# Import Modules

In [16]:
import torch
import torch.nn as nn
import numpy as np
import math

# Concepts

## Position Encoding

$$
p_{i,j} = 
\begin{cases}
\sin \left(\frac{i}{10000^{\frac{j}{d_{model}}}}\right) & \text{if } j \text{ is even} \\
\cos \left(\frac{i}{10000^{\frac{j}{d_{model}}}}\right) & \text{if } j \text{ is odd}
\end{cases}
$$

# Naive Transformer Version 1

In [99]:
class InputEmbeddings(nn.Module):
    def __init__(self, vocab_size, embed_size):
        super().__init__()
        self.embed_size = embed_size
        self.vocab_size = vocab_size
        self.embedding  = nn.Embedding(vocab_size, embed_size)
    
    def forward(self, x):
        return self.embedding(x) * math.sqrt(self.embed_size)

class PositionalEncoding(nn.Module):

    def __init__(self, embed_size:int, seq_len:int, dropout: float):
        super().__init__()
        self.embed_size = embed_size
        self.seq_len = seq_len
        self.dropout = nn.Dropout(dropout)

        # Matrix shape (embed_size, seq_len)
        pe = torch.zeros(seq_len, embed_size)
        position = torch.arange(0, seq_len, dtype=torch.float).unsqueeze(1) # shape: (seq_len, 1)
        div_term = torch.exp(torch.arange(0, embed_size, 2).float() * -(math.log((10000.0))) / embed_size)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0) # Shape: (1, seq_len, embed_size)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x = x + (self.pe[:, :x.shape[1], :]).requires_grad_(False)
        x = (self.pe[:, :x.size(1)].detach()).requires_grad_(False)
        return self.dropout(x)
    
class LayerNorm(nn.Module):
    
    def __init__(self, eps: float = 1e-6):
        super().__init__()
        self.eps = eps
        self.alpha = nn.Parameter(torch.ones(1)) ## Multiplied
        self.bias = nn.Parameter(torch.zeros(1)) ## Added
    
    def forward(self, x):
        mean = x.mean(dim = -1, keepdim=True)
        std = x.std(dim = -1, keepdim=True)
        x = (x - mean) / (std + self.eps)
        return self.alpha * (x - mean) / (std + self.eps) + self.bias
    
class FeedForwardBlock(nn.Module):

    def __init__(self, embed_size: int, dff: int, dropout: float) -> None:
        super().__init__()
        self.linear1 = nn.Linear(embed_size, dff)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dff, embed_size)
        self.relu = nn.ReLU()
    
    def forward (self, x):
        # (Batch, Seq_len, Embed_size) -> (Batch, Seq_len, embed_size * expansion) --> (Batch, Seq_len, Embed_size)
        return self.linear2(self.dropout(self.relu(self.linear1(x))))

