In [None]:
# Imports
import torch
import torch.nn as nn
import math

In [None]:
# Defines a class for input embeddings with scaling based on the model's dimensionality.
class InputEmbeddings(nn.Module):
    def __init__(self, d_model: int, vocab_size: int) -> None:
        super().__init__()
        self.d_model = d_model
        self.vocab_size = vocab_size
        self.embedding = nn.Embedding(vocab_size, d_model)
    def forward(self, x):
        return self.embedding(x) * math.sqrt(self.d_model)

In [None]:
# Defines classes for positional encoding with sinusoidal functions and a residual connection with layer normalization.
class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, seq_len: int, dropout: float) -> None:
        super().__init__()
        self.d_model = d_model
        self.seq_len = seq_len
        self.dropout = nn.Dropout(dropout)
        pe = torch.zeros(seq_len, d_model)
        position = torch.arange(0, seq_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + (self.pe[:, :x.shape[1], :]).requires_grad_(False)
        return self.dropout(x)

class ResidualConnection(nn.Module):

        def __init__(self, features: int, dropout: float) -> None:
            super().__init__()
            self.dropout = nn.Dropout(dropout)
            self.norm = LayerNormalization(features)

        def forward(self, x, sublayer):
            return x + self.dropout(sublayer(self.norm(x)))

In [None]:
# Implements layer normalization to stabilize and accelerate training by normalizing the input across features.
class LayerNormalization(nn.Module):
    def __init__(self, features: int, eps:float=10**-6) -> None:
        super().__init__()
        self.eps = eps
        self.alpha = nn.Parameter(torch.ones(features))
        self.bias = nn.Parameter(torch.zeros(features))

    def forward(self, x):
        mean = x.mean(dim = -1, keepdim = True)
        std = x.std(dim = -1, keepdim = True)
        return self.alpha * (x - mean) / (std + self.eps) + self.bias

In [None]:
# Defines a feedforward neural network block with two linear layers and dropout for regularization.
class FeedForwardBlock(nn.Module):
    def __init__(self, d_model: int, d_ff: int, dropout: float) -> None:
        super().__init__()
        self.linear_1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.linear_2(self.dropout(torch.relu(self.linear_1(x))))

In [None]:
# Implements a multi-head attention block with query, key, and value transformations, and scaled dot-product attention.
class MultiHeadAttentionBlock(nn.Module):

    def __init__(self, d_model: int, h: int, dropout: float) -> None:
        super().__init__()
        self.d_model = d_model
        self.h = h
        assert d_model % h == 0, "d_model is not divisible by h"

        self.d_k = d_model // h
        self.w_q = nn.Linear(d_model, d_model, bias=False)
        self.w_k = nn.Linear(d_model, d_model, bias=False)
        self.w_v = nn.Linear(d_model, d_model, bias=False)
        self.w_o = nn.Linear(d_model, d_model, bias=False)
        self.dropout = nn.Dropout(dropout)

    @staticmethod
    def attention(query, key, value, mask, dropout: nn.Dropout):
        d_k = query.shape[-1]
        attention_scores = (query @ key.transpose(-2, -1)) / math.sqrt(d_k)
        if mask is not None:
            attention_scores.masked_fill_(mask == 0, -1e9)
        attention_scores = attention_scores.softmax(dim=-1)
        if dropout is not None:
            attention_scores = dropout(attention_scores)
        return (attention_scores @ value), attention_scores

    def forward(self, q, k, v, mask):
        query = self.w_q(q)
        key = self.w_k(k)
        value = self.w_v(v)
        query = query.view(query.shape[0], query.shape[1], self.h, self.d_k).transpose(1, 2)
        key = key.view(key.shape[0], key.shape[1], self.h, self.d_k).transpose(1, 2)
        value = value.view(value.shape[0], value.shape[1], self.h, self.d_k).transpose(1, 2)
        x, self.attention_scores = MultiHeadAttentionBlock.attention(query, key, value, mask, self.dropout)
        x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.h * self.d_k)
        return self.w_o(x)

In [None]:
# Defines a residual connection with layer normalization and dropout for improved gradient flow.
class ResidualConnection(nn.Module):
        def __init__(self, features: int, dropout: float) -> None:
            super().__init__()
            self.dropout = nn.Dropout(dropout)
            self.norm = LayerNormalization(features)

        def forward(self, x, sublayer):
            return x + self.dropout(sublayer(self.norm(x)))

