In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
from pathlib import Path
folder_path = Path("/content/drive/My Drive/Transformer_from_scratch")

In [3]:
import os
from pathlib import Path

source_folder = folder_path
mount_point = "/content/Transformer_from_scratch"

# Create a shortcut (symbolic link)
if not os.path.exists(mount_point):
  os.symlink(source_folder, mount_point)

# Text Preprocessing

In [4]:
with open("/content/drive/My Drive/Transformer_from_scratch/Data/hin.txt","r",encoding="utf-8") as file:
    content = file.readlines()

In [5]:
# To deal with weird characters if present
import unicodedata
def normalize(text):
    return unicodedata.normalize("NFKC", text)

## Add `<start>` and `<end>` tokens

In [6]:
english = []
hindi = []

for line in content:
    eng,hind,_ = line.split("\t")
    english.append(f"<start> {normalize(eng.strip().lower())} <end>")
    hindi.append(f"<start> {normalize(hind.strip().lower())} <end>")

## Creating english to hindi map

In [7]:
len(english) == len(hindi)

True

In [8]:
english_to_hindi_map = dict()
for i in range(0,len(english)):
    english_to_hindi_map[english[i]] = hindi[i]

In [9]:
english_to_hindi_map

{'<start> wow! <end>': '<start> वाह! <end>',
 '<start> duck! <end>': '<start> बतख़! <end>',
 '<start> help! <end>': '<start> बचाओ! <end>',
 '<start> jump. <end>': '<start> छलांग. <end>',
 '<start> hello! <end>': '<start> नमस्कार। <end>',
 '<start> cheers! <end>': '<start> चियर्स! <end>',
 '<start> exhale. <end>': '<start> सांस छोड़ो। <end>',
 '<start> got it? <end>': '<start> समझे कि नहीं? <end>',
 "<start> i'm ok. <end>": '<start> मैं ठीक हूँ। <end>',
 '<start> inhale. <end>': '<start> सांस लो। <end>',
 '<start> thanks! <end>': '<start> धन्यवाद! <end>',
 '<start> we won. <end>': '<start> हम जीते। <end>',
 '<start> awesome! <end>': '<start> बहुत बढ़िया! <end>',
 '<start> come in. <end>': '<start> अंदर आ जाओ। <end>',
 '<start> get out! <end>': '<start> बाहर निकल जाओ! <end>',
 '<start> go away! <end>': '<start> चले जाओ! <end>',
 '<start> goodbye! <end>': '<start> ख़ुदा हाफ़िज़। <end>',
 '<start> perfect! <end>': '<start> सही! <end>',
 '<start> we lost. <end>': '<start> हम हार गए। <end>',

## Tokenizing using HugginFace Tokenizer

In [10]:
with open("/content/drive/My Drive/Transformer_from_scratch/Data/english.txt", "w", encoding="utf-8") as f:
    for line in english:
        f.write(line + "\n")

with open("/content/drive/My Drive/Transformer_from_scratch/Data/hindi.txt", "w", encoding="utf-8") as f:
    for line in hindi:
        f.write(line + "\n")

In [11]:
from tokenizers import Tokenizer, models, trainers, pre_tokenizers, normalizers

# English tokenizer
eng_tokenizer = Tokenizer(models.BPE())
eng_tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()

trainer = trainers.BpeTrainer(vocab_size=3000, special_tokens=["<pad>", "<unk>", "<start>", "<end>"])

eng_tokenizer.train(files=["/content/drive/My Drive/Transformer_from_scratch/Data/english.txt"], trainer=trainer)
eng_tokenizer.save("eng_tokenizer.json")

# Hindi tokenizer
hin_tokenizer = Tokenizer(models.BPE())
hin_tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()

trainer = trainers.BpeTrainer(vocab_size=3000, special_tokens=["<pad>", "<unk>", "<start>", "<end>"])

hin_tokenizer.train(files=["/content/drive/My Drive/Transformer_from_scratch/Data/hindi.txt"], trainer=trainer)
hin_tokenizer.save("hin_tokenizer.json")

In [12]:
from tokenizers import Tokenizer

eng_tok = Tokenizer.from_file("eng_tokenizer.json")
hin_tok = Tokenizer.from_file("hin_tokenizer.json")

In [13]:
encoder_input_ids = []
decoder_input_ids = []
decoder_target_ids = []

for eng_sent, hin_sent in zip(english, hindi):
    eng_ids = eng_tok.encode(eng_sent).ids
    hin_ids = hin_tok.encode(hin_sent).ids

    decoder_input_ids.append(hin_ids[:-1])

    decoder_target_ids.append(hin_ids[1:])

    encoder_input_ids.append(eng_ids)


In [14]:
vocab_en =eng_tok.get_vocab()
vocab_hi = hin_tok.get_vocab()

vocab_size_en = len(vocab_en)
vocab_size_hi = len(vocab_hi)

pad_idx_en = vocab_en["<pad>"]
pad_idx_hi = vocab_hi["<pad>"]


## Padding

In [15]:
from torch.nn.utils.rnn import pad_sequence
import torch

# Convert to tensors
encoder_input_ids = [torch.tensor(seq) for seq in encoder_input_ids]
decoder_input_ids = [torch.tensor(seq) for seq in decoder_input_ids]
decoder_target_ids = [torch.tensor(seq) for seq in decoder_target_ids]

# Pad all sequences
encoder_input_ids = pad_sequence(encoder_input_ids, batch_first=True, padding_value=0)
decoder_input_ids = pad_sequence(decoder_input_ids, batch_first=True, padding_value=0)
decoder_target_ids = pad_sequence(decoder_target_ids, batch_first=True, padding_value=0)


In [16]:
encoder_input_ids.shape,decoder_input_ids.shape,decoder_target_ids.shape

(torch.Size([3116, 31]), torch.Size([3116, 33]), torch.Size([3116, 33]))

# Custom Dataset class

In [17]:
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split

In [18]:
class EngToHindiDataset(Dataset):
    def __init__(self,train:bool,encoder_input_ids=encoder_input_ids,decoder_input_ids=decoder_input_ids,decoder_target_ids=decoder_target_ids):
        super().__init__()
        self.train = train
        self.encoder_input_ids = encoder_input_ids
        self.decoder_input_ids = decoder_input_ids
        self.decoder_target_ids = decoder_target_ids

        self.target = list(zip(decoder_input_ids,decoder_target_ids))

        self.X_train,self.X_test,self.y_train,self.y_test = train_test_split(self.encoder_input_ids,self.target,test_size=0.2,random_state=42)

    def __len__(self):
        if self.train:
            return len(self.y_train)
        else:
            return len(self.y_test)

    def __getitem__(self,idx):
        if self.train:
            decoder_input,decoder_target = self.y_train[idx]
            return {
              "encoder_input": self.X_train[idx],
              "decoder_input": decoder_input,
              "decoder_target": decoder_target
            }

        else:
            decoder_input,decoder_target = self.y_test[idx]
            return {
              "encoder_input": self.X_test[idx],
              "decoder_input": decoder_input,
              "decoder_target": decoder_target
            }

In [19]:
train_data = EngToHindiDataset(train=True)
test_data = EngToHindiDataset(train=False)
len(train_data),len(test_data)

(2492, 624)

# DataLoader

In [20]:
from torch.utils.data import DataLoader
import os

BATCH_SIZE = 32
NUM_WORKERS = os.cpu_count()
train_dataloader = DataLoader(train_data,num_workers=NUM_WORKERS,batch_size=BATCH_SIZE,shuffle=True,pin_memory=True)
test_dataloader = DataLoader(test_data,num_workers=NUM_WORKERS,batch_size=BATCH_SIZE,shuffle=False,pin_memory=False)

# Transformer from scratch

In [21]:
from torch import nn
import torch
import math

## Self Attention Implementation

In [22]:
class SelfAttention(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self,Q,K,V,mask=None):
        d_k = Q.size(-1)
        scores = torch.matmul(Q,K.transpose(-2,-1))/ math.sqrt(d_k)

        if mask is not None:
            scores = scores.masked_fill(mask==0,float("-inf"))

        attention = torch.softmax(scores,dim=-1)

        output = torch.matmul(attention,V)

        return output,attention

## Multi head Attention Implementation

In [23]:
class MultiHeadAttention(nn.Module):
    def __init__(self,embed_dim,num_heads):

        super().__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = self.embed_dim//self.num_heads

        self.q_linear = nn.Linear(self.embed_dim,self.embed_dim)
        self.k_linear = nn.Linear(self.embed_dim,self.embed_dim)
        self.v_linear = nn.Linear(self.embed_dim,self.embed_dim)

        self.fc_out = nn.Linear(self.embed_dim,self.embed_dim)
        self.attention = SelfAttention()
    def forward(self,query,key,value,mask=None):

        B, q_len, _ = query.shape
        _, k_len, _ = key.shape
        _, v_len, _ = value.shape

        Q = self.q_linear(query)
        K = self.k_linear(key)
        V = self.v_linear(value)



        Q = Q.view(B, q_len, self.num_heads, self.head_dim).transpose(1, 2)
        K = K.view(B, k_len, self.num_heads, self.head_dim).transpose(1, 2)
        V = V.view(B, v_len, self.num_heads, self.head_dim).transpose(1, 2)

        out,attn = self.attention(Q,K,V,mask)

        out = out.transpose(1,2).contiguous().view(B,q_len,self.embed_dim)

        out = self.fc_out(out)

        return out

## Positional Encoding

In [24]:
import math
class PositionalEncoding(nn.Module):
    def __init__(self, embed_dim, max_len=5000):
        super().__init__()

        pe = torch.zeros(max_len, embed_dim)

        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # shape: (max_len, 1)

        div_term = torch.exp(torch.arange(0, embed_dim, 2).float() * (-math.log(10000.0) / embed_dim))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)

        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return x

