# Transformer Architecture (Eng-Hindi)

In [None]:
import torch
import torch.nn as nn
import math

# Create Input Embeddings

*   d_model = how much information one word can carry
*   vocab_size  =	Total number of tokens the model knows
* super() =	Initializes the parent nn.Module
* self = current object (the layer)
* x = Tensor of shape (batch_size, seq_len)

In [None]:
class InputEmbeddings(nn.Module):

  def __init__(
      self,d_model : int,
      vocab_size : int
      ):

    super().__init__()
    self.d_model = d_model
    self.vocab_size = vocab_size

    # creates a lookup table (embedding layer) that maps
    self.embedding = nn.Embedding(vocab_size,d_model)

  def forward(self,x):

    # Convert token IDs to embeddings
    embedding = self.embedding(x)

    # Scale the embeddings
    embedding = embedding * math.sqrt(self.d_model)
    return embedding

# Positional Encoding


* arange = creates positions
* unsqueeze(1) = makes it compatible with embedding dimensions
* arange(0, d_model, 2) = even dimensions
* log(10000) / d_model = frequency scale
* exp() = actual wavelengths
* unsqueeze(0) =	Make shapes compatible with batch input
* register_buffer =	Store positional encoding safely without training



In [None]:
class PositionalEncoding(nn.Module):
  def __init__(
      self,d_model:int,
      seq_len:int,
      dropout:float
      ) -> None:

    super().__init__()
    self.d_model = d_model
    self.seq_len = seq_len
    self.dropout = nn.Dropout(dropout)

#  matrix of shape (seq_len, d_model) filled with zeros
    pe = torch.zeros(seq_len,d_model)

# creates a column vector of token positions so the model knows the order of tokens.
    position = torch.arange(0,seq_len,dtype=torch.float).unsqueeze(1)

# div_term defines the frequencies used by sine and cosine to encode token positions.
    div_term = torch.exp(torch.arange(0, d_model, 2).float()*(-math.log(10000.0)/d_model))

    # Fill even indices
    pe[:,0::2] = torch.sin(position * div_term)
    # Fill odd indices
    pe[:,1::2] = torch.cos(position * div_term)

    # Adds a batch dimension to the positional encoding tensor.
    pe = pe.unsqueeze(0)
    self.register_buffer('pe',pe)

  def forward(self, x):
  # x is the input tensor (token embeddings
    x = x + (self.pe[:, :x.shape[1], :]).requires_grad_(False)
    return self.dropout(x)

# Layer Normalization

In [None]:
class LayerNormalization(nn.Module):

  def __init__(
      self,
      d_model: int,
      eps: float = 1e-6
      ) -> None:

    super().__init__()
    self.eps = eps

    # Learnable Parameters
    self.alpha = nn.Parameter(torch.ones(d_model))
    self.bias = nn.Parameter(torch.zeros(d_model))

  def forward(self,x):

    mean = x.mean(dim =-1, keepdim =True)    # Mean Calculation
    std = x.std(dim =-1, keepdim =True, unbiased =False)   # Standard Deviation

    # Normalize
    x_norm = (x - mean) / (std + self.eps)

    # scale and shift
    return self.alpha * x_norm + self.bias

# Feed Forward Layer

* d_model = size of the input and output embeddings
* d_ff = hidden layer size of the feed-forward network
* dropout = controls how much regularization is applied during training

In [None]:
class FeedForwardBlock(nn.Module):

  def __init__(
      self,
      d_model: int,
      d_ff: int,
      dropout: float
      ) -> None:

    super().__init__()

    # layers of the Feed-Forward Network
    self.liner_1 = nn.Linear(d_model,d_ff)
    self.dropout = nn.Dropout(dropout)
    self.liner_2 = nn.Linear(d_ff,d_model)

  def forward(self,x):

    return self.liner_2(self.dropout(torch.relu(self.liner_1(x))))

    #  x: (batch_size, seq_len, d_model)
    # expand → activate → regularize → contract


# Multi-Head Attention

* @staticmethod = function inside a class that doesn’t need object or class data

* mask is used to control which positions the attention mechanism can “see.”

* query @ key.transpose(-2, -1) = matrix multiplication between the query vectors and the transposed key vectors

* transpose(-2, -1) = swaps the last two dimensions

* attention_scoresShape =(batch_size, h, seq_len, seq_len)

* masked_fill(condition, value) =
Replaces values in attention_scores where condition is True with the given value.

* q, k, v shape = (batch_size, seq_len, d_model)