class AttentionBlock(nn.Module):

    def __init__(self, embed_size: int, heads: int, dropout: float) -> None:
        super().__init__()
        self.embed_size = embed_size
        self.heads = heads

        assert embed_size % heads == 0, "Embedding size must be divisible by heads"


        self.d_k = embed_size // heads # Dimension of vector seen by each head
        self.w_q = nn.Linear(embed_size, embed_size, bias=False) # Wq
        self.w_k = nn.Linear(embed_size, embed_size, bias=False) # Wk
        self.w_v = nn.Linear(embed_size, embed_size, bias=False) # Wv
        self.w_o = nn.Linear(embed_size, embed_size, bias=False) # Wo

        self.dropout = nn.Dropout(dropout)
    
    @staticmethod
    def scaled_dot_product_attention(query, key, value, mask, dropout: nn.Dropout):
        # (Batch, Heads, sequence_length, embed_size)
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(query.shape[-1])
        # scores = torch.einsum("bhqd,bhkd->bhqk", [query, key]) / math.sqrt(query.shape[-1])
        if mask is not None: 
            scores = scores.masked_fill(mask == 0, float('-1e20'))
        if dropout is not None:
            scores = dropout(scores)

        # (batch, heads, sequence_len, d_k) --> (batch, heads, sequence_len, sequence_len)
        attention_score = torch.softmax(scores, dim=-1) # (batch, h, seq_len, seq_len)
        out = torch.matmul(attention_score, value)
        # out = torch.einsum("bhqk, bhkd -> bhqd", [attention_score, value])
        return out, attention_score
    
    def forward(self, q, k, v, mask):
        query = self.w_q(q) 
        key = self.w_k(k) 
        value = self.w_v(v) 

        # (batch, seq_len, embed_size) --> (batch, seq_len, h, d_k) --> (batch, h, seq_len, d_k)
        query = query.view(query.shape[0], query.shape[1], self.heads, self.d_k).transpose(1, 2)
        key = key.view(key.shape[0], key.shape[1], self.heads, self.d_k).transpose(1, 2)
        value = value.view(value.shape[0], value.shape[1], self.heads, self.d_k).transpose(1, 2)

        # Calculate attention
        x, self.attention_scores = self.scaled_dot_product_attention(query, key, value, mask, self.dropout)
        
        # (batch, h, seq_len, d_k) --> (batch, seq_len, h, d_k) --> (batch, seq_len, embed_size)
        x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.heads * self.d_k)

        # (batch, seq_len, embed_size) --> (batch, seq_len, embed_size)  
        return self.w_o(x)
    
class ResidualConnection(nn.Module):

    def __init__(self, features: int, dropout: float):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        self.norm = LayerNorm(features)
    
    def forward(self, x, sublayer):

        residual = x
        x = self.norm(x)
        x = sublayer(x)
        x = residual + self.dropout(x)

        # return x + self.dropout(sublayer(self.norm(x)))
        return x
    
class EncoderBlock(nn.Module):
    def __init__(self,
                 features: int,
                 attention_block: AttentionBlock,
                 FullyConnected_block: FeedForwardBlock,
                 dropout: float,
                 ):
        super().__init__()
        self.attention_block = attention_block
        self.FullyConnected_block = FullyConnected_block
        self.residual_connections = nn.ModuleList([ResidualConnection(features, dropout) for _ in range(2)])
    
    def forward(self, x, source_mask):
        x = self.residual_connections[0](x, lambda x: self.attention_block(x, x, x, source_mask))
        x = self.residual_connections[1](x, self.FullyConnected_block)
        return x
    
class Encoder(nn.Module):

    def __init__(self,
                 features: int,
                 layers: nn.ModuleList) -> None:
        super().__init__()
        self.layers = layers
        self.norm = LayerNorm(features)

    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

class DecoderBlock(nn.Module):

    def __init__(
        self,
        features: int,
        attention_block: AttentionBlock,
        cross_attention_block: AttentionBlock,
        FullyConnected_block: FeedForwardBlock,
        dropout: float,
    ) -> None:
        super().__init__()
        self.attention_block = attention_block
        self.FullyConnected_block = FullyConnected_block
        self.cross_attention_block = cross_attention_block
        self.residual_connections = nn.ModuleList([ResidualConnection(features, dropout) for _ in range(3)]
    )
    
    def forward(self, x, enc_out, source_mask, target_mask):
        x = self.residual_connections[0](x, lambda x: self.attention_block(x,x,x,target_mask))
        x = self.residual_connections[1](x, lambda x: self.cross_attention_block(x,enc_out,enc_out,source_mask))
        x = self.residual_connections[2](x, self.FullyConnected_block)
        return x
    
class Decoder(nn.Module):

    def __init__(self,
                 features: int,
                 layers: nn.ModuleList) -> None:
        super().__init__()
        self.layers = layers
        self.norm = LayerNorm(features)
    
    def forward(self, x, enc_out, source_mask, target_mask):
        for layer in self.layers:
            x = layer(x, enc_out, source_mask, target_mask)
        return self.norm(x)

