<a href="https://colab.research.google.com/github/NVN404/ai-assignment/blob/main/Q3code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
import os
import io
import random
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from google.colab import drive

# Mount Google Drive
drive.mount('/drive')
baseDir = '/drive/MyDrive/aksharantar_sampled'
lang = 'tam'
trainPath = os.path.join(baseDir, lang, f'{lang}_train.csv')
validPath = os.path.join(baseDir, lang, f'{lang}_valid.csv')
testPath = os.path.join(baseDir, lang, f'{lang}_test.csv')

if not (os.path.exists(trainPath) and os.path.exists(validPath) and os.path.exists(testPath)):
    raise FileNotFoundError("One or more dataset files missing. Check baseDir and lang variables.")
trainDf = pd.read_csv(trainPath, header=0)
validDf = pd.read_csv(validPath, header=0)
testDf = pd.read_csv(testPath, header=0)

# Converting dataframes to list of pairs
trainPairs = list(zip(trainDf.iloc[:,0].astype(str).tolist(), trainDf.iloc[:,1].astype(str).tolist()))
validPairs = list(zip(validDf.iloc[:,0].astype(str).tolist(), validDf.iloc[:,1].astype(str).tolist()))
testPairs = list(zip(testDf.iloc[:,0].astype(str).tolist(), testDf.iloc[:,1].astype(str).tolist()))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Models hyperparameters
embedDim = 128
hiddenDim = 256
encoderLayers = 1
decoderLayers = 1
rnnType = 'GRU'
batchSize = 64
learningRate = 1e-3
epochs = 20
teacherForcingRatio = 0.5
modelSavePath = f'/drive/MyDrive/akshantar_models/{lang}_seq2seq_attention.pth' # Changed model name
os.makedirs(os.path.dirname(modelSavePath), exist_ok=True)

class Vocab:
    def __init__(self, chars, name):
        self.name = name
        self.charToIndex = {"<pad>":0, "<sos>":1, "<eos>":2}
        self.indexToChar = {0:"<pad>", 1:"<sos>", 2:"<eos>"}
        self.size = len(self.charToIndex)
        for c in sorted(list(chars)):
            self.addChar(c)
            #<pad> (for padding short sentences), <sos> (Start-of-String), and <eos> (End-of-String)

    def addChar(self, char):
        if char not in self.charToIndex:
            self.charToIndex[char] = self.size
            self.indexToChar[self.size] = char
            self.size += 1

def buildVocabs(pairs):
    srcChars = set(c for s,t in pairs for c in s)
    tgtChars = set(c for s,t in pairs for c in t)
    srcVocab = Vocab(srcChars, "latin")
    tgtVocab = Vocab(tgtChars, "native")
    return srcVocab, tgtVocab
srcVocab, tgtVocab = buildVocabs(trainPairs)
# creating two objects one is latin another one is native

combinedPairs = trainPairs + validPairs + testPairs
maxSrcLen = max(len(s) for s,t in combinedPairs) + 1
maxTgtLen = max(len(t) for s,t in combinedPairs) + 2


# this is a pyTorch class which converts this pair latin and native to numerical tensors
class TransliterationDataset(Dataset):
    def __init__(self, pairs, srcV, tgtV, maxSrcLen, maxTgtLen):
        self.pairs = pairs
        self.srcV = srcV
        self.tgtV = tgtV
        self.maxSrcLen = maxSrcLen
        self.maxTgtLen = maxTgtLen

    def __len__(self):
        return len(self.pairs)

    def encodeSrc(self, s):
        inds = [self.srcV.charToIndex.get(c, self.srcV.charToIndex["<pad>"]) for c in s] + [self.srcV.charToIndex["<eos>"]]
        pad = [self.srcV.charToIndex["<pad>"]] * (self.maxSrcLen - len(inds))
        inds.extend(pad)
        return torch.tensor(inds, dtype=torch.long)

    def encodeTgt(self, t):
        inds = [self.tgtV.charToIndex["<sos>"]] + [self.tgtV.charToIndex.get(c, self.tgtV.charToIndex["<pad>"]) for c in t] + [self.tgtV.charToIndex["<eos>"]]
        pad = [self.tgtV.charToIndex["<pad>"]] * (self.maxTgtLen - len(inds))
        inds.extend(pad)
        return torch.tensor(inds, dtype=torch.long)

    def __getitem__(self, idx):
        s,t = self.pairs[idx]
        return self.encodeSrc(s).to(device), self.encodeTgt(t).to(device)

