<a href="https://colab.research.google.com/github/A-Dharnish/tamil-dialect-standardization/blob/main/dot22.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import math
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import numpy as np
from google.colab import files

uploaded = files.upload()

# Special tokens and indices
PAD_TOKEN = "<pad>"
SOS_TOKEN = "<sos>"
EOS_TOKEN = "<eos>"
UNK_TOKEN = "<unk>"

PAD_IDX, SOS_IDX, EOS_IDX, UNK_IDX = 0, 1, 2, 3

import torch
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
import torch.nn as nn
from torch.utils.data import DataLoader
from time import time

# Early Stopping class
class EarlyStopping:
    def _init_(self, patience=5, delta=0):
        self.patience = patience  # how many epochs to wait for improvement
        self.delta = delta        # minimum change to qualify as an improvement
        self.counter = 0          # how many epochs without improvement
        self.best_loss = float('inf')
        self.early_stop = False   # Flag to stop training if True

    def _call_(self, val_loss, model):
        # If the validation loss decreases, reset the counter and update the best_loss
        if val_loss < self.best_loss - self.delta:
            self.best_loss = val_loss
            self.counter = 0
            torch.save(model.state_dict(), "best_seq2seq_model.pt")  # Save the model with the best validation loss
        else:
            self.counter += 1

        # If counter exceeds patience, set early_stop to True
        if self.counter >= self.patience:
            self.early_stop = True




# -----------------------------
# Data Preparation
# -----------------------------

def tokenize(text):
    return text.strip().split()

def build_vocab(sentences, min_freq=1):
    freq = {}
    for sent in sentences:
        for word in tokenize(sent):
            freq[word] = freq.get(word, 0) + 1

    vocab = {PAD_TOKEN: PAD_IDX, SOS_TOKEN: SOS_IDX, EOS_TOKEN: EOS_IDX, UNK_TOKEN: UNK_IDX}
    idx = 4
    for word, count in freq.items():
        if count >= min_freq:
            vocab[word] = idx
            idx += 1
    return vocab

def numericalize(text, vocab):
    tokens = tokenize(text)
    return [vocab.get(token, UNK_IDX) for token in tokens]

class TranslationDataset(Dataset):
    def _init_(self, filepath, src_vocab=None, trg_vocab=None):
        df = pd.read_excel(filepath)
        self.src_sentences = df.iloc[:, 0].astype(str).tolist()
        self.trg_sentences = df.iloc[:, 1].astype(str).tolist()

        self.src_vocab = build_vocab(self.src_sentences) if src_vocab is None else src_vocab
        self.trg_vocab = build_vocab(self.trg_sentences) if trg_vocab is None else trg_vocab
    def _getitems_(self, indices):
       return [self[i] for i in indices]

    def _len_(self):
        return len(self.src_sentences)

    def _getitem_(self, index):
        src = self.src_sentences[index]
        trg = self.trg_sentences[index]

        src_ids = [SOS_IDX] + numericalize(src, self.src_vocab) + [EOS_IDX]
        trg_ids = [SOS_IDX] + numericalize(trg, self.trg_vocab) + [EOS_IDX]

        return torch.tensor(src_ids), torch.tensor(trg_ids)

def pad_collate(batch):
    src_batch, trg_batch = zip(*batch)
    src_lens = [len(s) for s in src_batch]
    trg_lens = [len(t) for t in trg_batch]

    src_max_len = max(src_lens)
    trg_max_len = max(trg_lens)

    padded_src = [torch.cat([s, torch.full((src_max_len - len(s),), PAD_IDX, dtype=torch.long)]) for s in src_batch]
    padded_trg = [torch.cat([t, torch.full((trg_max_len - len(t),), PAD_IDX, dtype=torch.long)]) for t in trg_batch]

    return torch.stack(padded_src), torch.stack(padded_trg)

# -----------------------------
# Model
# -----------------------------
class Encoder(nn.Module):
    def _init_(self, input_dim, emb_dim, hid_dim, num_layers=1, dropout=0.5):
        super()._init_()
        self.embedding = nn.Embedding(input_dim, emb_dim, padding_idx=PAD_IDX)
        self.lstm = nn.LSTM(emb_dim, hid_dim, num_layers, dropout=dropout, batch_first=True)

    def forward(self, src):
        embedded = self.embedding(src)
        outputs, (hidden, cell) = self.lstm(embedded)
        return hidden, cell

