In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import random
import os
import wandb
from tqdm import tqdm
import re
import matplotlib.pyplot as plt
import pandas as pd
import json

In [2]:
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
secret_value_0 = user_secrets.get_secret("wandb_key")

os.environ['WANDB_API_KEY'] = secret_value_0

In [3]:
DEVICE   = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
BASE_DIR = '/kaggle/input/dakshina-dataset/dakshina_dataset_v1.0/hi/lexicons'

In [4]:
class CharacterEmbedding(nn.Module):
    # Creating an embedding layer that maps input character indices to embedding vectors.
    # input_size: number of unique characters (vocabulary size)
    # embedding_dim: size of each embedding vector
    def __init__(self, input_size, embedding_dim):
        super(CharacterEmbedding, self).__init__()
        self.embedding = nn.Embedding(input_size, embedding_dim)

    # Returns corresponding embedding vectors of shape (batch_size, seq_length, embedding_dim)
    def forward(self, input_seq):
        # input_seq: a tensor of character indices, typically of shape (batch_size, seq_length)
        return self.embedding(input_seq)

In [5]:
# EncoderRNN transforms sequences of token IDs into contextual hidden states
# Supports GRU, LSTM, or vanilla RNN cells
# input_size: number of unique tokens
# hidden_size: size of the RNN hidden state
# embedding_dim: size of token embedding vectors
# num_layers: number of stacked recurrent layers
# cell_type: 'GRU', 'LSTM', or 'RNN'
# dropout_p: dropout probability between RNN layers (only if num_layers > 1)
# bidirectional: whether to run the RNN in both forward and backward directions
class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size, embedding_dim, num_layers=1,cell_type='GRU', dropout_p=0.1, bidirectional=False):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.cell_type = cell_type
        self.bidirectional = bidirectional
        self.directions = 2 if bidirectional else 1
        
        # Embedding layer
        self.embedding = nn.Embedding(input_size, embedding_dim)
        
        # Dropout before the RNN (applied to embeddings)
        self.dropout = nn.Dropout(dropout_p)
        dropout_p = dropout_p if num_layers > 1 else 0
        
        # RNN layer
        if cell_type == 'GRU':
            self.rnn = nn.GRU(embedding_dim, hidden_size, num_layers,dropout=dropout_p,bidirectional=bidirectional, batch_first=True)
        elif cell_type == 'LSTM':
            self.rnn = nn.LSTM(embedding_dim, hidden_size, num_layers,dropout=dropout_p,bidirectional=bidirectional, batch_first=True)
        else:  # Default to RNN
            self.rnn = nn.RNN(embedding_dim, hidden_size, num_layers,dropout=dropout_p,bidirectional=bidirectional, nonlinearity='tanh', batch_first=True)

    # Forward pass through the encoder
    def forward(self, input_seq):
        # Input shape: [batch_size, seq_len]
        batch_size = input_seq.size(0)
        
        # Convert indices to embeddings and apply dropout to embeddings
        embedded = self.embedding(input_seq)  # [batch_size, seq_len, embedding_dim]
        embedded = self.dropout(embedded)
        
        # Pass through RNN
        outputs, hidden = self.rnn(embedded)
        
        return outputs, hidden

In [6]:
class Attention(nn.Module):
    def __init__(self, hidden_dim):
        super(Attention, self).__init__()
        self.hidden_dim = hidden_dim
        self.energy_layer = nn.Linear(hidden_dim * 2, hidden_dim)
        self.context_vector = nn.Parameter(torch.empty(hidden_dim))
        nn.init.uniform_(self.context_vector, -0.1, 0.1)

    def forward(self, decoder_hidden, encoder_output_seq):
        # decoder_hidden: [batch_size, hidden_dim]
        # encoder_output_seq: [batch_size, seq_length, hidden_dim]

        batch_sz = encoder_output_seq.size(0)
        seq_len = encoder_output_seq.size(1)

        # Expand decoder hidden state across time dimension
        repeated_hidden = decoder_hidden.unsqueeze(1).expand(-1, seq_len, -1)  # [batch_size, seq_len, hidden_dim]

        # Compute attention energies
        concat_inputs = torch.cat((repeated_hidden, encoder_output_seq), dim=2)  # [batch_size, seq_len, 2*hidden_dim]
        energy_scores = torch.tanh(self.energy_layer(concat_inputs))            # [batch_size, seq_len, hidden_dim]
        energy_scores = energy_scores.transpose(1, 2)                            # [batch_size, hidden_dim, seq_len]

        # Prepare context vector for batch multiplication
        context = self.context_vector.unsqueeze(0).expand(batch_sz, -1).unsqueeze(1)  # [batch_size, 1, hidden_dim]

        # Compute alignment scores
        alignment = torch.bmm(context, energy_scores).squeeze(1)  # [batch_size, seq_len]

        # Normalize scores into probabilities
        attention_weights = F.softmax(alignment, dim=1)           # [batch_size, seq_len]

        return attention_weights

