In [1]:
import torch
print(torch.cuda.is_available())

True


In [2]:
import torch.nn as nn 
import math

## 1.0 Input Embedding

In [13]:
class InputEmbeddings(nn.Module):

    def __init__(self, 
                 vocab_size:int,
                 d_model:int) -> None:
        
        super().__init__()
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)

    
    def forward(self, x):
        return self.embedding(x) * math.sqrt(self.d_model)

In [14]:
InputEmbeddings(vocab_size=10,
                d_model=512)

InputEmbeddings(
  (embedding): Embedding(10, 512)
)

## 2.0 PositionalEmbedding

In [None]:
class PositionalEmbedding(nn.Module):

    def __init__(self,
                 d_model:int,
                 seq_len:int,
                 dropout:float) -> None:
        
        super().__init__()
        self.d_model = d_model
        self.seq_len = seq_len
        self.dropout = nn.Dropout(dropout)

        # create a matrix of shape (seq_len, d_model)
        pe = torch.zeros(seq_len, d_model)

        # create a vector of shape(seq_len)
        position = torch.arange(0, seq_len, dtype=torch.float).unsqueeze(1)

        # create a vector of shape (d_model)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() (-math.log(10000.0) / d_model))

        # Apply sine to even indices 
        pe[:, 0::2] = torch.sin(position * div_term)

        # Apply consine to odd indices 
        pe[:, 1::2] = torch.cos(position * div_term)

        # Add to batch dimension to the positional  encoding 
        pe = pe.unsqueeze(0)

        # Register the positional encoding as a buffer 
        self.register_buffer('pe', pe)



    def forward(self, x):
        x = x + (self.pe[:, :x.shape[1], :]).required_grad_(False)

        return self.dropout(x)


### 3.0 LayerNormalization

In [None]:
class LayerNormalization(nn.Module):

    def __init__(self, 
                 features:int,
                 eps:float=10) -> None:
        super().__init__()
        
        self.eps = eps 
        self.alpha = nn.Parameter(torch.ones(features))   # alpha is a learnable parameter
        self.bias = nn.Parameter(torch.ones(features))    # bias is a learnable parameter

        
    def forward(self, x):

        # keep the dimension for broadcasting 
        mean = x.mean(dim = -1, keepdim = True)

        # standard daveation 
        std = x.std(dim = -1, keepdim = True)


        # calculate the layarnormalization 
        return self.alpha * (x - mean) / (std + self.eps) + self.bias




## 4.0 FeedForward Netwrok

In [None]:
class FeedForwardNetwork(nn.Module):

    def __init__(self,
                 d_model:int,
                 d_ff:int,
                 dropout:float) -> None:
        super().__init__()
        self.linear_1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model)


    def forward(self, x):
        return self.linear_2(self.dropout(torch.relu(self.linear_1(x))))
    

    

## 5.0 MultHeadAttention

In [None]:
class MultiHeadAttentionBlock(nn.Module):

    def __init__(self,
                 h:int,
                 d_model:int,
                 dropout:float) -> None:
        super().__init__()
        self.h = h 
        self.d_model = d_model
        self.dropout = nn.Dropout(dropout)
        assert d_model % h == 0 

        self.d_k = d_model // h 
        self.w_q = nn.Linear(d_model, d_model, bias=False)
        self.w_k = nn.Linear(d_model, d_model, bias=False)
        self.w_v = nn.Linear(d_model, d_model, bias=False)
        self.w_o = nn.Linear(d_model, d_model, bias=False)



    def attention(query, key, value, mask, dropout:nn.Dropout):

        d_k = query.shape[-1]
        attention_scores = (query @ key.transpose(-2, -1)) / math.sqrt(d_k)

        if mask is not None:
            attention_scores = attention_scores.masked_fill_(mask == 0, 
                                                             -1e9)
            

        # calculate the softmax 
        attention_scores = attention_scores.softmax(dim = -1)

        if dropout is not None:
            attention_scores = dropout(attention_scores)


        attention_scores = attention_scores @ value

        return attention_scores





    def forward(self, k, q, v, mask):

        query = self.w_q(q)
        key = self.w_k(k)
        value = self.w_v(v)

        # calculate the prime of key, query, value 
        query = query.view(query.shape[0], query.shape[1], self.h, self.d_k)
        key = key.view(key.shape[0], key.shape[1], self.h, self.d_k)
        value = value.view(value.shape[0], value.shape[1], self.h , self.d_k)


        # calculate the multhead attention 
        x, self.attention_scores = MultiHeadAttentionBlock.attention(query, value, key, mask, self.dropout)


        # (batch_size, seq_len, h, d_k) -> (batch_size, seq_len, d_model)
        x = x.transpose(1, 2).contiguous().view(x.shape[0], x.shape[-1], self.h * self.d_k)


        return self.w_o(x)
    