class Decoder(nn.Module):
    def _init_(self, output_dim, emb_dim, hid_dim, num_layers=1, dropout=0.5):
        super()._init_()
        self.embedding = nn.Embedding(output_dim, emb_dim, padding_idx=PAD_IDX)
        self.lstm = nn.LSTM(emb_dim, hid_dim, num_layers, dropout=dropout, batch_first=True)
        self.fc_out = nn.Linear(hid_dim, output_dim)

    def forward(self, input, hidden, cell):
        input = input.unsqueeze(1)
        embedded = self.embedding(input)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        prediction = self.fc_out(output.squeeze(1))
        return prediction, hidden, cell

class Seq2Seq(nn.Module):
    def _init_(self, encoder, decoder, device):
        super()._init_()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = src.shape[0]
        trg_len = trg.shape[1]
        trg_vocab_size = self.decoder.embedding.num_embeddings

        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)
        hidden, cell = self.encoder(src)
        input = trg[:, 0]

        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t] = output
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = trg[:, t] if teacher_force else top1

        return outputs

# -----------------------------
# Train / Eval
# -----------------------------
def train(model, dataloader, optimizer, criterion, clip, device):
    model.train()
    epoch_loss = 0

    for src, trg in dataloader:
        src, trg = src.to(device), trg.to(device)
        optimizer.zero_grad()
        output = model(src, trg)
        output_dim = output.shape[-1]
        output = output[:, 1:].reshape(-1, output_dim)
        trg = trg[:, 1:].reshape(-1)
        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()

    return epoch_loss / len(dataloader)

def evaluate(model, dataloader, criterion, device, trg_vocab):
    model.eval()
    epoch_loss = 0
    all_preds = []
    all_trues = []
    bleu_scores = []
    inv_trg_vocab = {idx: token for token, idx in trg_vocab.items()}
    smoothie = SmoothingFunction().method4

    with torch.no_grad():
        for src, trg in dataloader:
            src, trg = src.to(device), trg.to(device)
            output = model(src, trg, teacher_forcing_ratio=0)
            output_dim = output.shape[-1]
            output_tokens = output.argmax(2)
            output_flat = output[:, 1:].reshape(-1, output_dim)
            trg_flat = trg[:, 1:].reshape(-1)

            for pred_seq, true_seq in zip(output_tokens, trg):
                pred_words = [inv_trg_vocab.get(tok.item(), UNK_TOKEN) for tok in pred_seq if tok != PAD_IDX]
                true_words = [inv_trg_vocab.get(tok.item(), UNK_TOKEN) for tok in true_seq if tok != PAD_IDX]
                bleu = sentence_bleu([true_words], pred_words, smoothing_function=smoothie)
                bleu_scores.append(bleu)

            all_preds.extend(output_flat.argmax(1).cpu().numpy())
            all_trues.extend(trg_flat.cpu().numpy())

            loss = criterion(output_flat, trg_flat)
            epoch_loss += loss.item()

    accuracy = accuracy_score(all_trues, all_preds)
    precision = precision_score(all_trues, all_preds, average='macro', zero_division=0)
    recall = recall_score(all_trues, all_preds, average='macro', zero_division=0)
    f1 = f1_score(all_trues, all_preds, average='macro', zero_division=0)
    bleu_avg = np.mean(bleu_scores)

    return epoch_loss / len(dataloader), accuracy, precision, recall, f1, bleu_avg

# -----------------------------
# Translate
# -----------------------------
def translate_sentence(sentence, src_vocab, trg_vocab, model, device, max_len=50):
    model.eval()
    tokens = [SOS_TOKEN] + tokenize(sentence) + [EOS_TOKEN]
    src_indexes = [src_vocab.get(token, UNK_IDX) for token in tokens]
    src_tensor = torch.LongTensor(src_indexes).unsqueeze(0).to(device)  # Add batch dimension

    with torch.no_grad():
        hidden, cell = model.encoder(src_tensor)

    trg_indexes = [SOS_IDX]

    for _ in range(max_len):
        trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)
        with torch.no_grad():
            output, hidden, cell = model.decoder(trg_tensor, hidden, cell)
            pred_token = output.argmax(1).item()
        trg_indexes.append(pred_token)
        if pred_token == EOS_IDX:
            break

    inv_trg_vocab = {idx: token for token, idx in trg_vocab.items()}
    translated_tokens = [inv_trg_vocab.get(idx, UNK_TOKEN) for idx in trg_indexes[1:-1]]
    return ' '.join(translated_tokens)



