In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import numpy as np
import pandas as pd
import random

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

### Prepare Data

In [4]:
TRAIN_FilePath = 'dakshina_dataset_v1.0/hi/lexicons/hi.translit.sampled.train.tsv'
DEV_FilePath = 'dakshina_dataset_v1.0/hi/lexicons/hi.translit.sampled.dev.tsv'
TEST_FilePath = 'dakshina_dataset_v1.0/hi/lexicons/hi.translit.sampled.test.tsv'

In [5]:
from google.colab import drive
drive.mount('/content/drive')
TRAIN_FilePath = '/content/drive/My Drive/hi/lexicons/hi.translit.sampled.train.tsv'
DEV_FilePath = '/content/drive/My Drive/hi/lexicons/hi.translit.sampled.dev.tsv'
TEST_FilePath = '/content/drive/My Drive/hi/lexicons/hi.translit.sampled.test.tsv'

Mounted at /content/drive


In [6]:
# Load train, dev and test datasets
train_df = pd.read_csv(TRAIN_FilePath, sep='\t', header=None)
dev_df = pd.read_csv(DEV_FilePath, sep='\t', header=None)
test_df = pd.read_csv(TEST_FilePath, sep='\t', header=None)


# Renaming the columns
train_df.columns = ['devanagari', 'latin', 'frequency']
dev_df.columns = ['devanagari', 'latin', 'frequency']
test_df.columns = ['devanagari', 'latin', 'frequency']

# Dataset Sizes
print(f"Train Dataset Size : {train_df.shape[0]}\nDev Dataset Size   : {dev_df.shape[0]}\nTest Dataset Size  : {test_df.shape[0]}")

Train Dataset Size : 44204
Dev Dataset Size   : 4358
Test Dataset Size  : 4502


In [7]:
train_df.head()

Unnamed: 0,devanagari,latin,frequency
0,अं,an,3
1,अंकगणित,ankganit,3
2,अंकल,uncle,4
3,अंकुर,ankur,4
4,अंकुरण,ankuran,3


In [8]:
class Vocabulary:
    def __init__(self):
        self.pad_token = "<pad>"
        self.sos_token = "<sos>"
        self.eos_token = "<eos>"
        self.unk_token = "<unk>"

        # Initialize mappings
        self.char2idx = {self.pad_token: 0, self.sos_token: 1, self.eos_token: 2, self.unk_token: 3}
        self.idx2char = {0: self.pad_token, 1: self.sos_token, 2: self.eos_token, 3: self.unk_token}
        self.vocab_size = 4

    def build_vocabulary(self, text_data):
        for text in text_data:
            text = str(text)
            for char in text:
                if char not in self.char2idx:
                    self.char2idx[char] = self.vocab_size
                    self.idx2char[self.vocab_size] = char
                    self.vocab_size += 1

    def encode(self, text, add_special_tokens=True):
        indices = []
        text = str(text)
        for char in text:
            indices.append(self.char2idx.get(char, self.char2idx[self.unk_token]))

        if add_special_tokens:
            indices = [self.char2idx[self.sos_token]] + indices + [self.char2idx[self.eos_token]]

        return indices

    def decode(self, indices, remove_special_tokens=True):
        chars = []
        keys = list(self.idx2char.keys())
        for idx in indices:
            if isinstance(idx, torch.Tensor):
                idx = idx.item()
            if idx in keys:
                char = self.idx2char[idx]
                if remove_special_tokens and char in [self.pad_token, self.sos_token, self.eos_token, self.unk_token]:
                    continue
                chars.append(char)

        return "".join(chars)


In [9]:
class TransliterationDataset(Dataset):
    def __init__(self, data_path, src_vocab, tgt_vocab):
        df = pd.read_csv(data_path, sep='\t', header=None)

        # Create Dataset
        self.source_sequences = []
        self.target_sequences = []

        for idx, row in df.iterrows():
            x_seq = src_vocab.encode(row[1])
            y_seq = tgt_vocab.encode(row[0])
            self.source_sequences.append(x_seq)
            self.target_sequences.append(y_seq)

    def __len__(self):
        return len(self.source_sequences)

    def __getitem__(self, idx):
        return torch.tensor(self.source_sequences[idx], dtype=torch.long), torch.tensor(self.target_sequences[idx], dtype=torch.long)