In [7]:
class DecoderRNNWithAttention(nn.Module):
    def __init__(self, output_size, hidden_size, embedding_dim, num_layers=1, 
                 cell_type='GRU', dropout_p=0.1):
        super(DecoderRNNWithAttention, self).__init__()
        
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.num_layers = num_layers
        self.cell_type = cell_type
        
        # Embedding for decoder input tokens
        self.embedding = nn.Embedding(output_size, embedding_dim)
        self.embedding_dropout = nn.Dropout(dropout_p)

        adjusted_dropout = dropout_p if num_layers > 1 else 0
        rnn_input_dim = embedding_dim + hidden_size  # includes attention context

        # Choose RNN type
        if cell_type == 'GRU':
            self.rnn = nn.GRU(rnn_input_dim, hidden_size, num_layers, 
                              dropout=adjusted_dropout, batch_first=True)
        elif cell_type == 'LSTM':
            self.rnn = nn.LSTM(rnn_input_dim, hidden_size, num_layers, 
                               dropout=adjusted_dropout, batch_first=True)
        else:
            self.rnn = nn.RNN(rnn_input_dim, hidden_size, num_layers, 
                              dropout=adjusted_dropout, nonlinearity='tanh', batch_first=True)
        
        # Attention module
        self.attention = Attention(hidden_size)
        
        # Output transformation
        self.output_dropout = nn.Dropout(dropout_p)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, input_token, hidden_state, encoder_outputs):
        # input_token: [batch_size]
        # hidden_state: hidden (or hidden, cell) for LSTM
        # encoder_outputs: [batch_size, seq_len, hidden_size]

        bsz = input_token.size(0)

        # Embed the current input token
        embedded_input = self.embedding(input_token.squeeze(1)).unsqueeze(1)
        embedded_input = self.embedding_dropout(embedded_input)

        # Select last hidden state (handle LSTM separately)
        if self.cell_type == 'LSTM':
            current_hidden = hidden_state[0][-1]
        else:
            current_hidden = hidden_state[-1]

        # Compute attention weights
        attention_scores = self.attention(current_hidden, encoder_outputs)  # [batch_size, seq_len]
        attention_scores = attention_scores.unsqueeze(1)                    # [batch_size, 1, seq_len]

        # Compute context vector via weighted sum
        context_vector = torch.bmm(attention_scores, encoder_outputs)       # [batch_size, 1, hidden_size]

        # Concatenate embedding and context for RNN input
        combined_input = torch.cat((embedded_input, context_vector), dim=2) # [batch_size, 1, input_dim]

        # Run the RNN cell
        rnn_output, new_hidden_state = self.rnn(combined_input, hidden_state)

        # Generate output
        rnn_output = self.output_dropout(rnn_output)
        logits = self.fc(rnn_output.squeeze(1))  # [batch_size, output_size]
        
        return F.log_softmax(logits, dim=1), new_hidden_state, attention_scores.squeeze(1)

