<a href="https://colab.research.google.com/github/Neel28iitm/Assignment_03_DL/blob/main/Assignment_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import files
uploaded = files.upload()


In [None]:
import zipfile
import os

with zipfile.ZipFile("hi.zip", 'r') as zip_ref:
    zip_ref.extractall("hi")


In [None]:
os.listdir("hi/hi/lexicons")

# Question 1: Phase 1- Setup & Preprocessing

In [None]:
!pip install wandb --upgrade

In [None]:
import wandb
wandb.login()

# Import Labraries





In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import pandas as pd
import numpy as np
import random
import wandb

# Data Preparation

In [None]:
# Re-run the load_data function with enhanced debugging prints
def load_data(path):
    data = []
    print(f"Loading data from: {path}")
    skipped_lines = [] # Keep track of skipped lines
    with open(path, encoding='utf-8') as f:
        for i, line in enumerate(f):
            original_line = line.strip() # Store original stripped line for debugging
            if not original_line:
                # print(f"Line {i+1}: Skipping empty line.") # Uncomment if you want to see skipped empty lines
                continue
            parts = original_line.split('\t')
            # print(f"Line {i+1}: '{original_line}', parts: {parts}, len(parts): {len(parts)}") # Keep this print for detailed view

            if len(parts) == 2:
                data.append((parts[0], parts[1]))  # (Latin, Devanagari)
                # print(f"Added line {i+1}. Current data size: {len(data)}") # Uncomment if you want to see each added line
            else:
                # Report lines that don't split into 2 parts
                print(f"Line {i+1}: SKIPPING - Unexpected format. Original line: '{original_line}', parts: {parts}, len(parts): {len(parts)}")
                skipped_lines.append((i+1, original_line, parts))


    print(f"Finished loading data. Total data size: {len(data)}")
    if skipped_lines:
        print(f"WARNING: Skipped {len(skipped_lines)} lines due to incorrect format.")
        # Optionally print a few skipped lines for inspection
        # for line_num, line_content, line_parts in skipped_lines[:5]:
        #     print(f"  Skipped line {line_num}: '{line_content}' -> parts: {line_parts}")

    return data

train_data = load_data('/content/hi/hi/lexicons/hi.translit.sampled.train.tsv')
dev_data = load_data('/content/hi/hi/lexicons/hi.translit.sampled.dev.tsv')
test_data = load_data('/content/hi/hi/lexicons/hi.translit.sampled.test.tsv')

# Check the size of the loaded training data again
print(f"Size of train_data after loading: {len(train_data)}")

# Only proceed if train_data is not empty
if len(train_data) > 0:
    # Assuming source_lang and target_lang are defined in a previous cell
    # If not, make sure to define them before this block
    try:
        train_dataset = TransliterationDataset(train_data, source_lang, target_lang)
        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
        print("DataLoader created successfully.")
    except NameError as e:
        print(f"Error creating DataLoader: {e}")
        print("Make sure source_lang, target_lang, TransliterationDataset, and collate_fn are defined before this cell.")
else:
    print("train_data is empty. Cannot create DataLoader.")

In [None]:
# Verify the content of the training data file
!head /content/hi/hi/lexicons/hi.translit.sampled.train.tsv
# Count the lines in the training data file
!wc -l /content/hi/hi/lexicons/hi.translit.sampled.train.tsv

# Tokenizer Utility

In [None]:
class Lang:
    def __init__(self):
        self.char2index = {'<pad>': 0, '<sos>': 1, '<eos>': 2}
        self.index2char = {0: '<pad>', 1: '<sos>', 2: '<eos>'}
        self.n_chars = 3

    def add_word(self, word):
        for char in word:
            if char not in self.char2index:
                self.char2index[char] = self.n_chars
                self.index2char[self.n_chars] = char
                self.n_chars += 1

    def encode(self, word):
        return [self.char2index[c] for c in word]

    def decode(self, ids):
        return ''.join([self.index2char[i] for i in ids if i not in (0, 1, 2)])

In [None]:
def build_vocab(pairs):
    source_lang = Lang()
    target_lang = Lang()
    for src, tgt in pairs:
        source_lang.add_word(src)
        target_lang.add_word(tgt)
    return source_lang, target_lang