trainDataset = TransliterationDataset(trainPairs, srcVocab, tgtVocab, maxSrcLen, maxTgtLen)
validDataset = TransliterationDataset(validPairs, srcVocab, tgtVocab, maxSrcLen, maxTgtLen)
testDataset = TransliterationDataset(testPairs, srcVocab, tgtVocab, maxSrcLen, maxTgtLen)
trainLoader = DataLoader(trainDataset, batch_size=batchSize, shuffle=True)
validLoader = DataLoader(validDataset, batch_size=batchSize, shuffle=False)
testLoader = DataLoader(testDataset, batch_size=batchSize, shuffle=False)

#it reads the input word and compress it into a set of memory vectors and a final summary vector.
class Encoder(nn.Module):
    def __init__(self, inputVocabSize, embedDim, hiddenDim, numLayers, rnnType):
        super().__init__()
        self.embedding = nn.Embedding(inputVocabSize, embedDim, padding_idx=0)
        if rnnType == 'GRU':
            self.rnn = nn.GRU(embedDim, hiddenDim, numLayers, batch_first=True)
        elif rnnType == 'LSTM':
            self.rnn = nn.LSTM(embedDim, hiddenDim, numLayers, batch_first=True)
        else:
            self.rnn = nn.RNN(embedDim, hiddenDim, numLayers, batch_first=True)

    def forward(self, src):
        emb = self.embedding(src)
        outputs, hidden = self.rnn(emb)
        return outputs, hidden

# It scores how well the decoder's state matches each of the encoder's states.kinda like focus mechanism .
class Attention(nn.Module):
    def __init__(self, hiddenDim):
        super().__init__()
        self.attn = nn.Linear(hiddenDim * 2, hiddenDim)
        self.v = nn.Linear(hiddenDim, 1, bias=False)

    def forward(self, hidden, encoderOutputs):
        batchSize = encoderOutputs.shape[0]
        srcLen = encoderOutputs.shape[1]
        hidden = hidden.unsqueeze(1).repeat(1, srcLen, 1)
        combined = torch.cat((hidden, encoderOutputs), dim=2)
        energy = torch.tanh(self.attn(combined))
        attention = self.v(energy).squeeze(2)
        return torch.softmax(attention, dim=1)


#it generates the output word one character at a time, using the encoder's outputs and the attention
class Decoder(nn.Module):
    def __init__(self, outputVocabSize, embedDim, hiddenDim, numLayers, rnnType, attention):
        super().__init__()
        self.outputVocabSize = outputVocabSize
        self.attention = attention

        self.embedding = nn.Embedding(outputVocabSize, embedDim, padding_idx=0)
        if rnnType == 'GRU':
            self.rnn = nn.GRU(embedDim + hiddenDim, hiddenDim, numLayers, batch_first=True)
        elif rnnType == 'LSTM':
            self.rnn = nn.LSTM(embedDim + hiddenDim, hiddenDim, numLayers, batch_first=True)
        else:
            self.rnn = nn.RNN(embedDim + hiddenDim, hiddenDim, numLayers, batch_first=True)
        self.out = nn.Linear(hiddenDim * 2, outputVocabSize)

    def forward(self, inputChar, hidden, encoderOutputs):
        embedded = self.embedding(inputChar)
        if isinstance(self.rnn, nn.LSTM):
             attnWeights = self.attention(hidden[0][-1,:,:], encoderOutputs)
        else: # GRU or RNN
             attnWeights = self.attention(hidden[-1,:,:], encoderOutputs)
        attnWeights = attnWeights.unsqueeze(1)
        context = torch.bmm(attnWeights, encoderOutputs)
        rnnInput = torch.cat((embedded, context), dim=2)
        output, hidden = self.rnn(rnnInput, hidden)
        combined = torch.cat((output.squeeze(1), context.squeeze(1)), dim=1)
        pred = self.out(combined)
        return pred, hidden