In [8]:
def beam_search_decode(model, src, sos_idx, eos_idx, max_len=30, beam_width=3, device='cuda'):
    model.eval()
    with torch.no_grad():
        # Encode source sequence
        encoder_outputs, encoder_hidden = model.encoder(src)

        # Initialize decoder hidden state
        if model.bidirectional:
            if model.cell_type == 'LSTM':
                h_enc, c_enc = encoder_hidden
                h_dec = torch.zeros(model.decoder.num_layers, 1, model.decoder.hidden_size, device=device)
                c_dec = torch.zeros(model.decoder.num_layers, 1, model.decoder.hidden_size, device=device)
                for i in range(model.encoder.num_layers):
                    h_comb = torch.cat((h_enc[2*i], h_enc[2*i+1]), dim=1)
                    c_comb = torch.cat((c_enc[2*i], c_enc[2*i+1]), dim=1)
                    h_dec[i] = model.hidden_transform(h_comb)
                    c_dec[i] = model.hidden_transform(c_comb)
                decoder_hidden = (h_dec, c_dec)
            else:
                h_dec = torch.zeros(model.decoder.num_layers, 1, model.decoder.hidden_size, device=device)
                for i in range(model.encoder.num_layers):
                    h_comb = torch.cat((encoder_hidden[2*i], encoder_hidden[2*i+1]), dim=1)
                    h_dec[i] = model.hidden_transform(h_comb)
                decoder_hidden = h_dec
        else:
            decoder_hidden = encoder_hidden

        # Initialize beam search
        beams = [([sos_idx], 0.0, decoder_hidden)]
        completed = []

        for _ in range(max_len):
            candidates = []
            for seq, score, hidden in beams:
                if seq[-1] == eos_idx:
                    completed.append((seq, score))
                    continue

                inp = torch.tensor([seq[-1]], device=device)
                output, next_hidden, _ = model.decoder(inp, hidden, encoder_outputs)
                
                top_log_probs, top_indices = torch.topk(output.squeeze(0), beam_width)
                for i in range(beam_width):
                    next_token = top_indices[i].item()
                    new_seq = seq + [next_token]
                    new_score = score + top_log_probs[i].item()
                    if model.cell_type == 'LSTM':
                        detached_hidden = tuple(h.detach() for h in next_hidden)
                        candidates.append((new_seq, new_score, detached_hidden))
                    else:
                        candidates.append((new_seq, new_score, next_hidden.detach()))

            # Select top-k beams
            beams = sorted(candidates, key=lambda x: x[1], reverse=True)[:beam_width]
            if not beams:
                break

        # Collect finished sequences
        completed.extend([(seq, score) for seq, score, _ in beams if seq[-1] == eos_idx])

        # Fallback if no completed sequence
        if not completed:
            completed = beams

        # Return sorted results
        completed = sorted(completed, key=lambda x: x[1], reverse=True)
        return completed


In [9]:
# Seq2Seq implements an Encoder and Decoder for end-to-end sequence-to-sequence modeling
# input_size: size of source vocabulary
# output_size: size of target vocabulary
# embedding_dim: dimension of embeddings in both encoder and decoder
# hidden_size: size of hidden states in encoder and decoder (must match for vanilla seq2seq)
# encoder_layers / decoder_layers: number of stacked RNN layers
# cell_type: 'GRU', 'LSTM', or 'RNN'
# dropout_p: dropout probability for embeddings and RNN layers
# bidirectional_encoder: if True, runs encoder bidirectionally and transforms hidden state

