In [None]:
import wandb
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient() 

personal_key_for_api = user_secrets.get_secret("wandb-key")

! wandb login $personal_key_for_api

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from torch.optim.lr_scheduler import StepLR
import os

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
import os
import torch
from torch.utils.data import Dataset

class DakshinaDataset(Dataset):
    def __init__(self, root_dir, split):
        """
        root_dir: Path to the root directory of the dataset.
        split: 'train', 'dev', or 'test'
        """
        assert split in ['train', 'dev', 'test'], "Split must be 'train', 'dev', or 'test'"
        
        # File mapping for the splits
        self.file_map = {
            'train': 'hi.translit.sampled.train.tsv',
            'dev': 'hi.translit.sampled.dev.tsv',
            'test': 'hi.translit.sampled.test.tsv'
        }
        
        self.data_path = os.path.join(root_dir, self.file_map[split])

        # Read data
        self.data = []
        with open(self.data_path, 'r', encoding='utf-8') as f:
            lines = f.readlines()
            for line_num, line in enumerate(lines, 1):
                parts = line.strip().split("\t")
                
                # If there are 3 parts, we only want the first two (Devnagari and Latin Transliteration)
                if len(parts) == 3:
                    devanagari, latin, _ = parts
                    self.data.append((latin, devanagari))
                else:
                    print(f" Warning: Line {line_num} in {self.data_path} is malformed: {line.strip()}")

        # Build vocabularies
        self.src_vocab = self.build_vocab([pair[0] for pair in self.data])
        self.trg_vocab = self.build_vocab([pair[1] for pair in self.data])

    def build_vocab(self, sentences):
        vocab = {"<pad>": 0, "<sos>": 1, "<eos>": 2, "<unk>": 3}
        idx = 4
        for sentence in sentences:
            for char in sentence:
                if char not in vocab:
                    vocab[char] = idx
                    idx += 1
        return vocab

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        src_sentence, trg_sentence = self.data[idx]
        
        # Tokenize
        src_indices = [self.src_vocab.get(char, 3) for char in src_sentence]  # 3 is <unk>
        trg_indices = [self.trg_vocab.get(char, 3) for char in trg_sentence]

        # Add <sos> and <eos>
        src_indices = [1] + src_indices + [2]
        trg_indices = [1] + trg_indices + [2]

        return torch.tensor(src_indices), torch.tensor(trg_indices)

In [None]:
sweep_config = {
    'method': 'bayes',
    'metric': {'name': 'Validation Accuracy', 'goal': 'maximize'},
    'parameters': {
        'embed_dim': {'values': [64, 256]},
        'hidden_dim': {'values': [64, 256]},
        'encoder_layers': {'values': [2, 3]},
        'cell_type': {'values': ['RNN', 'GRU', 'LSTM']},
        'dropout': {'values': [0.2, 0.3]},
        'beam_width': {'values': [3, 5]},
        'learning_rate' : {"values": [0.0001]},
        'teacher_forcing_ratio': {'values': [0.7]}
    }
}

In [None]:
import torch
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    """
    Custom collate function to handle batches with variable sequence lengths.
    """
    src_batch, trg_batch = zip(*batch)
    
    # Pad sequences to the maximum length in the batch
    src_padded = pad_sequence(src_batch, batch_first=True, padding_value=0)  # 0 is <pad>
    trg_padded = pad_sequence(trg_batch, batch_first=True, padding_value=0)
    
    return src_padded, trg_padded

In [None]:
from torch.utils.data import DataLoader
import os

# Paths
root_dir = '/kaggle/input/dataset-assign3/dakshina_dataset_v1.0/hi/lexicons'

# Initialize datasets
train_dataset = DakshinaDataset(root_dir, 'train')
val_dataset = DakshinaDataset(root_dir, 'dev')
test_dataset = DakshinaDataset(root_dir, 'test')

# Initialize DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

# Vocabulary sizes
SRC_VOCAB_SIZE = len(train_dataset.src_vocab) 
TRG_VOCAB_SIZE = len(train_dataset.trg_vocab) 

print("Source Vocab Size:", SRC_VOCAB_SIZE) 
print("Target Vocab Size:", TRG_VOCAB_SIZE) 

In [None]:
import torch.nn as nn