In [10]:
def collate_fn(batch):
    src_batch = [item[0] for item in batch]
    tgt_batch = [item[1] for item in batch]

    # Pad sequences
    src_batch_padded = pad_sequence(src_batch, batch_first=True, padding_value=0)
    tgt_batch_padded = pad_sequence(tgt_batch, batch_first=True, padding_value=0)

    return src_batch_padded, tgt_batch_padded

In [12]:
# Build Source and Target Vocabularies
src_vocab = Vocabulary()
tgt_vocab = Vocabulary()

train_df = pd.read_csv(TRAIN_FilePath, sep='\t', header=None)
src_text = []
tgt_text = []
for idx, row in train_df.iterrows():
    src_text.append(row[1])
    tgt_text.append(row[0])

src_vocab.build_vocabulary(src_text)
tgt_vocab.build_vocabulary(tgt_text)

In [17]:
# Train, Test and Dev (Validation) Dataset and Dataloaders
train_dataset = TransliterationDataset(TRAIN_FilePath, src_vocab, tgt_vocab)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)

dev_dataset = TransliterationDataset(DEV_FilePath, src_vocab, tgt_vocab)
dev_loader = DataLoader(dev_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

test_dataset = TransliterationDataset(TEST_FilePath, src_vocab, tgt_vocab)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)


In [22]:
# Inspect if dataloader is created as desired
for batch in train_loader:
    inputs, targets = batch

    print("Inputs shape:", inputs.shape)
    print("Targets shape:", targets.shape)

    # Check one sample
    print("Sample input:", inputs[0])
    print("Sample target:", targets[0])
    print(src_vocab.decode(inputs[0]))
    print(tgt_vocab.decode(targets[0]))
    break

Inputs shape: torch.Size([32, 14])
Targets shape: torch.Size([32, 13])
Sample input: tensor([ 1,  6,  4, 23, 10,  6,  9,  4,  2,  0,  0,  0,  0,  0])
Sample target: tensor([ 1,  6, 21, 29, 12,  6, 10, 21,  2,  0,  0,  0,  0])
kamukta
कामुकता


## Model

In [19]:
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers=1, cell_type="RNN", dropout=0.0):
        super().__init__()

        if num_layers == 1:
            dropout = 0

        self.cell_type = cell_type

        # Embedding Layer
        self.embedding = nn.Embedding(input_size, embedding_size)

        # Recurrent Layer
        if cell_type == "LSTM":
            self.recurrent_layer = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
        elif cell_type == "GRU":
            self.recurrent_layer = nn.GRU(embedding_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
        else: # Default (RNN)
            self.recurrent_layer = nn.RNN(embedding_size, hidden_size, num_layers, dropout=dropout, batch_first=True)

        # Dropout layer
        self.dropout = nn.Dropout(dropout)

    def forward(self, input):
        # input: batch_size x seq_len

        embeddings = self.embedding(input)
        embeddings = self.dropout(embeddings)

        if self.cell_type == 'LSTM':
            outputs, (hidden, cell) = self.recurrent_layer(embeddings)
            return outputs, (hidden, cell)
        else:
            outputs, hidden = self.recurrent_layer(embeddings)
            return outputs, hidden


In [23]:
class Decoder(nn.Module):
    def __init__(self, output_size, embedding_size, hidden_size, num_layers=1, cell_type="RNN", dropout=0.0):
        super().__init__()

        if num_layers == 1:
            dropout = 0

        self.output_size = output_size
        self.cell_type = cell_type

        # Embedding Layer
        self.embedding = nn.Embedding(output_size, embedding_size)

        # Dropout layer
        self.dropout = nn.Dropout(dropout)

        # Recurrent Layer
        if cell_type == "LSTM":
            self.recurrent_layer = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
        elif cell_type == "GRU":
            self.recurrent_layer = nn.GRU(embedding_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
        else: # Default (RNN)
            self.recurrent_layer = nn.RNN(embedding_size, hidden_size, num_layers, dropout=dropout, batch_first=True)

        # Output layer
        self.fc_out = nn.Linear(hidden_size, output_size)

    def forward(self, input, hidden):

        input = input.unsqueeze(1)
        embeddings = self.embedding(input)
        embeddings = self.dropout(embeddings)

        if self.cell_type == "LSTM":
            hidden, cell = hidden
            outputs, (hidden, cell) = self.recurrent_layer(embeddings, (hidden, cell))
            hidden = (hidden, cell)
        else:
            outputs, hidden = self.recurrent_layer(embeddings, hidden)

        outputs = outputs.squeeze(1)
        prediction = self.fc_out(outputs)
        return prediction, hidden


In [15]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, tgt, teacher_forcing_ratio=0.5):

        batch_size = src.shape[0]
        tgt_len = tgt.shape[1]
        tgt_vocab_size = self.decoder.output_size

        outputs = torch.zeros(batch_size, tgt_len, tgt_vocab_size).to(self.device)

        if self.encoder.cell_type == 'LSTM':
            encoder_outputs, (hidden, cell) = self.encoder(src)
            decoder_hidden = (hidden, cell)
        else:
            encoder_outputs, hidden = self.encoder(src)
            decoder_hidden = hidden

        decoder_input = tgt[:, 0]
        for t in range(1, tgt_len):
            decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)
            outputs[:, t] = decoder_output
            teacher_force = random.random() < teacher_forcing_ratio
            top = decoder_output.argmax(1)
            decoder_input = tgt[:, t] if teacher_force else top
        return outputs

