In [1]:
import torch
import torch.nn as nn
import math
import os
from tokenizers import Tokenizer

In [2]:
# Dirs
cwd= os.getcwd()
data_dir= os.path.join(cwd, "../data/")
artifacts_dir= os.path.join(cwd, "../artifacts/")
en_path= data_dir + 'UNPC.ar-en.en'
ar_path= data_dir + 'UNPC.ar-en.ar'
en_path= os.path.abspath(en_path)
ar_path= os.path.abspath(ar_path)

In [3]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length= 512, dropout= 0.1):
        super().__init__()
        self.dropout= nn.Dropout(p= dropout)

        # Create positional encoding matrix
        position= torch.arange(max_seq_length).unsqueeze(1) # (max_seq_length, 1)
        div_term= torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        
        pe= torch.zeros(max_seq_length, d_model)
        pe[:, 0::2]= torch.sin(position * div_term) # Even positions: sin
        pe[:, 1::2]= torch.cos(position * div_term) # Odd positions: cos

        # Register as buffer (non-learnable param)
        self.register_buffer('pe', pe.unsqueeze(0))
    def forward(self, x):
        # Add positional encoding to input
        x= x + self.pe[:, :x.size(1)]
        return self.dropout(x)

class TransformerEmbeddings(nn.Module):
    def __init__(self, vocab_size, d_model, max_seq_length= 512, dropout= 0.1, padding_idx= 1):
        super().__init__()
        self.embedding= nn.Embedding(vocab_size, d_model, padding_idx= padding_idx)
        self.positional_encoding= PositionalEncoding(d_model, max_seq_length, dropout)
        self.d_model= d_model
    
    def forward(self, x):
        # Scale embeddings by sqrt(d_model) as mentioned in the paper
        embeddings= self.embedding(x) * math.sqrt(self.d_model)
        return self.positional_encoding(embeddings)

In [5]:
# Loading tokenizers and getting vocab size 
ar_tokenizer= Tokenizer.from_file(artifacts_dir + 'bpe_tokenizer_ar.json')
en_tokenizer= Tokenizer.from_file(artifacts_dir + 'bpe_tokenizer_en.json')
ar_vocab_size= ar_tokenizer.get_vocab_size()
en_vocab_size= en_tokenizer.get_vocab_size()
d_model= 512

# Create separate embedding layers
src_embeddings = TransformerEmbeddings(en_vocab_size, d_model, padding_idx=1)  # Assuming pad_id=1
tgt_embeddings = TransformerEmbeddings(ar_vocab_size, d_model, padding_idx=1)

# Test with sample input
batch_size, seq_len = 32, 512
src_tokens = torch.randint(0, en_vocab_size, (batch_size, seq_len))
tgt_tokens = torch.randint(0, ar_vocab_size, (batch_size, seq_len))

src_embedded = src_embeddings(src_tokens)
tgt_embedded = tgt_embeddings(tgt_tokens)

print(f"Source embedded shape: {src_embedded.shape}")  # (32, 512, 512)
print(f"Target embedded shape: {tgt_embedded.shape}")  # (32, 512, 512)


Source embedded shape: torch.Size([32, 512, 512])
Target embedded shape: torch.Size([32, 512, 512])


In [6]:
# Test 1: Verify padding tokens are handled correctly
pad_id = 1
test_tokens = torch.tensor([[pad_id, 2, 3], [4, pad_id, pad_id]])
embedded = src_embeddings(test_tokens)
print("Embedding test passed - padding tokens handled")

# Test 2: Verify positional encoding adds different values for different positions
test_same_token = torch.tensor([[10, 10]])  # Same token at positions 0 and 1
embedded_same = src_embeddings(test_same_token)
assert not torch.allclose(embedded_same[0, 0], embedded_same[0, 1], atol=1e-6)
print("Positional encoding test passed - different positions have different encodings")

# Test 3: Check gradient flow
src_embedded.sum().backward()
print("Gradient test passed - gradients are flowing through embeddings")

Embedding test passed - padding tokens handled
Positional encoding test passed - different positions have different encodings
Gradient test passed - gradients are flowing through embeddings


