In [15]:
# ============================================================
# [1] Setup: imports, mount Drive, config
# ============================================================

from google.colab import drive
drive.mount('/content/drive')

import os
import math
import random
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# ---- Hyperparameters (updated) ----
EMBED_DIM      = 128     # d: character embedding size (was 64)
HIDDEN_DIM     = 256     # h: hidden size for encoder & decoder (was 128)
NUM_LAYERS     = 1       # number of RNN/LSTM/GRU layers
CELL_TYPE      = "gru"   # "rnn", "gru", or "lstm"
BATCH_SIZE     = 64
N_EPOCHS       = 20      # was 10
LEARNING_RATE  = 1e-3
TEACHER_FORCE  = 0.5
MAX_DATA_ROWS  = 6000    # use subset while debugging; set None to use all
# ----------------------------------


# ============================================================
# [2] Load Aksharantar Hindi dataset from Drive (no header)
# ============================================================

data_path = "/content/drive/MyDrive/aksharantar_sampled/hin/hin_train.csv"
assert os.path.exists(data_path), f"File not found: {data_path}"

# File has NO header row, so we set header=None and give names
df = pd.read_csv(data_path, header=None, names=["src", "tgt"]).dropna()

if MAX_DATA_ROWS is not None:
    df = df.iloc[:MAX_DATA_ROWS].reset_index(drop=True)

print("Data shape:", df.shape)
print(df.head())

# Column names for the rest of the code
SRC_COL = "src"   # romanized (Latin) input
TGT_COL = "tgt"   # Devanagari output


# ============================================================
# [3] Build character vocabularies for source & target
# ============================================================

PAD_TOKEN = "<pad>"
SOS_TOKEN = "<sos>"
EOS_TOKEN = "<eos>"
UNK_TOKEN = "<unk>"

def build_char_vocab(texts):
    chars = set()
    for t in texts:
        for ch in str(t):
            chars.add(ch)
    char_list = [PAD_TOKEN, SOS_TOKEN, EOS_TOKEN, UNK_TOKEN] + sorted(list(chars))
    stoi = {ch: i for i, ch in enumerate(char_list)}
    itos = {i: ch for ch, i in stoi.items()}
    return stoi, itos

src_stoi, src_itos = build_char_vocab(df[SRC_COL].tolist())
tgt_stoi, tgt_itos = build_char_vocab(df[TGT_COL].tolist())

SRC_VOCAB_SIZE = len(src_stoi)
TGT_VOCAB_SIZE = len(tgt_stoi)

print("Source vocab size:", SRC_VOCAB_SIZE)
print("Target vocab size:", TGT_VOCAB_SIZE)


# ============================================================
# [4] Encoding utilities & Dataset / DataLoader
# ============================================================

def encode_src(text):
    """Encode Latin/romanized string to list of source token IDs."""
    return [src_stoi.get(ch, src_stoi[UNK_TOKEN]) for ch in str(text)]

def encode_tgt(text):
    """Encode Devanagari string with <sos> and <eos> tokens."""
    return (
        [tgt_stoi[SOS_TOKEN]] +
        [tgt_stoi.get(ch, tgt_stoi[UNK_TOKEN]) for ch in str(text)] +
        [tgt_stoi[EOS_TOKEN]]
    )

class TransliterationDataset(Dataset):
    def __init__(self, df):
        self.src_texts = df[SRC_COL].tolist()
        self.tgt_texts = df[TGT_COL].tolist()

    def __len__(self):
        return len(self.src_texts)

    def __getitem__(self, idx):
        src_ids = encode_src(self.src_texts[idx])
        tgt_ids = encode_tgt(self.tgt_texts[idx])
        return torch.tensor(src_ids, dtype=torch.long), \
               torch.tensor(tgt_ids, dtype=torch.long)

def collate_fn(batch):
    """
    Pads sequences in the batch to the max length.
    Returns: padded_src, padded_tgt, src_lengths, tgt_lengths
    """
    src_seqs, tgt_seqs = zip(*batch)
    src_lengths = [len(s) for s in src_seqs]
    tgt_lengths = [len(t) for t in tgt_seqs]

    max_src = max(src_lengths)
    max_tgt = max(tgt_lengths)

    padded_src = []
    padded_tgt = []

    for s, t in zip(src_seqs, tgt_seqs):
        src_pad_len = max_src - len(s)
        tgt_pad_len = max_tgt - len(t)

        padded_src.append(
            torch.cat([s, torch.full((src_pad_len,), src_stoi[PAD_TOKEN], dtype=torch.long)])
        )
        padded_tgt.append(
            torch.cat([t, torch.full((tgt_pad_len,), tgt_stoi[PAD_TOKEN], dtype=torch.long)])
        )

    padded_src = torch.stack(padded_src)  # [batch, max_src]
    padded_tgt = torch.stack(padded_tgt)  # [batch, max_tgt]

    return padded_src, padded_tgt, torch.tensor(src_lengths), torch.tensor(tgt_lengths)

