In [9]:
import torch as th
from torch import nn
from torchsummary import summary
from tensorflow.keras.layers import TextVectorization
import math

In [4]:
data = ["The cat is sleeping on the mat", "She loves to read books in the evening", "The sun rises in the east and sets in the west", 
        "He bought a new car last week", "They are going to the park to play soccer",
        "The teacher explained the lesson clearly", "I enjoy drinking coffee in the morning",
        "The children laughed at the funny joke", "We visited the museum during our vacation", 
        "The dog barked loudly at the stranger"
]

In [5]:
class Tokenizer:
    def __init__(self):
        self.vocab = None # to store the vocab
        self.input_ids = None # to store the input ids
        
    def get_tokenizer(self, data):
        vocab = set()
        for sentence in data:
            for i in sentence.lower().split(" "):
                vocab.add(i)
        self.vocab = vocab
        return self.vocab
    
    def mapper(self, vocab):
        tokens = {}
        for i, element in enumerate(vocab):
            tokens[element] = i
        self.input_ids = tokens
        return self.input_ids
    
    def __call__(self, data):
        vocab = self.get_tokenizer(data)
        tokens = self.mapper(vocab)
        return tokens
    
    def set_max_len(self, x, max_len):
        current_len = len(x)
        len_diff = max_len - current_len
        
        if len_diff <=  max_len and (len_diff >0):
            for i in range(0, len_diff):
                x.append(0)
            return x
        else:
            return x[0:max_len]
            
    def transform(self, data, tokens, max_len: int):
        _ = []
        for sentence in data:
            x = []
            for word in sentence.lower().split(" "):
                x.append(tokens[word])
            _.append(self.set_max_len(x, max_len = max_len))
        return th.tensor(_)
    
    def __len__(self):
        """Returns the vocab size"""
        assert self.vocab != None, "Tokenizer not fit"
        return len(self.vocab)
    


In [6]:
tokenizer = Tokenizer()
tokens = tokenizer(data)

vocab_size = tokenizer.__len__()
d_model = 50
max_len = 5
data = tokenizer.transform(data, tokens, max_len)

# Structure to follow:
1. Embeddings
2. Positional Encoding
3. multihead attention
4. Residual connection
5. Layer normalization
6. Encoder Block
6.1 Encoder


In [7]:
class EmbeddingLayer(nn.Module):
    def __init__(self, vocab_size, d_model):
        super().__init__()
        self.d_model = d_model
        self.vocab_size = vocab_size
        self.embedding = nn.Embedding(self.vocab_size, self.d_model)
        
    def forward(self, x):
        return self.embedding(x)
    

class WithPositionalEncoding(nn.Module):
    def __init__(self, max_len: int, d_model:int, vocab_size:int):
        super(WithPositionalEncoding, self).__init__()
        self.d_model = d_model
        self.vocab_size = vocab_size
        self.max_len = max_len
        self.embdding = EmbeddingLayer(vocab_size, d_model)
    
    def get_embedding(self):
        pe = th.zeros(self.max_len, self.d_model)
        index = th.arange(0, self.max_len).unsqueeze(1)
        div_term = th.exp(th.arange(0, self.d_model, 2) * -(th.log(th.tensor(10000.0)) / self.d_model))
        # for even indices
        pe[:, ::2] = th.sin(index*div_term)
        pe[:, 1::2] = th.cos(index*div_term)
        return pe
    
    def forward(self, x):
        seq_len = x.shape[-1]
        return self.embdding(x) + self.get_embedding()[:seq_len, :]


ob = WithPositionalEncoding(max_len, d_model, vocab_size)    
embeddings = ob(data)    

In [7]:
embeddings.shape (50, 50)

torch.Size([10, 5, 50])

# Step 3: Multihead attention


In [10]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, h: int):
        super().__init__()
        self.d_model = d_model # dimnesion of model/embeddings
        self.h = h # number of heads
        
        # to ensure d_model is divisible by h
        assert d_model % h == 0, "d_model is nit divisible by h"
        
        self.d_k = d_model // h # dimension of vector seen by each head 
        self.q = nn.Linear(d_model, d_model, bias=False)
        self.q = nn.Linear(d_model, d_model, bias=False)
        self.v = nn.Linear(d_model, d_model, bias=False)
        self.w = nn.Linear(d_model, d_model, bias=False) # shape = d_model, d_model
        
    @staticmethod
    def attention(query, key, value):
        d_k = query.shape[-1]
        # query/keys shape -> batch, heads, seq_len, dim
        # keys shape after transpose -> batch, heads, dim, seq_len
        
        # attention score - >> batch, heads, seq_len, seq_len
        
        attention_scores = (query @ key.transpose(-2, -1)) / math.sqrt(d_k)
        
        attention_scores = attention_scores.softmax(dim = -1) # batch, h, seq_len, seq_len
        
        # values shape -> batch, heads, seq_len, dim
        # after matmul with attention score -> batch, heads, seq_len, dim
        return (attention_scores @ value), attention_scores
    
    def forward(self, x):
        query = self.q(x) # batch, seq_len, d_model -> batch, seq_len, d_model
        keys = self.k(x)  # batch, seq_len, d_model -> batch, seq_len, d_model
        values = self.v(x) # batch, seq_len, d_model -> batch, seq_len, d_model
        
        # batch, seq_len, d_model -> batch, seq_len, h, d_k -> batch, h, sq_len, d_k
        query = query.view(query.shape[0], query.shape[1], self.h, self.d_k).transpose(1, 2) # after view shape -> batch, seq_len, heads, dim
        keys = keys.view(keys.shape[0], keys.shape[1], self.h, self.d_k).transpose(1, 2) 
        values = values.view(values.shape[0], values.shape[1], self.h, self.d_k).transpose(1, 2) # after transpose -> batch, heads, seq_len, dim
        
        # calculate attention
        x, self.attention_scores = MultiHeadAttention.attention(query, keys, values)
        
        # combine all heads together
        # x shape (attention score with matmul values)-> batch, heads, seq_len, dim  -> batch, seq_len, heads, dim -> batch, seq_len, dim
        x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.h * self.d_k)
        
        # multiply with w -> batch, seq_len, d_model -> batch, seq_len, d_model
        return self.w(x)
        
        
        

