# Imports

In [None]:
import time
import random
import csv
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

# Preparing Data

In [None]:
# !wget https://storage.googleapis.com/gresearch/dakshina/dakshina_dataset_v1.0.tar
# !tar xvf dakshina_dataset_v1.0.tar

In [None]:
path = "./dakshina_dataset_v1.0/hi/romanized/hi.romanized.rejoined.aligned.cased_nopunct.tsv"
rawData = open(path)
reader = csv.reader(rawData, delimiter="\t")
dataset = []

In [None]:
error = []
pair = []
sent = []
i = 0
rowno = 1
for row in reader:

    if row[0] == "</s>":
        dataset.append(sent)
        sent = []
        i += 1
        continue

    try:
        pair.append(row[0])
        pair.append(row[1])
        sent.append(pair)
        pair = []
    except IndexError:
        error.append(rowno)

    rowno += 1

In [None]:
train_data = dataset[:6000]
dev_data = dataset[6000:8000]
test_data = dataset[8000:]

# Seq2Seq Model

### Encoder

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, dropout):
        super().__init__()
        self.hid_dim = hid_dim
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.GRU(emb_dim, hid_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src, src_size):
        embedded = self.dropout(self.embedding(src))
        embedded = embedded.permute(1, 0, 2)
        embedded = nn.utils.rnn.pack_padded_sequence(
            embedded, src_size, enforce_sorted=False
        )
        output, hidden = self.rnn(embedded)
        output, _ = nn.utils.rnn.pad_packed_sequence(output)
        output = output.permute(1, 0, 2)
        return output, hidden

### Decoder

In [None]:
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, dropout):
        super().__init__()
        self.hid_dim = hid_dim
        self.output_dim = output_dim
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.GRU(emb_dim, hid_dim)
        self.fc_out = nn.Sequential(
            nn.Linear(self.hid_dim, emb_dim),
            nn.LeakyReLU(),
            nn.Linear(emb_dim, self.output_dim),
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, context):
        embedded = self.dropout(self.embedding(input))
        output, hidden = self.rnn(embedded, hidden)
        output = output.view(-1, output.size(2))
        output = self.fc_out(output)
        return output, hidden

### Seq2Seq Connection

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, tgt, src_size, teacher_forcing_ratio=0.5):
        batch_size = tgt.shape[0]

        encoder_output, encoder_hidden = self.encoder(src, src_size)
        decoder_hidden = encoder_hidden
        outputs = torch.zeros(batch_size, self.decoder.output_dim, tgt.size(1)).to(
            self.device
        )
        decoder_input = tgt[:, 0].unsqueeze(1)
        outputs[:, 1, 0] = 1

        for t in range(1, tgt.size(1)):
            decoder_output, decoder_hidden = self.decoder(
                decoder_input, decoder_hidden, encoder_output
            )
            outputs[:, :, t] = decoder_output
            top1 = decoder_output.argmax(1)
            teacher_force = random.random() < teacher_forcing_ratio
            decoder_input = (
                tgt[:, t].unsqueeze(1) if teacher_force else top1.unsqueeze(1)
            )

        return outputs

### Weight Initialization

In [None]:
def init_weights(model):
    for name, param in model.named_parameters():
        nn.init.normal_(param.data, mean=0, std=0.01)

# Data Handling

In [None]:
english_lower_script = [chr(alpha) for alpha in range(97, 123)]
devanagari_script = [
    "ऄ",
    "अ",
    "आ",
    "इ",
    "ई",
    "उ",
    "ऊ",
    "ऍ",
    "ऎ",
    "ए",
    "ऐ",
    "ऑ",
    "ऒ",
    "ओ",
    "औ",
    "ऋ",
    "ॠ",
    "ऌ",
    "ॡ",
    "ॲ",
    "ॐ",
    "क",
    "ख",
    "ग",
    "घ",
    "ङ",
    "च",
    "छ",
    "ज",
    "झ",
    "ञ",
    "ट",
    "ठ",
    "ड",
    "ढ",
    "ण",
    "त",
    "थ",
    "द",
    "ध",
    "न",
    "ऩ",
    "प",
    "फ",
    "ब",
    "भ",
    "म",
    "य",
    "र",
    "ऱ",
    "ल",
    "ळ",
    "ऴ",
    "व",
    "श",
    "ष",
    "स",
    "ह",
    "क़",
    "ख़",
    "ग़",
    "ज़",
    "ड़",
    "ढ़",
    "फ़",
    "य़",
    "्",
    "ा",
    "ि",
    "ी",
    "ु",
    "ू",
    "ॅ",
    "ॆ",
    "े",
    "ै",
    "ॉ",
    "ॊ",
    "ो",
    "ौ",
    "ृ",
    "ॄ",
    "ॢ",
    "ॣ",
    "ँ",
    "ं",
    "ः",
    "़",
    "॑",
    "ऽ",
    chr(0x200C),
    chr(0x200D),
]

In [None]:
class Script:
    def __init__(self, language_script=devanagari_script):
        self.graphemes = language_script
        self.char2index = {}
        self.index2char = {}
        self.char2index["_"] = 0
        self.char2index["^"] = 1
        self.char2index["$"] = 2
        self.index2char[0] = "_"
        self.index2char[1] = "^"
        self.index2char[2] = "$"

        for index, char in enumerate(self.graphemes):
            self.char2index[char] = index + 3
            self.index2char[index + 3] = char

    def size(self):
        return len(self.char2index)

    def word2vector(self, word):
        vector = list()
        vector.append(self.char2index["^"])
        for char in list(word):
            if char in self.char2index:
                vector.append(self.char2index[char])
        vector.append(self.char2index["$"])
        vector = np.asarray(vector, dtype=np.int64)
        return vector

    def vector2word(self, vector):
        word = list()
        for index in vector:
            word.append(self.index2char[index])
        word = "".join(word).replace("_", "").replace("^", "").replace("$", "")
        return word