#  orchestrates the entire proces
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, targetVocabSize):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.targetVocabSize = targetVocabSize

    def forward(self, src, tgt, teacherForcingRatio=0.5):
        batchSize = src.shape[0]
        maxTgt = tgt.shape[1]
        outputs = torch.zeros(batchSize, maxTgt, self.targetVocabSize, device=device)
        encoder_outputs, encHidden = self.encoder(src)
        decoder_hidden = encHidden
        decoder_input = tgt[:, 0].unsqueeze(1)

        for t in range(1, maxTgt):
            output, decoder_hidden = self.decoder(decoder_input, decoder_hidden, encoder_outputs)
            outputs[:, t, :] = output
            teacher_force = random.random() < teacherForcingRatio
            top1 = output.argmax(1)
            decoder_input = tgt[:, t].unsqueeze(1) if teacher_force else top1.unsqueeze(1)
        return outputs

def initWeights(m): # give the model's weights a smart random starting point for training
    if hasattr(m, 'weight') and m.weight.dim() > 1:
        nn.init.xavier_uniform_(m.weight.data)

attention = Attention(hiddenDim).to(device)
encoder = Encoder(srcVocab.size, embedDim, hiddenDim, encoderLayers, rnnType).to(device)
decoder = Decoder(tgtVocab.size, embedDim, hiddenDim, decoderLayers, rnnType, attention).to(device)
model = Seq2Seq(encoder, decoder, tgtVocab.size).to(device)
model.apply(initWeights)

optimizer = optim.Adam(model.parameters(), lr=learningRate)
criterion = nn.CrossEntropyLoss(ignore_index=tgtVocab.charToIndex["<pad>"])

# Training function
def trainEpoch(model, loader, optimizer, criterion, clip):
    model.train()
    totalLoss = 0
    for src, tgt in loader:
        optimizer.zero_grad()
        output = model(src, tgt, teacherForcingRatio)
        outputDim = output.shape[-1]
        outputFlat = output[:,1:,:].reshape(-1, outputDim)
        targetFlat = tgt[:,1:].reshape(-1)
        loss = criterion(outputFlat, targetFlat)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        totalLoss += loss.item()
    return totalLoss / len(loader)

# Evaluation function
def evaluate(model, loader, criterion):
    model.eval()
    totalLoss = 0
    with torch.no_grad():
        for src, tgt in loader:
            output = model(src, tgt, 0.0)
            outputDim = output.shape[-1]
            outputFlat = output[:,1:,:].reshape(-1, outputDim)
            targetFlat = tgt[:,1:].reshape(-1)
            loss = criterion(outputFlat, targetFlat)
            totalLoss += loss.item()
    return totalLoss / len(loader)

print("Starting training...")
for epoch in range(1, epochs+1):
    trainLoss = trainEpoch(model, trainLoader, optimizer, criterion, clip=1)
    validLoss = evaluate(model, validLoader, criterion)
    print(f'Epoch {epoch:02d} | Train Loss: {trainLoss:.4f} | Valid Loss: {validLoss:.4f}')
    torch.save({'epoch':epoch,'model_state':model.state_dict(),'optimizer_state':optimizer.state_dict()}, modelSavePath)
print("Training finished.")