In [None]:
# Implements an encoder block with self-attention, feedforward layers, and residual connections, as well as an encoder composed of multiple layers.
class EncoderBlock(nn.Module):
    def __init__(self, features: int, self_attention_block: MultiHeadAttentionBlock, feed_forward_block: FeedForwardBlock, dropout: float) -> None:
        super().__init__()
        self.self_attention_block = self_attention_block
        self.feed_forward_block = feed_forward_block
        self.residual_connections = nn.ModuleList([ResidualConnection(features, dropout) for _ in range(2)])

    def forward(self, x, src_mask):
        x = self.residual_connections[0](x, lambda x: self.self_attention_block(x, x, x, src_mask))
        x = self.residual_connections[1](x, self.feed_forward_block)
        return x

class Encoder(nn.Module):
    def __init__(self, features: int, layers: nn.ModuleList) -> None:
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization(features)

    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

In [None]:
# Implements a decoder block with self-attention, cross-attention, feedforward layers, and residual connections, as well as a decoder composed of multiple layers.
class DecoderBlock(nn.Module):
    def __init__(self, features: int, self_attention_block: MultiHeadAttentionBlock, cross_attention_block: MultiHeadAttentionBlock, feed_forward_block: FeedForwardBlock, dropout: float) -> None:
        super().__init__()
        self.self_attention_block = self_attention_block
        self.cross_attention_block = cross_attention_block
        self.feed_forward_block = feed_forward_block
        self.residual_connections = nn.ModuleList([ResidualConnection(features, dropout) for _ in range(3)])

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        x = self.residual_connections[0](x, lambda x: self.self_attention_block(x, x, x, tgt_mask))
        x = self.residual_connections[1](x, lambda x: self.cross_attention_block(x, encoder_output, encoder_output, src_mask))
        x = self.residual_connections[2](x, self.feed_forward_block)
        return x

class Decoder(nn.Module):
    def __init__(self, features: int, layers: nn.ModuleList) -> None:
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization(features)
    def forward(self, x, encoder_output, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, encoder_output, src_mask, tgt_mask)
        return self.norm(x)

In [None]:
# Defines a projection layer that maps the model's output to the vocabulary size for each sequence element.
class ProjectionLayer(nn.Module):
    def __init__(self, d_model, vocab_size) -> None:
        super().__init__()
        self.proj = nn.Linear(d_model, vocab_size)
    def forward(self, x) -> None:
        return self.proj(x)

In [None]:
def causal_mask(size):
    mask = torch.triu(torch.ones(size, size), diagonal=1)
    return mask.masked_fill(mask == 1, float('-inf')).masked_fill(mask == 0, float(0.0))


In [None]:
# Defines a Transformer model with encoding, decoding, and projection layers for sequence-to-sequence tasks.
class Transformer(nn.Module):

    def __init__(self, encoder: Encoder, decoder: Decoder, src_embed: InputEmbeddings, tgt_embed: InputEmbeddings, src_pos: PositionalEncoding, tgt_pos: PositionalEncoding, projection_layer: ProjectionLayer) -> None:
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_embed = src_embed
        self.tgt_embed = tgt_embed
        self.src_pos = src_pos
        self.tgt_pos = tgt_pos
        self.projection_layer = projection_layer

    def encode(self, src, src_mask):
        src = self.src_embed(src)
        src = self.src_pos(src)
        return self.encoder(src, src_mask)

    def decode(self, encoder_output: torch.Tensor, src_mask: torch.Tensor, tgt: torch.Tensor, tgt_mask: torch.Tensor):
        tgt = self.tgt_embed(tgt)
        tgt = self.tgt_pos(tgt)
        return self.decoder(tgt, encoder_output, src_mask, tgt_mask)

    def project(self, x):
        return self.projection_layer(x)