In [7]:
def scaled_dot_product_attention(query, key, value, mask= None, dropout= None):
    """
    Compute scaled dot-product attention.
    
    Args:
        query: Query tensor of shape (batch_size, num_heads, seq_len, d_k)
        key: Key tensor of shape (batch_size, num_heads, seq_len, d_k)
        value: Value tensor of shape (batch_size, num_heads, seq_len, d_k)
        mask: Mask tensor broadcastable to (batch_size, num_heads, seq_len, seq_len)
        dropout: Dropout layer
    
    Returns:
        attn_output: Attention output tensor of shape (batch_size, num_heads, seq_len, d_k)
        attn_weights: Attention weights tensor of shape (batch_size, num_heads, seq_len, seq_len)
    """
    d_k= query.size(-1)
    scores= torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
    if mask is not None:
        scores= scores.masked_fill(mask == 0, -1e9)
    attn_weights= scores.softmax(dim= -1)

    if dropout is not None:
        attn_weights= dropout(attn_weights)
    attn_output= torch.matmul(attn_weights, value)
    return attn_output, attn_weights

In [8]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout= 0.1):
        super().__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        self.d_model= d_model
        self.num_heads= num_heads
        self.d_k= d_model // num_heads

        # Linear projections for query, key and value
        self.w_q= nn.Linear(d_model, d_model)
        self.w_k= nn.Linear(d_model, d_model)
        self.w_v= nn.Linear(d_model, d_model)
        self.w_o= nn.Linear(d_model, d_model)

        self.dropout= nn.Dropout(dropout)

    def forward(self, query, key, value, mask= None):
        """
        Args:
            query: Tensor of shape (batch_size, seq_len, d_model)
            key: Tensor of shape (batch_size, seq_len, d_model)
            value: Tensor of shape (batch_size, seq_len, d_model)
            mask: Mask tensor broadcastable to (batch_size, seq_len, seq_len) or (batch_size, num_heads, seq_len, seq_len)
        
        Returns:
            output: Tensor of shape (batch_size, seq_len, d_model)
        """

        batch_size= query.size(0)

        # Linear projections and split into heads
        Q= self.w_q(query).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2)
        K= self.w_k(key).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2)
        V= self.w_v(value).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2)

        # Apply attention
        attn_output, attn_weights= scaled_dot_product_attention(Q, K, V, mask, self.dropout)

        # Concat heads and apply final linear
        attn_output= attn_output.transpose(1,2).contiguous().view(batch_size, -1, self.d_model)
        output= self.w_o(attn_output)
        return output


In [9]:
def generate_causal_mask(seq_len):
    """
    Generate a causal mask for decoder self-attention.
    
    Args:
        seq_len: Sequence length
    
    Returns:
        mask: Tensor of shape (1, 1, seq_len, seq_len) with zeros for masked positions
    """
    mask= torch.triu(torch.ones(seq_len, seq_len), diagonal= 1).bool()
    return mask.unsqueeze(0).unsqueeze(0) # (1, 1, seq_len, seq_len)
def generate_padding_mask(seq, pad_id= 1):
    """
    Generate a padding mask for sequences.
    
    Args:
        seq: Tensor of shape (batch_size, seq_len) containing token indices
        pad_id: Padding token ID
    
    Returns:
        mask: Tensor of shape (batch_size, 1, 1, seq_len) with zeros for padded positions
    """
    mask= (seq != pad_id).unsqueeze(1).unsqueeze(2) # (batch_size, 1, 1, seq_len)
    return mask

In [15]:
# Test parameters
d_model = 512
num_heads = 8
batch_size = 32
seq_len = 512

# Create attention module
attention = MultiHeadAttention(d_model, num_heads)

# Create sample input tensors
query = torch.randn(batch_size, seq_len, d_model)
key = torch.randn(batch_size, seq_len, d_model)
value = torch.randn(batch_size, seq_len, d_model)

# Test without mask
output = attention(query, key, value)
print(f"Output shape without mask: {output.shape}")  # Should be (2, 10, 512)