In [16]:
def train(model, train_loader, optimizer, criterion, clip=1.0):
    model.train()
    epoch_loss = 0

    for i, (src, tgt) in enumerate(train_loader):
        src = src.to(device)
        tgt = tgt.to(device)

        optimizer.zero_grad()

        output = model(src, tgt)

        # Reshape output and target for loss calculation
        # output: [batch_size, tgt_len, output_dim]
        # tgt: [batch_size, tgt_len]
        output_dim = output.shape[-1]
        output = output[:, 1:].reshape(-1, output_dim)  # Remove first token (SOS)
        tgt = tgt[:, 1:].reshape(-1)  # Remove first token (SOS)

        # Calculate loss
        loss = criterion(output, tgt)

        # Backpropagation
        loss.backward()

        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

        # Update parameters
        optimizer.step()

        epoch_loss += loss.item()

        if (i+1) % 100 == 0:
            print(f'Batch {i+1}/{len(train_loader)} | Loss: {loss.item():.4f}')

    return epoch_loss / len(train_loader)

In [17]:
def evaluate(model, val_loader, criterion):
    model.eval()
    epoch_loss = 0

    with torch.no_grad():
        for i, (src, tgt) in enumerate(val_loader):
            src = src.to(device)
            tgt = tgt.to(device)

            output = model(src, tgt, teacher_forcing_ratio=0)  # No teacher forcing during evaluation

            # Reshape output and target for loss calculation
            output_dim = output.shape[-1]
            output = output[:, 1:].reshape(-1, output_dim)
            tgt = tgt[:, 1:].reshape(-1)

            # Calculate loss
            loss = criterion(output, tgt)
            epoch_loss += loss.item()

    return epoch_loss / len(val_loader)

In [18]:
def transliterate(model, src_text, src_vocab, tgt_vocab, device, max_length=100):
    model.eval()

    # Convert source text to tensor
    src_indices = src_vocab.encode(src_text)
    src_tensor = torch.tensor(src_indices, dtype=torch.long).unsqueeze(0).to(device)

    # Get encoder outputs
    with torch.no_grad():
        if model.encoder.cell_type == 'LSTM':
            encoder_outputs, (hidden, cell) = model.encoder(src_tensor)
            decoder_hidden = (hidden, cell)
        else:
            encoder_outputs, hidden = model.encoder(src_tensor)
            decoder_hidden = hidden

    # Start with SOS token
    decoder_input = torch.tensor([tgt_vocab.char2idx[tgt_vocab.sos_token]], device=device)

    result_indices = [tgt_vocab.char2idx[tgt_vocab.sos_token]]

    for _ in range(max_length):
        with torch.no_grad():
            decoder_output, decoder_hidden = model.decoder(decoder_input, decoder_hidden)

        # Get the most likely next character
        top_token = decoder_output.argmax(1).item()
        result_indices.append(top_token)

        # Stop if EOS token
        if top_token == tgt_vocab.char2idx[tgt_vocab.eos_token]:
            break

        # Use predicted token as next input
        decoder_input = torch.tensor([top_token], device=device)

    # Convert indices to text
    result_text = tgt_vocab.decode(result_indices)

    return result_text

