In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math,copy,re
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import warnings
warnings.simplefilter("ignore")
print(torch.__version__)

2.5.1+cu124


# Word Embeddings
Each word in the input sequence would have a vector of embedding_dimension of 512.

Output dimension = (batch_size, sequence_length, embedding_dimension)
Word_Embedding_Matrix dimension = (Vocab_length, embedding_dimension)

In [2]:
class Embedding(nn.Module):
    def __init__(self,vocab_size,embedding_dim):
        super(Embedding,self).__init__()
        self.embed=nn.Embedding(vocab_size,embedding_dim)
    def forward(self,x):
        return self.embed(x)

# Positional Embeddings
Dimensions are similar to Word Embeddings
Later the outputs of these two layers are added

In [3]:
class PositionalEmbedding(nn.Module):
    def __init__(self,max_seq_len,embedding_dim):
        super(PositionalEmbedding,self).__init__()
        self.embedding_dim=embedding_dim
        pe =torch.zeros(max_seq_len,embedding_dim)
        # pos -> refers to order in the sentence
        # i -> refers to position along embedding vector dimension (i-even,i+1-odd)
        for pos in range(max_seq_len):
            for i in range(0,embedding_dim,2):
                pe[pos,i]=math.sin(pos/(10000**((2*i)/embedding_dim)))
                pe[pos,i+1]=math.cos(pos/(10000**((2*i)/embedding_dim)))
        pe=pe.unsqueeze(0) #
        self.register_buffer('positional_embedding',pe)
        #register_buffer ---> stored in state_dict but non-trainable
    def forward(self,x):
        x=x*math.sqrt(self.embedding_dim)
        seq_len=x.size(1) #current seq_len of the word instead of max_sequnece_length
        x=x+torch.autograd.Variable(self.pe[:,:seq_len,])
        return x



# Multi-Head Attention

Q_matrix,
K_matrix,
V_matrix

In [4]:
class MultiHeadAttention(nn.Module):
    def __init__(self,embedding_dim=512,num_heads=8):
        super(MultiHeadAttention,self).__init__()
        self.embedding_dim=embedding_dim
        self.num_heads=num_heads
        self.head_dim=embedding_dim//num_heads # 512/8 = 64
        #ALL the QKV matrices are 64,64
        self.key_matrix = nn.Linear(self.head_dim,self.head_dim,bias=False)
        self.query_matrix = nn.Linear(self.head_dim,self.head_dim,bias=False)
        self.value_matrix = nn.Linear(self.head_dim,self.head_dim,bias=False)
        self.out = nn.Linear(self.num_heads * self.head_dim,self.embedding_dim) #512 * 512
    def forward(self,key,query,value,mask=None):
        batch_size=key.size(0)
        seq_length=key.size(1)
        # query dimension can change in decoder during inference.
        # so we cant take general seq_length
        seq_length_query = query.size(1)
        key = key.view(batch_size,seq_length,self.n_heads,self.head_dim)
        #i.e 32,10,512 ---> 32,10,8,64
        query = query.view(batch_size,seq_length_query,self.n_heads,self.head_dim)
        #i.e 32,seq_len_query,512 ---> 32,seq_len_query,8,64
        value = value.view(batch_size,seq_length,self.n_heads,self.head_dim)
        #i.e 32,10,512 ---> 32,10,8,64
        k=self.key_matrix(key) # 64*64 x 32,10,8,64
        q=self.query_matrix(query)
        v=self.value_matrix(value)

        q=q.transpose(1,2) #32,8,10,64
        k=k.transpose(1,2) #32,8,10,64
        v=v.transpose(1,2) #32,8,10,64 each head has 10 seq len vectors each of 64 dimensional vector(each word has 64dim vector)

        # computes attention (QK.T)/d^1/2
        # adjust key for matrix multiplication
        k_adjusted = k.transpose(-1,-2) # 32,8,64,10
        product = torch.matmul(q, k_adjusted) #32,8,10,10

        # fill those positions of product matrix as (-1e20) where mask positions are 0
        if mask is not None:
             product = product.masked_fill(mask == 0, float("-1e20"))

        #divising by square root of key dimension
        product = product / math.sqrt(self.single_head_dim) #  sqrt(64) ~ 8

        #applying softmax
        scores = F.softmax(product, dim=-1) #32,8,10,10 last layer is normalized
        scores = torch.matmul(scores, v)  # (32,8,10,10) x (32,8,10,64) = (32,8,10,64)
        scores.transpose(1,2).contiguous().view(batch_size, seq_length_query, self.single_head_dim*self.n_heads)  # (32,8,10,64)  -> (32,10,8,64)   -> (32,10,512)
        #contiguous becuase the memory copy is no stored as contiguos memeory
        output = self.out(scores)  # (32,10,512) -> (32,10,512)
        return output