In [None]:
class Transliteration_Dataset(Dataset):
    def __init__(self, data, src_script, tgt_script):
        src_data = list()
        tgt_data = list()
        for sentence in data:
            src, tgt = zip(*sentence)
            for i in range(len(src)):
                flag = 0
                for src_char in src[i]:
                    for tgt_char in tgt[i]:
                        if (
                            src_char not in src_script.graphemes
                            or tgt_char not in tgt_script.graphemes
                        ):
                            flag = 1
                            break
                    if flag == 1:
                        break
                if flag == 0:
                    src_data.append(src[i])
                    tgt_data.append(tgt[i])

        self.src_sript = src_script
        self.tgt_sript = tgt_script
        self.src = [src_script.word2vector(word) for word in src_data]
        self.tgt = [tgt_script.word2vector(word) for word in tgt_data]
        self.max_src_size = max([len(vector) for vector in self.src], default=0)
        self.max_tgt_size = max([len(vector) for vector in self.tgt], default=0)

    def __len__(self):
        return len(self.src)

    def __getitem__(self, index):
        src_vector = self.pad_sequence(self.src[index], self.max_src_size)
        tgt_vector = self.pad_sequence(self.tgt[index], self.max_tgt_size)
        return src_vector, tgt_vector, len(self.src[index])

    def pad_sequence(self, vector, max_size):
        padded_vector = np.zeros((max_size), dtype=np.int64)
        if len(vector) > max_size:
            padded_vector[:] = vector[:max_size]
        else:
            padded_vector[: len(vector)] = vector
        return padded_vector

# Training and Evaluation

## Dataset Creation

In [None]:
src_script = Script(devanagari_script)
tgt_script = Script(english_lower_script)

train_dataset = Transliteration_Dataset(train_data, src_script, tgt_script)
dev_dataset = Transliteration_Dataset(dev_data, src_script, tgt_script)
test_dataset = Transliteration_Dataset(test_data, src_script, tgt_script)

train_dataloader = DataLoader(train_dataset, batch_size=1, shuffle=True)
dev_dataloader = DataLoader(dev_dataset, batch_size=1, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=True)

### Hyperparameters & Model Architecture

In [None]:
EPOCHS = 10

INPUT_DIM = src_script.size()
OUTPUT_DIM = tgt_script.size()
ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
HID_DIM = 512
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
print(device)

### Model Creation

In [None]:
enc = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, ENC_DROPOUT)
dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, DEC_DROPOUT)

model = Seq2Seq(enc, dec, device).to(device)
model.apply(init_weights)

### Optimization Handling

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [None]:
def calculate_accuracy(pred_tensor, tgt_tensor, script):
    pred_sequence = torch.argmax(pred_tensor, dim=1)
    batch_size = pred_sequence.shape[0]
    count = 0
    for i in range(batch_size):
        pred = script.vector2word(pred_sequence[i, :].cpu().numpy())
        tgt = script.vector2word(tgt_tensor[i, :].cpu().numpy())
        if pred == tgt:
            count += 1
    return torch.tensor(count / batch_size)

In [None]:
def calculate_loss(pred, tgt):
    mask = tgt.ge(1).type(torch.FloatTensor).to(device)
    loss = criterion(pred, tgt) * mask
    return torch.mean(loss)

### Training

In [None]:
def train(model, iterator, optimizer, clip):
    model.train()
    epoch_loss = 0

    for i, (src, tgt, src_size) in enumerate(iterator):
        src = src.to(device)
        tgt = tgt.to(device)

        optimizer.zero_grad()
        output = model(src, tgt, src_size)
        loss = calculate_loss(output, tgt)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()

    return epoch_loss / len(iterator)

### Evaluation

In [None]:
def evaluate(model, iterator):
    model.eval()
    epoch_loss = 0
    epoch_accuracy = 0

    with torch.no_grad():
        for i, (src, tgt, src_size) in enumerate(iterator):
            src = src.to(device)
            tgt = tgt.to(device)

            output = model(src, tgt, src_size, 0)
            loss = calculate_loss(output, tgt)
            epoch_loss += loss.item()
            epoch_accuracy += calculate_accuracy(output, tgt, tgt_script)

    return epoch_loss / len(iterator), epoch_accuracy / len(iterator)

### Training Loop

In [None]:
best_dev_loss = float("inf")
best_dev_accuracy = 0

for epoch in range(EPOCHS):
    start_time = time.time()

    train_loss = train(model, train_dataloader, optimizer, 1)
    dev_loss, dev_accuracy = evaluate(model, dev_dataloader)

    end_time = time.time()
    epoch_time = end_time - start_time
    epoch_mins = int(epoch_time / 60)
    epoch_secs = int(epoch_time - (epoch_mins * 60))

    if dev_loss < best_dev_loss:
        best_dev_loss = dev_loss
        best_dev_accuracy = dev_accuracy
        torch.save(model.state_dict(), "baseline.pt")

    print(
        f"Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s | Train Loss: {train_loss:.3f} | Val. Loss: {dev_loss:.3f} | Val. Acc: {dev_accuracy:.3f}"
    )

### Evaluation on Test Dataset

In [None]:
model.load_state_dict(torch.load("baseline.pt"))

test_loss, test_accuracy = evaluate(model, test_dataloader)

print(f"Test Loss: {test_loss:.3f} | Test Acc: {test_accuracy:.3f}")