class FullyConnected(nn.Module):
    def __init__(self, embed_size: int, target_vocab_size: int) -> None:
        super().__init__()
        self.fc = nn.Linear(embed_size, target_vocab_size)
    
    def forward(self, x):
        x = self.fc(x)
        x = torch.log_softmax(x, dim=-1)

        # return torch.log_softmax(self.fc(x), dim=-1)
        return x
    
class Transformer(nn.Module):
    def __init__(self, encoder, decoder, input_embedding, target_embedding,
                 positional_encoding, target_positional_encoding, fc):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.input_embedding = input_embedding
        self.target_embedding = target_embedding
        self.positional_encoding = positional_encoding
        self.target_positional_encoding = target_positional_encoding
        self.fc = fc

    def encode(self, source, source_mask):
        source = self.input_embedding(source)
        source = self.positional_encoding(source)
        return self.encoder(source, source_mask)

    def decode(self, target, enc_out, source_mask, target_mask):
        target = self.target_embedding(target)
        target = self.target_positional_encoding(target)
        return self.decoder(target, enc_out, source_mask, target_mask)

    def forward(self, source, target, source_mask, target_mask):
        enc_out = self.encode(source, source_mask)
        dec_out = self.decode(target, enc_out, source_mask, target_mask)
        return self.fc(dec_out)


def my_transformer(source_vocab_size: int, 
                   target_vocab_size: int, 
                   source_seq_len: int, 
                   target_seq_len: int,
                   embed_size: int = 512, 
                   Nx: int = 6, 
                   heads: int = 8, 
                   dff: int = 2048, 
                   dropout: float = 0.1):
    
    source_embed = InputEmbeddings(source_vocab_size, embed_size)
    target_embed = InputEmbeddings(target_vocab_size, embed_size)
    source_pe = PositionalEncoding(embed_size, source_seq_len, dropout)
    target_pe = PositionalEncoding(embed_size, target_seq_len, dropout)

    encoder_blocks = nn.ModuleList([
        EncoderBlock(embed_size,
                     AttentionBlock(embed_size, heads, dropout),
                     FeedForwardBlock(embed_size, dff, dropout),
                     dropout)
        for _ in range(Nx)
    ])

    decoder_blocks = nn.ModuleList([
        DecoderBlock(embed_size,
                     AttentionBlock(embed_size, heads, dropout),
                     AttentionBlock(embed_size, heads, dropout),
                     FeedForwardBlock(embed_size, dff, dropout),
                     dropout)
        for _ in range(Nx)
    ])

    encoder = Encoder(embed_size, encoder_blocks)
    decoder = Decoder(embed_size, decoder_blocks)
    fc = FullyConnected(embed_size, target_vocab_size)

    model = Transformer(encoder, decoder, source_embed, target_embed, source_pe, target_pe, fc)

    for p in model.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)

    return model

## Testing on a Dummy Data

In [100]:
# ==== TEST ====
# Dummy test
batch_size = 3
src_vocab_size = 1000
tgt_vocab_size = 1000
src_seq_len = 8
tgt_seq_len = 7

model = my_transformer(src_vocab_size, tgt_vocab_size, src_seq_len, tgt_seq_len)

source = torch.randint(0, src_vocab_size, (batch_size, src_seq_len))
target = torch.randint(0, tgt_vocab_size, (batch_size, tgt_seq_len))
source_mask = target_mask = None

output = model(source, target, source_mask, target_mask)
# print("Output shape:", output.shape)
print(output)  # Expected: (batch_size, tgt_seq_len, tgt_vocab_size)

tensor([[[-6.9502, -6.8750, -6.8972,  ..., -6.9416, -6.9061, -6.9215],
         [-6.9502, -6.8750, -6.8973,  ..., -6.9415, -6.9060, -6.9215],
         [-6.9502, -6.8750, -6.8973,  ..., -6.9415, -6.9060, -6.9215],
         ...,
         [-6.9503, -6.8750, -6.8972,  ..., -6.9416, -6.9062, -6.9216],
         [-6.9503, -6.8751, -6.8971,  ..., -6.9417, -6.9063, -6.9216],
         [-6.9503, -6.8751, -6.8970,  ..., -6.9417, -6.9063, -6.9216]]],
       grad_fn=<LogSoftmaxBackward0>)