def infer(model, srcText, srcV, tgtV, maxTgtLen):
    model.eval()
    with torch.no_grad():
        srcIdx = [srcV.charToIndex.get(c, srcV.charToIndex["<pad>"]) for c in srcText] + [srcV.charToIndex["<eos>"]]
        pad = [srcV.charToIndex["<pad>"]] * (maxSrcLen - len(srcIdx))
        srcIdx.extend(pad)
        srcTensor = torch.tensor(srcIdx, dtype=torch.long, device=device).unsqueeze(0)
        encoder_outputs, encHidden = model.encoder(srcTensor)
        decHidden = encHidden
        decInput = torch.tensor([[tgtV.charToIndex["<sos>"]]], dtype=torch.long, device=device)

        preds = []
        for _ in range(maxTgtLen):
            out, decHidden = model.decoder(decInput, decHidden, encoder_outputs)
            top1 = out.argmax(1)
            ch = tgtV.indexToChar[top1.item()]
            if ch == "<eos>" or ch == "<pad>":
                break
            preds.append(ch)
            decInput = top1.unsqueeze(1)
        return "".join(preds)

print("\nSample predictions on test set:")
for i in range(10):
    s,t = testPairs[i]
    pred = infer(model, s, srcVocab, tgtVocab, maxTgtLen)
    print(f'Input: {s:<12} | Target: {t:<12} | Pred: {pred:<12}')

def testAccuracy(model, loader, tgtV):
    model.eval()
    totalTokens = 0
    correctTokens = 0
    ignore_indices = [tgtV.charToIndex["<pad>"], tgtV.charToIndex["<sos>"], tgtV.charToIndex["<eos>"]]

    with torch.no_grad():
        for src, tgt in loader:
            output = model(src, tgt, 0.0)
            preds = output.argmax(-1)
            tgtFlat = tgt[:,1:]
            predsFlat = preds[:,1:]
            mask = torch.ones_like(tgtFlat, dtype=torch.bool, device=device)
            for idx in ignore_indices:
                mask &= (tgtFlat != idx)
            totalTokens += mask.sum().item()
            correctTokens += ((predsFlat == tgtFlat) & mask).sum().item()

    return correctTokens / totalTokens if totalTokens > 0 else 0.0

acc = testAccuracy(model, testLoader, tgtVocab)
print(f'\nTest token-level accuracy: {acc*100:.2f}%')

Drive already mounted at /drive; to attempt to forcibly remount, call drive.mount("/drive", force_remount=True).
Starting training...
Epoch 01 | Train Loss: 0.9740 | Valid Loss: 0.8254
Epoch 02 | Train Loss: 0.2172 | Valid Loss: 0.6881
Epoch 03 | Train Loss: 0.1702 | Valid Loss: 0.7314
Epoch 04 | Train Loss: 0.1508 | Valid Loss: 0.7534
Epoch 05 | Train Loss: 0.1351 | Valid Loss: 0.7200
Epoch 06 | Train Loss: 0.1249 | Valid Loss: 0.6939
Epoch 07 | Train Loss: 0.1162 | Valid Loss: 0.6721
Epoch 08 | Train Loss: 0.1079 | Valid Loss: 0.7118
Epoch 09 | Train Loss: 0.1005 | Valid Loss: 0.7315
Epoch 10 | Train Loss: 0.0921 | Valid Loss: 0.7196
Epoch 11 | Train Loss: 0.0892 | Valid Loss: 0.7525
Epoch 12 | Train Loss: 0.0796 | Valid Loss: 0.7300
Epoch 13 | Train Loss: 0.0745 | Valid Loss: 0.7722
Epoch 14 | Train Loss: 0.0739 | Valid Loss: 0.7774
Epoch 15 | Train Loss: 0.0652 | Valid Loss: 0.8307
Epoch 16 | Train Loss: 0.0650 | Valid Loss: 0.8020
Epoch 17 | Train Loss: 0.0590 | Valid Loss: 0.8240