In [10]:
import torch 
import torch.nn as nn
import math

### Input Embedding 


In [11]:
class InputEmbeddings(nn.Module): 
    
    def __inti__(self, d_model: int, vocab_size: int) -> None: 
        super().__init__()  
        self.d_model = d_model  # size of model embedding
        self.vocab_size = vocab_size 
        self.embedding = nn.Embedding(vocab_size, d_model) 
    
    def forward(self, x): 
        """
            (batch, seq_len) -> (batch, seq_len, d_model) 
            embedding(vocab, d_model) -> maps indices to a d model dimensional vector. 
            * math.sqrt(self.d_model) -> scale the embedding by sqrt(d_model) 
        """
        return self.embedding(x) * math.sqrt(self.d_model) 

### Position Encoding 


In [12]:
class PositionalEncoding(nn.Module): 
    
    def __init__(self, d_model: int, seq_len: int, dropout: float) -> None:
        super().__init__()
        self.d_model = d_model 
        self.seq_len = seq_len 
        self.dropout = nn.Dropout(dropout) 
        # positional encoding for each token in the sequence has d_model dimensions.
        self.pe = torch.zeros(seq_len, d_model) 
        self.position = torch.arange(0, seq_len, dtype=torch.float()).unsqueeze(1) # (seq_len, 1) 
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        self.pe[:, 0::2] = torch.sin(self.position * div_term)
        self.pe[:, 1::2] = torch.cos(self.position * div_term)
        self.pe = self.pe.unsqueeze(0)
        self.register_buffer('pe', self.pe) 
    
    def forward(self, x): 
        x = x + self.pe[:, : x.shape[1], :].requires_grad(False)   # (batch, seq_len, d_model)
        return self.dropout(x) 

### LayerNormalization Class

In [13]:
class LayerNormalization(nn.Module): 
        
        def __init__(self, d_model: int, eps: float = 1e-6) -> None: 
            super().__init__() 
            self.d_model = d_model 
            self.eps = eps 
            self.gamma = nn.Parameter(torch.ones(d_model)) 
            self.beta = nn.Parameter(torch.zeros(d_model)) 
        
        def forward(self, x): 
            mean = x.mean(dim=-1, keepdim=True) # get mean 
            std = x.std(dim=-1, keepdim=True)   # get varianceb 
            # normalize 
            x = (x - mean) / (std + self.eps)
            # scale and shift: y = gamma * x + beta
            # gamma for scaling, beta for shifting
            y = self.gamma * x + self.beta
            return y

### FeedForwardBlock Class

In [14]:
class FeedForwardBlock(nn.Module): 
    def __init__(self, d_model: int, d_ff: int, dropout: float) -> None: 
        super().__init__() 
        self.d_model = d_model 
        self.d_ff = d_ff 
        self.dropout = nn.Dropout(dropout) 
        self.linear1 = nn.Linear(d_model, d_ff) 
        self.linear2 = nn.Linear(d_ff, d_model) 
        self.relu = nn.ReLU()
    
    def forward(self, x: torch.Tensor) -> torch.Tensor: 
        # just a simple fully connected feed forward network
        x = self.linear1(x) 
        x = self.relu(x) 
        x = self.dropout(x) 
        x = self.linear2(x) 
        return x

### Multi-head attention:
- gồm nhiều self-attention với hi vọng mỗi self attention nhìn ở nhiều đặc điểm khác nhau sẽ hiểu ngữ cảnh tốt hơn. 
- d_k = d_model / head 
    - d_k là số chiều mà 1 head (1 attention nhìn được trong d_model - dimension of feature vector)
    - d_model : là số chiều của feature vector 
    - head: là số self attention được khởi tạo. 
    - VD : Tôi đang đi đâu đó 
         -  10   10  10  10  10   (mỗi từ được đại diện bởi vector 10 dimension - sử dụng 2 head)
    - head1 5    5   5   5   5
    - head2 5    5   5   5   5 
    - -> gom lại rồi trả về shape như đầu vào.