dataset = TransliterationDataset(df)
train_loader = DataLoader(dataset, batch_size=BATCH_SIZE,
                          shuffle=True, collate_fn=collate_fn)

print("Number of training examples:", len(dataset))


# ============================================================
# [5] RNN factory, Encoder, Decoder, Seq2Seq model
# ============================================================

def make_rnn(cell_type, input_size, hidden_size, num_layers=1, batch_first=True):
    """
    Helper to create RNN/LSTM/GRU module based on `cell_type`.
    """
    cell_type = cell_type.lower()
    if cell_type == "lstm":
        return nn.LSTM(input_size, hidden_size, num_layers=num_layers,
                       batch_first=batch_first)
    elif cell_type == "gru":
        return nn.GRU(input_size, hidden_size, num_layers=num_layers,
                      batch_first=batch_first)
    elif cell_type == "rnn":
        return nn.RNN(input_size, hidden_size, num_layers=num_layers,
                      batch_first=batch_first, nonlinearity="tanh")
    else:
        raise ValueError(f"Unknown CELL_TYPE: {cell_type}")


class Encoder(nn.Module):
    """
    Character-level encoder:
    - Embedding layer
    - RNN/LSTM/GRU
    Returns final hidden state(s).
    """
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=1, cell_type="gru"):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.cell_type = cell_type.lower()

        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=src_stoi[PAD_TOKEN])
        self.rnn = make_rnn(self.cell_type, embed_dim, hidden_dim, num_layers)

    def forward(self, src, src_lengths):
        # src: [batch, src_len]
        embedded = self.embedding(src)  # [batch, src_len, embed_dim]

        # Pack for variable-length RNN
        packed = nn.utils.rnn.pack_padded_sequence(
            embedded, src_lengths.cpu(), batch_first=True, enforce_sorted=False
        )
        outputs, hidden = self.rnn(packed)
        # outputs: packed sequence (ignored)
        # hidden: h_n (and c_n if LSTM)
        return hidden


class Decoder(nn.Module):
    """
    Character-level decoder:
    - Embedding layer
    - RNN/LSTM/GRU
    - Linear layer to vocab logits
    """
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=1, cell_type="gru"):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.cell_type = cell_type.lower()

        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=tgt_stoi[PAD_TOKEN])
        self.rnn = make_rnn(self.cell_type, embed_dim, hidden_dim, num_layers)
        self.fc_out = nn.Linear(hidden_dim, vocab_size)

    def forward(self, input_step, hidden):
        # input_step: [batch] (token ids for one time step)
        input_step = input_step.unsqueeze(1)        # [batch, 1]
        embedded = self.embedding(input_step)       # [batch, 1, embed_dim]
        output, hidden = self.rnn(embedded, hidden) # output: [batch, 1, hidden_dim]
        logits = self.fc_out(output.squeeze(1))     # [batch, vocab_size]
        return logits, hidden


class Seq2Seq(nn.Module):
    """
    Full Seq2Seq model:
    - Encoder processes entire input sequence
    - Decoder generates output sequence step-by-step
    """
    def __init__(self, encoder, decoder, cell_type="gru"):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.cell_type = cell_type.lower()

    def forward(self, src, src_lengths, tgt, teacher_forcing=0.5):
        """
        src: [batch, src_len]
        tgt: [batch, tgt_len] with <sos> at index 0
        """
        batch_size, tgt_len = tgt.size()
        vocab_size = self.decoder.fc_out.out_features

        outputs = torch.zeros(batch_size, tgt_len, vocab_size, device=src.device)

        # Encode
        hidden = self.encoder(src, src_lengths)

        # First decoder input is <sos> for everyone
        input_token = tgt[:, 0]  # [batch]

        for t in range(1, tgt_len):
            logits, hidden = self.decoder(input_token, hidden)
            outputs[:, t] = logits

            teacher = random.random() < teacher_forcing
            top1 = logits.argmax(1)  # [batch]

            input_token = tgt[:, t] if teacher else top1

        return outputs


# Instantiate model, loss, optimizer
encoder = Encoder(SRC_VOCAB_SIZE, EMBED_DIM, HIDDEN_DIM,
                  num_layers=NUM_LAYERS, cell_type=CELL_TYPE)
decoder = Decoder(TGT_VOCAB_SIZE, EMBED_DIM, HIDDEN_DIM,
                  num_layers=NUM_LAYERS, cell_type=CELL_TYPE)
model = Seq2Seq(encoder, decoder, cell_type=CELL_TYPE).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=tgt_stoi[PAD_TOKEN])
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

