In [1]:
import torch
import torch.nn as nn
import numpy as np

In [None]:
# import torch

# # Show up to 10,000 elements, and widen the output lines
# torch.set_printoptions(threshold=10000, linewidth=200)

import numpy as np
import sys

np.set_printoptions(threshold=sys.maxsize, linewidth=200)

## Transformer Block

In [48]:
# Positional Encoding Module -------------------------------------------------------
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2).float() *
            (-np.log(10000.0) / d_model
        ))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        print(f"\nPositional Encoding Input: {x.shape}")
        print(x.detach().numpy())
        output = x + self.pe[:, :x.size(1)]
        print("\nPositional Encoding Output:")
        print(output.detach().numpy())
        return output

# Embedding Layer ------------------------------------------------------------------
class Embeddings(nn.Module):
    def __init__(self, vocab_size, d_model):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, d_model)
        self.d_model = d_model

    def forward(self, x):
        print(f"\nEmbedding Input (token IDs): {x.cpu().numpy()}")
        output = self.embed(x) * np.sqrt(self.d_model)
        print(f"\nEmbedding Output: {output.shape}")
        print(output.detach().numpy())
        return output

# Multi-Head Attention Module ------------------------------------------------------
class MultiHeadAttention(nn.Module):
    def __init__(self, is_decoder=False):
        super().__init__()
        self.d_k = d_model // num_heads
        self.is_decoder = is_decoder
        self.WQ = nn.Linear(d_model, d_model)
        self.WK = nn.Linear(d_model, d_model)
        self.WV = nn.Linear(d_model, d_model)
        self.WO = nn.Linear(d_model, d_model)

    def forward(self, Q, K, V):
        batch_size = Q.size(0)
        print(f"\nMultiHeadAttention (Decoder={self.is_decoder})")
        print(f"Input Q: {Q.shape}, K: {K.shape}, V: {V.shape}")

        # Linear projections
        Q = self.WQ(Q).view(batch_size, -1, num_heads, self.d_k).transpose(1,2)
        K = self.WK(K).view(batch_size, -1, num_heads, self.d_k).transpose(1,2)
        V = self.WV(V).view(batch_size, -1, num_heads, self.d_k).transpose(1,2)

        # Attention calculation
        scores = torch.matmul(Q, K.transpose(-2, -1)) / np.sqrt(self.d_k)
        
        # Fix: Create proper 2D mask for decoder
        if self.is_decoder:
            seq_len = scores.size(-1)
            # Create square mask of size (seq_len, seq_len)
            mask = torch.triu(
                torch.ones(seq_len, seq_len, device=scores.device),
                diagonal=1
            ).bool()
            scores = scores.masked_fill(mask.unsqueeze(0).unsqueeze(0), float('-inf'))
        
        attn_weights = torch.softmax(scores, dim=-1)
        context = torch.matmul(attn_weights, V)
        
        # Concatenate heads
        context = context.transpose(1,2).contiguous().view(batch_size, -1, d_model)
        output = self.WO(context)
        
        print("\nAttention Weights Sample:")
        print(attn_weights[0,0,0].detach().numpy())
        print("\nAttention Output:")
        print(output.detach().numpy())
        return output

# Feed-Forward Network -------------------------------------------------------------
class FeedForward(nn.Module):
    def __init__(self):
        super().__init__()
        self.ffn = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )

    def forward(self, x):
        print(f"\nFeedForward Input: {x.shape}")
        print(x.detach().numpy())
        output = self.ffn(x)
        print("\nFeedForward Output:")
        print(output.detach().numpy())
        return output

# Encoder Layer --------------------------------------------------------------------
class EncoderLayer(nn.Module):
    def __init__(self):
        super().__init__()
        self.self_attn = MultiHeadAttention()
        self.ffn = FeedForward()
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x):
        print("\n" + "="*30 + " Encoder Layer " + "="*30)
        # Self-attention
        print("\nEncoder Self-Attention:")
        attn_output = self.self_attn(x, x, x)
        x = self.norm1(x + attn_output)
        print("\nPost Attention (Norm):")
        print(x.detach().numpy())
        
        # FFN
        print("\nEncoder Feed-Forward:")
        ffn_output = self.ffn(x)
        x = self.norm2(x + ffn_output)
        print("\nPost FFN (Norm):")
        print(x.detach().numpy())
        return x