import matplotlib.pyplot as plt
import random
# -----------------------------
# Main
# -----------------------------


Saving CIP_DATASETS.xlsx to CIP_DATASETS.xlsx


In [None]:
class TranslationDataset(Dataset):
    def __init__(self, dataframe):
        self.data = dataframe
        self.src_texts = dataframe['Tirunelveli Tamil'].tolist()
        self.trg_texts = dataframe['Senthamil'].tolist()

        self.src_vocab = self.build_vocab(self.src_texts)
        self.trg_vocab = self.build_vocab(self.trg_texts)

    def __len__(self):
        return len(self.src_texts)

    def __getitem__(self, idx):
        src = self.encode(self.src_texts[idx], self.src_vocab)
        trg = self.encode(self.trg_texts[idx], self.trg_vocab)
        return torch.tensor(src), torch.tensor(trg)

    def build_vocab(self, texts):
        vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
        idx = 4
        for text in texts:
            for token in text.split():
                if token not in vocab:
                    vocab[token] = idx
                    idx += 1
        return vocab

    def encode(self, text, vocab):
        tokens = text.split()
        encoded = [vocab.get(token, vocab['<unk>']) for token in tokens]
        return [vocab['<sos>']] + encoded + [vocab['<eos>']]


In [None]:
# ==== Configuration ====
# ==== Configuration ====
BATCH_SIZE = 32
EMBED_SIZE = 256
HIDDEN_SIZE = 512
NUM_LAYERS = 2
DROPOUT = 0.3
LEARNING_RATE = 0.001
NUM_EPOCHS = 80
PAD_IDX = 0  # will be updated after vocab build
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")


def main():
    # Load and preprocess data
    df = pd.read_excel('CIP_DATASETS.xlsx')
    df=df.dropna()
    if df.isnull().values.any():
        raise ValueError("CSV file contains NaN values.")

    dataset = TranslationDataset(df)
    global PAD_IDX
    PAD_IDX = dataset.src_vocab["<pad>"]

    # Train, validation, and test split
    train_size = int(0.8 * len(dataset))
    val_size = int(0.1 * len(dataset))
    test_size = len(dataset) - train_size - val_size

    train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(
        dataset, [train_size, val_size, test_size]
    )

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=pad_collate)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=pad_collate)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=pad_collate)

    # Initialize model, optimizer, and loss
    encoder = Encoder(len(dataset.src_vocab), EMBED_SIZE, HIDDEN_SIZE, NUM_LAYERS, DROPOUT)
    decoder = Decoder(len(dataset.trg_vocab), EMBED_SIZE, HIDDEN_SIZE, NUM_LAYERS, DROPOUT)
    model = Seq2Seq(encoder, decoder, PAD_IDX).to(DEVICE)

    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

    # Training loop
    for epoch in range(NUM_EPOCHS):
        train_loss = train(model, train_loader, optimizer, criterion, DEVICE)
        val_loss, val_acc, val_prec, val_rec, val_f1, val_bleu = evaluate(model, val_loader, criterion, DEVICE, dataset.trg_vocab)

        print(f"Epoch [{epoch+1}/{NUM_EPOCHS}]")
        print(f"Train Loss   : {train_loss:.3f}")
        print(f"Val Loss     : {val_loss:.3f}")
        print(f"Accuracy     : {val_acc:.4f}")
        print(f"Precision    : {val_prec:.4f}")
        print(f"Recall       : {val_rec:.4f}")
        print(f"F1 Score     : {val_f1:.4f}")
        print(f"BLEU Score   : {val_bleu:.4f}")
        print("-" * 40)

    # Final evaluation on test set
    test_loss, test_acc, test_prec, test_rec, test_f1, test_bleu = evaluate(
        model, test_loader, criterion, DEVICE, dataset.trg_vocab
    )

    print("\n--- Test Set Evaluation ---")
    print(f"Test Loss    : {test_loss:.3f}")
    print(f"Accuracy     : {test_acc:.4f}")
    print(f"Precision    : {test_prec:.4f}")
    print(f"Recall       : {test_rec:.4f}")
    print(f"F1 Score     : {test_f1:.4f}")
    print(f"BLEU Score   : {test_bleu:.4f}")

    # Plot confusion matrix (optional: can be val_loader or test_loader)
    plot_confusion_matrix(model, test_loader, dataset.trg_vocab, DEVICE, pad_idx=PAD_IDX)

    return model, dataset, DEVICE, test_loader

