In [8]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Embedding, LSTM, GRU, SimpleRNN, Dense
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import matplotlib.pyplot as plt


In [12]:
def load_dakshina_pairs(filepath, num_samples=None):
    with open(filepath, 'r', encoding='utf-8') as f:
        lines = f.read().strip().split('\n')
    pairs = [line.split('\t') for line in lines]
    if num_samples:
        pairs = pairs[:num_samples]
    return [(latin, '<start> ' + devanagari + ' <end>') for latin, devanagari in pairs]


In [14]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import unicodedata
import string
import os
import random

# ---------------------
# CONFIGURATION
# ---------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
EMBEDDING_DIM = 64
HIDDEN_DIM = 128
NUM_LAYERS = 1
CELL_TYPE = 'LSTM'  # Options: 'RNN', 'LSTM', 'GRU'
BATCH_SIZE = 32
NUM_EPOCHS = 15
TEACHER_FORCING_RATIO = 0.5
MAX_LENGTH = 30

# ---------------------
# DATA LOADING AND PREPROCESSING
# ---------------------
class TransliterationDataset(Dataset):
    def __init__(self, path):
        self.pairs = []
        self.input_chars = set()
        self.output_chars = set()

        with open(path, 'r', encoding='utf-8') as f:
            for line in f:
                latin, devanagari = line.strip().split('\t')
                self.pairs.append((latin, devanagari))
                self.input_chars.update(latin)
                self.output_chars.update(devanagari)

        self.input_chars = sorted(list(self.input_chars))
        self.output_chars = sorted(list(self.output_chars))

        self.input_char2idx = {ch: i + 1 for i, ch in enumerate(self.input_chars)}
        self.output_char2idx = {ch: i + 1 for i, ch in enumerate(self.output_chars)}

        self.input_char2idx['<pad>'] = 0
        self.output_char2idx['<pad>'] = 0
        self.output_char2idx['<sos>'] = len(self.output_char2idx)
        self.output_char2idx['<eos>'] = len(self.output_char2idx)

        self.input_idx2char = {i: ch for ch, i in self.input_char2idx.items()}
        self.output_idx2char = {i: ch for ch, i in self.output_char2idx.items()}

    def __len__(self):
        return len(self.pairs)

    def encode_seq(self, seq, char2idx, add_sos_eos=False):
        indices = [char2idx[c] for c in seq]
        if add_sos_eos:
            indices = [char2idx['<sos>']] + indices + [char2idx['<eos>']]
        return indices

    def pad_seq(self, indices, max_length):
        return indices + [0] * (max_length - len(indices))

    def __getitem__(self, idx):
        input_seq, target_seq = self.pairs[idx]
        input_encoded = self.encode_seq(input_seq, self.input_char2idx)
        target_encoded = self.encode_seq(target_seq, self.output_char2idx, add_sos_eos=True)

        input_encoded = self.pad_seq(input_encoded, MAX_LENGTH)
        target_encoded = self.pad_seq(target_encoded, MAX_LENGTH + 2)

        return (
            torch.tensor(input_encoded, dtype=torch.long),
            torch.tensor(target_encoded, dtype=torch.long)
        )

# ---------------------
# MODEL DEFINITION
# ---------------------
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_dim, hidden_dim, num_layers, cell_type='LSTM'):
        super().__init__()
        self.embedding = nn.Embedding(input_size, embedding_dim, padding_idx=0)
        rnn_cell = getattr(nn, cell_type)
        self.rnn = rnn_cell(embedding_dim, hidden_dim, num_layers, batch_first=True)

    def forward(self, x):
        embedded = self.embedding(x)
        outputs, hidden = self.rnn(embedded)
        return outputs, hidden

class Decoder(nn.Module):
    def __init__(self, output_size, embedding_dim, hidden_dim, num_layers, cell_type='LSTM'):
        super().__init__()
        self.embedding = nn.Embedding(output_size, embedding_dim, padding_idx=0)
        rnn_cell = getattr(nn, cell_type)
        self.rnn = rnn_cell(embedding_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_size)

    def forward(self, x, hidden):
        x = x.unsqueeze(1)
        embedded = self.embedding(x)
        output, hidden = self.rnn(embedded, hidden)
        prediction = self.fc(output.squeeze(1))
        return prediction, hidden

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, cell_type='LSTM'):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.cell_type = cell_type

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = src.size(0)
        trg_len = trg.size(1)
        trg_vocab_size = self.decoder.fc.out_features

        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(device)

        encoder_outputs, hidden = self.encoder(src)

        input = trg[:, 0]  # <sos> token

        for t in range(1, trg_len):
            output, hidden = self.decoder(input, hidden)
            outputs[:, t] = output
            top1 = output.argmax(1)
            input = trg[:, t] if random.random() < teacher_forcing_ratio else top1

        return outputs

