In [4]:
import torch
import torch.nn as nn 
import math 


In [4]:
class InputEmbeddings(nn.Module):
    def __init__(self, d_model:int, vocabu_size:int):
        super().__init__()
        self.d_model = d_model
        self.vocabu_size = vocabu_size
        self.embedding = nn.Embedding(vocabu_size, d_model)
    def forward(self, x):
        return self.embedding(x)*math.sqrt(self.d_model)
    

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model:int, seq_len:int, dropout:float)->None:
        super.__init__()
        self.d_model = d_model
        self.seq_len=seq_len
        self.dropout = nn.Dropout(dropout)

        #initialize matrix of shape (seq_len, d_model)
        pe = torch.zeros(seq_len, d_model)
        #create position indices
        position = torch.arrange(0, seq_len, d_type=torch.float).unsqueeze(1)
        #numerically stable dividing term 
        div_term = torch.exp(torch.arrange(0,d_model, 2)).float()*(-math.log(10000.0)/d_model)
        pe[:, 0::2] = torch.sin(position*div_term)
        pe[:, 1::2] = torch.cos(position*div_term)
        #add batch dimension
        pe=pe.unsqueeze (0)
        #register buffer as a buffer is not a parameter, it is not updated during backprop
        self.register_buffer('pe',pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)
    


In [1]:
#import torch

#pos = torch.arange(0, 5)          # positions: 0..4
#print("pos:", pos)
#print("pos.shape:", pos.shape)     # (5,)

#pos_u1 = pos.unsqueeze(1)          # add dim at index 1
#print("\npos_u1:\n", pos_u1)
#print("pos_u1.shape:", pos_u1.shape)  # (5, 1)

#pos_u0 = pos.unsqueeze(0)          # add dim at index 0
#print("\npos_u0:\n", pos_u0)
#print("pos_u0.shape:", pos_u0.shape)  # (1, 5)

pos: tensor([0, 1, 2, 3, 4])
pos.shape: torch.Size([5])

pos_u1:
 tensor([[0],
        [1],
        [2],
        [3],
        [4]])
pos_u1.shape: torch.Size([5, 1])

pos_u0:
 tensor([[0, 1, 2, 3, 4]])
pos_u0.shape: torch.Size([1, 5])


In [None]:
class LayerNormalization(nn.Module):
    def __init__(self, parameters_shape, eps=1e-5):
        super().__init__()
        self.parameters_shape=parameters_shape
        self.eps=eps
        self.gamma=nn.Parameter(torch.ones(parameters_shape))
        self.beta=nn.Parameter(torch.zeros(parameters_shape))

    def forward(self,x):
        dims=[-(i+1) for i in range(len(self.parameters_shape))]
        mean=x.mean(dim=dims, keepdim=True)
        print(f"Mean \n ({mean.size()}) : \n{mean}")
        var = ((x-mean)**2).mean(dim=dims, keepdim=True)
        std = (var + self.eps).sqrt()
        print(f"Standard Deviation \n ({std.size()}) : \n{std}")
        y=(x-mean)/std
        print(f"y \n ({y.size()})= \n {y}")
        out = self.gamma *y +self.beta 
        return out 
        


In [None]:
class MultiheadAttention(nn.Module):

    def __init__(self, d_model,n_heads, dropout) -> None:
        super().__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        assert d_model % n_heads==0, "Model dimensions is not divisible by number of heads"
        self.d_k= d_model // n_heads
        self.w_q=nn.Linear(d_model, d_model)
        self.w_k=nn.Linear(d_model, d_model)
        self.w_v=nn.Linear(d_model, d_model)
        
        self.w_o=nn.Linear(d_model, d_model)
        self.dropout=nn.Dropout(dropout)

    @staticmethod
    def attention(query, key, value, mask, dropout: nn.Dropout):
        d_k=query.shape[-1]
        #Attention scores
        attention_scores = (query @ key.transpose(-2,-1))//math.sqrt(d_k)
        if mask is not None:
            attention_scores.mask_fill_(mask==0, -1e11)
            attention_scores=attention_scores.softmax(dim=-1)
        if dropout is not None:
            attention_scores=dropout(attention_scores)
        qkv=attention_scores@value
        return qkv, attention_scores
        

    def forward(self, q, k, v, mask):
        #linear multiplication by weight matrix
        query=self.w_q(q)
        key=self.w_k(k)
        value=self.w_v(v)
        #(Batch size, sequence length, embedding size)->(Batch size, sequence length, number of heads, d_k)->(Batch size, number of heads,sequence length, d_k)
        query=query.view(query.shape[0],query.shape[1], self.n_heads, self.d_k).transpose(-2,-1)
        key=key.view(key.shape[0],key.shape[1], self.n_heads, self.d_k).transpose(-2,-1)
        value=value.view(value.shape[0],value.shape[1], self.n_heads, self.d_k).transpose(-2,-1)

        x, self.attention_scores=self.attention(query, key, value,mask, self.dropout)

        # Combine all the heads together
        # (batch size, number of heads, seq_len, d_k) --> (batch size, seq_len, number of heads, d_k) --> (batch size, seq_len, d_model)
        x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.n_heads * self.d_k)


        return self.w_o(x)
        