# Naive Transformer 2

In [22]:
import torch
import torch.nn as nn


class AttentionBlock(nn.Module):
    def __init__(self, embed_size, heads):
        super(AttentionBlock, self).__init__()
        self.embed_size = embed_size
        self.heads = heads
        self.d_k = embed_size // heads

        assert self.embed_size % self.heads == 0

        self.values = nn.Linear(embed_size, embed_size)
        self.keys = nn.Linear(embed_size, embed_size)
        self.queries = nn.Linear(embed_size, embed_size)
        self.fc_out = nn.Linear(embed_size, embed_size)

    def forward(self, values, keys, query, mask):
        # Get number of training examples
        N = query.shape[0]

        value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]

        values = self.values(values)  # (N, value_len, embed_size)
        keys = self.keys(keys)  # (N, key_len, embed_size)
        queries = self.queries(query)  # (N, query_len, embed_size)

        values = values.reshape(N, value_len, self.heads, self.d_k)
        keys = keys.reshape(N, key_len, self.heads, self.d_k)
        queries = queries.reshape(N, query_len, self.heads, self.d_k)

        energy = torch.einsum("bqhd,bkhd->bhqk", [queries, keys])
        if mask is not None:
            energy = energy.masked_fill(mask == 0, float("-1e20"))
        attention = torch.softmax(energy / (self.embed_size ** (1 / 2)), dim=3)
        # attention shape: (N, heads, query_len, key_len)

        out = torch.einsum("bhql,blhd->bqhd", [attention, values]).reshape(
            N, query_len, self.heads * self.d_k
        )
        out = self.fc_out(out)
        return out


class TransformerBlock(nn.Module):
    def __init__(self, embed_size, heads, dropout, expansion):
        super(TransformerBlock, self).__init__()
        self.attention = AttentionBlock(embed_size, heads)
        self.norm1 = nn.LayerNorm(embed_size)
        self.norm2 = nn.LayerNorm(embed_size)

        self.FullyConnected = nn.Sequential(
            nn.Linear(embed_size, expansion * embed_size),
            nn.ReLU(),
            nn.Linear(expansion * embed_size, embed_size),
        )

        self.dropout = nn.Dropout(dropout)

    def forward(self, value, key, query, mask):
        attention = self.attention(value, key, query, mask)

        # Add skip connection, run through normalization and finally dropout
        x = self.dropout(self.norm1(attention + query))
        forward = self.FullyConnected(x)
        out = self.dropout(self.norm2(forward + x))
        return out


class Encoder(nn.Module):
    def __init__(
        self,
        source_vocab_size,
        embed_size,
        num_layers,
        heads,
        device,
        expansion,
        dropout,
        max_length,
    ):

        super(Encoder, self).__init__()
        self.embed_size = embed_size
        self.device = device
        self.word_embedding = nn.Embedding(source_vocab_size, embed_size)
        self.position_embedding = nn.Embedding(max_length, embed_size)

        self.layers = nn.ModuleList(
            [
                TransformerBlock(
                    embed_size,
                    heads,
                    dropout=dropout,
                    expansion=expansion,
                )
                for _ in range(num_layers)
            ]
        )

        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        N, seq_length = x.shape
        positions = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)
        out = self.dropout(
            (self.word_embedding(x) + self.position_embedding(positions))
        )

        # In the Encoder the query, key, value are all the same, it's in the
        # decoder this will change. This might look a bit odd in this case.
        for layer in self.layers:
            out = layer(out, out, out, mask)

        return out


class DecoderBlock(nn.Module):
    def __init__(self, embed_size, heads, expansion, dropout, device):
        super(DecoderBlock, self).__init__()
        self.norm = nn.LayerNorm(embed_size)
        self.attention = AttentionBlock(embed_size, heads=heads)
        self.transformer_block = TransformerBlock(
            embed_size, heads, dropout, expansion
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, value, key, source_mask, target_mask):
        attention = self.attention(x, x, x, target_mask)
        query = self.dropout(self.norm(attention + x))
        out = self.transformer_block(value, key, query, source_mask)
        return out