- self-attention : chắc năng chính là tổng hợp các feature tại từ đang xét với ngữ cảnh của các từ xung quanh 
- feedforwardblock : Suy diện (tăng tính biểu diễn của feature).

#### Mask in multi head attention
2 dạng mask được sử dụng : 
- padding mask : được dùng để đảm bảo rằng các padding tokens (khi mà chuẩn hóa input đầu với max input len, đảm bảo các câu trong batch sẽ có cùng len) không ảnh hưởng gì đến cơ chế attention 
- look-ahead mask (mask multi head attention - Causal mask): được dùng để đảm bảo trong quá trình đào tạo và suy luận mỗi VỊ TRÍ trong chuỗi chỉ có thể tham gia vào các vị trí trước đó và vị trí hiện tại chứ không liên quan đến bất kỳ vị trí nào trong tương lai.   

In [15]:
class MultiHeadAttentionBlock(nn.Module): 
    
    def __init__(self, d_model: int, h: int, dropout: float) -> None: 
        super().__init__()
        self.d_model = d_model 
        self.h = h 
        assert d_model % h == 0, "d_model must be divisible by h" 
        
        self.d_k = d_model // h 
        self.w_q = nn.Linear(d_model, d_model, bias=False)
        self.w_k = nn.Linear(d_model, d_model, bias=False) 
        self.w_v = nn.Linear(d_model, d_model, bias=False) 
        self.w_o = nn.Linear(self.h * self.d_k, d_model, bias=False)
        self.dropout = nn.Dropout(dropout)
        
    @classmethod
    def attention(query, key, value, mask, dropout: nn.Dropout):
        d_k = query.shape[-1] 
        
        # (batch, h, seq_len, d_k) -> (batch, h, seq_len, seq_len)
        attention_scores = (query @ key.transpose(-2, -1)) / math.sqrt(d_k) 
        
        if mask is not None: 
            attention_scores = attention_scores.masked_fill(mask == 0, -1e9) 
        attention_scores = attention_scores.softmax(dim=-1) 
        if dropout is not None: 
            attention_scores = dropout(attention_scores) 
        # (batch, h, seq_len, seq_len) @ (batch, h, seq_len, d_k) -> (batch, h, seq_len, d_k)
        return (attention_scores @ value), attention_scores 
    
    def forward(self, query, key, value, mask): 
        query = self.w_q(query)  # (batch, seq_len, d_model) * (batch, d_model, d_model)-> (batch, seq_len, d_model)
        key = self.w_k(key)      # same 
        value = self.w_v(value)  # same
        
        # split into h heads d_model = h * d_k
        # (batch, seq_len, d_model) -> (batch, seq_len, h, d_k) -> (batch, h, seq_len, d_k)
        query = query.view(query.shape[0], -1, self.h, self.d_k).transpose(1, 2) 
        key = key.view(key.shape[0], -1, self.h, self.d_k).transpose(1, 2) 
        value = value.view(value.shape[0], -1, self.h, self.d_k).transpose(1, 2)
        
        # apply attention 
        x, self.attention_scores = MultiHeadAttentionBlock.attention(query, key, value, mask, self.dropout) 
        
        # combine all heads together 
        # (batch, h, seq_len, d_k) -> (batch, seq_len, h, d_k) -> (batch, seq_len, h * d_k) 
        # contiguous() -> make sure the tensor is stored in a contiguous chunk of memory
        x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.h * self.d_k)
        
        # Multiply by W_o to stabilize the output shape 
        return self.w_o(x)
        

### ResidualConnection
- xử lý vấn đề vanishing gradients 
- được dùng mỗi khi qua lớp multi head attention hoặc feed-forward layer -> xong rồi sẽ xử dụng layer normalization. 

In [16]:
class ResidualConnection(nn.Module): 
    
    def __init__(self, d_model: int, dropout: float) -> None: 
        super().__init__() 
        self.d_model = d_model 
        self.dropout = nn.Dropout(dropout) 
        self.norm = LayerNormalization(d_model)
    
    def forward(self, x, sublayer): 
        return x + self.dropout(sublayer(self.norm(x)))