## 6.0 Residual Connection

In [None]:
class ResidualConnection(nn.Module):

    def __init__(self,
                 features:int,
                 dropout:float) -> None:
        super().__init__()

        self.dropout = nn.Dropout(dropout)
        self.norm = LayerNormalization(features)

    def forward(self, x, sublayer):
        # calculate the sublayer 
        x = self.dropout(sublayer(self.norm(x)))
        return x

## 7.0 EncoderBlock

In [None]:
class EncoderBlock(nn.Module):

    def __init__(self,
                 self_attention_block:MultiHeadAttentionBlock,
                 feed_forward_block:FeedForwardNetwork,
                 features:int,
                 dropout:nn.Dropout) -> None:
        super().__init__()
        self.self_attention_block = self_attention_block
        self.feed_forward_block = feed_forward_block
        self.residual_connection = nn.ModuleList([
            ResidualConnection(features, dropout) for _ in range(2)
        ])


    def forward(self, x, src_mask):  # srouce_mask -> hide the interaction of the padding word with other words we  don't want to padding word to interact with other words so we will apply the source mask  
        x = self.residual_connection[0](x,  # first skip connection
                                        lambda x: self.self_attention_block(x, x, x, src_mask)    # multi-head Attention block
                                        )
        

        x = self.residual_connection(1)(x,  # second skip connection 
                                        self.feed_forward_block
                                        )
        

        return x

## 8.0 Encoder

In [None]:
class Encoder(nn.Module):

    def __init__(self,
                 features:int,
                 layers:nn.ModuleList,
                 ) -> None:
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization(features)


    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)

        return self.norm(x)

In [None]:
class Encoder(nn.Module):

    def __init__(self,
                 features:int,
                 layers:nn.ModuleList
                 ) -> None:
        super().__init__()
        self.norm = LayerNormalization(features)
        self.layers = layers


    def forward(self, 
                x, 
                mask):
        
        
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

## 1. Decodder

In [None]:
class DecoderBlock(nn.Module):

    def __init__(self,
                 features:int,
                 self_attention_block:MultiHeadAttentionBlock,
                 cross_attention_block:MultiHeadAttentionBlock,
                 feed_forward_block:FeedForwardNetwork,
                 dropout) -> None:
        super().__init__()
        self.self_attention_block = self_attention_block
        self.cross_attention_block = cross_attention_block
        self.feed_forward_block = feed_forward_block
        self.residual_connections = nn.ModuleList([ResidualConnection(features, dropout) for _ in range(3)])
        

    def forward(self, x, src_mask, tgt_mask, encoder_output):
        x = self.residual_connections[0](x, lambda x: self.self_attention_block(x, x, x,tgt_mask, src_mask ))
        x = self.residual_connections[1](x, lambda x: self.self_attention_block(x, encoder_output, encoder_output, src_mask))
        x = self.feed_forward_block[3](x, self.feed_forward_block)
        return x 
        

## 1.2 Decoder

In [None]:
class Decoder(nn.Module):

    def __init__(self,
                 features:int,
                 layers:nn.ModuleList) -> None:
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization(features)


    def forward(self, x, encoder_output, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, encoder_output, src_mask, tgt_mask)
        return self.norm(x)

## 1.3 Projection Lyaer

In [None]:
class ProjectionLayer(nn.Module):

    def __init__(self,
                 d_model,
                 vocab_size) -> None:
        super().__init__()
        self.proj = nn.Linear(d_model, vocab_size)


    def forward(self, x) -> None:
        return self.proj(x)
    

## 1.4 Transformer

In [None]:
class Transformer(nn.Module):

    def __init__(self,
                 encoder) -> None:
        super().__init__()