class Decoder(nn.Module):
    def __init__(
        self,
        target_vocab_size,
        embed_size,
        num_layers,
        heads,
        expansion,
        dropout,
        device,
        max_length,
    ):
        super(Decoder, self).__init__()
        self.device = device
        self.word_embedding = nn.Embedding(target_vocab_size, embed_size)
        self.position_embedding = nn.Embedding(max_length, embed_size)

        self.layers = nn.ModuleList(
            [
                DecoderBlock(embed_size, heads, expansion, dropout, device)
                for _ in range(num_layers)
            ]
        )
        self.fc_out = nn.Linear(embed_size, target_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_out, source_mask, target_mask):
        N, seq_length = x.shape
        positions = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)
        x = self.dropout((self.word_embedding(x) + self.position_embedding(positions)))

        for layer in self.layers:
            x = layer(x, enc_out, enc_out, source_mask, target_mask)

        x = self.fc_out(x)

        return x


class Transformer(nn.Module):
    def __init__(
        self,
        source_vocab_size,
        target_vocab_size,
        src_pad_idx,
        trg_pad_idx,
        embed_size=512,
        num_layers=6,
        expansion=4,
        heads=8,
        dropout=0,
        device="cpu",
        max_length=100,
    ):

        super(Transformer, self).__init__()

        self.encoder = Encoder(
            source_vocab_size,
            embed_size,
            num_layers,
            heads,
            device,
            expansion,
            dropout,
            max_length,
        )

        self.decoder = Decoder(
            target_vocab_size,
            embed_size,
            num_layers,
            heads,
            expansion,
            dropout,
            device,
            max_length,
        )

        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.device = device

    def make_source_mask(self, src):
        source_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
        # (N, 1, 1, src_len)
        return source_mask.to(self.device)

    def make_target_mask(self, trg):
        N, trg_len = trg.shape
        target_mask = torch.tril(torch.ones((trg_len, trg_len))).expand(
            N, 1, trg_len, trg_len
        )

        return target_mask.to(self.device)

    def forward(self, source, target):
        source_mask = self.make_source_mask(source)
        target_mask = self.make_target_mask(target)
        enc_src = self.encoder(source, source_mask)
        out = self.decoder(target, enc_src, source_mask, target_mask)
        return out

## Testing on Dummy Data

In [23]:
# Hyperparameters
batch_size = 2
source_vocab_size = 10  # Source vocabulary size (e.g., English)
trg_vocab_size = 10  # Target vocabulary size (e.g., French)
max_length = 5        # Maximum sequence length
embed_size = 256      # Embedding dimension
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = "cpu"

# Dummy source and target data (indices representing tokens)
src_data = torch.randint(0,source_vocab_size, (batch_size, max_length)).to(device)
trg_data = torch.randint(0, trg_vocab_size, (batch_size, max_length)).to(device)

# Pad indices (for masking)
src_pad_idx = 0
trg_pad_idx = 0

# Initialize the Transformer model
model = Transformer(
    source_vocab_size=source_vocab_size,
    target_vocab_size=trg_vocab_size,
    src_pad_idx=src_pad_idx,
    trg_pad_idx=trg_pad_idx,
    embed_size=embed_size,
    device=device,
).to(device)

# Forward pass
output = model(src_data, trg_data[:, :-1])  # Teacher forcing: exclude last token for trg
# print("Output shape:", output.shape) 
print(f"{output}") # Should be [batch_size, trg_seq_len - 1, trg_vocab_size]