In [7]:
class FeedForwardBlock(nn.Module):

    def __init__(self, d_model: int, d_ff: int, dropout: float) -> None:
        super().__init__()
        self.linear1=nn.Linear(d_model, d_ff)
        self.dropout=nn.Dropout(dropout)
        self.Linear2=nn.Linear(d_ff, d_model)
    def forward(self,x ):
        return self.linear2(self.dropout(torch.relu(self.linear1(x))))
        

In [6]:
class ResidualConnection(nn.Module):
    def __init__(self, features, dropout) -> None:
        super().__init__()
        self.features=features
        self.dropout=nn.Dropout(dropout)
        self.norm=LayerNormalization(features)
    def forward(self, x, sublayer):
        return x+ self.dropout(sublayer(self.norm(x)))

In [None]:
class EncoderBlock(nn.Module):
    def __init__(self, self_attention_block:MultiheadAttention, feed_forward_block: FeedForwardBlock, features:int, dropout:float ) -> None:
        super().__init__()
        self.self_attention_block=self_attention_block
        self.feed_forward_block=feed_forward_block
        #two residue connections, one for attention layer, one for feedforward layer
        self.residual_connections = nn.ModuleList([ResidualConnection(features, dropout) for _ in range(2)])
    
    def forward(self, x, src_mask):
        #attention residue connection
        x = self.residual_connections[0](x, lambda x: self.self_attention_block(x, x, x, src_mask))
        #Feed forward residue connection
        x = self.residual_connections[1](x, self.feed_forward_block)
        return x


In [None]:
    
class Encoder(nn.Module):
    def __init__(self,features:int, layers:nn.ModuleList):
        super.__init__()
        self.layers=layers
        self.features=features
        self.norm=LayerNormalization(features)
        
    def forward(self,x,mask):
        for layer in self.layers:
            x=layer(x, mask)
        return self.norm(x)



In [None]:
class DecoderBlock(nn.Module):

    def __init__(self, features: int, self_attention_block: MultiheadAttention, cross_attention_block: MultiheadAttention, feed_forward_block: FeedForwardBlock, dropout: float) -> None:
        super().__init__()
        self.self_attention_block = self_attention_block
        self.cross_attention_block = cross_attention_block
        self.feed_forward_block = feed_forward_block
        self.residue_connections = nn.ModuleList([ResidualConnection(features, dropout) for _ in range(3)])
    def forward(self, x, encoder_output, src_mask, tgt_mask):
        x= self.residue_connections[0](x, lambda x:self.self_attention_block(x,x,x, tgt_mask))
        x= self.residue_connections[1](x, lambda x: self.self_attention_block(x, encoder_output, encoder_output, src_mask))
        x= self.residue_connections[2](x, self.feed_forward_block)
        return x

In [5]:
class Decoder(nn.Module):
    def __init__(self, layers:nn.ModuleList, features: int):
        super.__init__()
        self.layers = layers
        self.norm = LayerNormalization(features)

    def forward(self, x, encoder_output, tgt_mask, src_mask):
        for layer in self.layers:
            x=layer(x, tgt_mask, encoder_output, src_mask)
        return self.norm(x)
        



In [8]:
class ProjectionLayer(nn.Module):

    def __init__(self, d_model, vocab_size):
        super.__init__()
        self.d_model=d_model
        self.vocab_size=vocab_size
        self.proj = nn.Linear(self.d_model, self.vocab_size)
    
    def forward(self,x):
        # (batch, seq_len, d_model) --> (batch, seq_len, vocab_size)
        return self.proj(x)



In [None]:
   
class Transformer(nn.Module):

    def __init__(self, encoder: Encoder, decoder: Decoder, src_embed: InputEmbeddings, tgt_embed: InputEmbeddings, src_pos: PositionalEncoding, tgt_pos: PositionalEncoding, projection_layer: ProjectionLayer) -> None:
        super().__init__()
        self.encoder=encoder
        self.decoder=decoder
        self.src_embed=src_embed
        self.tgt_embed=tgt_embed
        self.src_pos=src_pos
        self.tgt_pos=tgt_pos
        self.projection_layer=projection_layer
    def encode(self, src, src_mask):
        src=self.src_embed(src)
        src=self.pos(src)
        return self.encoder(src, src_mask)
    
    def decode (self, src, src_mask, encoder_output, tgt_mask):
        tgt=self.tgt_embed(tgt)
        tgt=self.pos(tgt)
        return self.decoder(tgt, tgt_mask, encoder_output, src_mask)
    
    def project(self, x):
        # (batch, seq_len, vocab_size)
        return self.projection_layer(x)




In [None]:
def build_transformer (src_vocab_size: int, tgt_vocab_size: int, src_seq_len: int, tgt_seq_len: int, d_model: int=512, N: int=6, h: int=8, dropout: float=0.1, d_ff: int=2048):
    # Create the embedding layers

    # Create the positional encoding layers

    # Create the encoder blocks

    # Create the decoder blocks

    # Create the encoder and decoder

    # Create the projection layer

    # Create the transformer

    # Initialize the parameters