In [None]:
class MultiHeadAttention(nn.Module):

  def __init__(
      self,
      d_model: int,
      num_heads: int,
      dropout: float
      ) -> None:

    super().__init__()

    self.d_model = d_model
    self.num_heads = num_heads
    self.dropout = nn.Dropout(dropout)

    # Ensure d_model is divisible by num_heads
    assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

    # dimension of each attention head
    self.d_k = d_model // num_heads

    # learnable linear projection layers
    self.w_q = nn.Linear(d_model, d_model)
    self.w_k = nn.Linear(d_model, d_model)
    self.w_v = nn.Linear(d_model, d_model)
    # output projection
    self.w_o = nn.Linear(d_model, d_model)

  @staticmethod
  def attention(
      query,
      key,
      value,
      mask,
      dropout=nn.Dropout):

    # get the size of the last dimension of the query tensor
    d_k = query.shape[-1]

    # scaled dot-product attention scores.
    # (Q · Kᵀ) / √d_k
    attention_scores = (query @ key.transpose(-2,-1))/math.sqrt(d_k)

    if mask is not None:
      # blocks the model from attending to masked positions
      attention_scores = attention_scores.masked_fill(mask == 0, -1e9)
      # raw attention scores into probabilities
      attention_scores = attention_scores.softmax(dim=-1)

      # dropout to the attention weights to regularize the attention mechanism
    if dropout is not None:
      attention_scores = dropout(attention_scores)

    return (attention_scores @ value), attention_scores

  def forward(self,q,k,v,mask):

    # Linear projection
    query = self.w_q(q)
    key = self.w_k(k)
    value = self.w_v(v)

    # reshape and rearrange
    query = query.view(query.shape[0],
                       query.shape[1],
                       self.num_heads,
                       self.d_k).transpose(1,2)
    key = key.view(key.shape[0],
                   key.shape[1],
                   self.num_heads,
                   self.d_k).transpose(1,2)
    value = value.view(value.shape[0],
                       value.shape[1],
                       self.num_heads,
                       self.d_k).transpose(1,2)

    # applies the attention mechanism and collects both outputs
    x, self.attention_scores = MultiHeadAttention.attention(query, key, value, mask, self.dropout)

    # concatenates all attention heads back into a single embedding vector for each token
    x = x.transpose(1,2).contiguous().view(x.shape[0], -1, self.num_heads * self.d_k)

    # output projection
    return self.w_o(x)

# Residual Connection

In [None]:
class ResidualConnection(nn.Module):

  def __init__(
      self,
      features: int,
      dropout: float
      ) -> None:

    super().__init__()
    self.dropout = nn.Dropout(dropout)
    # Pass 'features' (d_model) to LayerNormalization
    self.norm = LayerNormalization(features)

  def forward(self, x, sublayer):

      return x + self.dropout(sublayer(self.norm(x)))

# Encoder

*  x: (batch_size, seq_len, d_model)
* src_mask: padding mask for attention

In [None]:
class EncoderBlock(nn.Module):

  def __init__(
      self,
      d_model: int,
      self_attention_block: MultiHeadAttention,
      feed_forward_block : FeedForwardBlock,
      dropout: float
      ) -> None:

    super().__init__()

    # store the submodules inside the EncoderBlock
    self.self_attention_block = self_attention_block
    self.feed_forward_block = feed_forward_block
    # two residual connection modules and stores them inside a ModuleList
    self.residual_connection = nn.ModuleList(
        [ResidualConnection(d_model, dropout) for _ in range(2)]
    )

  def forward(self, x, src_mask):
    # Self-attention with residual + pre-layernorm
    x = self.residual_connection[0](
        x,
        lambda x: self.self_attention_block(x, x, x, src_mask)
    )
    # Feed-forward network with residual + pre-layernorm
    x = self.residual_connection[1](
        x,
        lambda x: self.feed_forward_block(x)
    )
    return x

class Encoder(nn.Module):

  def __init__(
      self,
      d_model: int,
      layers: nn.ModuleList,
  ) -> None:

    super().__init__()
    self.layers = layers

    self.norm = LayerNormalization(d_model)

  def forward(self, x, mask):
     for layer in self.layers:
         x = layer(x, mask)
     return self.norm(x)

# Decoder Block

*  x: (batch_size, tgt_len, d_model)
* encoder_output: (batch_size, src_len, d_model)
* src_mask: padding mask for source
* tgt_mask: causal + padding mask for target