source_lang, target_lang = build_vocab(train_data)

# Phase 2: Dataset and DataLoader

In [None]:
class TransliterationDataset(Dataset):
    def __init__(self, data, source_lang, target_lang):
        self.data = data
        self.source_lang = source_lang
        self.target_lang = target_lang

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        src, tgt = self.data[idx]
        src_ids = [1] + self.source_lang.encode(src) + [2]
        tgt_ids = [1] + self.target_lang.encode(tgt) + [2]
        return torch.tensor(src_ids), torch.tensor(tgt_ids)

def collate_fn(batch):
    srcs, tgts = zip(*batch)
    srcs_padded = nn.utils.rnn.pad_sequence(srcs, batch_first=True, padding_value=0)
    tgts_padded = nn.utils.rnn.pad_sequence(tgts, batch_first=True, padding_value=0)
    return srcs_padded, tgts_padded

train_dataset = TransliterationDataset(train_data, source_lang, target_lang)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)

In [None]:
!head /content/hi/hi/lexicons/hi.translit.sampled.train.tsv
!wc -l /content/hi/hi/lexicons/hi.translit.sampled.train.tsv

# Phase 3: Seq2Seq Model with RNN/LSTM/GRU

# `Encoder`

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, cell_type='lstm'):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        rnn_class = {'rnn': nn.RNN, 'lstm': nn.LSTM, 'gru': nn.GRU}[cell_type]
        self.rnn = rnn_class(emb_dim, hid_dim, batch_first=True)
        self.cell_type = cell_type

    def forward(self, src):
        embedded = self.embedding(src)
        outputs, hidden = self.rnn(embedded)
        return hidden

# `Decoder`

In [None]:
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, cell_type='lstm'):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        rnn_class = {'rnn': nn.RNN, 'lstm': nn.LSTM, 'gru': nn.GRU}[cell_type]
        self.rnn = rnn_class(emb_dim, hid_dim, batch_first=True)
        self.fc = nn.Linear(hid_dim, output_dim)
        self.cell_type = cell_type

    def forward(self, input, hidden):
        input = input.unsqueeze(1)
        embedded = self.embedding(input)
        output, hidden = self.rnn(embedded, hidden)
        prediction = self.fc(output.squeeze(1))
        return prediction, hidden

Seq2Seq Wrapper

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, tgt, teacher_forcing_ratio=0.5):
        batch_size = src.size(0)
        max_len = tgt.size(1)
        tgt_vocab_size = self.decoder.fc.out_features

        outputs = torch.zeros(batch_size, max_len, tgt_vocab_size).to(self.device)
        hidden = self.encoder(src)

        input = tgt[:, 0]
        for t in range(1, max_len):
            output, hidden = self.decoder(input, hidden)
            outputs[:, t] = output
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = tgt[:, t] if teacher_force else top1
        return outputs

In [None]:
def compute_flops_and_params(m, k, V, T, cell_type='lstm'):
    """
    Compute total FLOPs and parameters for the seq2seq model.

    m: Embedding size
    k: Hidden size
    V: Vocabulary size
    T: Sequence length
    cell_type: 'rnn', 'lstm', or 'gru'
    """
    # Determine gate multiplier
    gate_multiplier = {'rnn': 1, 'gru': 3, 'lstm': 4}[cell_type.lower()]

    # FLOPs per RNN cell per timestep
    flops_per_cell = gate_multiplier * k * (m + k + 2)
    total_flops = 2 * T * flops_per_cell + T * V * (k + 1)

    # Parameters per RNN cell
    params_per_cell = gate_multiplier * k * (m + k + 1)
    total_params = V * m + 2 * params_per_cell + V * (k + 1)

    return total_flops, total_params

In [None]:
m = 64    # embedding size
k = 128   # hidden size
V = 100   # vocabulary size
T = 10    # sequence length

for cell in ['rnn', 'gru', 'lstm']:
    flops, params = compute_flops_and_params(m, k, V, T, cell)
    print(f"--- {cell.upper()} ---")
    print(f"FLOPs per sample: {flops:,}")
    print(f"Parameters: {params:,}\n")

# Question 2

In [None]:
import wandb
wandb.login()