tensor([[[-1.0755, -0.0155,  0.4992,  0.5091, -0.2652,  0.3990,  0.4569,
           0.3919, -0.2867,  0.4454],
         [-0.2295,  0.2734,  1.0342,  1.2918, -0.6223,  0.4974, -0.3705,
          -0.3416, -0.4456, -0.3181],
         [-0.0674,  0.5256,  1.0206,  0.4504, -0.6455, -0.3648,  0.6779,
          -0.1407,  0.1877,  0.4822],
         [-0.7602, -0.0344,  0.3387,  0.4943, -0.0548, -0.3248, -0.4568,
          -0.0907, -0.2651,  0.1696]],

        [[-0.4865, -0.1522,  0.4949,  0.7071, -0.7135,  0.4941, -0.1803,
           0.2173, -0.1299,  0.7150],
         [-0.3801, -0.1549,  1.2053,  1.2520, -0.3428,  0.4923, -0.3389,
          -0.2826,  0.0490,  0.2139],
         [ 0.0773, -0.4217,  0.5831,  0.6919, -0.1064,  0.2588, -0.5266,
           0.1088,  1.0204,  0.6728],
         [-1.0178, -0.2780,  0.9299,  0.9036, -0.2468,  0.1658, -0.3872,
           0.7223,  0.8983,  1.0028]]], grad_fn=<ViewBackward0>)


# PyTorch's nn.Transformer() module

* source: https://towardsdatascience.com/a-detailed-guide-to-pytorchs-nn-transformer-module-c80afbc9ffb1/

--> A detailed guide to Pytorch’s ``nn.Transformer()`` module.

## Creating the Transformer Block
using ```nn.Transformer()``` module

In [101]:
class Transformer(nn.Module):
    def __init__(
            self,
            num_tokens: int,
            dim_model: int,
            num_heads: int,
            num_enc_layers: int,
            num_dec_layers: int,
            dropout: float
    ):
        super().__init__()

        self.model_type = "Transformer"
        self.dim_model = dim_model


        # Layers
        self.position_encoder = PositionalEncoding(
            dim_model=dim_model,
            dropout=dropout,
            max_seq_len=5000,
        )

        self.embedding = nn.Embedding(num_tokens, dim_model)

        self.transformer = nn.Transformer(
            d_model=dim_model,
            nhead=num_heads,
            num_encoder_layers=num_enc_layers,
            num_decoder_layers=num_dec_layers,
            dropout=dropout,
            batch_first=True
        )
        self.fc_out = nn.Sequential(
            nn.Linear(dim_model, 150),
            nn.ReLU(),
            nn.Dropout(dropout), 
            nn.Linear(150, num_tokens)
        )
    
    def forward(
            self,
            source: torch.Tensor,
            target: torch.Tensor,
                ):
        # source size must be (batch_size, src sequence length)
        # target size must be (batch_size, tgt sequence length)
        
        # Embedding + positional encoding - Out size = (batch_size, sequence length, dim_model)
        source = self.embedding(source) * math.sqrt(self.dim_model)
        target = self.embedding(target) * math.sqrt(self.dim_model)
        source = self.position_encoder(source)
        target = self.position_encoder(target)

        # Permute to obtain size (sequence length, batch_size, dim_model)

        source = source.permute(1,0,2)
        target = target.permute(1,0,2)

        # Transformer blocks
        transformer_out = self.transformer(source,target)
        out = self.fc_out(transformer_out)

        return out
    
    def get_target_mask(self,
                        size: int) -> torch.Tensor:
        mask = torch.tril(torch.ones(size, size) == 1).float() ## Lower Triangular Matrix
        ## Fill the upper triangular part of the matrix with a very very small number
        mask = mask.masked_fill(mask == 0, float(-1e20))
        mask = mask.masked_fill(mask == 1, float(0.0))

         # Fill the upper triangular part of the matrix with a very very small number
        # mask = torch.where(mask == 0, float(-1e20), float(0.0))
        return mask
    
    def create_pad_mask(self,
                        matrix: torch.tensor,
                        pad_idx: int) -> torch.tensor:
        return (matrix == pad_idx)