In [None]:
# Builds a Transformer model with specified hyperparameters, including encoder and decoder blocks, embedding layers, and positional encodings.
def build_transformer(src_vocab_size: int, tgt_vocab_size: int, src_seq_len: int, tgt_seq_len: int, d_model: int=512, N: int=6, h: int=8, dropout: float=0.1, d_ff: int=2048) -> Transformer:
    src_embed = InputEmbeddings(d_model, src_vocab_size)
    tgt_embed = InputEmbeddings(d_model, tgt_vocab_size)
    src_pos = PositionalEncoding(d_model, src_seq_len, dropout)
    tgt_pos = PositionalEncoding(d_model, tgt_seq_len, dropout)
    encoder_blocks = []
    for _ in range(N):
        encoder_self_attention_block = MultiHeadAttentionBlock(d_model, h, dropout)
        feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
        encoder_block = EncoderBlock(d_model, encoder_self_attention_block, feed_forward_block, dropout)
        encoder_blocks.append(encoder_block)
    decoder_blocks = []
    for _ in range(N):
        decoder_self_attention_block = MultiHeadAttentionBlock(d_model, h, dropout)
        decoder_cross_attention_block = MultiHeadAttentionBlock(d_model, h, dropout)
        feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
        decoder_block = DecoderBlock(d_model, decoder_self_attention_block, decoder_cross_attention_block, feed_forward_block, dropout)
        decoder_blocks.append(decoder_block)
    encoder = Encoder(d_model, nn.ModuleList(encoder_blocks))
    decoder = Decoder(d_model, nn.ModuleList(decoder_blocks))
    projection_layer = ProjectionLayer(d_model, tgt_vocab_size)
    transformer = Transformer(encoder, decoder, src_embed, tgt_embed, src_pos, tgt_pos, projection_layer)
    for p in transformer.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)

    return transformer

In [None]:
import pandas as pd

In [None]:
csv = pd.read_csv("/kaggle/input/english-to-hindi-parallel-dataset/newdata.csv")
csv.head(5)

Unnamed: 0.1,Unnamed: 0,english_sentence,hindi_sentence
0,0,politicians do not have permission to do what ...,"राजनीतिज्ञों के पास जो कार्य करना चाहिए, वह कर..."
1,1,"I'd like to tell you about one such child,",मई आपको ऐसे ही एक बच्चे के बारे में बताना चाहू...
2,2,This percentage is even greater than the perce...,यह प्रतिशत भारत में हिन्दुओं प्रतिशत से अधिक है।
3,3,what we really mean is that they're bad at not...,हम ये नहीं कहना चाहते कि वो ध्यान नहीं दे पाते
4,4,.The ending portion of these Vedas is called U...,इन्हीं वेदों का अंतिम भाग उपनिषद कहलाता है।


In [None]:
csv.drop("Unnamed: 0",axis=1,inplace=True)
csv.dropna()
csv = csv.dropna()
csv.head(5)

Unnamed: 0,english_sentence,hindi_sentence
0,politicians do not have permission to do what ...,"राजनीतिज्ञों के पास जो कार्य करना चाहिए, वह कर..."
1,"I'd like to tell you about one such child,",मई आपको ऐसे ही एक बच्चे के बारे में बताना चाहू...
2,This percentage is even greater than the perce...,यह प्रतिशत भारत में हिन्दुओं प्रतिशत से अधिक है।
3,what we really mean is that they're bad at not...,हम ये नहीं कहना चाहते कि वो ध्यान नहीं दे पाते
4,.The ending portion of these Vedas is called U...,इन्हीं वेदों का अंतिम भाग उपनिषद कहलाता है।


In [None]:
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam
from tokenizers import Tokenizer, trainers
from tokenizers.models import WordLevel
from tokenizers.pre_tokenizers import Whitespace
from pathlib import Path
from tqdm import tqdm

In [None]:
# Processes the dataset by reading, tokenizing, splitting into training and validation sets, and creating data loaders.
def process_data(config):
    df = pd.read_csv(config["data_path"]).dropna().reset_index(drop=True)
    tokenizer_src = build_tokenizer(df["english_sentence"], config["tokenizer_file"].format("en"))
    tokenizer_tgt = build_tokenizer(df["hindi_sentence"], config["tokenizer_file"].format("hi"))

    train_size = int(len(df) * 0.9)
    train_df, val_df = df.iloc[:train_size], df.iloc[train_size:]

    train_ds = BilingualDataset(train_df, tokenizer_src, tokenizer_tgt, config["seq_len"])
    val_ds = BilingualDataset(val_df, tokenizer_src, tokenizer_tgt, config["seq_len"])

    train_loader = DataLoader(train_ds, batch_size=config["batch_size"], shuffle=True)
    val_loader = DataLoader(val_ds, batch_size=1)

    return train_loader, val_loader, tokenizer_src, tokenizer_tgt