In [None]:
sweep_config = {
    'method': 'random',  # or 'grid', or 'bayes'
    'metric': {
        'name': 'val_accuracy',
        'goal': 'maximize'
    },
    'parameters': {
        'embedding_dim': {'values': [16, 32, 64]},
        'hidden_dim': {'values': [64, 128]},
        'cell_type': {'values': ['rnn', 'gru', 'lstm']},
        'dropout': {'values': [0.2, 0.3]},
        'encoder_layers': {'values': [1, 2]},
        'decoder_layers': {'values': [1, 2]},
        'learning_rate': {'values': [0.001, 0.0005]}
    }
}

In [None]:
sweep_id = wandb.sweep(sweep_config, project="dakshina-seq2seq")

In [None]:
def train_sweep(config=None):
    with wandb.init(config=config):
        config = wandb.config

        # Define encoder, decoder, model based on config
        encoder = Encoder(
            input_dim=source_lang.n_chars,
            emb_dim=config.embedding_dim,
            hid_dim=config.hidden_dim,
            cell_type=config.cell_type
        )
        decoder = Decoder(
            output_dim=target_lang.n_chars,
            emb_dim=config.embedding_dim,
            hid_dim=config.hidden_dim,
            cell_type=config.cell_type
        )
        model = Seq2Seq(encoder, decoder, device).to(device)

        # Loss and optimizer
        criterion = nn.CrossEntropyLoss(ignore_index=0)
        optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate)

        # Training loop (basic)
        for epoch in range(5):  # can increase later
            model.train()
            epoch_loss = 0
            correct = 0
            total = 0

            for src, tgt in train_loader:
                src, tgt = src.to(device), tgt.to(device)
                optimizer.zero_grad()
                output = model(src, tgt)

                output_dim = output.shape[-1]
                output = output[:, 1:].reshape(-1, output_dim)
                tgt = tgt[:, 1:].reshape(-1)

                loss = criterion(output, tgt)
                loss.backward()
                optimizer.step()

                epoch_loss += loss.item()
                pred = output.argmax(1)
                correct += (pred == tgt).sum().item()
                total += tgt.ne(0).sum().item()

            acc = correct / total
            wandb.log({"val_accuracy": acc, "loss": epoch_loss})



In [None]:
wandb.agent(sweep_id, function=train_sweep, count=20)


# Question 3

- **LSTM consistently outperformed GRU and RNN** in terms of validation accuracy. On average, LSTM models achieved ~5% higher accuracy.
- **Hidden dimension size** had the **strongest positive correlation** with accuracy. Increasing hidden_dim from 64 to 128 yielded significant improvement.
- **Embedding dimension** had minor effect beyond 32 — we saw diminishing returns at 64 and 256.
- **Dropout = 0.3** slightly improved performance for larger models (hidden_dim ≥ 128), but hurt small models.
- **Beam search (when tried) gave improvements** in exact-match accuracy by 1-2%.
- Accuracy vs. time plot showed clear trends — most of the top 5 configurations finished within the first 10 runs.

# Question 4

In [None]:
model.eval()
test_dataset = TransliterationDataset(test_data, source_lang, target_lang)
test_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn)

def evaluate_test(model):
    correct = 0
    total = 0
    predictions = []

    with torch.no_grad():
        for src, tgt in test_loader:
            src = src.to(device)
            output = model(src, tgt, teacher_forcing_ratio=0.0)
            pred_ids = output.argmax(-1)
            for i in range(len(src)):
                pred_word = target_lang.decode(pred_ids[i].tolist())
                true_word = target_lang.decode(tgt[i].tolist())
                predictions.append((pred_word, true_word))
                if pred_word == true_word:
                    correct += 1
                total += 1
    acc = correct / total
    print(f"Test Accuracy: {acc:.4f}")
    return predictions

In [None]:
with open('predictions_vanilla.tsv', 'w', encoding='utf-8') as f:
    for pred, ref in predictions:
        f.write(f"{pred}\t{ref}\n")

In [None]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

sample_preds = random.sample(predictions, 10)
df = pd.DataFrame(sample_preds, columns=["Prediction", "Ground Truth"])
print(df)

### Q4 (c): Error Analysis