## Transformer Encoder implementation

In [25]:
class TransformerEncoder(nn.Module):
    def __init__(self,embed_dim,num_heads,ffn_hidden_units,dropout=0.1):
        super().__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.ffn_hidden_units = ffn_hidden_units
        self.dropout = dropout
        self.mha = MultiHeadAttention(self.embed_dim,self.num_heads)
        self.layernorm1 = nn.LayerNorm(embed_dim)
        self.ff = nn.Sequential(
            nn.Linear(self.embed_dim,self.ffn_hidden_units),
            nn.ReLU(),
            nn.Linear(self.ffn_hidden_units,self.embed_dim)
        )
        self.layernorm2 = nn.LayerNorm(embed_dim)
        self.dropout = nn.Dropout(self.dropout)

    def forward(self,x,mask=None):
        attn_out = self.mha(x,x,x,mask)
        x = self.layernorm1(x + self.dropout(attn_out))

        ff_out = self.ff(x)
        x = self.layernorm2(x + self.dropout(ff_out))

        return x

### Full encoder implementation

In [26]:
class FullEncoder(nn.Module):
    def __init__(self,vocab_size,embed_dim,num_heads,ffn_hidden_units,num_layers,dropout=0.1,max_len=5000):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size,embed_dim)
        self.positional_encoding = PositionalEncoding(embed_dim,max_len)
        self.dropout = nn.Dropout(dropout)

        self.encoder_blocks = nn.ModuleList([
            TransformerEncoder(embed_dim,num_heads,ffn_hidden_units,dropout)
            for _ in range(num_layers)
        ])


    def forward(self,input_ids,mask=None):
        x = self.embedding(input_ids)
        x = self.positional_encoding(x)
        x = self.dropout(x)

        for block in self.encoder_blocks:
            x = block(x,mask)

        return x