In [None]:
class DecoderBlock(nn.Module):

  def __init__(
      self,
      d_model: int,
      self_attention_block: MultiHeadAttention,
      cross_attention_block: MultiHeadAttention,
      feed_forward_block: FeedForwardBlock,
      dropout: float
  ) -> None:
      super().__init__()

      self.self_attention_block = self_attention_block
      self.cross_attention_block = cross_attention_block
      self.feed_forward_block = feed_forward_block
      # Three residual connections:
      self.residual_connection = nn.ModuleList(
          [ResidualConnection(d_model, dropout) for _ in range(3)]
      )

  def forward(self, x, encoder_output, src_mask, tgt_mask):
    # 1. Masked self-attention
    x = self.residual_connection[0](
        x,
        lambda x: self.self_attention_block(x, x, x, tgt_mask)
    )
    # 2. Cross-attention
    x = self.residual_connection[1](
        x,
        lambda x: self.cross_attention_block(
            x, encoder_output, encoder_output, src_mask
        )
      )
    #  3. Feed-forward network
    x = self.residual_connection[2](
        x,
        lambda x: self.feed_forward_block(x)
    )
    return x

class Decoder(nn.Module):
  def __init__(
      self,
      d_model: int,
      layers: nn.ModuleList
  ) -> None:

      super().__init__()
      self.layers = layers
      self.norm = LayerNormalization(d_model)


  def forward(self, x, encoder_output, src_mask, tgt_mask):
      for layer in self.layers:
        x = layer(x, encoder_output, src_mask, tgt_mask)
      return self.norm(x)

# Linear / Projection Layer

In [None]:
class ProjectionLayer(nn.Module):

  def __init__(self,d_model: int,vocab_size: int) -> None:
    super().__init__()
    self.proj = nn.Linear(d_model, vocab_size)

  def forward(self, x):
      return self.proj(x)

# Transformer

In [None]:
class Transformer(nn.Module):

  def __init__(
      self,
      encoder: Encoder,
      decoder: Decoder,
      src_embedding: InputEmbeddings,
      tgt_embedding: InputEmbeddings,
      src_pos_encoding: PositionalEncoding,
      tgt_pos_encoding: PositionalEncoding,
      projection_layer: ProjectionLayer
  ) -> None:

    super().__init__()
    self.encoder = encoder
    self.decoder = decoder
    self.src_embedding = src_embedding
    self.tgt_embedding = tgt_embedding
    self.src_pos_encoding = src_pos_encoding
    self.tgt_pos_encoding = tgt_pos_encoding
    self.projection_layer = projection_layer

  def encode(self, src, src_mask):
    src = self.src_embedding(src)
    src = self.src_pos_encoding(src)
    return self.encoder(src, src_mask)

  def decode(self, encoder_output, src_mask, tgt, tgt_mask):
    tgt = self.tgt_embedding(tgt)
    tgt = self.tgt_pos_encoding(tgt)
    return self.decoder(tgt, encoder_output, src_mask, tgt_mask)

  def project(self, x):

    return self.projection_layer(x)

def build_transformer(
    src_vocab_size: int,
    tgt_vocab_size: int,
    src_seq_len: int,     # Max input sentence length
    tgt_seq_len: int,     # Max output sentence length
    d_model: int =512,    # hidden dimension of the Transformer
    N: int =6,            # number of encoder and decoder layers
    h: int =8,            # number of attention heads in Multi-Head Attention
    dropout: float =0.1,  # dropout probability
    d_ff: int =2048       # idden dimension of the feed-forward network
) -> Transformer:



  #  1. Create Embeddings
  src_embedding = InputEmbeddings(d_model, src_vocab_size)
  tgt_embedding = InputEmbeddings(d_model, tgt_vocab_size)

  # 2. Create Positional Encodings
  src_pos_encoding = PositionalEncoding(d_model, src_seq_len, dropout)
  tgt_pos_encoding = PositionalEncoding(d_model, tgt_seq_len, dropout)

  # 3. Create Encoder Blocks
  encoder_blocks = []

  for _ in range(N):
    encoder_self_attention_block = MultiHeadAttention(d_model, h, dropout)
    feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
    encoder_block = EncoderBlock(
        d_model,
        encoder_self_attention_block,
        feed_forward_block,
        dropout
    )

    encoder_blocks.append(encoder_block)

  # 4. Create Decoder Blocks
  decoder_blocks = []

  for _ in range(N):
    decoder_self_attention_block = MultiHeadAttention(d_model, h, dropout)
    decoder_cross_attention_block = MultiHeadAttention(d_model, h, dropout)
    feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
    decoder_block = DecoderBlock(
        d_model,
        decoder_self_attention_block,
        decoder_cross_attention_block,
        feed_forward_block,
        dropout
    )

    decoder_blocks.append(decoder_block)

    # 5. Create Encoder and Decoder containers
    encoder = Encoder(d_model, nn.ModuleList(encoder_blocks))
    decoder = Decoder(d_model, nn.ModuleList(decoder_blocks))

    # 6. Create Projection Layer
    projection_layer = ProjectionLayer(d_model, tgt_vocab_size)

    # 7. Creat thee Full Transformer
    transformer = Transformer(
        encoder,
        decoder,
        src_embedding,
        tgt_embedding,
        src_pos_encoding,
        tgt_pos_encoding,
        projection_layer
    )

    # 8. Xavier Initialization (Crucial for convergence!)
    for p in transformer.parameters():
      if p.dim() > 1:
        nn.init.xavier_uniform_(p)

    return transformer