In [None]:
class Seq2SeqRNN(nn.Module):
    def __init__(self,
                 input_dim,
                 output_dim,
                 embed_dim=256,
                 hidden_dim=512,
                 encoder_layers=1,
                 cell_type='LSTM',
                 dropout=0.3):
        super(Seq2SeqRNN, self).__init__()

        self.encoder_embedding = nn.Embedding(input_dim, embed_dim)
        self.decoder_embedding = nn.Embedding(output_dim, embed_dim)

        rnn_cell = {'RNN': nn.RNN, 'LSTM': nn.LSTM, 'GRU': nn.GRU}[cell_type]
        decoder_layers = encoder_layers

        self.encoder = rnn_cell(embed_dim, hidden_dim, encoder_layers, batch_first=True, dropout=dropout if encoder_layers > 1 else 0)
        self.decoder = rnn_cell(embed_dim, hidden_dim, decoder_layers, batch_first=True, dropout=dropout if decoder_layers > 1 else 0)

        self.fc_out = nn.Linear(hidden_dim, output_dim)
        self.cell_type = cell_type

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size, trg_len = trg.size()
        trg_vocab_size = self.fc_out.out_features
        
        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(src.device)
        
        embedded_src = self.encoder_embedding(src)
        if self.cell_type == 'LSTM':
            enc_outputs, (hidden, cell) = self.encoder(embedded_src)
        else:
            enc_outputs, hidden = self.encoder(embedded_src)
            cell = None

        input_dec = trg[:, 0]

        for t in range(1, trg_len):
            embedded_dec = self.decoder_embedding(input_dec).unsqueeze(1)
            if self.cell_type == 'LSTM':
                output, (hidden, cell) = self.decoder(embedded_dec, (hidden, cell))
            else:
                output, hidden = self.decoder(embedded_dec, hidden)

            prediction = self.fc_out(output.squeeze(1))
            outputs[:, t, :] = prediction

            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            input_dec = trg[:, t] if teacher_force else prediction.argmax(1)
        
        return outputs


In [None]:
import torch

class BeamSearchDecoder:
    def __init__(self, model, trg_vocab, beam_width=3, max_len=30):
        self.model = model
        self.trg_vocab = trg_vocab
        self.beam_width = beam_width
        self.max_len = max_len
        self.EOS_token = trg_vocab['<eos>']
        self.SOS_token = trg_vocab['<sos>']

    def decode(self, src_input):
        device = src_input.device
        with torch.no_grad():
            # Encode source input
            embedded_src = self.model.encoder_embedding(src_input.unsqueeze(0))  # shape: (1, src_len, emb_dim)
            encoder_outputs, hidden = self.model.encoder(embedded_src)  # hidden: tuple if LSTM

            # Initialize beam with SOS token, score 0, and hidden state
            sequences = [([self.SOS_token], 0.0, hidden)]

            for _ in range(self.max_len):
                all_candidates = []
                for seq, score, hidden_state in sequences:
                    if seq[-1] == self.EOS_token:
                        all_candidates.append((seq, score, hidden_state))
                        continue

                    # Get the last token and embed it
                    input_tensor = torch.tensor([[seq[-1]]], device=device)  # shape: (1, 1)
                    embedded = self.model.decoder_embedding(input_tensor)  # shape: (1, 1, emb_dim)

                    # Decoder step
                    output, hidden_state = self.model.decoder(embedded, hidden_state)  # output: (1, 1, hidden_dim)

                    # Compute scores for top k tokens
                    output = self.model.fc_out(output.squeeze(1))  # shape: (1, vocab_size)
                    topk = torch.topk(output, self.beam_width, dim=1)

                    # Extend each sequence with top-k tokens
                    for i in range(self.beam_width):
                        token = topk.indices[0][i].item()
                        token_score = topk.values[0][i].item()
                        candidate_seq = seq + [token]
                        candidate_score = score - torch.log(torch.tensor(token_score + 1e-10)).item()
                        all_candidates.append((candidate_seq, candidate_score, hidden_state))

                # Sort all candidates and select the top ones
                ordered = sorted(all_candidates, key=lambda x: x[1])
                sequences = ordered[:self.beam_width]

            best_sequence = sequences[0][0]
            return best_sequence


In [None]:
import torch.optim as optim