## Decoder implementation

In [27]:
class TransformerDecoder(nn.Module):
    def __init__(self, embed_dim, num_heads, ffn_hidden_units, dropout=0.1):
        super().__init__()

        self.masked_mha = MultiHeadAttention(embed_dim, num_heads)
        self.cross_mha = MultiHeadAttention(embed_dim, num_heads)

        self.layernorm1 = nn.LayerNorm(embed_dim)
        self.layernorm2 = nn.LayerNorm(embed_dim)
        self.layernorm3 = nn.LayerNorm(embed_dim)

        self.ff = nn.Sequential(
            nn.Linear(embed_dim, ffn_hidden_units),
            nn.ReLU(),
            nn.Linear(ffn_hidden_units, embed_dim)
        )

        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, tgt_mask=None, memory_mask=None):
        _x = self.masked_mha(x, x, x, tgt_mask)
        x = self.layernorm1(x + self.dropout(_x))

        _x = self.cross_mha(x, enc_output, enc_output, memory_mask)
        x = self.layernorm2(x + self.dropout(_x))

        _x = self.ff(x)
        x = self.layernorm3(x + self.dropout(_x))

        return x


### Full Decoder implementation

In [28]:
class FullDecoder(nn.Module):
    def __init__(self,vocab_size, embed_dim, num_heads, ffn_hidden_units, num_layers, dropout=0.1, max_len=5000):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size,embed_dim)
        self.positional_encoding = PositionalEncoding(embed_dim,max_len)

        self.dropout = nn.Dropout(dropout)

        self.decoder_blocks = nn.ModuleList([
            TransformerDecoder(embed_dim,num_heads,ffn_hidden_units)

            for _ in range(num_layers)
        ])

        self.fc_out = nn.Linear(embed_dim,vocab_size)

    def forward(self, tgt_ids, enc_output, tgt_mask=None, memory_mask=None):
        x = self.embedding(tgt_ids)
        x = self.positional_encoding(x)

        x = self.dropout(x)
        for block in self.decoder_blocks:
            x = block(x,enc_output,tgt_mask,memory_mask)

        out = self.fc_out(x)

        return out