print(model)


# ============================================================
# [6] Training loop (updated epochs)
# ============================================================

def train_one_epoch(model, loader, optimizer, criterion, teacher_forcing=0.5):
    model.train()
    epoch_loss = 0.0

    for src, tgt, src_lens, tgt_lens in loader:
        src, tgt, src_lens = src.to(device), tgt.to(device), src_lens.to(device)

        optimizer.zero_grad()

        outputs = model(src, src_lens, tgt, teacher_forcing=teacher_forcing)
        # outputs: [batch, tgt_len, vocab_size]

        # Ignore <sos> position for loss
        logits = outputs[:, 1:].reshape(-1, outputs.size(-1))   # [batch*(tgt_len-1), vocab]
        targets = tgt[:, 1:].reshape(-1)                        # [batch*(tgt_len-1)]

        loss = criterion(logits, targets)
        loss.backward()

        nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(loader)


for epoch in range(1, N_EPOCHS + 1):
    loss = train_one_epoch(model, train_loader, optimizer, criterion,
                           teacher_forcing=TEACHER_FORCE)
    print(f"Epoch {epoch:02d}/{N_EPOCHS} - Loss: {loss:.4f}")


# ============================================================
# [7] Inference: transliterate new words
# ============================================================

def decode_tokens(token_ids):
    chars = []
    for idx in token_ids:
        idx = int(idx)
        ch = tgt_itos.get(idx, UNK_TOKEN)
        if ch == EOS_TOKEN:
            break
        if ch not in [PAD_TOKEN, SOS_TOKEN]:
            chars.append(ch)
    return "".join(chars)

def transliterate(model, word, max_len=30):
    model.eval()
    with torch.no_grad():
        src_ids = torch.tensor([encode_src(word)], dtype=torch.long).to(device)
        src_len = torch.tensor([src_ids.size(1)], dtype=torch.long).to(device)

        hidden = model.encoder(src_ids, src_len)
        input_token = torch.tensor([tgt_stoi[SOS_TOKEN]], dtype=torch.long).to(device)

        outputs = []
        for _ in range(max_len):
            logits, hidden = model.decoder(input_token, hidden)
            top1 = logits.argmax(1)
            outputs.append(top1.item())
            input_token = top1
            if top1.item() == tgt_stoi[EOS_TOKEN]:
                break

    return decode_tokens(outputs)

# Test some random examples from training data
print("\nSample transliterations:")
for _ in range(10):
    src_word = random.choice(df[SRC_COL].tolist())
    print(src_word, "->", transliterate(model, src_word))


# ============================================================
# [8] Simple character-level accuracy evaluation
# ============================================================

def evaluate_char_accuracy(model, df, num_samples=500):
    """
    Simple character-level accuracy over first num_samples rows.
    Compares predicted vs true target character-by-character.
    """
    model.eval()
    total_chars = 0
    correct_chars = 0

    with torch.no_grad():
        for i in range(min(num_samples, len(df))):
            src = str(df.iloc[i][SRC_COL])
            tgt = str(df.iloc[i][TGT_COL])
            pred = transliterate(model, src)

            # Compare up to min length
            L = min(len(tgt), len(pred))
            for j in range(L):
                total_chars += 1
                if tgt[j] == pred[j]:
                    correct_chars += 1

    if total_chars == 0:
        return 0.0

    acc = correct_chars / total_chars
    return acc

char_acc = evaluate_char_accuracy(model, df, num_samples=500)
print(f"\nApprox. character-level accuracy on first 500 samples: {char_acc*100:.2f}%")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Using device: cuda
Data shape: (6000, 2)
           src         tgt
0  shastragaar  शस्त्रागार
1      bindhya    बिन्द्या
2    kirankant    किरणकांत
3  yagyopaveet   यज्ञोपवीत
4      ratania     रटानिया
Source vocab size: 30
Target vocab size: 66
Number of training examples: 6000
Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(30, 128, padding_idx=0)
    (rnn): GRU(128, 256, batch_first=True)
  )
  (decoder): Decoder(
    (embedding): Embedding(66, 128, padding_idx=0)
    (rnn): GRU(128, 256, batch_first=True)
    (fc_out): Linear(in_features=256, out_features=66, bias=True)
  )
)
Epoch 01/20 - Loss: 3.0970
Epoch 02/20 - Loss: 2.3244
Epoch 03/20 - Loss: 1.9072
Epoch 04/20 - Loss: 1.6440
Epoch 05/20 - Loss: 1.4464
Epoch 06/20 - Loss: 1.3225
Epoch 07/20 - Loss: 1.1712
Epoch 08/20 - Loss: 1.0786
Epoch 09/20 - Loss: 1.0069
Epoch 10/20 - Loss: 0.9285
Epo