# ---------------------
# TRAINING AND EVALUATION
# ---------------------
def train_model(model, dataloader, criterion, optimizer):
    model.train()
    total_loss = 0

    for src, trg in dataloader:
        src, trg = src.to(device), trg.to(device)
        optimizer.zero_grad()
        output = model(src, trg, TEACHER_FORCING_RATIO)

        output_dim = output.shape[-1]
        output = output[:, 1:].reshape(-1, output_dim)
        trg = trg[:, 1:].reshape(-1)

        loss = criterion(output, trg)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    return total_loss / len(dataloader)

def evaluate_model(model, dataloader, dataset):
    model.eval()
    correct = 0
    total = 0
    sample_outputs = []

    with torch.no_grad():
        for src, trg in dataloader:
            src = src.to(device)
            outputs = model(src, trg=None, teacher_forcing_ratio=0)

            preds = outputs.argmax(2).cpu().numpy()
            for i in range(src.size(0)):
                pred_seq = preds[i]
                gold_seq = trg[i].cpu().numpy()
                pred_chars = [dataset.output_idx2char[idx] for idx in pred_seq if idx != 0 and idx not in [dataset.output_char2idx['<sos>'], dataset.output_char2idx['<eos>']]]
                gold_chars = [dataset.output_idx2char[idx] for idx in gold_seq if idx != 0 and idx not in [dataset.output_char2idx['<sos>'], dataset.output_char2idx['<eos>']]]

                if pred_chars == gold_chars:
                    correct += 1
                total += 1
                if len(sample_outputs) < 5:
                    input_str = ''.join([dataset.input_idx2char[idx.item()] for idx in src[i] if idx.item() != 0])
                    sample_outputs.append((input_str, ''.join(gold_chars), ''.join(pred_chars)))

    accuracy = correct / total * 100
    return accuracy, sample_outputs

# ---------------------
# MAIN TRAINING LOOP
# ---------------------
def run_training(data_path):
    dataset = TransliterationDataset(data_path)
    dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

    encoder = Encoder(
        input_size=len(dataset.input_char2idx),
        embedding_dim=EMBEDDING_DIM,
        hidden_dim=HIDDEN_DIM,
        num_layers=NUM_LAYERS,
        cell_type=CELL_TYPE
    ).to(device)

    decoder = Decoder(
        output_size=len(dataset.output_char2idx),
        embedding_dim=EMBEDDING_DIM,
        hidden_dim=HIDDEN_DIM,
        num_layers=NUM_LAYERS,
        cell_type=CELL_TYPE
    ).to(device)

    model = Seq2Seq(encoder, decoder, cell_type=CELL_TYPE).to(device)
    criterion = nn.CrossEntropyLoss(ignore_index=0)
    optimizer = torch.optim.Adam(model.parameters())

    for epoch in range(NUM_EPOCHS):
        loss = train_model(model, dataloader, criterion, optimizer)
        print(f"Epoch {epoch + 1}/{NUM_EPOCHS}, Loss: {loss:.4f}")

    accuracy, samples = evaluate_model(model, dataloader, dataset)
    print(f"\n✅ Final Accuracy: {accuracy:.2f}%")
    print("\n🔤 Sample Predictions:")
    for i, (inp, tgt, pred) in enumerate(samples):
        print(f"{i+1}. {inp} ➡️ {pred} (Target: {tgt})")

# ---------------------
# RUN IT
# ---------------------
# Example: 'data/dakshina_dataset_v1.0/hi/lexicons/hi.translit.sampled.train.tsv'
run_training("PATH_TO_TRAIN_FILE")


FileNotFoundError: [Errno 2] No such file or directory: 'PATH_TO_TRAIN_FILE'