# -------------------------
# Plotting Section
# -------------------------







from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import torch
import numpy as np

def plot_confusion_matrix(model, val_loader, trg_vocab, device, pad_idx=0, max_tokens=30):
    model.eval()
    all_preds = []
    all_trues = []

    with torch.no_grad():
        for src_batch, trg_batch in val_loader:
            src_batch, trg_batch = src_batch.to(device), trg_batch.to(device)
            output = model(src_batch, trg_batch[:, :-1])  # remove <eos> from input
            output_tokens = output.argmax(dim=-1)  # predicted token indices

            true_tokens = trg_batch[:, 1:]  # shift to match output

            for pred_seq, true_seq in zip(output_tokens, true_tokens):
                for pred_token, true_token in zip(pred_seq, true_seq):
                    if true_token.item() != pad_idx:
                        all_preds.append(pred_token.item())
                        all_trues.append(true_token.item())

    # Convert vocab indices to tokens for readable labels
    idx_to_token = {v: k for k, v in trg_vocab.items()}

    # Limit labels to top-N most common tokens
    labels = list(set(all_trues + all_preds))
    if len(labels) > max_tokens:
        labels = labels[:max_tokens]  # truncate to top N

    cm = confusion_matrix(all_trues, all_preds, labels=labels, normalize='true')

    label_names = [idx_to_token[i] for i in labels]

    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=label_names)
    fig, ax = plt.subplots(figsize=(12, 10))
    disp.plot(include_values=False, cmap='Blues', ax=ax, xticks_rotation='vertical')
    plt.title("Token-level Confusion Matrix")
    plt.tight_layout()
    plt.show()


# Run main and get the important objects
model, dataset, device = main()
val_loader = DataLoader(
    torch.utils.data.Subset(dataset, range(int(0.9 * len(dataset)), len(dataset))),
    batch_size=64,
    shuffle=False,
    collate_fn=pad_collate
)

plot_confusion_matrix(model, val_loader, dataset.trg_vocab, device, pad_idx=PAD_IDX)
# Now define your translation function
def trans(example):
    translation = translate_sentence(example, dataset.src_vocab, dataset.trg_vocab, model, device)


    return translation

TypeError: Encoder.__init__() takes 1 positional argument but 6 were given

In [None]:
import torch
print(torch.__version__)
print(torch.cuda.is_available())


2.6.0+cu124
True


In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from collections import Counter
import re

# ==== Configuration ====
BATCH_SIZE = 128
EMBED_SIZE = 512
HIDDEN_SIZE = 512
NUM_LAYERS = 5
DROPOUT = 0.3
LEARNING_RATE = 0.0001
NUM_EPOCHS = 80
PAD_IDX = 0  # placeholder
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ==== Tokenization ====
def tokenize(text):
    return re.findall(r"\b\w+\b", text.lower())

# ==== Dataset ====
class TranslationDataset(Dataset):
    def __init__(self, df):
        self.pairs = list(zip(df['Tirunelveli Tamil'], df['Senthamil']))
        self.src_vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
        self.trg_vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
        self.src_inv_vocab = {}
        self.trg_inv_vocab = {}
        self.build_vocab()

    def build_vocab(self):
        src_counter = Counter()
        trg_counter = Counter()
        for src, trg in self.pairs:
            src_counter.update(tokenize(src))
            trg_counter.update(tokenize(trg))
        for word in src_counter:
            if word not in self.src_vocab:
                self.src_vocab[word] = len(self.src_vocab)
        for word in trg_counter:
            if word not in self.trg_vocab:
                self.trg_vocab[word] = len(self.trg_vocab)
        self.src_inv_vocab = {i: w for w, i in self.src_vocab.items()}
        self.trg_inv_vocab = {i: w for w, i in self.trg_vocab.items()}

    def encode(self, sentence, vocab):
        tokens = tokenize(sentence)
        return [vocab.get(tok, vocab['<unk>']) for tok in tokens]

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        src, trg = self.pairs[idx]
        src_ids = [self.src_vocab['<sos>']] + self.encode(src, self.src_vocab) + [self.src_vocab['<eos>']]
        trg_ids = [self.trg_vocab['<sos>']] + self.encode(trg, self.trg_vocab) + [self.trg_vocab['<eos>']]
        return torch.tensor(src_ids), torch.tensor(trg_ids)