# Training Pipeline

In [None]:
# 0. Installs (if needed)
try:
    import datasets
except Exception:
    !pip install -q datasets
    import datasets
import os
import json
import math
from datasets import load_dataset
from tqdm.auto import tqdm
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import random
from typing import List, Tuple

#  Config

In [None]:
# 1. Config

config = {
    "src_vocab_size": 20000,
    "tgt_vocab_size": 20000,
    "src_seq_len": 64,
    "tgt_seq_len": 64,
    "d_model": 256,
    "d_ff": 1024,
    "num_layers": 4,
    "num_heads": 8,
    "dropout": 0.1,
    "batch_size":64,
    "lr": 3e-4,
    "epochs": 10,
    "device": torch.device("cuda" if torch.cuda.is_available() else "cpu"),
    "num_samples": 500000,
    "save_path": "transformer_weights.pth",
    "vocab_path": "vocab_iitb.json",
    "seed": 42,

}

torch.manual_seed(config["seed"])
random.seed(config["seed"])

In [None]:
# 2. Dataset download from  Hugging Face
ds = load_dataset("cfilt/iitb-english-hindi")
print(ds)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

dataset_infos.json:   0%|          | 0.00/953 [00:00<?, ?B/s]

data/train-00000-of-00001.parquet:   0%|          | 0.00/190M [00:00<?, ?B/s]

data/validation-00000-of-00001.parquet:   0%|          | 0.00/85.7k [00:00<?, ?B/s]

data/test-00000-of-00001.parquet:   0%|          | 0.00/500k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/1659083 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/520 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/2507 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['translation'],
        num_rows: 1659083
    })
    validation: Dataset({
        features: ['translation'],
        num_rows: 520
    })
    test: Dataset({
        features: ['translation'],
        num_rows: 2507
    })
})


In [None]:
print(ds["train"][0]["translation"])

{'en': 'Give your application an accessibility workout', 'hi': 'अपने अनुप्रयोग को पहुंचनीयता व्यायाम का लाभ दें'}


* `<pad>` =  used to pad shorter sentences to fixed length

* `<sos>` → “start of sentence”

* `<eos>` → “end of sentence”

* `<unk>` → unknown word (not in vocabulary)
* s is one sentence from the dataset
* tqdm = “taqaddum”


| Dictionary | Direction     | Used when             |
| ---------- | ------------- | --------------------- |
| `stoi`     | word → number | Encoding input text   |
| `itos`     | number → word | Decoding model output |





In [None]:
# 3. text into fixed-length numerical token IDs
SPECIAL_TOKS = {
    "<pad>": 0,
    "<sos>": 1,
    "<eos>": 2,
    "<unk>": 3
}

# converts a sentence into a list of tokens
def simple_tokenize(text: str) ->List[str]:
  return text.strip().split()

# converts raw text into a numerical vocabulary
def build_vocab(samples: List[str], max_size: int) -> dict:
  from collections import Counter
  counter = Counter()

  # tokenizes each sentence and counts how often every word
  for s in tqdm(samples, desc = "Counting tokens for vocab"):
    tokens = simple_tokenize(s.lower())
    counter.update(tokens)

  # selects the most frequent words and assigns them unique IDs after the special tokens
  most_common = counter.most_common(max_size - len(SPECIAL_TOKS))
  stoi = {tok: i + len(SPECIAL_TOKS) for i ,(tok, _)in enumerate(most_common)}

  # nserts special tokens into the vocabulary with fixed, reserved IDs
  for k, v in SPECIAL_TOKS.items():
    stoi[k] = v

  itos = {i: t for t, i in stoi.items()}
  return {"stoi": stoi, "itos": itos}