# Decoder Layer --------------------------------------------------------------------
class DecoderLayer(nn.Module):
    def __init__(self):
        super().__init__()
        self.self_attn = MultiHeadAttention(is_decoder=True)
        self.cross_attn = MultiHeadAttention()
        self.ffn = FeedForward()
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)

    def forward(self, x, enc_output):
        print("\n" + "="*30 + " Decoder Layer " + "="*30)
        # Masked self-attention
        print("\nDecoder Self-Attention:")
        attn_output = self.self_attn(x, x, x)
        x = self.norm1(x + attn_output)
        print("\nPost Self-Attention (Norm):")
        print(x.detach().numpy())
        
        # Cross-attention
        print("\nDecoder Cross-Attention:")
        attn_output = self.cross_attn(x, enc_output, enc_output)
        x = self.norm2(x + attn_output)
        print("\nPost Cross-Attention (Norm):")
        print(x.detach().numpy())
        
        # FFN
        print("\nDecoder Feed-Forward:")
        ffn_output = self.ffn(x)
        x = self.norm3(x + ffn_output)
        print("\nPost FFN (Norm):")
        print(x.detach().numpy())
        return x

# Encoder Stack --------------------------------------------------------------------
class Encoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.embed = Embeddings(vocab_size, d_model)
        self.pe = PositionalEncoding(d_model)
        self.layers = nn.ModuleList([EncoderLayer() for _ in range(num_layers)])

    def forward(self, x):
        print("\n" + "="*40 + " Encoder Start " + "="*40)
        x = self.embed(x)
        x = self.pe(x)
        for layer in self.layers:
            x = layer(x)
        return x

# Decoder Stack --------------------------------------------------------------------
class Decoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.embed = Embeddings(vocab_size, d_model)
        self.pe = PositionalEncoding(d_model)
        self.layers = nn.ModuleList([DecoderLayer() for _ in range(num_layers)])

    def forward(self, x, enc_output):
        print("\n" + "="*40 + " Decoder Start " + "="*40)
        x = self.embed(x)
        x = self.pe(x)
        for layer in self.layers:
            x = layer(x, enc_output)
        return x

