In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import math

In [None]:
# --- Step 1: Define Key Modules ---

class PositionalEncoding(nn.Module):
    """
    This module adds positional information to the input embeddings.
    Since Transformers process sequences in parallel, they have no inherent
    sense of word order. Positional encoding provides this information.
    """

In [None]:
def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        
        # Create a matrix of shape (max_len, d_model)
        pe = torch.zeros(max_len, d_model)
        # Create a tensor of shape (max_len, 1) representing positions
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)

        # Calculate the denominator for the sine and cosine functions
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        # Apply sine to even indices and cosine to odd indices
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        # Add a batch dimension to the positional encoding matrix
        pe = pe.unsqueeze(0).transpose(0, 1)

        # Register the positional encoding as a buffer, so it's not a parameter
        self.register_buffer('pe', pe)

In [None]:
def forward(self, x):
        """
        x: input tensor of shape (seq_len, batch_size, d_model)
        """
        # Add the positional encoding to the input embeddings
        x = x + self.pe[:x.size(0), :]
        return x

In [None]:
class MultiHeadAttention(nn.Module):
    """
    The core mechanism of the Transformer. It allows the model to weigh
    the importance of different words in a sequence when encoding a single word.
    """
    def __init__(self, d_model, n_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % n_heads == 0

        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads # Dimension of each head
        
        # Linear layers for Query, Key, Value
        self.query_linear = nn.Linear(d_model, d_model)
        self.key_linear = nn.Linear(d_model, d_model)
        self.value_linear = nn.Linear(d_model, d_model)

        # Final linear layer to combine head outputs
        self.output_linear = nn.Linear(d_model, d_model)
        
        self.dropout = nn.Dropout(p=0.1)

    def forward(self, query, key, value, mask=None):
        """
        query, key, value: tensors of shape (seq_len, batch_size, d_model)
        mask: optional mask tensor for preventing attention to certain positions (e.g., padding)
        """
        batch_size = query.size(1)

        # 1. Linear projections
        query = self.query_linear(query)
        key = self.key_linear(key)
        value = self.value_linear(value)

        # 2. Reshape and permute for multi-head attention
        query = query.view(-1, batch_size, self.n_heads, self.d_k).permute(1, 2, 0, 3)
        key = key.view(-1, batch_size, self.n_heads, self.d_k).permute(1, 2, 0, 3)
        value = value.view(-1, batch_size, self.n_heads, self.d_k).permute(1, 2, 0, 3)
        
        # 3. Scaled Dot-Product Attention
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.d_k)
        
        if mask is not None:
            # Apply the mask. `scores` will have its masked positions set to -infinity
            scores = scores.masked_fill(mask == 0, float('-inf'))
        
        attention_weights = torch.softmax(scores, dim=-1)
        attention_weights = self.dropout(attention_weights)
        
        # 4. Multiply with Value and concatenate heads
        attended_values = torch.matmul(attention_weights, value)
        attended_values = attended_values.permute(2, 0, 1, 3).contiguous()
        attended_values = attended_values.view(-1, batch_size, self.d_model)
        
         # 5. Final linear layer
        output = self.output_linear(attended_values)
        return output

In [None]:
class FeedForward(nn.Module):
    """
    A simple two-layer feed-forward network with a ReLU activation.
    It's applied to each position in the sequence independently.
    """
    def __init__(self, d_model, d_ff):
        super(FeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(p=0.1)
    def forward(self, x):
        x = torch.relu(self.linear1(x))
        x = self.dropout(x)
        x = self.linear2(x)
        return x

In [None]:
class EncoderLayer(nn.Module):
    """
    A single layer of the Transformer's Encoder.
    It contains a Multi-Head Attention sub-layer and a Feed-Forward sub-layer.
    """
    def __init__(self, d_model, n_heads, d_ff):
        super(EncoderLayer, self).__init__()
        self.multi_head_attention = MultiHeadAttention(d_model, n_heads)
        self.feed_forward = FeedForward(d_model, d_ff)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(p=0.1)
        self.dropout2 = nn.Dropout(p=0.1)

    def forward(self, src, src_mask):
        # Multi-head attention with a residual connection and layer normalization
        attention_output = self.multi_head_attention(src, src, src, src_mask)
        src = src + self.dropout1(attention_output)
        src = self.norm1(src)

        # Feed-forward network with a residual connection and layer normalization
        ff_output = self.feed_forward(src)
        src = src + self.dropout2(ff_output)
        src = self.norm2(src)
        return src

In [None]:
class DecoderLayer(nn.Module):
    """
    A single layer of the Transformer's Decoder.
    It contains a masked Multi-Head Attention, a second Multi-Head Attention,
    and a Feed-Forward sub-layer.
    """
    def __init__(self, d_model, n_heads, d_ff):
        super(DecoderLayer, self).__init__()
        self.masked_multi_head_attention = MultiHeadAttention(d_model, n_heads)
        self.multi_head_attention = MultiHeadAttention(d_model, n_heads)
        self.feed_forward = FeedForward(d_model, d_ff)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)

        self.dropout1 = nn.Dropout(p=0.1)
        self.dropout2 = nn.Dropout(p=0.1)
        self.dropout3 = nn.Dropout(p=0.1)

    def forward(self, trg, enc_src, trg_mask, src_mask):
        # 1. Masked self-attention (to prevent looking at future tokens)
        attention1_output = self.masked_multi_head_attention(trg, trg, trg, trg_mask)
        trg = trg + self.dropout1(attention1_output)

        # 2. Encoder-Decoder attention (to attend to the encoder's output)
        attention2_output = self.multi_head_attention(trg, enc_src, enc_src, src_mask)
        trg = trg + self.dropout2(attention2_output)
        trg = self.norm2(trg)

        # 3. Feed-forward network
        ff_output = self.feed_forward(trg)
        trg = trg + self.dropout3(ff_output)
        trg = self.norm3(trg)
        
        return trg