def train_model(model, train_loader, val_loader, config):
    criterion = nn.CrossEntropyLoss(ignore_index=0)
    optimizer = optim.Adam(model.parameters(), lr=config.learning_rate, weight_decay=1e-5)

    for epoch in range(config.epochs):
        model.train()
        epoch_loss = 0

        for src, trg in train_loader:
            src, trg = src.to(device), trg.to(device)

            optimizer.zero_grad()
            output = model(src, trg, config.teacher_forcing_ratio)

            output_dim = output.shape[-1]
            output = output[:, 1:].reshape(-1, output_dim)
            trg = trg[:, 1:].reshape(-1)

            loss = criterion(output, trg)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        val_loss, val_acc = evaluate_model(model, val_loader)
        print(f"Epoch {epoch+1}: Train Loss = {epoch_loss/len(train_loader):.4f}, Val Loss = {val_loss:.4f}, Val Accuracy = {val_acc*100:.2f}%")
        
        # Log to WandB
        wandb.log({
            "Train Loss": epoch_loss / len(train_loader), 
            "Validation Loss": val_loss,
            "Validation Accuracy" : val_acc,
            "Epoch": epoch + 1
        })

    return val_acc, model

def evaluate_model(model, val_loader):
    model.eval()
    criterion = nn.CrossEntropyLoss(ignore_index=0)
    val_loss = 0
    total, correct = 0, 0

    with torch.no_grad():
        for src, trg in val_loader:
            src, trg = src.to(device), trg.to(device)
            output = model(src, trg, teacher_forcing_ratio=0)  # no teacher forcing

            output_dim = output.shape[-1]
            output_flat = output[:, 1:].reshape(-1, output_dim)
            trg_flat = trg[:, 1:].reshape(-1)

            loss = criterion(output_flat, trg_flat)
            val_loss += loss.item()

            # Accuracy: Compare predicted tokens to target
            predictions = output.argmax(dim=-1)
            for i in range(trg.shape[0]):
                for j in range(1, trg.shape[1]):  # skip <sos>
                    if trg[i, j] == 0:  # padding
                        break
                    total += 1
                    if predictions[i, j] == trg[i, j]:
                        correct += 1

    accuracy = correct / total if total != 0 else 0
    return val_loss / len(val_loader), accuracy


In [None]:
sweep_id = wandb.sweep(sweep_config, project='assignment3')

In [None]:
def sweep_train(config=None):
    default = {
        "epochs": 20,
        "embed_dim": 256,
        "hidden_dim": 256,
        "encoder_layers": 2,
        "cell_type": "GRU",
        "dropout": 0.3,
        "beam_width": 3
    }

    wandb.init(project="assignment3", entity="da6401-assignments", config=default)
    config = wandb.config

    model = Seq2SeqRNN(
        input_dim=SRC_VOCAB_SIZE,
        output_dim=TRG_VOCAB_SIZE,
        embed_dim=config.embed_dim,
        hidden_dim=config.hidden_dim,
        encoder_layers=config.encoder_layers,
        cell_type=config.cell_type,
        dropout=config.dropout
    ).to(device)

    val_accuracy, model = train_model(model, train_loader, val_loader, config)

    wandb.log({
        'Validation Accuracy': val_accuracy
    })

    wandb.run.save()


In [None]:
wandb.agent(sweep_id, sweep_train, count=200)

In [None]:
api = wandb.Api()
runs = api.runs("da6401-assignments/assignment3")
best_run = max(runs, key=lambda run: run.summary.get("validation Accuracy", float("inf")))

print(f"Best run name: {best_run.name}. \nValidation Accuracy: {best_run.summary.get('validation Accuracy')}")

In [None]:
def log_predictions_to_wandb(model, test_loader, trg_vocab, idx_to_trg, num_samples=10):
    model.eval()
    decoder = BeamSearchDecoder(model, trg_vocab, beam_width=5)
    
    table = wandb.Table(columns=["Input", "Target", "Prediction"])
    count = 0

    with torch.no_grad():
        for src_batch, trg_batch in test_loader:
            src_batch, trg_batch = src_batch.to(device), trg_batch.to(device)
            for src, trg in zip(src_batch, trg_batch):
                if count >= num_samples:
                    break

                pred_seq = decoder.decode(src)
                pred_seq = pred_seq[1:]
                if trg_vocab['<eos>'] in pred_seq:
                    pred_seq = pred_seq[:pred_seq.index(trg_vocab['<eos>'])]

                trg_seq = trg[1:]
                if trg_vocab['<eos>'] in trg_seq:
                    trg_seq = trg_seq[:(trg_seq == trg_vocab['<eos>']).nonzero(as_tuple=True)[0][0]]

                # Convert token ids to strings
                input_str = " ".join([idx_to_trg.get(tok.item(), "<unk>") for tok in src if tok.item() in idx_to_trg])
                target_str = " ".join([idx_to_trg.get(tok.item(), "<unk>") for tok in trg_seq if tok.item() in idx_to_trg])
                pred_str = " ".join([idx_to_trg.get(tok, "<unk>") for tok in pred_seq])

                table.add_data(input_str, target_str, pred_str)
                count += 1

    wandb.log({"Sample Predictions": table})