# ==== Padding ====
def pad_collate(batch):
    srcs, trgs = zip(*batch)
    srcs_padded = nn.utils.rnn.pad_sequence(srcs, batch_first=True, padding_value=PAD_IDX)
    trgs_padded = nn.utils.rnn.pad_sequence(trgs, batch_first=True, padding_value=PAD_IDX)
    return srcs_padded, trgs_padded

# ==== Encoder ====
class Encoder(nn.Module):
    def __init__(self, input_dim, embed_size, hidden_size, num_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, embed_size, padding_idx=PAD_IDX)
        self.rnn = nn.GRU(embed_size, hidden_size, num_layers, dropout=dropout, batch_first=True)

    def forward(self, x):
        embedded = self.embedding(x)
        outputs, hidden = self.rnn(embedded)
        return hidden

# ==== Decoder ====
class Decoder(nn.Module):
    def __init__(self, output_dim, embed_size, hidden_size, num_layers, dropout):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, embed_size, padding_idx=PAD_IDX)
        self.rnn = nn.GRU(embed_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_dim)

    def forward(self, x, hidden):
        x = x.unsqueeze(1)  # batch_size -> (batch, 1)
        embedded = self.embedding(x)
        output, hidden = self.rnn(embedded, hidden)
        prediction = self.fc(output.squeeze(1))
        return prediction, hidden

# ==== Seq2Seq ====
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, pad_idx):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.pad_idx = pad_idx

    def forward(self, src, trg):
        batch_size, trg_len = trg.shape
        trg_vocab_size = self.decoder.embedding.num_embeddings
        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(DEVICE)
        hidden = self.encoder(src)
        input = trg[:, 0]
        for t in range(1, trg_len):
            output, hidden = self.decoder(input, hidden)
            outputs[:, t] = output
            input = output.argmax(1)
        return outputs

# ==== Training ====
def train(model, data_loader, optimizer, criterion):
    model.train()
    epoch_loss = 0
    for src, trg in data_loader:
        src, trg = src.to(DEVICE), trg.to(DEVICE)
        output = model(src, trg)
        output_dim = output.shape[-1]
        output = output[:, 1:].reshape(-1, output_dim)
        trg = trg[:, 1:].reshape(-1)
        loss = criterion(output, trg)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(data_loader)

# ==== Main ====
def main():
    df = pd.read_excel("CIP_DATASETS.xlsx")
    df=df.dropna()
    if df.isnull().any().any():
        raise ValueError("CSV file contains NaN values.")
    dataset = TranslationDataset(df)

    global PAD_IDX
    PAD_IDX = dataset.src_vocab["<pad>"]

    # Split
    total_len = len(dataset)
    train_end = int(0.8 * total_len)
    val_end = int(0.9 * total_len)
    train_dataset = torch.utils.data.Subset(dataset, range(train_end))
    val_dataset = torch.utils.data.Subset(dataset, range(train_end, val_end))
    test_dataset = torch.utils.data.Subset(dataset, range(val_end, total_len))

    # Loaders
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=pad_collate)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=pad_collate)

    # Model
    encoder = Encoder(len(dataset.src_vocab), EMBED_SIZE, HIDDEN_SIZE, NUM_LAYERS, DROPOUT)
    decoder = Decoder(len(dataset.trg_vocab), EMBED_SIZE, HIDDEN_SIZE, NUM_LAYERS, DROPOUT)
    model = Seq2Seq(encoder, decoder, PAD_IDX).to(DEVICE)

    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
    criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

    for epoch in range(NUM_EPOCHS):
        loss = train(model, train_loader, optimizer, criterion)
        print(f"Epoch {epoch+1}/{NUM_EPOCHS}, Loss: {loss:.4f}")

    return model, dataset, DEVICE

