In [4]:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import os

# -----------------------------------------
# 🔸 Step 1: Load Dakshina Dataset
# -----------------------------------------
def load_dakshina_tsv(file_path):
    latin, devanagari = [], []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split('\t')
            if len(parts) >= 2:
                latin.append(parts[0])
                devanagari.append(parts[1])
    return list(zip(latin, devanagari))

# Update this to your actual file path
base_path = '/content'

train_pairs = load_dakshina_tsv(os.path.join(base_path, 'hi.translit.sampled.dev.tsv'))
test_pairs = load_dakshina_tsv(os.path.join(base_path, 'hi.translit.sampled.test.tsv'))

# -----------------------------------------
# 🔸 Step 2: Build Character Vocab
# -----------------------------------------
input_chars = sorted(list(set("".join(x for x, _ in train_pairs))))
target_chars = sorted(list(set("".join(y for _, y in train_pairs))))

input_char2idx = {ch: i + 1 for i, ch in enumerate(input_chars)}
input_char2idx["<pad>"] = 0
target_char2idx = {ch: i + 1 for i, ch in enumerate(target_chars)}
target_char2idx["<pad>"] = 0
target_char2idx["<sos>"] = len(target_char2idx)
target_char2idx["<eos>"] = len(target_char2idx)
idx2target_char = {i: ch for ch, i in target_char2idx.items()}

MAX_LENGTH = max(max(len(x), len(y)) for x, y in train_pairs) + 2

# -----------------------------------------
# 🔸 Step 3: Dataset Class
# -----------------------------------------
class CharSeqDataset(Dataset):
    def __init__(self, data_pairs):
        self.data = data_pairs

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        src, tgt = self.data[idx]
        src_seq = [input_char2idx[ch] for ch in src]
        tgt_seq = [target_char2idx["<sos>"]] + [target_char2idx[ch] for ch in tgt] + [target_char2idx["<eos>"]]

        src_seq += [input_char2idx["<pad>"]] * (MAX_LENGTH - len(src_seq))
        tgt_seq += [target_char2idx["<pad>"]] * (MAX_LENGTH - len(tgt_seq))

        return torch.tensor(src_seq), torch.tensor(tgt_seq[:-1]), torch.tensor(tgt_seq[1:])  # input, decoder_input, target

# -----------------------------------------
# 🔸 Step 4: Seq2Seq Model
# -----------------------------------------
class Seq2SeqModel(nn.Module):
    def __init__(self, input_vocab_size, target_vocab_size, embedding_dim, hidden_dim, num_layers=1, cell_type='lstm'):
        super().__init__()
        self.cell_type = cell_type.lower()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        self.encoder_embedding = nn.Embedding(input_vocab_size, embedding_dim, padding_idx=0)
        self.decoder_embedding = nn.Embedding(target_vocab_size, embedding_dim, padding_idx=0)

        rnn = {'rnn': nn.RNN, 'lstm': nn.LSTM, 'gru': nn.GRU}[self.cell_type]
        self.encoder = rnn(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)
        self.decoder = rnn(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)

        self.output_fc = nn.Linear(hidden_dim, target_vocab_size)

    def forward(self, src, tgt):
        src_embed = self.encoder_embedding(src)
        tgt_embed = self.decoder_embedding(tgt)

        _, hidden = self.encoder(src_embed)
        output, _ = self.decoder(tgt_embed, hidden)

        return self.output_fc(output)

# -----------------------------------------
# 🔸 Step 5: Training Loop
# -----------------------------------------
def train_seq2seq(model, dataloader, criterion, optimizer, device, epochs=10):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for src, tgt_input, tgt_output in dataloader:
            src, tgt_input, tgt_output = src.to(device), tgt_input.to(device), tgt_output.to(device)

            optimizer.zero_grad()
            output = model(src, tgt_input)
            loss = criterion(output.view(-1, output.size(-1)), tgt_output.view(-1))
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        print(f"Epoch {epoch+1}, Loss: {total_loss / len(dataloader):.4f}")