## "Putting everything together" TRANSFORMER

In [29]:
class Transformer(nn.Module):
    def __init__(self,
                 src_vocab_size, tgt_vocab_size,
                 embed_dim, num_heads,
                 ffn_hidden_units, num_layers,
                 dropout=0.1, max_len=5000):
        super().__init__()

        self.encoder = FullEncoder(
            vocab_size=src_vocab_size,
            embed_dim=embed_dim,
            num_heads=num_heads,
            ffn_hidden_units=ffn_hidden_units,
            num_layers=num_layers,
            dropout=dropout,
            max_len=max_len
        )

        self.decoder = FullDecoder(
            vocab_size=tgt_vocab_size,
            embed_dim=embed_dim,
            num_heads=num_heads,
            ffn_hidden_units=ffn_hidden_units,
            num_layers=num_layers,
            dropout=dropout,
            max_len=max_len
        )


    def forward(self, src_ids, tgt_ids, src_mask=None, tgt_mask=None):
        enc_output = self.encoder(src_ids, src_mask)
        output = self.decoder(tgt_ids, enc_output, tgt_mask, src_mask)
        return output

## Masking (padding and look ahead)

In [30]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [31]:
def create_padding_mask(seq, pad_idx=0):
    return (seq != pad_idx).unsqueeze(1).unsqueeze(2)

In [32]:
def create_look_ahead_mask(size):
    mask = torch.triu(torch.ones((size, size)), diagonal=1).type(torch.bool)
    return mask.unsqueeze(0).unsqueeze(1)

In [33]:
def create_decoder_mask(tgt_seq, pad_idx=0):
    seq_len = tgt_seq.size(1)
    device = tgt_seq.device

    padding_mask = create_padding_mask(tgt_seq, pad_idx).to(device)
    look_ahead_mask = create_look_ahead_mask(seq_len).to(device)

    combined_mask = padding_mask & (~look_ahead_mask)
    return combined_mask


# Training