class Seq2Seq(nn.Module):
    def __init__(self, input_size, output_size, embedding_dim=256, hidden_size=256,
                 encoder_layers=1, decoder_layers=1, cell_type='GRU', dropout_p=0.2,
                 bidirectional_encoder=False):
        super(Seq2Seq, self).__init__()

        self.bidirectional = bidirectional_encoder
        self.cell_type = cell_type

        self.encoder = EncoderRNN(input_size, hidden_size, embedding_dim,
                                  num_layers=encoder_layers, cell_type=cell_type,
                                  dropout_p=dropout_p, bidirectional=bidirectional_encoder)

        if self.bidirectional:
            self.hidden_transform = nn.Linear(hidden_size * 2, hidden_size)

        self.decoder = DecoderRNNWithAttention(output_size, hidden_size, embedding_dim,
                                               num_layers=decoder_layers, cell_type=cell_type,
                                               dropout_p=dropout_p)

    def _match_decoder_layers(self, hidden, batch_size):
        current_layers = hidden.size(0)
        if current_layers > self.decoder.num_layers:
            return hidden[:self.decoder.num_layers]
        elif current_layers < self.decoder.num_layers:
            padding = torch.zeros(self.decoder.num_layers - current_layers, batch_size,
                                  self.decoder.hidden_size, device=hidden.device)
            return torch.cat([hidden, padding], dim=0)
        else:
            return hidden

    def forward(self, src, trg, teacher_forcing_ratio=0.5, return_attention=False):
        batch_size = src.size(0)
        target_len = trg.size(1)
        vocab_size = self.decoder.output_size

        outputs = torch.zeros(batch_size, target_len, vocab_size, device=src.device)
        attentions = [] if return_attention else None

        encoder_outputs, encoder_hidden = self.encoder(src)
        decoder_hidden = None

        if self.bidirectional:
            if self.cell_type == 'LSTM':
                h_n, c_n = encoder_hidden
                h_dec = torch.zeros(self.decoder.num_layers, batch_size, self.decoder.hidden_size, device=src.device)
                c_dec = torch.zeros(self.decoder.num_layers, batch_size, self.decoder.hidden_size, device=src.device)
                
                for i in range(self.decoder.num_layers):
                    enc_layer = min(i, self.encoder.num_layers - 1)
                    h_cat = torch.cat((h_n[2 * enc_layer], h_n[2 * enc_layer + 1]), dim=1)
                    c_cat = torch.cat((c_n[2 * enc_layer], c_n[2 * enc_layer + 1]), dim=1)
                    h_dec[i] = self.hidden_transform(h_cat)
                    c_dec[i] = self.hidden_transform(c_cat)

                decoder_hidden = (h_dec, c_dec)

            else:
                h_n = encoder_hidden
                h_dec = torch.zeros(self.decoder.num_layers, batch_size, self.decoder.hidden_size, device=src.device)
                
                for i in range(self.decoder.num_layers):
                    enc_layer = min(i, self.encoder.num_layers - 1)
                    h_cat = torch.cat((h_n[2 * enc_layer], h_n[2 * enc_layer + 1]), dim=1)
                    h_dec[i] = self.hidden_transform(h_cat)

                decoder_hidden = h_dec
        else:
            if self.cell_type == 'LSTM':
                h_n, c_n = encoder_hidden
                decoder_hidden = (
                    self._match_decoder_layers(h_n, batch_size),
                    self._match_decoder_layers(c_n, batch_size)
                )
            else:
                decoder_hidden = self._match_decoder_layers(encoder_hidden, batch_size)

        input_token = trg[:, 0].unsqueeze(1)

        for t in range(1, target_len):
            output, decoder_hidden, attn = self.decoder(input_token, decoder_hidden, encoder_outputs)
            outputs[:, t, :] = output

            if return_attention:
                attentions.append(attn.unsqueeze(1))

            use_teacher = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1).unsqueeze(1)
            input_token = trg[:, t].unsqueeze(1) if use_teacher else top1

        if return_attention:
            attentions = torch.cat(attentions, dim=1)
            return outputs, attentions

        return outputs


In [10]:
class LexiconDataset(Dataset):
    def __init__(self, filepath, src_vocab=None, tgt_vocab=None, build_vocab=False):
        self.examples = []
        with open(filepath, encoding='utf-8') as file:
            for line in file:
                items = line.strip().split('\t')
                if len(items) < 2:
                    continue
                tgt_text, src_text = items[0], items[1]  # Hindi and romanized
                self.examples.append((src_text, tgt_text))

        if build_vocab:
            self.src_vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
            self.tgt_vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}

            for src, tgt in self.examples:
                for ch in src:
                    if ch not in self.src_vocab:
                        self.src_vocab[ch] = len(self.src_vocab)
                for ch in tgt:
                    if ch not in self.tgt_vocab:
                        self.tgt_vocab[ch] = len(self.tgt_vocab)
        else:
            assert src_vocab is not None and tgt_vocab is not None, "Prebuilt vocabularies must be provided."
            self.src_vocab = src_vocab
            self.tgt_vocab = tgt_vocab

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, index):
        src, tgt = self.examples[index]
        src_ids = [self.src_vocab.get(ch, self.src_vocab['<unk>']) for ch in src]
        tgt_ids = [self.tgt_vocab['<sos>']] + \
                  [self.tgt_vocab.get(ch, self.tgt_vocab['<unk>']) for ch in tgt] + \
                  [self.tgt_vocab['<eos>']]
        return torch.tensor(src_ids, dtype=torch.long), torch.tensor(tgt_ids, dtype=torch.long)