# -----------------------------------------
# 🔸 Step 6: Prediction Function
# -----------------------------------------
def predict_seq2seq(model, src_seq, device, max_len=MAX_LENGTH):
    model.eval()
    src_indices = [input_char2idx.get(ch, 0) for ch in src_seq]
    src_indices += [input_char2idx["<pad>"]] * (MAX_LENGTH - len(src_indices))
    src_tensor = torch.tensor(src_indices).unsqueeze(0).to(device)

    with torch.no_grad():
        src_embed = model.encoder_embedding(src_tensor)
        _, hidden = model.encoder(src_embed)

        decoder_input = torch.tensor([[target_char2idx["<sos>"]]], device=device)
        decoded = []

        for _ in range(max_len):
            dec_embed = model.decoder_embedding(decoder_input)
            output, hidden = model.decoder(dec_embed, hidden)
            logits = model.output_fc(output.squeeze(1))
            pred_id = logits.argmax(dim=1).item()
            if idx2target_char[pred_id] == "<eos>":
                break
            decoded.append(idx2target_char.get(pred_id, ""))
            decoder_input = torch.tensor([[pred_id]], device=device)

    return ''.join(decoded)

# -----------------------------------------
# 🔸 Step 7: Run Everything
# -----------------------------------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

embedding_dim = 64
hidden_dim = 128
num_layers = 1
cell_type = 'lstm'

input_vocab_size = len(input_char2idx)
target_vocab_size = len(target_char2idx)

train_dataset = CharSeqDataset(train_pairs)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

model = Seq2SeqModel(input_vocab_size, target_vocab_size, embedding_dim, hidden_dim,
                     num_layers=num_layers, cell_type=cell_type).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters(), lr=0.001)

train_seq2seq(model, train_loader, criterion, optimizer, device, epochs=30)

# -----------------------------------------
# 🔸 Step 8: Test Predictions
# -----------------------------------------
print("\nSample Predictions on Test Set:")
for x, y in test_pairs[:10]:
    prediction = predict_seq2seq(model, x, device)
    print(f"Input: {x} | Actual: {y} | Predicted: {prediction}")


Epoch 1, Loss: 2.7276
Epoch 2, Loss: 2.3748
Epoch 3, Loss: 2.2465
Epoch 4, Loss: 2.1283
Epoch 5, Loss: 1.9964
Epoch 6, Loss: 1.8454
Epoch 7, Loss: 1.6614
Epoch 8, Loss: 1.5077
Epoch 9, Loss: 1.3286
Epoch 10, Loss: 1.1913
Epoch 11, Loss: 1.0675
Epoch 12, Loss: 0.9728
Epoch 13, Loss: 0.8948
Epoch 14, Loss: 0.8284
Epoch 15, Loss: 0.7780
Epoch 16, Loss: 0.7308
Epoch 17, Loss: 0.6728
Epoch 18, Loss: 0.6356
Epoch 19, Loss: 0.6205
Epoch 20, Loss: 0.5700
Epoch 21, Loss: 0.5471
Epoch 22, Loss: 0.5273
Epoch 23, Loss: 0.5007
Epoch 24, Loss: 0.4954
Epoch 25, Loss: 0.4605
Epoch 26, Loss: 0.4413
Epoch 27, Loss: 0.4369
Epoch 28, Loss: 0.4202
Epoch 29, Loss: 0.4018
Epoch 30, Loss: 0.3827

Sample Predictions on Test Set:
Input: अंक | Actual: ank | Predicted: ank
Input: अंक | Actual: anka | Predicted: ank
Input: अंकित | Actual: ankit | Predicted: ankit
Input: अंकों | Actual: anakon | Predicted: ankon
Input: अंकों | Actual: ankhon | Predicted: ankon
Input: अंकों | Actual: ankon | Predicted: ankon
Input: 

Import

In [5]:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import os

Step 1:Load Dakshina Dataset

In [6]:
def load_dakshina_tsv(file_path):
    latin, devanagari = [], []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split('\t')
            if len(parts) >= 2:
                latin.append(parts[0])
                devanagari.append(parts[1])
    return list(zip(latin, devanagari))

# Update this to your actual file path
base_path = '/content'

train_pairs = load_dakshina_tsv(os.path.join(base_path, 'hi.translit.sampled.dev.tsv'))
test_pairs = load_dakshina_tsv(os.path.join(base_path, 'hi.translit.sampled.test.tsv'))

Step 2: Build Character Vocab