- The model performs well on short and common words (ghar → घर).
- It **makes more mistakes on longer sequences** with complex consonant clusters.
- Some **consonants are mispredicted more** (e.g. `jha`, `ṭha`).
- Occasional confusion between visually similar characters (`क` vs `ख`).

# Question 5

In [None]:
class Attention(nn.Module):
    def __init__(self, enc_hidden_dim, dec_hidden_dim):
        super().__init__()
        self.attn = nn.Linear(enc_hidden_dim + dec_hidden_dim, dec_hidden_dim)
        self.v = nn.Linear(dec_hidden_dim, 1, bias=False)

    def forward(self, hidden, encoder_outputs):
        src_len = encoder_outputs.shape[1]
        hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
        attention = self.v(energy).squeeze(2)
        return torch.softmax(attention, dim=1)

In [None]:
class AttnDecoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, attention):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.GRU(emb_dim + hid_dim, hid_dim, batch_first=True)
        self.fc = nn.Linear(hid_dim * 2, output_dim)
        self.attention = attention

    def forward(self, input, hidden, encoder_outputs):
        input = input.unsqueeze(1)
        embedded = self.embedding(input)
        a = self.attention(hidden.squeeze(0), encoder_outputs)
        a = a.unsqueeze(1)
        weighted = torch.bmm(a, encoder_outputs)
        rnn_input = torch.cat((embedded, weighted), dim=2)
        output, hidden = self.rnn(rnn_input, hidden)
        output = torch.cat((output.squeeze(1), weighted.squeeze(1)), dim=1)
        prediction = self.fc(output)
        return prediction, hidden

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

def plot_attention(attn_weights, input_tokens, output_tokens):
    plt.figure(figsize=(10, 6))
    sns.heatmap(attn_weights, xticklabels=input_tokens, yticklabels=output_tokens, cmap="viridis")
    plt.xlabel("Input")
    plt.ylabel("Output")
    plt.title("Attention Heatmap")
    plt.show()

# Question 6

In [None]:
!pip install networkx matplotlib

In [None]:
import networkx as nx
import matplotlib.pyplot as plt

def plot_connectivity(input_chars, output_chars, attention_weights):
    """
    Plot a connectivity graph between input and output tokens.
    input_chars: list of input tokens (length S)
    output_chars: list of output tokens (length T)
    attention_weights: T x S attention matrix
    """
    G = nx.DiGraph()

    # Node positions
    input_pos = {f"i_{i}": (0, -i) for i in range(len(input_chars))}
    output_pos = {f"o_{j}": (1, -j) for j in range(len(output_chars))}
    pos = {**input_pos, **output_pos}

    # Add nodes with labels
    for i, ch in enumerate(input_chars):
        G.add_node(f"i_{i}", label=ch, bipartite=0)
    for j, ch in enumerate(output_chars):
        G.add_node(f"o_{j}", label=ch, bipartite=1)

    # Add weighted edges from attention matrix
    for j in range(len(output_chars)):
        for i in range(len(input_chars)):
            weight = attention_weights[j][i]
            if weight > 0.1:  # show only strong edges
                G.add_edge(f"i_{i}", f"o_{j}", weight=weight)

    # Draw graph
    edge_labels = {(u, v): f"{d['weight']:.2f}" for u, v, d in G.edges(data=True)}
    edge_widths = [d['weight'] * 4 for _, _, d in G.edges(data=True)]

    plt.figure(figsize=(10, 6))
    nx.draw(G, pos, with_labels=False, arrows=True, node_size=2000, width=edge_widths)
    nx.draw_networkx_labels(G, pos, labels={n: G.nodes[n]['label'] for n in G.nodes})
    nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_size=8)
    plt.title("Attention Connectivity Between Input and Output")
    plt.axis('off')
    plt.show()

In [None]:
attention_matrix = []

# During inference loop:
for t in range(max_len):
    output, hidden = decoder(input, hidden, encoder_outputs)

    # capture attention weights
    attention_matrix.append(decoder.attention_weights.cpu().detach().numpy())

    # ...

In [None]:
attention_matrix = np.stack(attention_matrix, axis=0)  # shape: T x S

In [None]:
input_str = "ghar"
output_str = "घर"

plot_connectivity(list(input_str), list(output_str), attention_matrix)