In [5]:
a = torch.rand(2,3,4)
print(a)
b=a.view(2,3,2,2)
print(b)
c=b.transpose(1,2)
print(c)

tensor([[[0.0076, 0.7412, 0.3044, 0.6124],
         [0.6822, 0.0135, 0.4996, 0.0588],
         [0.5986, 0.3923, 0.6880, 0.0489]],

        [[0.7553, 0.7426, 0.1792, 0.7642],
         [0.5237, 0.2528, 0.6227, 0.2845],
         [0.4573, 0.2274, 0.1588, 0.6087]]])
tensor([[[[0.0076, 0.7412],
          [0.3044, 0.6124]],

         [[0.6822, 0.0135],
          [0.4996, 0.0588]],

         [[0.5986, 0.3923],
          [0.6880, 0.0489]]],


        [[[0.7553, 0.7426],
          [0.1792, 0.7642]],

         [[0.5237, 0.2528],
          [0.6227, 0.2845]],

         [[0.4573, 0.2274],
          [0.1588, 0.6087]]]])
tensor([[[[0.0076, 0.7412],
          [0.6822, 0.0135],
          [0.5986, 0.3923]],

         [[0.3044, 0.6124],
          [0.4996, 0.0588],
          [0.6880, 0.0489]]],


        [[[0.7553, 0.7426],
          [0.5237, 0.2528],
          [0.4573, 0.2274]],

         [[0.1792, 0.7642],
          [0.6227, 0.2845],
          [0.1588, 0.6087]]]])


#ENCODER

In [19]:
class TransformerBlock(nn.Module):
    def __init__(self,embedding_dim,expansion_factor=4,num_heads=8):
        super(TransformerBlock,self).__init__()
        self.attention = MultiHeadAttention(embedding_dim, num_heads)
        self.norm1 = nn.LayerNorm(embedding_dim)
        self.norm2 = nn.LayerNorm(embedding_dim)
        self.feed_forward=nn.Sequential(
            nn.Linear(embedding_dim,expansion_factor*embedding_dim),
            nn.ReLU(),
            nn.Linear(expansion_factor*embedding_dim,embedding_dim)
        ) #this is like expanding into more dimensional and reducing back to original dimension
        self.dropout1 = nn.Dropout(0.2)
        self.dropout2 = nn.Dropout(0.2)
    def forward(self,key,query,output):
        attention_ouput=self.attention(key,query,output)
        attention_residual=attention_ouput+output
        x=self.dropout1(self.norm1(attention_residual))
        feed_forward_output=self.feed_forward(x)
        feed_forward_residual=feed_forward_output+x
        output=self.dropout2(self.norm2(feed_forward_residual))
        return output
class TransformerEncoder(nn.Module):
    def __init__(self,seq_len,vocab_size,embedding_dim,num_layers=2,expansion_factor=4,num_heads=8):
        super(TransformerEncoder,self).__init__()
        self.embedding_dim=embedding_dim
        self.positional_encoder = PositionalEmbedding(seq_len, embedding_dim)
        self.layers = nn.ModuleList([TransformerBlock(embedding_dim, expansion_factor, num_heads) for _ in range(num_layers)])
    def forward(self,x):
        embed_out = self.embedding_layer(x)
        out = self.positional_encoder(embed_out)
        for layer in self.layers:
            out = layer(out, out, out)
        return out

#Decoder
In Decoder during cross attention key,value vectors are from Encoder Output whereas Query is from previous decoder block
Also masking is applied during training.

In [24]:
class DecoderBlock(nn.Module):
    def __init__(self,embedding_dim,expansion_factor=4,num_heads=8):
        super(DecoderBlock,self).__init__()
        self.attention = MultiHeadAttention(embedding_dim, num_heads)
        self.norm = nn.LayerNorm(embedding_dim)
        self.dropout = nn.Dropout(0.2)
        self.transfomer_block = TransformerBlock(embedding_dim, expansion_factor, num_heads)
    def forward(self,key,query,x,mask):
        #we need to pass mask to only the first attention
        attention = self.attention(x,x,x,mask=mask) #32x10x512
        value = self.dropout(self.norm(attention + x))
        out = self.transformer_block(key, query, value)
        return out