# Step 4: Residual Connection


In [44]:
class LayerNormalization(nn.Module):
    def __init__(self, features, eps:float):
        super().__init__()
        self.eps = eps
        self.alpha = nn.Parameter(th.ones(features))
        self.beta = nn.Parameter(th.zeros(features))
        

    def forward(self,x):
        mean = x.mean(dim = -1, keepdim=True) # calculate mean across dim 
        std = x.std(dim=-1, keepdim=True)
        return self.alpha * ((x - mean)/(std + self.eps)) + self.beta
    
    
class ResidualConnection(nn.Module):
    def __init__(self):
        super().__init__()
        self.norm = LayerNormalization()
        
    def forward(self, x):
        return x + self.norm(x)
    
    
class FeedForwardBlock(nn.Module):

    def __init__(self, d_model: int, d_ff: int, dropout: float) -> None:
        super().__init__()
        self.linear_1 = nn.Linear(d_model, d_ff) # w1 and b1
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model) # w2 and b2

    def forward(self, x):
        # (batch, seq_len, d_model) --> (batch, seq_len, d_ff) --> (batch, seq_len, d_model)
        return self.linear_2(self.dropout(th.relu(self.linear_1(x))))
        
        

In [13]:
embeddings.std(-1, keepdim=True).shape

torch.Size([10, 5, 1])

In [14]:
x = th.randint(0, 4, size=(2, 4, 7))
x

tensor([[[1, 1, 0, 0, 0, 1, 0],
         [0, 2, 2, 3, 3, 2, 3],
         [0, 2, 0, 3, 1, 1, 0],
         [2, 2, 2, 3, 3, 3, 3]],

        [[1, 2, 1, 3, 0, 1, 3],
         [2, 3, 0, 1, 0, 0, 0],
         [0, 1, 2, 2, 0, 3, 2],
         [2, 3, 1, 2, 1, 2, 3]]])

In [42]:
x.mean(dim = -1, dtype=float, keepdim=True)

tensor([[[0.4286],
         [2.1429],
         [1.0000],
         [2.5714]],

        [[1.5714],
         [0.8571],
         [1.4286],
         [2.0000]]], dtype=torch.float64)

In [37]:
x + x.mean(dim = -1, dtype=float, keepdim=True)

tensor([[[1.4286, 1.4286, 0.4286, 0.4286, 0.4286, 1.4286, 0.4286],
         [2.1429, 4.1429, 4.1429, 5.1429, 5.1429, 4.1429, 5.1429],
         [1.0000, 3.0000, 1.0000, 4.0000, 2.0000, 2.0000, 1.0000],
         [4.5714, 4.5714, 4.5714, 5.5714, 5.5714, 5.5714, 5.5714]],

        [[2.5714, 3.5714, 2.5714, 4.5714, 1.5714, 2.5714, 4.5714],
         [2.8571, 3.8571, 0.8571, 1.8571, 0.8571, 0.8571, 0.8571],
         [1.4286, 2.4286, 3.4286, 3.4286, 1.4286, 4.4286, 3.4286],
         [4.0000, 5.0000, 3.0000, 4.0000, 3.0000, 4.0000, 5.0000]]],
       dtype=torch.float64)

In [27]:
x.sum()

tensor(84)

In [25]:
x.numel(), x.shape

(56, torch.Size([2, 4, 7]))

In [28]:
84/56

1.5

In [32]:
sum([0, 2, 2, 3, 3, 2, 3])/7

2.142857142857143

In [None]:
class AttentionHead(nn.Module):
    def __init__(self, d_model):
        super().__init__()
        
        self.d_model = d_model
        self.q = nn.Linear(self.d_model, self.d_model)
        self.k = nn.Linear(self.d_model, self.d_model)
        self.v = nn.Linear(self.d_model, self.d_model)
        
    def forward(self, x):
        # print(x.shape, self.q.weight.shape)
        query = self.q(x) # batch_size, seq_length, q.dim-1
        keys = self.k(x) 
        values = self.v(x)
        # print(query.shape, keys.shape, values.shape)
        attention_score = th.softmax(th.matmul(query, keys.transpose(-1, -2))/ th.sqrt(th.tensor(d_model)), dim=-1)
        attention_weight = th.matmul(attention_score, values)
        return attention_weight
        


    




def ResidualConnection(nn.Module):
    def __init__(self, d_model):
        super().__init__()
        self.d_model = d_model
        self.layer_norm = LayerNormalization(self.d_model)
        self.attention = AttentionHead(self.d_model)
        
    def forward(self, x):
        pass

def FeedForward(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = nn.Linear(self.d_model, d_model)
        
           

ob = AttentionHead(50)
attention_weight = ob(embeddings)