# Convert a sentence into a fixed-length list of token IDs
def encode_sentence(text: str, vocab: dict, seq_len: int) ->List[int]:
  tokens = simple_tokenize(text.lower())
  # Convert tokens to IDs
  ids = [vocab["stoi"].get(tok, SPECIAL_TOKS["<unk>"]) for tok in tokens]
  # Add <SOS> and <EOS>
  ids = [SPECIAL_TOKS["<sos>"]] + ids[:seq_len - 2] + [SPECIAL_TOKS["<eos>"]]
  pad_len = seq_len - len(ids)   # Padding calculation

  if pad_len > 0:
    ids += [SPECIAL_TOKS["<pad>"]] * pad_len
  return ids

* s → source sentence (English)

* t → target sentence (Hindi)

In [None]:
from datasets.features import translation
# 4.Prepare a smaller training subset & build vocab

num_samples = config["num_samples"]
train_split = ds["train"]

if num_samples is None:
  num_samples = len(train_split)
else:
  num_samples = min(num_samples, len(train_split))
# how many samples will actually be use
print(f"Using {num_samples} training samples(out of {len(train_split)})")

# collects English–Hindi sentence pairs from the training dataset
sample_pairs = []
for i, item in enumerate(tqdm(train_split, desc = "Collecting samples")):
  if i >= num_samples:
    break
  tr = item["translation"]

  if (tr is None) or ("en" not in tr or "hi" not in tr):
    continue
  sample_pairs.append((tr["en"], tr["hi"]))


# separate the bilingual sentence pairs into two lists
src_texts = [s for s, t in sample_pairs]
tgt_texts = [t for s, t in sample_pairs]


print("Building source vocabulary...")
src_vocab = build_vocab(src_texts, config["src_vocab_size"])

print("Building target vocabulary...")
tgt_vocab = build_vocab(tgt_texts, config["tgt_vocab_size"])


#  Save vocab to disk for later use
with open(config["vocab_path"], "w", encoding = "utf-8") as f:
  json.dump({"src": src_vocab,  "tgt": tgt_vocab}, f, ensure_ascii = False, indent = 2)


Using 500000 training samples(out of 1659083)


Collecting samples:   0%|          | 0/1659083 [00:00<?, ?it/s]

Building source vocabulary...


Counting tokens for vocab:   0%|          | 0/500000 [00:00<?, ?it/s]

Building target vocabulary...


Counting tokens for vocab:   0%|          | 0/500000 [00:00<?, ?it/s]

In [None]:
# 5. PyTorch Dataset + collate

PAD_ID = SPECIAL_TOKS["<pad>"]

class ParallelTextDataset(Dataset):
  def __init__(self,
               pairs: List[Tuple[str, str]],
               src_vocab: dict,
               tgt_vocab: dict,
               src_seq_len: int,
               tgt_seq_len: int
               ):
    self.pairs = pairs
    self.src_vocab = src_vocab
    self.tgt_vocab = tgt_vocab
    self.src_seq_len = src_seq_len
    self.tgt_seq_len = tgt_seq_len

  def __len__(self):
    return len(self.pairs)

  def __getitem__(self, idx):   # idx = index
    src_text, tgt_text = self.pairs[idx]
    # Convert text → token IDs
    src_ids = encode_sentence(src_text, self.src_vocab, self.src_seq_len)
    tgt_ids = encode_sentence(tgt_text, self.tgt_vocab, self.tgt_seq_len)

    return src_ids, tgt_ids

def collate_fn(batch):
  #  batch: list of (src_ids, tgt_ids)
  src = torch.tensor([b[0] for b in batch], dtype = torch.long)
  tgt = torch.tensor([b[1] for b in batch], dtype = torch.long)
  return src, tgt

train_dataset = ParallelTextDataset(
    sample_pairs,
    src_vocab,
    tgt_vocab,
    config["src_seq_len"],
    config["tgt_seq_len"]
)

train_loader = DataLoader(
    train_dataset,
    batch_size =config["batch_size"],
    shuffle = True,
    collate_fn = collate_fn
)

# creates a small, random, human-readable evaluation set
NUM_EVAL_EXAMPLES =20
eval_indices = random.sample(range(len(train_dataset)), NUM_EVAL_EXAMPLES)
eval_examples = [train_dataset[i] for i in eval_indices]



* Source mask: hides padding in encoder

* Target mask: hides padding and future tokens in decoder

* unsqueeze(1).unsqueeze(2) reshapes it to (batch, 1, 1, src_len)

In [None]:
# 6. Helper: masks for your transformer (padding + causal)

def make_src_mask(src_ids, pad_idx = PAD_ID):
  src_mask = (src_ids != pad_idx).unsqueeze(1).unsqueeze(2)

  return src_mask.to(torch.int64)

