In [1]:
#QUESTION1
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import random
import numpy as np


In [2]:
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

In [3]:
# Data Processing
def read_tsv(path):
    data = []
    with open(path, 'r', encoding='utf-8') as f:
        for line in f:
            target, source, freq = line.strip().split('\t')
            data.extend([(source, target)] * int(freq))
    return data

def build_vocab(data):
    vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2}
    for word in data:
        for char in word:
            if char not in vocab:
                vocab[char] = len(vocab)
    return vocab

In [4]:
# Dataset and Collate
class TransliterationDataset(Dataset):
    def __init__(self, pairs, src_vocab, tgt_vocab):
        self.pairs = pairs
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        src_word, tgt_word = self.pairs[idx]
        src_ids = [self.src_vocab[c] for c in src_word]
        tgt_ids = [self.tgt_vocab['<sos>']] + [self.tgt_vocab[c] for c in tgt_word] + [self.tgt_vocab['<eos>']]
        return torch.tensor(src_ids, dtype=torch.long), torch.tensor(tgt_ids, dtype=torch.long)

def collate_fn(batch):
    src_seqs, tgt_seqs = zip(*batch)
    src_padded = nn.utils.rnn.pad_sequence(src_seqs, batch_first=True, padding_value=0)
    tgt_padded = nn.utils.rnn.pad_sequence(tgt_seqs, batch_first=True, padding_value=0)
    return src_padded, tgt_padded

In [5]:
# Encoder and Decoder
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dim, n_layers):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.GRU(emb_dim, hidden_dim, n_layers, batch_first=True)

    def forward(self, src):
        embedded = self.embedding(src)
        _, hidden = self.rnn(embedded)
        return hidden

class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim, n_layers):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.GRU(emb_dim, hidden_dim, n_layers, batch_first=True)
        self.fc_out = nn.Linear(hidden_dim, output_dim)

    def forward(self, input, hidden):
        input = input.unsqueeze(1)  # (batch_size, 1)
        embedded = self.embedding(input)
        output, hidden = self.rnn(embedded, hidden)
        return self.fc_out(output.squeeze(1)), hidden

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size, trg_len = trg.size()
        vocab_size = self.decoder.fc_out.out_features
        outputs = torch.zeros(batch_size, trg_len, vocab_size).to(self.device)

        hidden = self.encoder(src)
        input = trg[:, 0]  # <sos>

        for t in range(1, trg_len):
            output, hidden = self.decoder(input, hidden)
            outputs[:, t] = output
            teacher_force = random.random() < teacher_forcing_ratio
            input = trg[:, t] if teacher_force else output.argmax(1)

        return outputs


In [6]:
# Utilities
def predict(model, word, src_vocab, tgt_vocab, max_len=30):
    model.eval()
    rev_tgt_vocab = {i: c for c, i in tgt_vocab.items()}
    src_tensor = torch.tensor([src_vocab[c] for c in word], dtype=torch.long).unsqueeze(0).to(model.device)
    hidden = model.encoder(src_tensor)
    input = torch.tensor([tgt_vocab['<sos>']], dtype=torch.long).to(model.device)
    output = []

    for _ in range(max_len):
        out, hidden = model.decoder(input, hidden)
        top1 = out.argmax(1).item()
        if rev_tgt_vocab[top1] == '<eos>':
            break
        output.append(rev_tgt_vocab[top1])
        input = torch.tensor([top1], dtype=torch.long).to(model.device)

    return ''.join(output)

def evaluate_accuracy(model, loader, tgt_vocab):
    model.eval()
    total, correct = 0, 0
    with torch.no_grad():
        for src, trg in loader:
            src, trg = src.to(model.device), trg.to(model.device)
            outputs = model(src, trg, 0)  # no teacher forcing
            preds = outputs.argmax(2)
            for p, t in zip(preds, trg):
                if torch.equal(p[1:], t[1:]):  # ignoring <sos>
                    correct += 1
                total += 1
    return correct / total

In [10]:
# Training Loop
def train(model, loader, optimizer, criterion, clip=1):
    model.train()
    total_loss = 0
    for src, trg in loader:
        src, trg = src.to(model.device), trg.to(model.device)
        optimizer.zero_grad()
        output = model(src, trg)
        output = output[:, 1:].reshape(-1, output.size(-1))
        trg = trg[:, 1:].reshape(-1)
        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(loader)

# Putting it All Together
dev_path = "/content/data1.tsv"  # Make sure this path is correct
data_pairs = read_tsv(dev_path)

src_vocab = build_vocab([p[0] for p in data_pairs])
tgt_vocab = build_vocab([p[1] for p in data_pairs])

dataset = TransliterationDataset(data_pairs, src_vocab, tgt_vocab)
loader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
ENC = Encoder(len(src_vocab), emb_dim=64, hidden_dim=128, n_layers=1)
DEC = Decoder(len(tgt_vocab), emb_dim=64, hidden_dim=128, n_layers=1)
model = Seq2Seq(ENC, DEC, DEVICE).to(DEVICE)

optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss(ignore_index=0)

# -----------------------------
# Optional: Train the model
# -----------------------------
# for epoch in range(10):
#     loss = train(model, loader, optimizer, criterion)
#     acc = evaluate_accuracy(model, loader, tgt_vocab)
#     print(f"Epoch {epoch+1}: Loss = {loss:.4f}, Accuracy = {acc:.4f}")

# Sample Predictions
for i in range(5):
    src, tgt = data_pairs[i]
    pred = predict(model, src, src_vocab, tgt_vocab)
    print(f"Input: {src} | Target: {tgt} | Predicted: {pred}")

Input: an | Target: अं | Predicted: झऐणऔर्खण्ए<pad>ततरथथह<pad>तरथहहइऔाथथहह
Input: an | Target: अं | Predicted: झऐणऔर्खण्ए<pad>ततरथथह<pad>तरथहहइऔाथथहह
Input: an | Target: अं | Predicted: झऐणऔर्खण्ए<pad>ततरथथह<pad>तरथहहइऔाथथहह
Input: ankganit | Target: अंकगणित | Predicted: झऐणऔर्खण्ए<pad>ततरथथह<pad>तरथहहइऔाथथहह
Input: ankganit | Target: अंकगणित | Predicted: झऐणऔर्खण्ए<pad>ततरथथह<pad>तरथहहइऔाथथहह