## Creating the Positional Encoding Part
class PositionalEncoding(nn.Module):
    def __init__(
            self,
            dim_model:int,
            dropout:float,
            max_seq_len: int
    ):
        super().__init__()
        self.dropout = nn.Dropout(dropout)

        #Encoding
        pe = torch.zeros(max_seq_len, dim_model)
        positions = torch.arange(0, max_seq_len, dtype=torch.float).view(-1,1)
        division_term = torch.exp(torch.arange(0, dim_model, 2)).float() * (-math.log(10000.0)/ dim_model)

        ## PE(position, 2i) = sin(pos/10000^(2i/d_model))
        pe[:, 0::2] = torch.sin(positions * division_term)

        ## PE(position, 2i+1) = cos(pos/10000^(2i/d_model))
        pe[:, 1::2] = torch.cos(positions * division_term)

        ## Saving the buffer 
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)
    
    def forward(self, token_embedding: torch.tensor) -> torch.tensor:
        """
        Residual Connection + Positional Encoding
        """
        return self.dropout(token_embedding + self.pe[:token_embedding.size(0)])


## Testing on Dummy Data

In [92]:
def generate(n):
    SOS_token = np.array([2])
    EOS_token = np.array([3])
    length = 8

    data = []

    for i in range(n // 3):
        X = np.concatenate((SOS_token, np.ones(length), EOS_token))
        Y = np.concatenate((SOS_token, np.ones(length), EOS_token))
        data.append((X, Y))
    
    # 0,0,0,0 -> 0,0,0,0
    for i in range(n // 3):
        X = np.concatenate((SOS_token, np.zeros(length), EOS_token))
        y = np.concatenate((SOS_token, np.zeros(length), EOS_token))
        data.append([X, y])
    return data

dummy_data = generate(2)
model = Transformer(
    num_tokens=4,
    dim_model=8,
    num_heads=2,
    num_enc_layers=2,
    num_dec_layers=2,
    dropout=0.5
)

In [102]:
# Parameters

num_tokens = 4  # Vocabulary size
batch_size = 8
seq_length = 10

# Create dummy input (source and target) - shape: (batch_size, sequence_length)
dummy_source = torch.randint(0, num_tokens, (batch_size, seq_length))
dummy_target = torch.randint(0, num_tokens, (batch_size, seq_length))

# Initialize the model
model = Transformer(
    num_tokens=num_tokens,
    dim_model=8,
    num_heads=2,
    num_enc_layers=2,
    num_dec_layers=2,
    dropout=0.5
)

# Forward pass
output = model(dummy_source, dummy_target)

print(f"Shape of the Output: {output.shape}")
print(f"Output: {output}")  


Shape of the Output: torch.Size([10, 8, 4])
Output: tensor([[[-0.0448,  0.6161,  0.0691, -0.0241],
         [ 0.1419,  0.3160,  0.3498,  0.1918],
         [ 0.4274,  0.5590, -0.1912, -0.2750],
         [ 0.1379,  0.7903,  0.5134,  0.3421],
         [ 0.0313,  0.4333,  0.4510,  0.1492],
         [-0.2876,  0.4828,  0.6478, -0.0035],
         [-0.4664,  0.1692,  0.0307,  0.0150],
         [ 0.0728,  0.1941,  0.2552,  0.0425]],

        [[-0.2108,  0.3502,  0.7501,  0.0520],
         [ 0.4513,  0.2603,  0.1949,  0.0433],
         [ 0.0080,  0.7172,  0.1981, -0.0311],
         [ 0.0503, -0.2382,  0.0246,  0.8067],
         [ 0.3114,  0.4575,  0.4704,  0.2556],
         [ 0.2219,  0.6273,  0.3405,  0.0311],
         [-0.1520,  0.3615,  0.2010,  0.2741],
         [-0.0747,  0.5056, -0.1597,  0.3879]],

        [[-0.5676,  0.0281,  0.2509,  0.0857],
         [ 0.2246, -0.0168,  0.1263,  0.1199],
         [-0.0148,  0.0115,  0.3053,  0.1922],
         [-0.2933,  0.3567, -0.3614, -0.1408],
    