In [None]:
# build_tokenizer creates or loads a tokenizer, while BilingualDataset tokenizes and prepares bilingual text data with special tokens and masks for training.
def build_tokenizer(texts, tokenizer_path):
    if Path(tokenizer_path).exists():
        return Tokenizer.from_file(tokenizer_path)

    tokenizer = Tokenizer(WordLevel(unk_token="[UNK]"))
    tokenizer.pre_tokenizer = Whitespace()
    trainer = trainers.WordLevelTrainer(special_tokens=["[UNK]", "[PAD]", "[SOS]", "[EOS]"])
    tokenizer.train_from_iterator(texts, trainer)
    tokenizer.save(tokenizer_path)
    return tokenizer

class BilingualDataset(Dataset):
    def __init__(self, df, tokenizer_src, tokenizer_tgt, seq_len):
        self.df = df
        self.tokenizer_src = tokenizer_src
        self.tokenizer_tgt = tokenizer_tgt
        self.seq_len = seq_len

        self.sos_token = tokenizer_tgt.token_to_id("[SOS]")
        self.eos_token = tokenizer_tgt.token_to_id("[EOS]")
        self.pad_token = tokenizer_tgt.token_to_id("[PAD]")

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        src_text = self.df.iloc[idx]["english_sentence"]
        tgt_text = self.df.iloc[idx]["hindi_sentence"]
        enc_input = [self.sos_token] + self.tokenizer_src.encode(src_text).ids + [self.eos_token]
        dec_input = [self.sos_token] + self.tokenizer_tgt.encode(tgt_text).ids
        label = self.tokenizer_tgt.encode(tgt_text).ids + [self.eos_token]
        enc_input = (enc_input + [self.pad_token] * self.seq_len)[:self.seq_len]
        dec_input = (dec_input + [self.pad_token] * self.seq_len)[:self.seq_len]
        label = (label + [self.pad_token] * self.seq_len)[:self.seq_len]
        encoder_mask = torch.tensor(enc_input) != self.pad_token
        decoder_mask = torch.tensor(dec_input) != self.pad_token
        return {
            "encoder_input": torch.tensor(enc_input),
            "decoder_input": torch.tensor(dec_input),
            "label": torch.tensor(label),
            "encoder_mask": encoder_mask.unsqueeze(0).unsqueeze(0).int(),
            "decoder_mask": decoder_mask.unsqueeze(0).unsqueeze(0).int(),
            "src_text": src_text,
            "tgt_text": tgt_text,
        }

In [None]:
# Configuration dictionary
def main():
    config = {
        'batch_size': 8,
        'num_epochs': 1,
        'lr': 1e-4,
        'seq_len': 350,
        'd_model': 512,
        'data_path': "/kaggle/input/english-to-hindi-parallel-dataset/newdata.csv",
        'model_folder': 'weights',
        'model_basename': 'en_hi_transformer_',
        'preload': None,
        'tokenizer_file': 'tokenizer_{0}.json',
        'experiment_name': 'runs/en_hi_transformer'
    }


In [None]:
# Create model directory if it doesn't exist
Path(config['model_folder']).mkdir(parents=True, exist_ok=True)

In [None]:
# Process data and get DataLoader objects
print("Processing data...")
train_dataloader, val_dataloader, tokenizer_src, tokenizer_tgt = process_data(config)


In [None]:
# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")


In [None]:
# Build transformer model
model = build_transformer(
        src_vocab_size=tokenizer_src.get_vocab_size(),
        tgt_vocab_size=tokenizer_tgt.get_vocab_size(),
        src_seq_len=config['seq_len'],
        tgt_seq_len=config['seq_len'],
        d_model=config['d_model']).to(device)


In [None]:
# Optimizer and loss function
optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'], eps=1e-9)
loss_fn = nn.CrossEntropyLoss(
        ignore_index=tokenizer_src.token_to_id('[PAD]'),
        label_smoothing=0.1).to(device)


In [None]:
# TensorBoard writer
writer = SummaryWriter(config['experiment_name'])
    print("Starting training...")
    global_step = 0