# Complete Transformer -------------------------------------------------------------
class Transformer(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()
        self.final_layer = nn.Linear(d_model, vocab_size)

    def forward(self, src, tgt):
        print("\n" + "="*40 + " Forward Pass Start " + "="*40)
        enc_output = self.encoder(src)
        dec_output = self.decoder(tgt[:, :-1], enc_output)
        return self.final_layer(dec_output)

## Execution

#### Configuration parameters

In [3]:
# Configuration parameters
d_model = 3        # Embedding dimension (divisible by num_heads)
num_heads = 3      # Number of attention heads
d_ff = 6           # Feed-forward hidden dimension
num_layers = 2     # Number of encoder/decoder layers
max_seq_len = 5    # Maximum sequence length

In [49]:
# Configuration parameters
d_model = 6        # Embedding dimension (divisible by num_heads)
num_heads = 3      # Number of attention heads
d_ff = 12          # Feed-forward hidden dimension
num_layers = 2     # Number of encoder/decoder layers
max_seq_len = 5    # Maximum sequence length

#### Build vocabulary

In [50]:
# Example sentences
src_sentence = ["Nikhil", "likes", "to", "play", "football"]
tgt_sentence = ["<start>", "Nikhil", "likes", "to", "play"]

In [51]:
# Build vocabulary -----------------------------------------------------------------
print("\nBuilding vocabulary...")
special_tokens = ["<pad>", "<start>"]
all_words = list(set(src_sentence + tgt_sentence))

vocab = {}
vocab["<pad>"] = 0
vocab["<start>"] = 1
current_idx = 2
for word in all_words:
    if word not in special_tokens and word not in vocab:
        vocab[word] = current_idx
        current_idx += 1
vocab_size = len(vocab)
print(f"Vocabulary (size {vocab_size}): {vocab}")


Building vocabulary...
Vocabulary (size 7): {'<pad>': 0, '<start>': 1, 'football': 2, 'Nikhil': 3, 'play': 4, 'likes': 5, 'to': 6}


#### Convert sentences to token IDs

In [52]:
# Convert sentences to token IDs ---------------------------------------------------
src_ids = [vocab[w] for w in src_sentence]
tgt_ids = [vocab[w] for w in tgt_sentence]
print(f"\nSource tokens: {src_sentence} => IDs: {src_ids}")
print(f"Target tokens: {tgt_sentence} => IDs: {tgt_ids}")


Source tokens: ['Nikhil', 'likes', 'to', 'play', 'football'] => IDs: [3, 5, 6, 4, 2]
Target tokens: ['<start>', 'Nikhil', 'likes', 'to', 'play'] => IDs: [1, 3, 5, 6, 4]


In [53]:
# Convert to tensors
src = torch.LongTensor(src_ids).unsqueeze(0)  # (1,5)
tgt = torch.LongTensor(tgt_ids).unsqueeze(0)  # (1,5)

#### Initialize Model and Training Components

In [54]:
# Initialize Model and Training Components -----------------------------------------
model = Transformer()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss(ignore_index=vocab["<pad>"])

In [55]:
# optimizer

#### Forward Pass

In [70]:
# Forward Pass ---------------------------------------------------------------------
print("\n" + "="*40 + " Starting Forward Pass " + "="*40)
outputs = model(src, tgt)  # Shape: (1,4, vocab_size)





Embedding Input (token IDs): [[3 5 6 4 2]]

Embedding Output: torch.Size([1, 5, 6])
[[[-3.5316937e+00  1.6512412e+00  2.7454066e+00 -2.3487000e+00 -8.1406766e-01  3.5352362e-03]
  [ 2.9384294e+00  5.3358903e+00 -1.2465447e+00  2.3554676e+00 -2.9935477e+00 -1.7453690e+00]
  [-1.4768381e+00  2.8245058e+00 -1.8362260e+00  3.9558768e+00  4.9465564e-01 -1.7560655e+00]
  [-4.9802560e-01 -1.5842095e-01  1.4116590e+00 -2.3076582e+00 -2.0125613e+00 -5.9117329e-01]
  [-1.2358226e+00 -8.2297730e-01 -4.2127466e-01 -7.5625145e-01 -1.6337032e+00  2.1894784e+00]]]

Positional Encoding Input: torch.Size([1, 5, 6])
[[[-3.5316937e+00  1.6512412e+00  2.7454066e+00 -2.3487000e+00 -8.1406766e-01  3.5352362e-03]
  [ 2.9384294e+00  5.3358903e+00 -1.2465447e+00  2.3554676e+00 -2.9935477e+00 -1.7453690e+00]
  [-1.4768381e+00  2.8245058e+00 -1.8362260e+00  3.9558768e+00  4.9465564e-01 -1.7560655e+00]
  [-4.9802560e-01 -1.5842095e-01  1.4116590e+00 -2.3076582e+00 -2.0125613e+00 -5.9117329e-01]
  [-1.2358226e

#### Loss Calculation

In [71]:
# Loss Calculation -----------------------------------------------------------------
print("\n" + "="*40 + " Loss Calculation " + "="*40)
targets = tgt[:, 1:].contiguous().view(-1)
loss = criterion(outputs.view(-1, vocab_size), targets)
print(f"\nFinal Loss: {loss.item():.4f}")



Final Loss: 1.8822


#### Backward Pass

In [72]:
# Backward Pass --------------------------------------------------------------------
print("\n" + "="*40 + " Backward Pass " + "="*40)
loss.backward()




#### Gradient Monitoring

In [73]:
# Gradient Monitoring -------------------------------------------------------------
print("\n" + "="*40 + " Gradient Magnitudes " + "="*40)
for name, param in model.named_parameters():
    if param.grad is not None:
        print(f"{name:50} {param.grad.norm().item():.4f}")


encoder.embed.embed.weight                         0.0825
encoder.layers.0.self_attn.WQ.weight               0.0559
encoder.layers.0.self_attn.WQ.bias                 0.0065
encoder.layers.0.self_attn.WK.weight               0.0486
encoder.layers.0.self_attn.WK.bias                 0.0000
encoder.layers.0.self_attn.WV.weight               0.0697
encoder.layers.0.self_attn.WV.bias                 0.0254
encoder.layers.0.self_attn.WO.weight               0.0926
encoder.layers.0.self_attn.WO.bias                 0.0476
encoder.layers.0.ffn.ffn.0.weight                  0.0451
encoder.layers.0.ffn.ffn.0.bias                    0.0170
encoder.layers.0.ffn.ffn.2.weight                  0.1116
encoder.layers.0.ffn.ffn.2.bias                    0.0921
encoder.layers.0.norm1.weight                      0.0424
encoder.layers.0.norm1.bias                        0.0928
encoder.layers.0.norm2.weight                      0.0345
encoder.layers.0.norm2.bias                        0.0869
encoder.layer

#### Prediction/Inference Example

In [74]:
# Prediction/Inference Example -----------------------------------------------------
def predict(model, src_input, max_length=5):
    model.eval()
    with torch.no_grad():
        # Encode source sequence
        enc_out = model.encoder(src_input)
        
        # Initialize target with <start> token
        target = torch.LongTensor([[vocab["<start>"]]]).to(src_input.device)
        
        print("\n" + "="*40 + " Prediction Start " + "="*40)
        for i in range(max_length-1):
            # Generate prediction
            output = model.decoder(target, enc_out)
            logits = model.final_layer(output[:, -1, :])
            
            # Get next token
            next_token = logits.argmax(-1).item()
            print(f"Step {i+1}: Predicted token {next_token} ({list(vocab.keys())[list(vocab.values()).index(next_token)]})")
            
            # Append to target sequence
            target = torch.cat([target, torch.LongTensor([[next_token]]).to(src_input.device)], dim=1)
            
            if next_token == vocab["<pad>"]:  # Early stopping
                break
        return target

In [75]:
print("\n" + "="*40 + " Making Prediction " + "="*40)
prediction = predict(model, src)
prediction = prediction.squeeze().tolist()
print("\nFinal Prediction IDs:", prediction)




Embedding Input (token IDs): [[3 5 6 4 2]]

Embedding Output: torch.Size([1, 5, 6])
[[[-3.5316937e+00  1.6512412e+00  2.7454066e+00 -2.3487000e+00 -8.1406766e-01  3.5352362e-03]
  [ 2.9384294e+00  5.3358903e+00 -1.2465447e+00  2.3554676e+00 -2.9935477e+00 -1.7453690e+00]
  [-1.4768381e+00  2.8245058e+00 -1.8362260e+00  3.9558768e+00  4.9465564e-01 -1.7560655e+00]
  [-4.9802560e-01 -1.5842095e-01  1.4116590e+00 -2.3076582e+00 -2.0125613e+00 -5.9117329e-01]
  [-1.2358226e+00 -8.2297730e-01 -4.2127466e-01 -7.5625145e-01 -1.6337032e+00  2.1894784e+00]]]

Positional Encoding Input: torch.Size([1, 5, 6])
[[[-3.5316937e+00  1.6512412e+00  2.7454066e+00 -2.3487000e+00 -8.1406766e-01  3.5352362e-03]
  [ 2.9384294e+00  5.3358903e+00 -1.2465447e+00  2.3554676e+00 -2.9935477e+00 -1.7453690e+00]
  [-1.4768381e+00  2.8245058e+00 -1.8362260e+00  3.9558768e+00  4.9465564e-01 -1.7560655e+00]
  [-4.9802560e-01 -1.5842095e-01  1.4116590e+00 -2.3076582e+00 -2.0125613e+00 -5.9117329e-01]
  [-1.2358226e+

In [76]:
inverse_vocab = {v: k for k, v in vocab.items()}
pred = [inverse_vocab[w] for w in prediction]
pred


['<start>', 'football', 'football', 'football', 'football']

### EXTRA

In [28]:
def ids_to_text(predicted_ids, vocab):
    inverse_vocab = {v: k for k, v in vocab.items()}
    tokens = []
    for id in predicted_ids:
        if id == vocab["<pad>"]:
            continue
        token = inverse_vocab.get(id, "<unk>")
        if token == "<start>":
            continue
        tokens.append(token)
    return " ".join(tokens)

In [29]:
# Modified prediction function
def predict(model, src_input, vocab, max_length=5):
    model.eval()
    with torch.no_grad():
        enc_out = model.encoder(src_input)
        target = torch.LongTensor([[vocab["<start>"]]]).to(src_input.device)
        
        for _ in range(max_length-1):
            output = model.decoder(target, enc_out)
            logits = model.final_layer(output[:, -1, :])
            next_token = logits.argmax(-1)
            target = torch.cat([target, next_token.unsqueeze(0)], dim=1)
            
            if next_token.item() == vocab["<pad>"]:
                break
                
        return ids_to_text(target.squeeze().cpu().numpy(), vocab)

In [None]:
# Usage
print("\nGenerating Prediction...")
generated_text = predict(model, src, vocab)
print(f"Generated Sequence: {generated_text}")