# ==== Run ====
model, dataset, device = main()


Epoch 1/50, Loss: 4.9021
Epoch 2/50, Loss: 3.8066
Epoch 3/50, Loss: 3.7547
Epoch 4/50, Loss: 3.7296
Epoch 5/50, Loss: 3.7104
Epoch 6/50, Loss: 3.6738
Epoch 7/50, Loss: 3.6424
Epoch 8/50, Loss: 3.6090
Epoch 9/50, Loss: 3.5747
Epoch 10/50, Loss: 3.5322
Epoch 11/50, Loss: 3.4902
Epoch 12/50, Loss: 3.4550
Epoch 13/50, Loss: 3.4242
Epoch 14/50, Loss: 3.3932
Epoch 15/50, Loss: 3.3672
Epoch 16/50, Loss: 3.3453
Epoch 17/50, Loss: 3.3178
Epoch 18/50, Loss: 3.2933
Epoch 19/50, Loss: 3.2714
Epoch 20/50, Loss: 3.2460
Epoch 21/50, Loss: 3.2232
Epoch 22/50, Loss: 3.1964
Epoch 23/50, Loss: 3.1680
Epoch 24/50, Loss: 3.1404
Epoch 25/50, Loss: 3.1175
Epoch 26/50, Loss: 3.0935
Epoch 27/50, Loss: 3.0706
Epoch 28/50, Loss: 3.0438
Epoch 29/50, Loss: 3.0165
Epoch 30/50, Loss: 2.9909
Epoch 31/50, Loss: 2.9672
Epoch 32/50, Loss: 2.9407
Epoch 33/50, Loss: 2.9225
Epoch 34/50, Loss: 2.8979
Epoch 35/50, Loss: 2.8710
Epoch 36/50, Loss: 2.8456
Epoch 37/50, Loss: 2.8257
Epoch 38/50, Loss: 2.8072
Epoch 39/50, Loss: 2.

In [None]:
from sklearn.metrics import f1_score
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

def evaluate(model, dataset, device):
    test_dataset = torch.utils.data.Subset(dataset, range(int(0.9 * len(dataset)), len(dataset)))
    test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, collate_fn=pad_collate)
    model.eval()

    all_preds = []
    all_trues = []
    bleu_scores = []
    total_tokens = 0
    correct_tokens = 0

    with torch.no_grad():
        for src, trg in test_loader:
            src, trg = src.to(device), trg.to(device)
            output = model(src, trg)
            pred_ids = output.argmax(dim=-1).squeeze().tolist()
            true_ids = trg.squeeze().tolist()

            # Remove special tokens for metric calculation
            pred_tokens = [dataset.trg_inv_vocab.get(i, "") for i in pred_ids if i not in [PAD_IDX, dataset.trg_vocab['<sos>'], dataset.trg_vocab['<eos>']]]
            true_tokens = [dataset.trg_inv_vocab.get(i, "") for i in true_ids if i not in [PAD_IDX, dataset.trg_vocab['<sos>'], dataset.trg_vocab['<eos>']]]

            # For accuracy
            min_len = min(len(pred_tokens), len(true_tokens))
            correct_tokens += sum(p == t for p, t in zip(pred_tokens[:min_len], true_tokens[:min_len]))
            total_tokens += min_len

            # For BLEU
            smoothie = SmoothingFunction().method4
            bleu = sentence_bleu([true_tokens], pred_tokens, smoothing_function=smoothie)
            bleu_scores.append(bleu)

            # For F1
            all_preds.extend(pred_tokens)
            all_trues.extend(true_tokens)

    accuracy = correct_tokens / total_tokens if total_tokens else 0
    average_bleu = sum(bleu_scores) / len(bleu_scores) if bleu_scores else 0


    print(f"\nEvaluation Metrics:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"BLEU Score: {average_bleu:.4f}")


# ==== Run Evaluation ====
evaluate(model, dataset, device)



Evaluation Metrics:
Accuracy: 0.1856
BLEU Score: 0.0510