In [47]:
def greedy_decode(model, src_ids, tokenizer_tgt, pad_idx, start_token="<start>", end_token="<end>", max_len=50, device="cuda"):
    model.eval()
    src_ids = src_ids.unsqueeze(0).to(device)
    src_mask = create_padding_mask(src_ids, pad_idx)

    with torch.no_grad():
        enc_output = model.encoder(src_ids, src_mask)
        tgt_ids = torch.tensor([[tokenizer_tgt.token_to_id(start_token)]], device=device)

        for _ in range(max_len):
            tgt_mask = create_decoder_mask(tgt_ids, pad_idx)
            output = model.decoder(tgt_ids, enc_output, tgt_mask, src_mask)
            next_token_logits = output[:, -1, :]
            next_token = torch.argmax(next_token_logits, dim=-1).unsqueeze(1)

            tgt_ids = torch.cat([tgt_ids, next_token], dim=1)

            if next_token.item() == tokenizer_tgt.token_to_id(end_token):
                break

    return tgt_ids.squeeze().tolist()


## BLEU score evaluation function

In [48]:
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
nltk.download("punkt")

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [49]:
def evaluate_bleu(model, dataloader, tokenizer_src, tokenizer_tgt, pad_idx, max_samples=100, device="cuda"):
    model.eval()
    smoothie = SmoothingFunction().method4
    total_bleu = 0
    count = 0

    for batch in dataloader:
        src_ids = batch["encoder_input"].to(device)
        tgt_target_ids = batch["decoder_target"].to(device)

        for i in range(src_ids.size(0)):
            src_sentence_ids = src_ids[i]
            tgt_sentence_ids = tgt_target_ids[i]

            ref_tokens = tokenizer_tgt.decode([
                tok for tok in tgt_sentence_ids.tolist()
                if tok != pad_idx and tok not in [
                    tokenizer_tgt.token_to_id("<start>"),
                    tokenizer_tgt.token_to_id("<end>")
                ]
            ]).split()

            pred_ids = greedy_decode(
                model,
                src_sentence_ids,
                tokenizer_tgt=tokenizer_tgt,
                pad_idx=pad_idx,
                start_token="<start>",
                end_token="<end>",
                device=device
            )

            pred_tokens = tokenizer_tgt.decode([
                tok for tok in pred_ids
                if tok != pad_idx and tok not in [
                    tokenizer_tgt.token_to_id("<start>"),
                    tokenizer_tgt.token_to_id("<end>")
                ]
            ]).split()

            bleu_score = sentence_bleu([ref_tokens], pred_tokens, smoothing_function=smoothie)
            total_bleu += bleu_score
            count += 1

            if count >= max_samples:
                break
        if count >= max_samples:
            break

    avg_bleu = total_bleu / count if count > 0 else 0
    print(f"\nAverage BLEU Score (on {count} samples): {avg_bleu:.4f}")


In [50]:
from tqdm.auto import tqdm
def train_model(
    model, train_dataloader, test_dataloader,
    tokenizer_src, tokenizer_tgt,
    optimizer, loss_fn,
    pad_idx_src, pad_idx_tgt,
    num_epochs=10, device="cuda"
):
    for epoch in tqdm(range(num_epochs)):
        model.train()
        total_loss = 0

        for batch in train_dataloader:
            src_ids = batch["encoder_input"].to(device)
            tgt_input_ids = batch["decoder_input"].to(device)
            tgt_target_ids = batch["decoder_target"].to(device)


            src_mask = create_padding_mask(src_ids, pad_idx_src)
            tgt_mask = create_decoder_mask(tgt_input_ids, pad_idx_tgt)


            output = model(src_ids, tgt_input_ids, src_mask, tgt_mask)


            output = output.view(-1, output.shape[-1])
            tgt_target_ids = tgt_target_ids.view(-1)


            loss = loss_fn(output, tgt_target_ids)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(train_dataloader)
        print(f"\nEpoch {epoch+1}/{num_epochs} | Train Loss: {avg_loss:.4f}")

        evaluate_bleu(
            model=model,
            dataloader=test_dataloader,
            tokenizer_src=tokenizer_src,
            tokenizer_tgt=tokenizer_tgt,
            pad_idx=pad_idx_tgt,
            max_samples=100
        )