for epoch in range(config['num_epochs']):
    model.train()
    batch_iterator = tqdm(train_dataloader, desc=f"Processing Epoch {epoch:02d}")
    total_loss = 0

    for batch in batch_iterator:
        encoder_input = batch['encoder_input'].to(device)
        decoder_input = batch['decoder_input'].to(device)
        encoder_mask = batch['encoder_mask'].to(device)
        decoder_mask = batch['decoder_mask'].to(device)
        label = batch['label'].to(device)
        optimizer.zero_grad()
        encoder_output = model.encode(encoder_input, encoder_mask)
        decoder_output = model.decode(encoder_output, encoder_mask, decoder_input, decoder_mask)
        proj_output = model.project(decoder_output)
        loss = loss_fn(proj_output.view(-1, tokenizer_tgt.get_vocab_size()), label.view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        batch_iterator.set_postfix({"loss": f"{loss.item():.4f}"})
        writer.add_scalar('train_loss', loss.item(), global_step)
        global_step += 1

    print(f"Epoch {epoch} Loss: {total_loss / len(train_dataloader):.4f}")


In [None]:
# Save model checkpoint
checkpoint_path = f"{config['model_folder']}/{config['model_basename']}{epoch:02d}.pt"
torch.save(model.state_dict(), checkpoint_path)
print(f"Checkpoint saved: {checkpoint_path}")
if epoch % 5 == 0:
    validate(model, val_dataloader, tokenizer_src, tokenizer_tgt, config, device)


In [None]:
# Function to translate an input sentence using a trained model
def translate_sentence(model, tokenizer_src, tokenizer_tgt, sentence, seq_len, device):
    model.eval()
    input_tokens = [tokenizer_src.token_to_id("[SOS]")] + tokenizer_src.encode(sentence).ids + [tokenizer_src.token_to_id("[EOS]")]
    input_tokens = (input_tokens + [tokenizer_src.token_to_id("[PAD]")] * seq_len)[:seq_len]
    input_tensor = torch.tensor([input_tokens], device=device)
    encoder_mask = (input_tensor != tokenizer_src.token_to_id("[PAD]")).unsqueeze(1).unsqueeze(1).int()
    with torch.no_grad():
        encoder_output = model.encode(input_tensor, encoder_mask)
    decoder_input = torch.tensor([[tokenizer_tgt.token_to_id("[SOS]")]], device=device)
    output_tokens = []

    for _ in range(seq_len):
        decoder_mask = causal_mask(decoder_input.size(1)).to(device)

        with torch.no_grad():
            decoder_output = model.decode(encoder_output, encoder_mask, decoder_input, decoder_mask)
            proj_output = model.project(decoder_output)
        next_token = torch.argmax(proj_output[:, -1, :], dim=-1).item()
        if next_token == tokenizer_tgt.token_to_id("[EOS]"):
            break

        output_tokens.append(next_token)
        decoder_input = torch.cat([decoder_input, torch.tensor([[next_token]], device=device)], dim=1)
    translated_sentence = tokenizer_tgt.decode(output_tokens)
    return translated_sentence



In [None]:
# The code loads pre-trained English and Hindi tokenizers, initializes and loads a transformer model, and translates a list of sentences from English to Hindi using the model.
tokenizer_src = Tokenizer.from_file("/kaggle/working/tokenizer_en.json")
tokenizer_tgt = Tokenizer.from_file("/kaggle/working/tokenizer_hi.json")

model = build_transformer(
    src_vocab_size=tokenizer_src.get_vocab_size(),
    tgt_vocab_size=tokenizer_tgt.get_vocab_size(),
    src_seq_len=350,
    tgt_seq_len=350,
    d_model=512,
    N=6,
    h=8,
    dropout=0.1,
    d_ff=2048
).to(device)

model.load_state_dict(torch.load("/kaggle/working/weights/en_hi_transformer_00.pt", map_location=device))
model.eval()

sentences = [
    "How are you?",
    "What is your name?",
    "Politicians do not have permission to do what needs to be done.",
    "I love machine learning."
]

for sentence in sentences:
    translated_text = translate_sentence(model, tokenizer_src, tokenizer_tgt, sentence, seq_len=350, device=device)
    print(f"Original: {sentence}")
    print(f"Translated: {translated_text}")
    print("-" * 50)