#### Encoder

In [17]:
class EncoderBlock(nn.Module): 
    
    def __init__(self, multi_head_attention: MultiHeadAttentionBlock, feed_forward: FeedForwardBlock, dropout: float) -> None: 
        super().__init__() 
        self.multi_head_attention = multi_head_attention 
        self.feed_forward = feed_forward 
        self.residual_connection = ResidualConnection(multi_head_attention.d_model, dropout)
        self.feed_forward_residual_connection = ResidualConnection(multi_head_attention.d_model, dropout)
    
    def forward(self, x, src_mask): 
        x = self.residual_connection(x, lambda x: self.multi_head_attention(x, x, x, src_mask))
        x = self.residual_connection(x, self.feed_forward)
        return x

class Encoder(nn.Module): 
    
    def __init__(self, layers: nn.ModuleList) -> None: 
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization(layers[0].multi_head_attention.d_model)
    
    def forward(self, x, mask): 
        for layer in self.layers: 
            x = layer(x, mask)
        return self.norm(x)

### Decoder

In [18]:
class DecoderBlock(nn.Module): 
    
    def __init__(self, multi_head_attention: MultiHeadAttentionBlock, feed_forward: FeedForwardBlock, dropout: float) -> None: 
        super().__init__() 
        self.multi_head_attention = multi_head_attention 
        self.feed_forward = feed_forward 
        self.residual_connection = ResidualConnection(multi_head_attention.d_model, dropout)
        self.feed_forward_residual_connection = ResidualConnection(multi_head_attention.d_model, dropout)
        
    def forward(self, x, encoder_output, src_mask, tgt_mask): 
        x = self.residual_connection(x, lambda x: self.multi_head_attention(x, x, x, tgt_mask)) # mask multi head attention
        x = self.residual_connection(x, lambda x: self.multi_head_attention(x, encoder_output, encoder_output, src_mask))   
        x = self.residual_connection(x, self.feed_forward)
        return x

class Decoder(nn.Module): 
    
    def __init__(self, layers: nn.ModuleList) -> None: 
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization(layers[0].multi_head_attention.d_model)
    
    def forward(self, x, encoder_output, src_mask, tgt_mask): 
        for layer in self.layers: 
            x = layer(x, encoder_output, src_mask, tgt_mask)
        return self.norm(x)

### ProjectionLayers

In [None]:
class ProjectionLayer(nn.Module):

    def __init__(self, d_model, vocab_size) -> None:
        super().__init__()
        self.proj = nn.Linear(d_model, vocab_size)

    def forward(self, x) -> None:
        # (batch, seq_len, d_model) --> (batch, seq_len, vocab_size)
        return self.proj(x)

### Transformer


In [None]:
class Transformer(nn.Module):

    def __init__(self, encoder: Encoder, decoder: Decoder, src_embed: InputEmbeddings, tgt_embed: InputEmbeddings, src_pos: PositionalEncoding, tgt_pos: PositionalEncoding, projection_layer: ProjectionLayer) -> None:
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_embed = src_embed
        self.tgt_embed = tgt_embed
        self.src_pos = src_pos
        self.tgt_pos = tgt_pos
        self.projection_layer = projection_layer

    def encode(self, src, src_mask):
        # (batch, seq_len, d_model)
        src = self.src_embed(src)
        src = self.src_pos(src)
        return self.encoder(src, src_mask)
    
    def decode(self, encoder_output: torch.Tensor, src_mask: torch.Tensor, tgt: torch.Tensor, tgt_mask: torch.Tensor):
        # (batch, seq_len, d_model)
        tgt = self.tgt_embed(tgt)
        tgt = self.tgt_pos(tgt)
        return self.decoder(tgt, encoder_output, src_mask, tgt_mask)
    
    def project(self, x):
        # (batch, seq_len, vocab_size)
        return self.projection_layer(x)