In [None]:
#best model
default = {
    "epochs": 20,
    "embed_dim": 64,
    "hidden_dim": 256,
    "encoder_layers": 2,
    "cell_type": "LSTM",
    "dropout": 0.2,
    "beam_width": 3,
    "learning_rate": 0.0001,
    "teacher_forcing_ratio": 0.7
}

sweep_config = {
    'method': 'bayes',
    'metric': {'name': 'Validation Accuracy', 'goal': 'maximize'},
    'parameters': {
        'epochs' : {'values':[20]},
        'embed_dim': {'values': [64]},
        'hidden_dim': {'values': [256]},
        'encoder_layers': {'values': [3]},
        'cell_type': {'values': ['LSTM']},
        'dropout': {'values': [0.2]},
        'beam_width': {'values': [3]},
        'learning_rate': {'values': [0.0001]},
        'teacher_forcing_ratio': {'values': [0.7]}
    }
}

def sweep_train_wrapper(idx_to_trg):
    def sweep_train():
        with wandb.init() as run:
            config = run.config

            model = Seq2SeqRNN(
                input_dim=SRC_VOCAB_SIZE,
                output_dim=TRG_VOCAB_SIZE,
                embed_dim=config.embed_dim,
                hidden_dim=config.hidden_dim,
                encoder_layers=config.encoder_layers,
                cell_type=config.cell_type,
                dropout=config.dropout
            ).to(device)

            val_accuracy, model = train_model(model, train_loader, val_loader, config)
            wandb.log({"Validation Accuracy": val_accuracy})

            log_predictions_to_wandb(model, test_loader, train_dataset.trg_vocab, idx_to_trg, num_samples=10)
            torch.save(model.state_dict(), f"model_{run.name}.pt")

    return sweep_train

idx_to_trg = {v: k for k, v in train_dataset.trg_vocab.items()}
sweep_id = wandb.sweep(sweep_config, project='assignment3')
wandb.agent(sweep_id, sweep_train_wrapper(idx_to_trg), count=1)


In [None]:
model = Seq2SeqRNN(
    input_dim=SRC_VOCAB_SIZE,
    output_dim=TRG_VOCAB_SIZE,
    embed_dim=64,
    hidden_dim=256,
    encoder_layers=3,
    cell_type='LSTM',
    dropout=0.2
).to(device)
model.load_state_dict(torch.load("model_hearty-sweep-1.pt"))
model.eval()


In [None]:
def compute_test_accuracy(model, test_loader, trg_vocab, idx_to_trg):
    model.eval()
    decoder = BeamSearchDecoder(model, trg_vocab, beam_width=5)
    
    correct = 0
    total = 0

    with torch.no_grad():
        for src_batch, trg_batch in test_loader:
            src_batch, trg_batch = src_batch.to(device), trg_batch.to(device)

            for src, trg in zip(src_batch, trg_batch):
                pred_seq = decoder.decode(src)
                
                # Remove <sos> and everything after <eos>
                pred_seq = pred_seq[1:]
                if trg_vocab['<eos>'] in pred_seq:
                    pred_seq = pred_seq[:pred_seq.index(trg_vocab['<eos>'])]

                trg_seq = trg[1:]  # remove <sos>
                if (trg == 0).all():
                    continue  # skip padded sequences
                if trg_vocab['<eos>'] in trg_seq:
                    trg_seq = trg_seq[:(trg_seq == trg_vocab['<eos>']).nonzero(as_tuple=True)[0][0]]

                # Compare predicted and target sequences
                if torch.equal(torch.tensor(pred_seq, device=device), trg_seq[:len(pred_seq)]):
                    correct += 1
                total += 1

    test_acc = correct / total
    print(f"Test Accuracy: {test_acc*100:.2f}%")
    return test_acc


In [None]:
# Create reverse vocab to convert index to character (for optional debug)
idx_to_trg = {i: c for c, i in train_dataset.trg_vocab.items()}

# Evaluate on test set
test_accuracy = compute_test_accuracy(model, test_loader, train_dataset.trg_vocab, idx_to_trg)