In [19]:
INPUT_SIZE = src_vocab.vocab_size
OUTPUT_SIZE = tgt_vocab.vocab_size
EMBEDDING_SIZE = 256
HIDDEN_SIZE = 512
NUM_LAYERS = 2
CELL_TYPE = "LSTM"  # Options: "RNN", "LSTM", "GRU"
DROPOUT = 0.2
LEARNING_RATE = 0.001
NUM_EPOCHS = 10

# Initialize encoder, decoder, and seq2seq model
encoder = Encoder(
    input_size=INPUT_SIZE,
    embedding_size=EMBEDDING_SIZE,
    hidden_size=HIDDEN_SIZE,
    num_layers=NUM_LAYERS,
    cell_type=CELL_TYPE,
    dropout=DROPOUT
)

decoder = Decoder(
    output_size=OUTPUT_SIZE,
    embedding_size=EMBEDDING_SIZE,
    hidden_size=HIDDEN_SIZE,
    num_layers=NUM_LAYERS,
    cell_type=CELL_TYPE,
    dropout=DROPOUT
)

model = Seq2Seq(encoder, decoder, device).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=0)  # Ignore padding token (index 0)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

In [20]:
# Training loop
print(f"Starting training for {NUM_EPOCHS} epochs...")
best_valid_loss = float('inf')

for epoch in range(NUM_EPOCHS):
    print(f"Epoch {epoch+1}/{NUM_EPOCHS}")

    # Train model
    train_loss = train(model, train_loader, optimizer, criterion)

    # Evaluate model
    valid_loss = evaluate(model, dev_loader, criterion)

    print(f"Train Loss: {train_loss:.4f} | Valid Loss: {valid_loss:.4f}")

    # Save the best model
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'best_transliteration_model.pt')
        print("Model saved!")

Starting training for 10 epochs...
Epoch 1/10
Batch 100/1382 | Loss: 3.0889
Batch 200/1382 | Loss: 2.6197
Batch 300/1382 | Loss: 2.6491
Batch 400/1382 | Loss: 2.0562
Batch 500/1382 | Loss: 1.6644
Batch 600/1382 | Loss: 1.2051
Batch 700/1382 | Loss: 1.1606
Batch 800/1382 | Loss: 1.3630
Batch 900/1382 | Loss: 1.0955
Batch 1000/1382 | Loss: 0.8668
Batch 1100/1382 | Loss: 1.4716
Batch 1200/1382 | Loss: 0.8982
Batch 1300/1382 | Loss: 0.9838
Train Loss: 1.6242 | Valid Loss: 1.1897
Model saved!
Epoch 2/10
Batch 100/1382 | Loss: 1.1630
Batch 200/1382 | Loss: 0.9476
Batch 300/1382 | Loss: 0.5444
Batch 400/1382 | Loss: 0.7372
Batch 500/1382 | Loss: 0.5402
Batch 600/1382 | Loss: 0.6457
Batch 700/1382 | Loss: 0.7539
Batch 800/1382 | Loss: 0.7224
Batch 900/1382 | Loss: 0.6856
Batch 1000/1382 | Loss: 0.6379
Batch 1100/1382 | Loss: 0.5698
Batch 1200/1382 | Loss: 0.4394
Batch 1300/1382 | Loss: 0.5429
Train Loss: 0.7176 | Valid Loss: 1.0699
Model saved!
Epoch 3/10
Batch 100/1382 | Loss: 0.4948
Batch 20

In [21]:
# Accuracy calculation function
def calculate_accuracy(model, data_loader, src_vocab, tgt_vocab, device):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for src, tgt in data_loader:
            src = src.to(device)
            tgt = tgt.to(device)

            batch_size = src.shape[0]

            for i in range(batch_size):
                # Get source text and actual target text
                src_indices = src[i].tolist()
                src_text = src_vocab.decode(src_indices)
                actual_tgt_text = tgt_vocab.decode(tgt[i].tolist())

                # Get predicted transliteration
                predicted_tgt_text = transliterate(model, src_text, src_vocab, tgt_vocab, device)

                # Check if prediction matches
                if predicted_tgt_text == actual_tgt_text:
                    correct += 1
                total += 1

    return correct / total

# Calculate accuracy on test set
test_accuracy = calculate_accuracy(model, test_loader, src_vocab, tgt_vocab, device)
print(f"\nTest Accuracy: {test_accuracy:.4f}")


Test Accuracy: 0.3634