class TransformerDecoder(nn.Module):
    def __init__(self,target_vocab_size,embedding_dim,seq_len, num_layers=2, expansion_factor=4, num_heads=8):
        super(TransformerDecoder,self).__init__()
        self.embedding_dim=embedding_dim
        self.positional_encoder = PositionalEmbedding(seq_len, embedding_dim)
        self.layers =nn.ModuleList([DecoderBlock(embedding_dim, expansion_factor, num_heads) for _ in range(num_layers)])
        self.fc_out = nn.Linear(embedding_dim, target_vocab_size)
        self.dropout = nn.Dropout(0.2)
    def forward(self,x,enc_out,mask):
        x = self.word_embedding(x)  #32x10x512
        x = self.position_embedding(x) #32x10x512
        x = self.dropout(x)
        for layer in self.layers:
            x = layer(enc_out, x, enc_out, mask)  #key,query,x

        out = F.softmax(self.fc_out(x))

        return out

In [21]:
class Transformer(nn.Module):
    def __init__(self, embedding_dim, src_vocab_size, target_vocab_size, seq_length,num_layers=2, expansion_factor=4, num_heads=8):
        super(Transformer, self).__init__()
        self.target_vocab_size = target_vocab_size

        self.encoder = TransformerEncoder(seq_length, src_vocab_size, embedding_dim, num_layers=num_layers, expansion_factor=expansion_factor, num_heads=num_heads)
        self.decoder = TransformerDecoder(target_vocab_size, embedding_dim, seq_length, num_layers=num_layers, expansion_factor=expansion_factor, num_heads=num_heads)

    def make_trg_mask(self, trg):
        batch_size, trg_len = trg.shape
        # returns the lower triangular part of matrix filled with ones
        trg_mask = torch.tril(torch.ones((trg_len, trg_len))).expand(
            batch_size, 1, trg_len, trg_len
        )
        return trg_mask

    def decode(self,src,trg):
        trg_mask = self.make_trg_mask(trg)
        enc_out = self.encoder(src)
        out_labels = []
        batch_size,seq_len = src.shape[0],src.shape[1]
        #outputs = torch.zeros(seq_len, batch_size, self.target_vocab_size)
        out = trg
        for i in range(seq_len): #10
            out = self.decoder(out,enc_out,trg_mask) #bs x seq_len x vocab_dim
            # taking the last token
            out = out[:,-1,:]

            out = out.argmax(-1)
            out_labels.append(out.item())
            out = torch.unsqueeze(out,axis=0)
        return out_labels
    def forward(self, src, trg):
        trg_mask = self.make_trg_mask(trg)
        enc_out = self.encoder(src)
        outputs = self.decoder(trg, enc_out, trg_mask)
        return outputs


In [25]:
src_vocab_size = 11
target_vocab_size = 11
num_layers = 6
seq_length= 12


# let 0 be sos token and 1 be eos token
src = torch.tensor([[0, 2, 5, 6, 4, 3, 9, 5, 2, 9, 10, 1],
                    [0, 2, 8, 7, 3, 4, 5, 6, 7, 2, 10, 1]])
target = torch.tensor([[0, 1, 7, 4, 3, 5, 9, 2, 8, 10, 9, 1],
                       [0, 1, 5, 6, 2, 4, 7, 6, 2, 8, 10, 1]])

print(src.shape,target.shape)
model = Transformer(embedding_dim=512, src_vocab_size=src_vocab_size,target_vocab_size=target_vocab_size, seq_length=seq_length,num_layers=num_layers, expansion_factor=4, num_heads=8)
model



torch.Size([2, 12]) torch.Size([2, 12])


Transformer(
  (encoder): TransformerEncoder(
    (positional_encoder): PositionalEmbedding()
    (layers): ModuleList(
      (0-5): 6 x TransformerBlock(
        (attention): MultiHeadAttention(
          (key_matrix): Linear(in_features=64, out_features=64, bias=False)
          (query_matrix): Linear(in_features=64, out_features=64, bias=False)
          (value_matrix): Linear(in_features=64, out_features=64, bias=False)
          (out): Linear(in_features=512, out_features=512, bias=True)
        )
        (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (feed_forward): Sequential(
          (0): Linear(in_features=512, out_features=2048, bias=True)
          (1): ReLU()
          (2): Linear(in_features=2048, out_features=512, bias=True)
        )
        (dropout1): Dropout(p=0.2, inplace=False)
        (dropout2): Dropout(p=0.2, inplace=False)
      )
    )
  )
  (decoder): TransformerDe