In [51]:
model = Transformer(
    src_vocab_size=vocab_size_en,
    tgt_vocab_size=vocab_size_hi,
    embed_dim=256,
    num_heads=4,
    ffn_hidden_units=512,
    num_layers=2,
    dropout=0.1,
    max_len=100
).to(device)


In [52]:
loss_fn = nn.CrossEntropyLoss(ignore_index=0).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001,betas = (0.9, 0.98),
eps = 1e-9) # Acc paper

In [53]:
train_model(
    model=model,
    train_dataloader=train_dataloader,
    test_dataloader=test_dataloader,
    tokenizer_src=eng_tok,
    tokenizer_tgt=hin_tok,
    optimizer=optimizer,
    loss_fn=loss_fn,
    pad_idx_src=pad_idx_en,
    pad_idx_tgt=pad_idx_hi,
    num_epochs=50,
    device=device
)


  0%|          | 0/50 [00:00<?, ?it/s]


Epoch 1/50 | Train Loss: 6.4435

Average BLEU Score (on 100 samples): 0.0040

Epoch 2/50 | Train Loss: 5.3784

Average BLEU Score (on 100 samples): 0.0340

Epoch 3/50 | Train Loss: 5.0213

Average BLEU Score (on 100 samples): 0.0426

Epoch 4/50 | Train Loss: 4.7859

Average BLEU Score (on 100 samples): 0.0440

Epoch 5/50 | Train Loss: 4.5988

Average BLEU Score (on 100 samples): 0.0457

Epoch 6/50 | Train Loss: 4.4303

Average BLEU Score (on 100 samples): 0.0487

Epoch 7/50 | Train Loss: 4.2819

Average BLEU Score (on 100 samples): 0.0544

Epoch 8/50 | Train Loss: 4.1395

Average BLEU Score (on 100 samples): 0.0613

Epoch 9/50 | Train Loss: 4.0220

Average BLEU Score (on 100 samples): 0.0599

Epoch 10/50 | Train Loss: 3.9019

Average BLEU Score (on 100 samples): 0.0629

Epoch 11/50 | Train Loss: 3.7849

Average BLEU Score (on 100 samples): 0.0595

Epoch 12/50 | Train Loss: 3.6771

Average BLEU Score (on 100 samples): 0.0624

Epoch 13/50 | Train Loss: 3.5709

Average BLEU Score (on 100

In [64]:
def translate_sentence(sentence, model, tokenizer_src, tokenizer_tgt, pad_idx_src, start_token="<start>", end_token="<end>", device="cuda"):
    model.eval()

    sentence = f"{start_token} {sentence.lower()} {end_token}"

    input_ids = torch.tensor(tokenizer_src.encode(sentence).ids).unsqueeze(0).to(device)

    output_ids = greedy_decode(
        model=model,
        src_ids=input_ids.squeeze(0),
        tokenizer_tgt=tokenizer_tgt,
        pad_idx=pad_idx_src,
        start_token=start_token,
        end_token=end_token,
        device=device
    )


    output_text = tokenizer_tgt.decode([
        idx for idx in output_ids
        if idx != tokenizer_tgt.token_to_id(start_token) and idx != tokenizer_tgt.token_to_id(end_token) and idx != pad_idx_src
    ])

    return output_text


In [70]:
random_english = "i am fine"
translated_hindi = translate_sentence(
    sentence=random_english,
    model=model,
    tokenizer_src=eng_tok,
    tokenizer_tgt=hin_tok,
    pad_idx_src=pad_idx_en,
    device=device
)

print(f"English: {random_english}")
print(f"Hindi Translation: {translated_hindi}")


English: i am fine
Hindi Translation: मैं ठीक हूँ हूँ क्या हूँ