def collate_fn(batch):
    """
    Collates a batch of examples, padding each sequence to the maximum length in the batch.
    Returns:
        padded_src: Tensor of shape (batch_size, max_src_len)
        padded_tgt: Tensor of shape (batch_size, max_tgt_len)
    """
    src_seqs, tgt_seqs = zip(*batch)
    max_src_len = max(len(seq) for seq in src_seqs)
    max_tgt_len = max(len(seq) for seq in tgt_seqs)

    batch_size = len(batch)
    padded_src = torch.full((batch_size, max_src_len), 0, dtype=torch.long)
    padded_tgt = torch.full((batch_size, max_tgt_len), 0, dtype=torch.long)

    for i in range(batch_size):
        padded_src[i, :len(src_seqs[i])] = src_seqs[i]
        padded_tgt[i, :len(tgt_seqs[i])] = tgt_seqs[i]

    return padded_src, padded_tgt

def get_dataloaders(data_dir, batch_size, build_vocab=False):
    """
    Loads training, validation, and test data loaders.
    Returns:
        train_loader, val_loader, test_loader,
        src_vocab_size, tgt_vocab_size, pad_idx, src_vocab, tgt_vocab
    """
    train_file = os.path.join(data_dir, 'hi.translit.sampled.train.tsv')
    val_file   = os.path.join(data_dir, 'hi.translit.sampled.dev.tsv')
    test_file  = os.path.join(data_dir, 'hi.translit.sampled.test.tsv')

    train_dataset = LexiconDataset(train_file, build_vocab=build_vocab)
    src_vocab = train_dataset.src_vocab
    tgt_vocab = train_dataset.tgt_vocab

    val_dataset  = LexiconDataset(val_file, src_vocab, tgt_vocab)
    test_dataset = LexiconDataset(test_file, src_vocab, tgt_vocab)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,  collate_fn=collate_fn)
    val_loader   = DataLoader(val_dataset,   batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
    test_loader  = DataLoader(test_dataset,  batch_size=1,          shuffle=False, collate_fn=collate_fn)

    return (train_loader, val_loader, test_loader,
            len(src_vocab), len(tgt_vocab), src_vocab['<pad>'],
            src_vocab, tgt_vocab)


In [11]:
class EarlyStopper:
    """Stops a run if the monitored metric doesn’t improve for `patience` steps."""
    def __init__(self, patience=5, min_delta=1e-4):
        self.patience, self.min_delta = patience, min_delta
        self.counter, self.best = 0, None

    def should_stop(self, current):
        if self.best is None or current > self.best + self.min_delta:
            self.best, self.counter = current, 0
        else:
            self.counter += 1
        return self.counter >= self.patience

In [12]:
CHAR2IDX_SRC = {
    "<pad>": 0,
    "<sos>": 1,
    "<eos>": 2,
    "<unk>": 3,
    **{c: i + 4 for i, c in enumerate("abcdefghijklmnopqrstuvwxyz")}
}
IDX2CHAR_SRC = {i: c for c, i in CHAR2IDX_SRC.items()}

# Load data, build vocabs, and create reverse target-char map
train_loader, val_loader, test_loader, src_size, tgt_size, pad_idx, src_vocab, tgt_vocab = get_dataloaders(
    BASE_DIR, batch_size=64, build_vocab=True
)

IDX2CHAR_TGT = {idx: ch for ch, idx in tgt_vocab.items()}  # Map decoder indices back to Hindi chars

# Model, optimizer, loss, early stopping

# parameters of best model
best_model = Seq2Seq(
    input_size=src_size,
    output_size=tgt_size,
    embedding_dim=64,
    hidden_size=256,
    encoder_layers=3,
    decoder_layers=1,
    cell_type='LSTM',  # or 'GRU' or 'RNN'
    dropout_p=0.4,
    bidirectional_encoder=False
).to(DEVICE)

optimizer = torch.optim.Adam(best_model.parameters(), lr=1e-3)
criterion = nn.NLLLoss(ignore_index=pad_idx)
stopper = EarlyStopper(patience=5)
best_val_acc = 0.0

# Training Loop

for epoch in range(1, 11):
    best_model.train()
    total_loss = 0.0
    for src, tgt in tqdm(train_loader, desc=f"[Epoch {epoch}] Training", leave=False):
        src, tgt = src.to(DEVICE), tgt.to(DEVICE)
        optimizer.zero_grad()
        out = best_model(src, tgt, teacher_forcing_ratio=1.0)
        loss = criterion(out.view(-1, tgt_size), tgt.view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    # Validation: sequence-level accuracy
    best_model.eval()
    correct_seqs, total_seqs = 0, 0
    with torch.no_grad():
        for src, tgt in tqdm(val_loader, desc=f"[Epoch {epoch}] Validation", leave=False):
            src, tgt = src.to(DEVICE), tgt.to(DEVICE)
            out = best_model(src, tgt, teacher_forcing_ratio=0.0)
            preds = out.argmax(dim=2)
            for pred_seq, true_seq in zip(preds, tgt):
                # Remove <sos> and padding tokens for comparison
                pred_tokens = pred_seq[1:][true_seq[1:] != pad_idx]
                true_tokens = true_seq[1:][true_seq[1:] != pad_idx]
                if torch.equal(pred_tokens, true_tokens):
                    correct_seqs += 1
                total_seqs += 1

    val_acc = correct_seqs / total_seqs
    print(f"[Epoch {epoch}] Loss: {total_loss:.4f} | Val Acc: {val_acc:.4f}")

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        # Optionally save model checkpoint here
    elif stopper.should_stop(val_acc):
        print("Early stopping triggered.")
        break

# Final Test Evaluation
best_model.eval()
correct_seqs, total_seqs = 0, 0
all_preds, all_trues = [], []

with torch.no_grad():
    for src, tgt in tqdm(test_loader, desc="Final Test Eval", leave=False):
        src, tgt = src.to(DEVICE), tgt.to(DEVICE)
        out = best_model(src, tgt, teacher_forcing_ratio=0.0)
        preds = out.argmax(dim=2)
        for pred_seq, true_seq in zip(preds, tgt):
            pred_tokens = pred_seq[1:][true_seq[1:] != pad_idx]
            true_tokens = true_seq[1:][true_seq[1:] != pad_idx]
            if torch.equal(pred_tokens, true_tokens):
                correct_seqs += 1
            total_seqs += 1

            all_preds.append(pred_tokens)
            all_trues.append(true_tokens)

test_acc = correct_seqs / total_seqs
print(f"\n Final Test Accuracy (Exact Word match): {test_acc:.4f}")

                                                                     

[Epoch 1] Loss: 1454.5247 | Val Acc: 0.0321


                                                                     

[Epoch 2] Loss: 699.1988 | Val Acc: 0.2295


                                                                     

[Epoch 3] Loss: 439.6851 | Val Acc: 0.3355


                                                                     

[Epoch 4] Loss: 344.9575 | Val Acc: 0.3531


                                                                     

[Epoch 5] Loss: 299.8262 | Val Acc: 0.3811


                                                                     

[Epoch 6] Loss: 271.3750 | Val Acc: 0.3878


                                                                     

[Epoch 7] Loss: 250.9414 | Val Acc: 0.4071


                                                                     

[Epoch 8] Loss: 235.4939 | Val Acc: 0.4169


                                                                     

[Epoch 9] Loss: 224.8430 | Val Acc: 0.3961


                                                                      

[Epoch 10] Loss: 213.9978 | Val Acc: 0.4312


                                                                     


 Final Test Accuracy (Exact Word match): 0.1724




In [15]:
romanized_test_words = []
test_path = os.path.join(BASE_DIR, 'hi.translit.sampled.test.tsv')

# Read romanized inputs from test file
with open(test_path, "r", encoding="utf-8") as file:
    for line in file:
        tgt, src, _ = line.strip().split()
        romanized_test_words.append(src)

best_model.eval()
results = []

with torch.no_grad():
    for i, (src_batch, tgt_batch) in enumerate(test_loader):
        src_batch, tgt_batch = src_batch.to(DEVICE), tgt_batch.to(DEVICE)
        output = best_model(src_batch, tgt_batch, teacher_forcing_ratio=0.0)
        predicted_tokens = output.argmax(dim=2)

        for j in range(src_batch.size(0)):
            pred_str = ''.join(
                IDX2CHAR_TGT[token.item()]
                for token in predicted_tokens[j][1:]  # skip <sos>
                if token.item() != pad_idx
            )
            tgt_str = ''.join(
                IDX2CHAR_TGT[token.item()]
                for token in tgt_batch[j][1:]  # skip <sos>
                if token.item() != pad_idx
            )

            original_input = romanized_test_words[i * src_batch.size(0) + j]

            results.append({
                'Input': original_input,
                'True Hindi': tgt_str,
                'Predicted Hindi': pred_str
            })

# Show a random sample of the results
sampled_results = random.sample(results, min(10, len(results)))
df = pd.DataFrame(sampled_results)
print(df.to_markdown(index=False))


| Input     | True Hindi   | Predicted Hindi   |
|:----------|:-------------|:------------------|
| bikherati | बिखेरती<eos>    | बिखेरते<eos>         |
| supachya  | सुपाच्य<eos>    | सुपच्य<eos><eos>    |
| rishikul  | ऋषिकुल<eos>    | रिशिकुल              |
| dabholkar | दाभोलकर<eos>   | दभोलकर<eos><eos>   |
| ashariri  | अशरीरी<eos>    | अशारीरी              |
| premlata  | प्रेमलता<eos>   | प्रेमलता<eos>        |
| hamaal    | हमाल<eos>     | हामाल               |
| mavey     | मावे<eos>      | मवे<eos><eos>      |
| kasida    | कसीदा<eos>     | कसीदा<eos>          |
| forcee    | फारसी<eos>     | फोर्सी               |


In [37]:
def highlight_pred(row):
    """
    Returns a list of CSS styles, one per column, 
    coloring the 'Predicted Hindi' cell green if correct else red.
    """
    styles = [''] * len(row)
    # Find the index of the Predicted column
    pred_idx = list(row.index).index('Predicted Hindi')
    if row['Predicted Hindi'] == row['True Hindi']:
        styles[pred_idx] = 'background-color: #c8e6c9; font-weight: bold;'  # light green
    else:
        styles[pred_idx] = 'background-color: #f8d7da; font-weight: bold;'  # light red
    return styles

# Apply to a random sample of 10 rows
subset = df.sample(n=min(10, len(df))).reset_index(drop=True)

styled = (
    subset.style
          .apply(highlight_pred, axis=1)
          .set_table_styles([
              # Center all text
              {'selector': 'td, th',
               'props': [('text-align', 'center'), ('padding', '6px')]},
              # Header style
              {'selector': 'th',
               'props': [('background-color', '#4F81BD'),
                         ('color', 'white'),
                         ('font-weight', 'bold'),
                         ('padding', '8px')]}
          ])
          .set_caption("✨ Sample Transliteration Predictions (Green = Correct, Red = Wrong) ✨")
)

# Display in a Jupyter/HTML context
display(styled)

Unnamed: 0,Input,True Hindi,Predicted Hindi
0,ria,रिया,रियर
1,mukartey,मुकरते,मुकरते
2,maulviyon,मौलवियों,मौलवियों
3,mauley,मौले,मौले
4,dreijer,ड्रेजर,ड़िजरेड
5,tapoowo,टापुओं,पपूवता
6,dikhaain,दिखाईं,दिखाईं
7,ghost,घोस्ट,घोस्सो
8,changul,चंगुल,चंगुल
9,moodi,मूडी,मूद्द


In [13]:
wandb.init(project="DL_Assignment_3")

[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.
[34m[1mwandb[0m: Currently logged in as: [33mcs24m019[0m ([33mcs24m019-iitm[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


In [16]:
best_model.eval()
all_samples = []

# First, collect all model outputs
with torch.no_grad():
    for i, (src, tgt) in enumerate(test_loader):
        src, tgt = src.to(DEVICE), tgt.to(DEVICE)
        out, attn_weights = best_model(src, tgt, return_attention=True)
        batch_size = src.size(0)

        for b in range(batch_size):
            romanized = romanized_test_words[i * batch_size + b]
            src_tokens = list(romanized)
            tgt_tokens = [IDX2CHAR_TGT[idx.item()] for idx in tgt[b][1:] if idx.item() != pad_idx]
            pred_tokens = out.argmax(dim=2)[b][1:len(tgt_tokens)+1]
            pred_chars = [IDX2CHAR_TGT[idx.item()] for idx in pred_tokens]

            attn = attn_weights[b][:len(pred_chars), :len(src_tokens)].cpu().numpy().tolist()

            all_samples.append((src_tokens, pred_chars, attn))

# Select 10 random samples
random_samples = random.sample(all_samples, 5)

# Build HTML blocks
html_blocks = []
for sample_count, (src_tokens, pred_chars, attn) in enumerate(random_samples):
    input_tokens_js = json.dumps(src_tokens, ensure_ascii=False)
    output_tokens_js = json.dumps(pred_chars, ensure_ascii=False)
    attention_js = json.dumps(attn)

    html_block = f"""
    <div style="margin-bottom: 50px;">
      <h2>Sample {sample_count + 1}</h2>
      <div><strong>Input (English):</strong></div>
      <div id="input-tokens-{sample_count}"></div>
      <div><strong>Predicted Output (Hindi):</strong></div>
      <div id="output-tokens-{sample_count}"></div>
      <script>
        const inputTokens_{sample_count} = {input_tokens_js};
        const outputTokens_{sample_count} = {output_tokens_js};
        const attention_{sample_count} = {attention_js};

        const inputDiv_{sample_count} = d3.select("#input-tokens-{sample_count}");
        const outputDiv_{sample_count} = d3.select("#output-tokens-{sample_count}");

        inputTokens_{sample_count}.forEach((token, i) => {{
          inputDiv_{sample_count}.append("span")
            .attr("class", "token input")
            .attr("id", "input-{sample_count}-" + i)
            .text(token);
        }});

        outputTokens_{sample_count}.forEach((token, i) => {{
          outputDiv_{sample_count}.append("span")
            .attr("class", "token output")
            .text(token)
            .on("mouseover", () => {{
              d3.selectAll(".token.input").style("background-color", "#fff");
              attention_{sample_count}[i].forEach((score, j) => {{
                const color = d3.interpolateOranges(score);
                d3.select("#input-{sample_count}-" + j).style("background-color", color);
              }});
            }})
            .on("mouseout", () => {{
              d3.selectAll(".token.input").style("background-color", "#fff");
            }});
        }});
      </script>
    </div>
    """
    html_blocks.append(html_block)

# Full HTML document
full_html = f"""
<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8" />
  <title>Attention Visualizations</title>
  <script src="https://d3js.org/d3.v7.min.js"></script>
  <style>
    body {{ font-family: Arial, sans-serif; margin: 30px; }}
    .token {{
      display: inline-block;
      padding: 8px 12px;
      margin: 3px;
      border-radius: 5px;
      border: 1px solid #ccc;
      font-size: 20px;
      cursor: pointer;
      user-select: none;
      transition: background-color 0.3s;
    }}
  </style>
</head>
<body>
  <h1>Random Attention Visualizations (5 Samples)</h1>
  {''.join(html_blocks)}
</body>
</html>
"""

# Log to WandB
wandb.log({"attention_visualizations_random_10": wandb.Html(full_html)})

In [17]:
wandb.finish()