In [None]:
class Transformer(nn.Module):
    """
    The full Transformer architecture, combining the Encoder and Decoder.
    """
    def __init__(self, src_vocab_size, trg_vocab_size, d_model, n_heads, d_ff, n_layers, device):
        super(Transformer, self).__init__()

        # Encoder components
        self.src_embedding = nn.Embedding(src_vocab_size, d_model)
        self.src_pos_encoding = PositionalEncoding(d_model)
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ff) for _ in range(n_layers)])

        # Decoder components
        self.trg_embedding = nn.Embedding(trg_vocab_size, d_model)
        self.trg_pos_encoding = PositionalEncoding(d_model)
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff) for _ in range(n_layers)])

        # Final output layer
        self.output_linear = nn.Linear(d_model, trg_vocab_size)
        
        self.dropout = nn.Dropout(p=0.1)
        self.device = device
    
    def forward(self, src, trg, src_mask, trg_mask):
        # Encoder forward pass
        src_embedded = self.dropout(self.src_pos_encoding(self.src_embedding(src)))
        for layer in self.encoder_layers:
            src_embedded = layer(src_embedded, src_mask)
        enc_output = src_embedded

        # Decoder forward pass
        trg_embedded = self.dropout(self.trg_pos_encoding(self.trg_embedding(trg)))
        for layer in self.decoder_layers:
            trg_embedded = layer(trg_embedded, enc_output, trg_mask, src_mask)
        dec_output = trg_embedded

        # Final linear layer for output
        output = self.output_linear(dec_output)
        return output



        

In [None]:
# --- Step 2: Data and Utility Functions ---

def create_masks(src, trg, pad_idx):
    """
    Create source and target masks to handle padding and prevent
    the decoder from cheating by looking at future tokens.
    """

    # Source mask: prevents attention to padding tokens in the source sequence
    src_mask = (src != pad_idx).unsqueeze(1).unsqueeze(2)
    # Target mask: prevents attention to padding AND future tokens
    trg_pad_mask = (trg != pad_idx).unsqueeze(1).unsqueeze(2)
    
    # Create a triangular mask to hide future tokens
    trg_len = trg.shape[0]
    trg_sub_mask = torch.triu(torch.ones((trg_len, trg_len), device=src.device), diagonal=1).bool()
    
    trg_mask = trg_pad_mask & ~trg_sub_mask
    
    return src_mask, trg_mask

In [None]:
def prepare_dummy_data():
    """
    Prepare a very simple, dummy dataset for a translation task.
    e.g., "hello world" -> "hallo welt"
    """
    # Define vocabulary mappings
    src_vocab = {'<pad>': 0, 'hello': 1, 'world': 2, 'how': 3, 'are': 4, 'you': 5, 'thanks': 6}
    trg_vocab = {'<pad>': 0, 'hallo': 1, 'welt': 2, 'wie': 3, 'geht': 4, 'es': 5, 'danke': 6}
    
    src_idx = [[1, 2], [3, 4, 5]]
    trg_idx = [[1, 2], [3, 4, 5]]

    # Pad sequences to the same length
    max_len = max(len(s) for s in src_idx)
    src_padded = torch.tensor([s + [src_vocab['<pad>']] * (max_len - len(s)) for s in src_idx]).transpose(0, 1)
    trg_padded = torch.tensor([s + [trg_vocab['<pad>']] * (max_len - len(s)) for s in trg_idx]).transpose(0, 1)
    
    return src_padded, trg_padded, src_vocab, trg_vocab

In [None]:
# --- Step 3: Instantiate and Train the Model ---

# Hyperparameters
D_MODEL = 256  # Dimension of embeddings and hidden states
N_HEADS = 8    # Number of attention heads
D_FF = 512     # Dimension of the feed-forward network
N_LAYERS = 3   # Number of encoder and decoder layers
NUM_EPOCHS = 100
LEARNING_RATE = 0.0001
PAD_IDX = 0

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

In [None]:
# Prepare dummy data and vocabulary
src_tensor, trg_tensor, src_vocab, trg_vocab = prepare_dummy_data()
src_vocab_size = len(src_vocab)
trg_vocab_size = len(trg_vocab)

In [None]:
# Instantiate the model
model = Transformer(
    src_vocab_size, 
    trg_vocab_size, 
    D_MODEL,
    N_HEADS, 
    D_FF, 
    N_LAYERS, 
    device
).to(device)