# Test with causal mask
causal_mask = generate_causal_mask(seq_len)
output_masked = attention(query, key, value, mask=causal_mask)
print(f"Output shape with causal mask: {output_masked.shape}")  # Should be (2, 10, 512)

# Test with padding mask (simulate padded sequence)
pad_id = 1
dummy_tokens = torch.randint(0, 100, (batch_size, seq_len))
dummy_tokens[0, 5:] = pad_id  # Pad the second half of first sequence
padding_mask = generate_padding_mask(dummy_tokens, pad_id)
output_padded = attention(query, key, value, mask=padding_mask)
print(f"Output shape with padding mask: {output_padded.shape}")  # Should be (2, 10, 512)

Output shape without mask: torch.Size([32, 512, 512])
Output shape with causal mask: torch.Size([32, 512, 512])
Output shape with padding mask: torch.Size([32, 512, 512])


In [16]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout= 0.1):
        super().__init__()
        self.self_attn= MultiHeadAttention(d_model, num_heads, dropout)
        self.feed_forward= nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(d_ff, d_model)
        )
        self.norm1= nn.LayerNorm(d_model)
        self.norm2= nn.LayerNorm(d_model)
        self.dropout1= nn.Dropout(dropout)
        self.dropout2= nn.Dropout(dropout)

    def forward(self, x, mask= None):
        # Self-attention with residual connection
        attn_output= self.self_attn(x, x, x, mask)
        x= x + self.dropout1(attn_output)
        x= self.norm1(x)

        # Feed-forward with residual connection
        ff_output= self.feed_forward(x)
        x= x + self.dropout2(ff_output)
        x= self.norm2(x)
        return x

In [17]:
# Test the encoder layer
encoder_layer = EncoderLayer(d_model=512, num_heads=8, d_ff=2048)

# Test with sample input (using your previous tensors)
encoder_output = encoder_layer(query)  # Using query as input for self-attention
print(f"Encoder layer output shape: {encoder_output.shape}")  # Should be (2, 10, 512)

# Test with mask
encoder_output_masked = encoder_layer(query, mask=causal_mask)
print(f"Encoder layer masked output shape: {encoder_output_masked.shape}")  # Should be (2, 10, 512)

Encoder layer output shape: torch.Size([32, 512, 512])
Encoder layer masked output shape: torch.Size([32, 512, 512])


In [18]:
class TransformerEncoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff, dropout= 0.1):
        super().__init__()
        self.layers= nn.ModuleList([
            EncoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
    def forward(self, x, mask= None):
        """
        Args:
            x: Tensor of shape (batch_size, seq_len, d_model) - embedded input
            mask: Optional mask for padding
        
        Returns:
            Tensor of same shape as input
        """
        for layer in self.layers:
            x= layer(x, mask)
            return x

In [None]:
# Test the full encoder
num_encoder_layers = 6  # Common choice
encoder = TransformerEncoder(
    num_layers=num_encoder_layers,
    d_model=512,
    num_heads=8, 
    d_ff=2048  # Typically 4 * d_model
)

# Test with your embedded input from earlier
encoder_output = encoder(src_embedded)  # Using your src_embedded from embedding tests
print(f"Full encoder output shape: {encoder_output.shape}")  # Should be (32, 512, 512)

# Test with padding mask (using your previously created mask)
encoder_output_masked = encoder(src_embedded, mask=padding_mask)
print(f"Full encoder masked output shape: {encoder_output_masked.shape}")  # Should be (32, 512, 512)

Full encoder output shape: torch.Size([32, 512, 512])
Full encoder masked output shape: torch.Size([32, 512, 512])


In [22]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout= 0.1):
        super().__init__()
        # Masked self-attention (for decoder's own sequence)
        self.self_attn= MultiHeadAttention(d_model, num_heads, dropout)
        # Cross-attention (encoder outputs -> decoder)
        self.cross_attn= MultiHeadAttention(d_model, num_heads, dropout)
        # Feed-forward network
        self.feed_forward= nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(d_ff, d_model)
        )
        # Normalization layers
        self.norm1= nn.LayerNorm(d_model)
        self.norm2= nn.LayerNorm(d_model)
        self.norm3= nn.LayerNorm(d_model)

        # Dropout layers
        self.dropout1= nn.Dropout(dropout)
        self.dropout2= nn.Dropout(dropout)
        self.dropout3= nn.Dropout(dropout)
    
    def forward(self, x, encoder_output, src_mask= None, tgt_mask= None):
        """
        Args:
            x: Decoder input (target sequence) of shape (batch_size, tgt_seq_len, d_model)
            encoder_output: Encoder output of shape (batch_size, src_seq_len, d_model)
            src_mask: Source mask (for padding) of shape (batch_size, 1, 1, src_seq_len)
            tgt_mask: Target mask (causal mask) of shape (batch_size, 1, tgt_seq_len, tgt_seq_len)
        """
        # 1. Masked self-attention with residual connection
        self_attn_output= self.self_attn(x, x, x, mask= tgt_mask)
        x= x + self.dropout1(self_attn_output)
        x= self.norm1(x)
        # 2. Cross-attention with residual connection
        # Query: decoder state, Key/Value: encoder outputs
        cross_attn_output= self.cross_attn(x, encoder_output, encoder_output, mask= src_mask)
        x= x + self.dropout2(cross_attn_output)
        x= self.norm2(x)

        # FFN with res conn
        ff_output= self.feed_forward(x)
        x= x + self.dropout3(ff_output)
        x= self.norm3(x)
        return x