def make_tgt_mask(tgt_ids, pad_idx = PAD_ID):
    batch, tgt_len = tgt_ids.shape
    padding_mask =(tgt_ids != pad_idx).unsqueeze(1).unsqueeze(2)
    causal = torch.tril(
        (torch.ones((tgt_len, tgt_len),
         dtype = torch.int64)).unsqueeze(0).unsqueeze(1)
    )
    mask = (padding_mask & causal.to(padding_mask.device) )


    return mask.to(torch.int64)

In [None]:
# 7. Build transformer using your build_transformer function

transformer = build_transformer(
    src_vocab_size = config["src_vocab_size"],
    tgt_vocab_size = config["tgt_vocab_size"],
    src_seq_len = config["src_seq_len"],
    tgt_seq_len = config["tgt_seq_len"],
    d_model = config["d_model"],
    N = config["num_layers"],
    h = config["num_heads"],
    dropout = config["dropout"],
    d_ff = config["d_ff"],
)

device = torch.device(config["device"])
transformer = transformer.to(device)

print(hasattr(transformer, "encode"))
print(hasattr(transformer, "decode"))

True
True


# Transformer  architecture

In [None]:
transformer

Transformer(
  (encoder): Encoder(
    (layers): ModuleList(
      (0-3): 4 x EncoderBlock(
        (self_attention_block): MultiHeadAttention(
          (dropout): Dropout(p=0.1, inplace=False)
          (w_q): Linear(in_features=256, out_features=256, bias=True)
          (w_k): Linear(in_features=256, out_features=256, bias=True)
          (w_v): Linear(in_features=256, out_features=256, bias=True)
          (w_o): Linear(in_features=256, out_features=256, bias=True)
        )
        (feed_forward_block): FeedForwardBlock(
          (liner_1): Linear(in_features=256, out_features=1024, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
          (liner_2): Linear(in_features=1024, out_features=256, bias=True)
        )
        (residual_connection): ModuleList(
          (0-1): 2 x ResidualConnection(
            (dropout): Dropout(p=0.1, inplace=False)
            (norm): LayerNormalization()
          )
        )
      )
    )
    (norm): LayerNormalization()
  )
  (de

In [None]:
# 8.  Training utilities

optimizer = torch.optim.Adam(transformer.parameters(), lr=config["lr"])
criterion = nn.CrossEntropyLoss(ignore_index = PAD_ID)  # LOSS FUNCTION

# decoder input and target labels forTransformer training using teacher forcing.
def shift_right(tgt_batch):

  return tgt_batch[:, :-1], tgt_batch[:, 1:]


In [None]:
# 9. Training loop

for epoch in range(1,config["epochs"] + 1):  # 11 training rounds
  transformer.train()
  running_loss = 0.0
  # Progress Bar
  pbar = tqdm(train_loader, desc=f"Epoch {epoch}/{config['epochs']}", leave=False)


  for batch_idx, (src_ids, tgt_ids) in enumerate(pbar):
    src_ids = src_ids.to(device)
    tgt_ids = tgt_ids.to(device)
    #
    decoder_input, labels = shift_right(tgt_ids)
    src_mask = make_src_mask(src_ids, pad_idx = PAD_ID).to(device)
    tgt_mask = make_tgt_mask(decoder_input, pad_idx = PAD_ID).to(device)
    enc_out = transformer.encode(src_ids, src_mask)
    dec_out = transformer.decode(enc_out, src_mask, decoder_input, tgt_mask)
    logits = transformer.project(dec_out)

    loss = criterion(
        logits.view(-1, logits.size(-1)),
        labels.reshape(-1))

    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_norm_(transformer.parameters(),1.0)
    optimizer.step()
    running_loss += loss.item()
    pbar.set_postfix(loss = running_loss / (batch_idx + 1))
  avg_loss = running_loss / (batch_idx + 1)
  print(f"Epoch {epoch} avg_loss: {avg_loss:.4f}")



Epoch 1/10:   0%|          | 0/7813 [00:00<?, ?it/s]

Epoch 1 avg_loss: 3.5528


Epoch 2/10:   0%|          | 0/7813 [00:00<?, ?it/s]

Epoch 2 avg_loss: 1.8072


Epoch 3/10:   0%|          | 0/7813 [00:00<?, ?it/s]

Epoch 3 avg_loss: 1.3505


Epoch 4/10:   0%|          | 0/7813 [00:00<?, ?it/s]

Epoch 4 avg_loss: 1.1507


Epoch 5/10:   0%|          | 0/7813 [00:00<?, ?it/s]

Epoch 5 avg_loss: 1.0331


Epoch 6/10:   0%|          | 0/7813 [00:00<?, ?it/s]

Epoch 6 avg_loss: 0.9531


Epoch 7/10:   0%|          | 0/7813 [00:00<?, ?it/s]

Epoch 7 avg_loss: 0.8941


Epoch 8/10:   0%|          | 0/7813 [00:00<?, ?it/s]

Epoch 8 avg_loss: 0.8480


Epoch 9/10:   0%|          | 0/7813 [00:00<?, ?it/s]

Epoch 9 avg_loss: 0.8090


Epoch 10/10:   0%|          | 0/7813 [00:00<?, ?it/s]

Epoch 10 avg_loss: 0.7793


In [None]:
# ==========================
# Save model checkpoints
# ==========================

# 1️⃣ Save epoch-wise checkpoint (every epoch)
checkpoint_path = f"transformer_epoch_{epoch}.pth"
torch.save(transformer.state_dict(), checkpoint_path)
print(f"Saved checkpoint: {checkpoint_path}")

# 2️⃣ Save final model ONLY at last epoch
if epoch == config["epochs"]:
    torch.save(transformer.state_dict(), config["save_path"])
    print("Saved final model state to", config["save_path"])

    # 3️⃣ Save vocab once (at final epoch)
    with open(config["vocab_path"], "w", encoding="utf-8") as f:
        json.dump(
            {"src": src_vocab, "tgt": tgt_vocab},
            f,
            ensure_ascii=False,
            indent=2
        )

    print("Saved vocabs to", config["vocab_path"])

Saved checkpoint: transformer_epoch_10.pth
Saved final model state to transformer_weights.pth
Saved vocabs to vocab_iitb.json


In [None]:
config["save_path"] = "transformer_iitb.pth"
torch.save(transformer.state_dict(), config["save_path"])
print("Saved final model to", config["save_path"])


Saved final model to transformer_iitb.pth


In [None]:
state_path = config["save_path"]

model.load_state_dict(torch.load(state_path, map_location=device))
model.eval()

print("✅ Model loaded successfully")


✅ Model loaded successfully


# Testing Model

In [None]:
import torch
import json
import math
from typing import List

In [None]:
#  Config must match training config (d_model, seq lengths, vocab paths, etc.)

test_config = {
    "src_seq_len": 64,
    "tgt_seq_len": 64,
    "d_model": 256,
    "d_ff": 1024,
    "num_layers": 4,
    "num_heads": 8,
    "dropout": 0.1,
    "vocab_path": "vocab_iitb.json",
    "save_path": "transformer_iitb.pth",
    "device": "cuda" if torch.cuda.is_available() else "cpu"

}

device = torch.device(test_config["device"])
print("Device:", device)

Device: cuda


In [None]:
SPECIAL_TOKS = {
    "<pad>": 0,
    "<sos>": 1,
    "<eos>": 2,
    "<unk>":3
}
PAD_ID = SPECIAL_TOKS["<pad>"]
SOS_ID = SPECIAL_TOKS["<sos>"]
EOS_ID = SPECIAL_TOKS["<eos>"]


def simple_tokenize(text: str) -> List[str]:
  return text.strip().split()


def encode_sentence(text: str, stoi: dict, seq_len: int) -> List[int]:
  tokens = simple_tokenize(text.lower())
  ids = [stoi.get(tok, SPECIAL_TOKS["<unk>"]) for tok in tokens]
  ids = [SOS_ID] + ids[:seq_len - 2] + [EOS_ID]
  pad_len = seq_len - len(ids)

  if pad_len > 0:
    ids += [PAD_ID] * pad_len
  return ids



# Masks (same as training)

def make_src_mask(src_ids, pad_idx=PAD_ID):
    mask = (src_ids != pad_idx).unsqueeze(1).unsqueeze(2)
    return mask.to(torch.int64)


def make_tgt_mask(tgt_ids, pad_idx = PAD_ID):
  batch, tgt_len = tgt_ids.shape
  padding_mask = (tgt_ids != pad_idx).unsqueeze(1).unsqueeze(2)
  causal = torch.tril(torch.ones((tgt_len, tgt_len), dtype = torch.int64)).unsqueeze(0).unsqueeze(1)
  mask =( padding_mask & causal.to(padding_mask.device))
  return mask.to(torch.int64)


#  Load vocab (normalize itos/stoi)
with open(test_config["vocab_path"], "r", encoding="utf-8") as f:
    vocabs = json.load(f)
src_vocab = vocabs["src"]
tgt_vocab = vocabs["tgt"]


#  normalize stoi (keys should be strings -> map to tokens) and itos (keys may be strings)
# Ensure stoi maps token->int and itos maps int->token

def normalize_vocab(v):
  stoi = v["stoi"]
  itos = v["itos"]


  new_stoi = {}
  for tok, idx in stoi.items():
    try:
      new_stoi[tok] = int(idx)
    except:
      new_stoi[tok] = idx


  new_itos = {}
  for idx, tok in itos.items():
    try:
      new_itos[int(idx)] = tok
    except:
      pass


  if len(new_itos) == 0:
    for t, i in new_stoi.items():
      new_itos[i] = t
  return {"stoi": new_stoi, "itos": new_itos}

src_vocab = normalize_vocab(src_vocab)
tgt_vocab = normalize_vocab(tgt_vocab)




In [None]:
TRAIN_SRC_VOCAB_SIZE = 20000
TRAIN_TGT_VOCAB_SIZE = 20000

model = build_transformer(
    src_vocab_size = TRAIN_SRC_VOCAB_SIZE,
    tgt_vocab_size = TRAIN_TGT_VOCAB_SIZE,
    src_seq_len = test_config["src_seq_len"],
    tgt_seq_len = test_config["tgt_seq_len"],
    d_model = test_config["d_model"],
    N = test_config["num_layers"],
    h = test_config["num_heads"],
    dropout = test_config["dropout"],
    d_ff= test_config["d_ff"],
)

model = model.to(device)

# load saved weights
state_path = test_config["save_path"]
try:
  model.load_state_dict(torch.load(state_path, map_location=device))
  print(f"Loaded model state from {state_path}")
except Exception as e:
  raise RuntimeError(f"Could not load moodel state from {state_path}:{e}")



Loaded model state from transformer_iitb.pth


In [None]:
def greedy_decode(
    model,
    src_sentence: str,
    src_vocab,
    tgt_vocab,
    max_len: int
):
    model.eval()

    # Encode source
    src_ids = encode_sentence(
        src_sentence,
        src_vocab["stoi"],
        test_config["src_seq_len"]
    )

    src_tensor = torch.tensor([src_ids], dtype=torch.long).to(device)
    src_mask = make_src_mask(src_tensor).to(device)

    with torch.no_grad():

        enc_out = model.encode(src_tensor, src_mask)

        generated = [SOS_ID]

        for step in range(max_len):


            dec_in = torch.tensor([generated], dtype=torch.long).to(device)
            tgt_mask = make_tgt_mask(dec_in).to(device)


            dec_out = model.decode(enc_out, src_mask, dec_in, tgt_mask)

            logits = model.project(dec_out)

            # last token prediction
            next_token = logits[0, -1].argmax(dim=-1).item()

            if next_token == EOS_ID:
                break

            generated.append(next_token)

    # Convert ids to words
    toks = []
    for i in generated:
        if i in (SOS_ID, PAD_ID):
            continue
        if i == EOS_ID:
            break
        toks.append(tgt_vocab["itos"].get(i, "<unk>"))

    return " ".join(toks) if toks else "<empty>"


In [None]:
# Sample sentences to test
test_sentences = [
    "The default plugin layout for the top panel",
    "Plugin with various methods of selecting accessibles quickly.",
    "Shows events as they occur from selected types and sources",
    "Validates application accessibility",
    "Hide private attributes",
    "Give your application an accessibility workout",
    "How are you",
    "What is your name?",
    "What are you doing? "
]
# Run inference and print
for sent in test_sentences:
    pred = greedy_decode(model, sent, src_vocab, tgt_vocab,
max_len=test_config["tgt_seq_len"])
    print("SRC :", sent)
    print("PRED:", pred)
    print("-" * 60)


SRC : The default plugin layout for the top panel
PRED: ऊपरी पटल के लिए डिफोल्ट प्लग-इन खाका
------------------------------------------------------------
SRC : Plugin with various methods of selecting accessibles quickly.
PRED: प्लग-इन जिसमें हैं <unk> को तेजी से चुनने के लिए कई विधियां
------------------------------------------------------------
SRC : Shows events as they occur from selected types and sources
PRED: चुने गए <unk> और स्रोतों से घटनाएं जैसे-जैसे घटित होती हैं, उन्हें दर्शाता है
------------------------------------------------------------
SRC : Validates application accessibility
PRED: अनुप्रयोग की पहुंचनीयता का सत्यापन करता है
------------------------------------------------------------
SRC : Hide private attributes
PRED: निजी गुणों को छुपाएँ
------------------------------------------------------------
SRC : Give your application an accessibility workout
PRED: अपने अनुप्रयोग को पहुंचनीयता व्यायाम का लाभ दें
------------------------------------------------------------
SRC