In [8]:
input_chars = sorted(list(set("".join(x for x, _ in train_pairs))))
target_chars = sorted(list(set("".join(y for _, y in train_pairs))))

input_char2idx = {ch: i + 1 for i, ch in enumerate(input_chars)}
input_char2idx["<pad>"] = 0
target_char2idx = {ch: i + 1 for i, ch in enumerate(target_chars)}
target_char2idx["<pad>"] = 0
target_char2idx["<sos>"] = len(target_char2idx)
target_char2idx["<eos>"] = len(target_char2idx)
idx2target_char = {i: ch for ch, i in target_char2idx.items()}

MAX_LENGTH = max(max(len(x), len(y)) for x, y in train_pairs) + 2

Step 3: Dataset Class

In [9]:
class CharSeqDataset(Dataset):
    def __init__(self, data_pairs):
        self.data = data_pairs

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        src, tgt = self.data[idx]
        src_seq = [input_char2idx[ch] for ch in src]
        tgt_seq = [target_char2idx["<sos>"]] + [target_char2idx[ch] for ch in tgt] + [target_char2idx["<eos>"]]

        src_seq += [input_char2idx["<pad>"]] * (MAX_LENGTH - len(src_seq))
        tgt_seq += [target_char2idx["<pad>"]] * (MAX_LENGTH - len(tgt_seq))

        return torch.tensor(src_seq), torch.tensor(tgt_seq[:-1]), torch.tensor(tgt_seq[1:])  # input, decoder_input, target

Step 4:Seq2Seq Model

In [10]:
class Seq2SeqModel(nn.Module):
    def __init__(self, input_vocab_size, target_vocab_size, embedding_dim, hidden_dim, num_layers=1, cell_type='lstm'):
        super().__init__()
        self.cell_type = cell_type.lower()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        self.encoder_embedding = nn.Embedding(input_vocab_size, embedding_dim, padding_idx=0)
        self.decoder_embedding = nn.Embedding(target_vocab_size, embedding_dim, padding_idx=0)

        rnn = {'rnn': nn.RNN, 'lstm': nn.LSTM, 'gru': nn.GRU}[self.cell_type]
        self.encoder = rnn(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)
        self.decoder = rnn(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)

        self.output_fc = nn.Linear(hidden_dim, target_vocab_size)

    def forward(self, src, tgt):
        src_embed = self.encoder_embedding(src)
        tgt_embed = self.decoder_embedding(tgt)

        _, hidden = self.encoder(src_embed)
        output, _ = self.decoder(tgt_embed, hidden)

        return self.output_fc(output)

Step 5: Training Loop

In [11]:
def train_seq2seq(model, dataloader, criterion, optimizer, device, epochs=10):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for src, tgt_input, tgt_output in dataloader:
            src, tgt_input, tgt_output = src.to(device), tgt_input.to(device), tgt_output.to(device)

            optimizer.zero_grad()
            output = model(src, tgt_input)
            loss = criterion(output.view(-1, output.size(-1)), tgt_output.view(-1))
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        print(f"Epoch {epoch+1}, Loss: {total_loss / len(dataloader):.4f}")

Step 6: Prediction Function

In [12]:
def predict_seq2seq(model, src_seq, device, max_len=MAX_LENGTH):
    model.eval()
    src_indices = [input_char2idx.get(ch, 0) for ch in src_seq]
    src_indices += [input_char2idx["<pad>"]] * (MAX_LENGTH - len(src_indices))
    src_tensor = torch.tensor(src_indices).unsqueeze(0).to(device)

    with torch.no_grad():
        src_embed = model.encoder_embedding(src_tensor)
        _, hidden = model.encoder(src_embed)

        decoder_input = torch.tensor([[target_char2idx["<sos>"]]], device=device)
        decoded = []

        for _ in range(max_len):
            dec_embed = model.decoder_embedding(decoder_input)
            output, hidden = model.decoder(dec_embed, hidden)
            logits = model.output_fc(output.squeeze(1))
            pred_id = logits.argmax(dim=1).item()
            if idx2target_char[pred_id] == "<eos>":
                break
            decoded.append(idx2target_char.get(pred_id, ""))
            decoder_input = torch.tensor([[pred_id]], device=device)

    return ''.join(decoded)