In [26]:
def test_decoder_layer():
    # Test parameters
    d_model = 512
    num_heads = 8
    d_ff = 2048
    batch_size = 32
    src_seq_len = 512  # Source sequence length
    tgt_seq_len = 510   # Target sequence length
    
    # Create decoder layer
    decoder_layer = DecoderLayer(d_model, num_heads, d_ff)
    
    # Create test tensors
    decoder_input = torch.randn(batch_size, tgt_seq_len, d_model)  # Target embeddings
    encoder_output = torch.randn(batch_size, src_seq_len, d_model)  # Encoder output
    
    # Create masks
    causal_mask = generate_causal_mask(tgt_seq_len)  # For decoder self-attention
    src_mask = torch.ones(batch_size, 1, 1, src_seq_len).bool()  # Example source mask
    
    # Test forward pass
    output = decoder_layer(decoder_input, encoder_output, src_mask, causal_mask)
    print(f"Decoder layer output shape: {output.shape}")  # Should be (32, 510, 512)
    
    return output

decoder_output = test_decoder_layer()

Decoder layer output shape: torch.Size([32, 510, 512])


In [27]:
class TransformerDecoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        
    def forward(self, x, encoder_output, src_mask=None, tgt_mask=None):
        """
        Args:
            x: Decoder input (target sequence) of shape (batch_size, tgt_seq_len, d_model)
            encoder_output: Encoder output of shape (batch_size, src_seq_len, d_model)
            src_mask: Source mask for padding of shape (batch_size, 1, 1, src_seq_len)
            tgt_mask: Target causal mask of shape (batch_size, 1, tgt_seq_len, tgt_seq_len)
        """
        for layer in self.layers:
            x = layer(x, encoder_output, src_mask, tgt_mask)
        return x

In [None]:
def test_full_decoder():
    # Test parameters
    num_decoder_layers = 6
    d_model = 512
    num_heads = 8
    d_ff = 2048
    batch_size = 32
    src_seq_len = 512
    tgt_seq_len = 510
    
    # Create decoder
    decoder = TransformerDecoder(num_decoder_layers, d_model, num_heads, d_ff)
    
    # Create test tensors
    decoder_input = torch.randn(batch_size, tgt_seq_len, d_model)
    encoder_output = torch.randn(batch_size, src_seq_len, d_model)
    
    # Create masks
    causal_mask = generate_causal_mask(tgt_seq_len)
    src_mask = torch.ones(batch_size, 1, 1, src_seq_len).bool()
    
    # Test forward pass
    decoder_output = decoder(decoder_input, encoder_output, src_mask, causal_mask)
    print(f"Full decoder output shape: {decoder_output.shape}")  # Should be (32, 510, 512)
    
    return decoder_output

decoder_output = test_full_decoder()