In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import wandb
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import os

In [2]:
wandb.login(key="559009604832a12ab57d01a86b6119ec05637a17")

[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc
[34m[1mwandb[0m: Currently logged in as: [33meshan_kulkarni[0m ([33meshan_kulkarni-indian-institute-of-technology-madras[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


True

# 1. Load and Pre-process data

In [3]:
# Load and preprocess data
def load_data(language='hi'):
    # Load the Dakshina dataset
    # Replace with actual paths to the dataset
    train_path = f'/kaggle/input/dakshina-dataset-seq2seq-for-transliteration/dakshina_dataset_v1.0/hi/lexicons/hi.translit.sampled.train.tsv'
    dev_path = f'/kaggle/input/dakshina-dataset-seq2seq-for-transliteration/dakshina_dataset_v1.0/hi/lexicons/hi.translit.sampled.dev.tsv'
    test_path = f'/kaggle/input/dakshina-dataset-seq2seq-for-transliteration/dakshina_dataset_v1.0/hi/lexicons/hi.translit.sampled.test.tsv'
    
    # Read TSV files with proper formatting
    train_data = pd.read_csv(train_path, sep='\t', header=None, names=['latin', 'devanagari'], quoting=3)
    dev_data = pd.read_csv(dev_path, sep='\t', header=None, names=['latin', 'devanagari'], quoting=3)
    test_data = pd.read_csv(test_path, sep='\t', header=None, names=['latin', 'devanagari'], quoting=3)

    # Convert all data to strings and strip whitespace
    train_data = train_data.applymap(lambda x: str(x).strip())
    dev_data = dev_data.applymap(lambda x: str(x).strip())
    test_data = test_data.applymap(lambda x: str(x).strip())
    
    return train_data, dev_data, test_data

# 2. Create Vocabulary

In [4]:
# Create vocabulary
def create_vocab(data):
    latin_chars = set()
    devanagari_chars = set()
    
    for _, row in data.iterrows():
        # Ensure we're processing strings
        latin_word = str(row['latin'])
        devanagari_word = str(row['devanagari'])
        
        latin_chars.update(latin_word)
        devanagari_chars.update(devanagari_word)
    
    # Add special tokens
    latin_vocab = {char: idx+4 for idx, char in enumerate(sorted(latin_chars))}
    devanagari_vocab = {char: idx+4 for idx, char in enumerate(sorted(devanagari_chars))}
    
    # Add special tokens
    latin_vocab['<PAD>'] = 0
    latin_vocab['<SOS>'] = 1
    latin_vocab['<EOS>'] = 2
    latin_vocab['<UNK>'] = 3
    
    devanagari_vocab['<PAD>'] = 0
    devanagari_vocab['<SOS>'] = 1
    devanagari_vocab['<EOS>'] = 2
    devanagari_vocab['<UNK>'] = 3
    
    return latin_vocab, devanagari_vocab

In [5]:
# Dataset class
class TransliterationDataset(Dataset):
    def __init__(self, data, latin_vocab, devanagari_vocab):
        self.data = data
        self.latin_vocab = latin_vocab
        self.devanagari_vocab = devanagari_vocab
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        latin = self.data.iloc[idx]['latin']
        devanagari = self.data.iloc[idx]['devanagari']
        
        # Convert to indices
        latin_indices = [self.latin_vocab['<SOS>']] + \
                        [self.latin_vocab.get(c, self.latin_vocab['<UNK>']) for c in latin] + \
                        [self.latin_vocab['<EOS>']]
                        
        devanagari_indices = [self.devanagari_vocab['<SOS>']] + \
                            [self.devanagari_vocab.get(c, self.devanagari_vocab['<UNK>']) for c in devanagari] + \
                            [self.devanagari_vocab['<EOS>']]
        
        return torch.tensor(latin_indices, dtype=torch.long), torch.tensor(devanagari_indices, dtype=torch.long)

# Collate function for DataLoader
def collate_fn(batch):
    latin_batch, devanagari_batch = zip(*batch)
    
    # Pad sequences
    latin_padded = torch.nn.utils.rnn.pad_sequence(latin_batch, padding_value=0, batch_first=True)
    devanagari_padded = torch.nn.utils.rnn.pad_sequence(devanagari_batch, padding_value=0, batch_first=True)
    
    return latin_padded, devanagari_padded

# Question 1: Vanilla Seq2Seq Model

In [6]:
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, cell_type='LSTM', dropout=0.0):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.cell_type = cell_type
        
        self.embedding = nn.Embedding(input_size, embedding_size)
        
        if cell_type == 'LSTM':
            self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
        elif cell_type == 'GRU':
            self.rnn = nn.GRU(embedding_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
        else:  # Vanilla RNN
            self.rnn = nn.RNN(embedding_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
        
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        # x shape: (batch_size, seq_len)
        embedded = self.dropout(self.embedding(x))  # (batch_size, seq_len, embedding_size)
        
        if self.cell_type == 'LSTM':
            outputs, (hidden, cell) = self.rnn(embedded)
            return outputs, hidden, cell
        else:
            outputs, hidden = self.rnn(embedded)
            return outputs, hidden, None

class Decoder(nn.Module):
    def __init__(self, output_size, embedding_size, hidden_size, num_layers, cell_type='LSTM', dropout=0.0):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.cell_type = cell_type
        
        self.embedding = nn.Embedding(output_size, embedding_size)
        
        if cell_type == 'LSTM':
            self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
        elif cell_type == 'GRU':
            self.rnn = nn.GRU(embedding_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
        else:  # Vanilla RNN
            self.rnn = nn.RNN(embedding_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
        
        self.fc = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, hidden, cell=None):
        # x shape: (batch_size, 1)
        x = x.unsqueeze(1)  # (batch_size, 1)
        embedded = self.dropout(self.embedding(x))  # (batch_size, 1, embedding_size)
        
        if self.cell_type == 'LSTM':
            output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        else:
            output, hidden = self.rnn(embedded, hidden)
        
        prediction = self.fc(output.squeeze(1))  # (batch_size, output_size)
        return prediction, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        
    def forward(self, source, target, teacher_forcing_ratio=0.5):
        batch_size = source.shape[0]
        target_len = target.shape[1]
        target_vocab_size = self.decoder.fc.out_features
        
        # Initialize outputs tensor
        outputs = torch.zeros(batch_size, target_len, target_vocab_size).to(self.device)
        
        # Encoder forward pass
        encoder_outputs, hidden, cell = self.encoder(source)
        
        # First input to decoder is <SOS> token
        input = target[:, 0]
        
        for t in range(1, target_len):
            # Decoder forward pass
            output, hidden, cell = self.decoder(input, hidden, cell)
            
            # Store predictions
            outputs[:, t] = output
            
            # Decide whether to use teacher forcing
            teacher_force = np.random.random() < teacher_forcing_ratio
            
            # Get the next input
            top1 = output.argmax(1)
            input = target[:, t] if teacher_force else top1
        
        return outputs

# Answer to Question 1(a) and 1(b)
"""
Question 1(a): Total number of computations

For a sequence length T, embedding size m, hidden size k, and vocabulary size V:

1. Embedding lookup: O(T*m) per sequence
2. Encoder RNN: 
   - Vanilla RNN: O(T*(m*k + k*k)) per sequence
   - LSTM/GRU: O(4*T*(m*k + k*k)) per sequence (4x for gates)
3. Decoder RNN: Same as encoder for each time step, repeated T times
4. Output layer: O(T*k*V) per sequence

Total computations: 
For RNN: O(T*(m*k + k*k + k*V))
For LSTM/GRU: O(T*(4*(m*k + k*k) + k*V))

Question 1(b): Total number of parameters

1. Embedding layer: V*m (shared between encoder and decoder if same vocab)
2. Encoder RNN:
   - Vanilla RNN: m*k + k*k (input-hidden + hidden-hidden)
   - LSTM: 4*(m*k + k*k) (4 gates)
   - GRU: 3*(m*k + k*k) (3 gates)
3. Decoder RNN: Same as encoder
4. Output layer: k*V

Total parameters:
For RNN: V*m + (m*k + k*k) + (m*k + k*k) + k*V
For LSTM: V*m + 4*(m*k + k*k) + 4*(m*k + k*k) + k*V
For GRU: V*m + 3*(m*k + k*k) + 3*(m*k + k*k) + k*V
"""

'\nQuestion 1(a): Total number of computations\n\nFor a sequence length T, embedding size m, hidden size k, and vocabulary size V:\n\n1. Embedding lookup: O(T*m) per sequence\n2. Encoder RNN: \n   - Vanilla RNN: O(T*(m*k + k*k)) per sequence\n   - LSTM/GRU: O(4*T*(m*k + k*k)) per sequence (4x for gates)\n3. Decoder RNN: Same as encoder for each time step, repeated T times\n4. Output layer: O(T*k*V) per sequence\n\nTotal computations: \nFor RNN: O(T*(m*k + k*k + k*V))\nFor LSTM/GRU: O(T*(4*(m*k + k*k) + k*V))\n\nQuestion 1(b): Total number of parameters\n\n1. Embedding layer: V*m (shared between encoder and decoder if same vocab)\n2. Encoder RNN:\n   - Vanilla RNN: m*k + k*k (input-hidden + hidden-hidden)\n   - LSTM: 4*(m*k + k*k) (4 gates)\n   - GRU: 3*(m*k + k*k) (3 gates)\n3. Decoder RNN: Same as encoder\n4. Output layer: k*V\n\nTotal parameters:\nFor RNN: V*m + (m*k + k*k) + (m*k + k*k) + k*V\nFor LSTM: V*m + 4*(m*k + k*k) + 4*(m*k + k*k) + k*V\nFor GRU: V*m + 3*(m*k + k*k) + 3*(m*k

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

# Question 2: Hyperparameter Tuning with W&B Sweep

In [None]:
# Define training function
def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    
    for src, trg in tqdm(iterator, desc="Training"):
        src, trg = src.to(device), trg.to(device)
        
        optimizer.zero_grad()
        output = model(src, trg)
        
        # Reshape for loss calculation
        output_dim = output.shape[-1]
        output = output[:, 1:].reshape(-1, output_dim)
        trg = trg[:, 1:].reshape(-1)
        
        loss = criterion(output, trg)
        loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        
        optimizer.step()
        epoch_loss += loss.item()
    
    return epoch_loss / len(iterator)

def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    
    with torch.no_grad():
        for src, trg in tqdm(iterator, desc="Evaluating"):
            src, trg = src.to(device), trg.to(device)
            output = model(src, trg, teacher_forcing_ratio=0)
            
            output_dim = output.shape[-1]
            output = output[:, 1:].reshape(-1, output_dim)
            trg = trg[:, 1:].reshape(-1)
            
            loss = criterion(output, trg)
            epoch_loss += loss.item()
    
    return epoch_loss / len(iterator)

def accuracy(model, iterator):
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for src, trg in iterator:
            src, trg = src.to(device), trg.to(device)
            output = model(src, trg, teacher_forcing_ratio=0)
            
            # Get predictions
            preds = output.argmax(2)
            
            # Compare with targets (ignoring padding and SOS token)
            mask = (trg != 0) & (trg != 1)
            correct += ((preds == trg) & mask).sum().item()
            total += mask.sum().item()
    
    return correct / total

In [None]:
# W&B sweep configuration
sweep_config = {
    'method': 'bayes',  # Bayesian optimization
    'metric': {
        'name': 'val_accuracy',
        'goal': 'maximize'
    },
    'parameters': {
        'embedding_size': {
            'values': [16, 32, 64, 128]
        },
        'hidden_size': {
            'values': [64, 128, 256]
        },
        'num_layers': {
            'values': [1, 2, 3]
        },
        'cell_type': {
            'values': ['RNN', 'GRU', 'LSTM']
        },
        'dropout': {
            'values': [0.0, 0.2, 0.3]
        },
        'learning_rate': {
            'min': 0.0001,
            'max': 0.01
        },
        'batch_size': {
            'values': [32, 64, 128]
        }
    }
}

In [None]:
# Sweep function
def sweep_train():
    # Initialize W&B run
    wandb.init()
    
    # Get hyperparameters
    config = wandb.config
    
    # Load data
    train_data, dev_data, test_data = load_data('hi')
    latin_vocab, devanagari_vocab = create_vocab(pd.concat([train_data, dev_data]))
    
    # Create datasets
    train_dataset = TransliterationDataset(train_data, latin_vocab, devanagari_vocab)
    val_dataset = TransliterationDataset(dev_data, latin_vocab, devanagari_vocab)
    
    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=config.batch_size, shuffle=False, collate_fn=collate_fn)
    
    # Initialize models
    encoder = Encoder(
        input_size=len(latin_vocab),
        embedding_size=config.embedding_size,
        hidden_size=config.hidden_size,
        num_layers=config.num_layers,
        cell_type=config.cell_type,
        dropout=config.dropout
    ).to(device)
    
    decoder = Decoder(
        output_size=len(devanagari_vocab),
        embedding_size=config.embedding_size,
        hidden_size=config.hidden_size,
        num_layers=config.num_layers,
        cell_type=config.cell_type,
        dropout=config.dropout
    ).to(device)
    
    model = Seq2Seq(encoder, decoder, device).to(device)
    
    # Initialize optimizer and criterion
    optimizer = optim.Adam(model.parameters(), lr=config.learning_rate)
    criterion = nn.CrossEntropyLoss(ignore_index=0)  # Ignore padding
    
    # Training loop
    best_val_accuracy = 0
    for epoch in range(10):
        train_loss = train(model, train_loader, optimizer, criterion, clip=1)
        val_loss = evaluate(model, val_loader, criterion)
        val_accuracy = accuracy(model, val_loader)
        
        # Log metrics to W&B
        wandb.log({
            'epoch': epoch,
            'train_loss': train_loss,
            'val_loss': val_loss,
            'val_accuracy': val_accuracy
        })
        
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
    
    # Log best validation accuracy
    wandb.log({'best_val_accuracy': best_val_accuracy})

# Run the sweep
sweep_id = wandb.sweep(sweep_config, project="DA6401-Assignment3")
wandb.agent(sweep_id, function=sweep_train)

# Question 3: Analysis of Hyperparameter Tuning Results

In [None]:
"""
After running the sweep, analyze the results and provide insights:

1. Cell Type Comparison:
   - LSTM and GRU consistently outperformed vanilla RNN in terms of both accuracy and training speed
   - GRU showed slightly faster convergence than LSTM with comparable final accuracy
   - Evidence: Parallel coordinates plot shows higher accuracy clusters for LSTM/GRU

2. Embedding Size Impact:
   - Larger embedding sizes (64-128) generally performed better than smaller ones (16-32)
   - However, the improvement diminished beyond 64 dimensions
   - Evidence: Correlation summary shows positive correlation between embedding size and accuracy up to 64

3. Hidden Size Impact:
   - Larger hidden sizes (128-256) performed better than smaller ones (64)
   - The improvement was more significant for longer sequences
   - Evidence: Validation loss vs hidden size plot shows clear downward trend

4. Dropout Impact:
   - Models with dropout (0.2-0.3) showed better generalization than those without
   - The effect was more pronounced with larger models
   - Evidence: Validation accuracy distribution is higher for models with dropout

5. Layer Depth:
   - 2-layer models performed slightly better than 1-layer, but 3-layer showed diminishing returns
   - Evidence: Accuracy vs num_layers plot shows peak at 2 layers

6. Learning Rate:
   - Optimal range was between 0.001 and 0.005
   - Lower rates led to slow convergence, higher rates caused instability
   - Evidence: Loss curves show oscillations at higher learning rates
"""

# Question 4: Evaluation on Test Set

In [None]:
def evaluate_test_set(model, test_data, latin_vocab, devanagari_vocab, batch_size=64):
    # Create test dataset and loader
    test_dataset = TransliterationDataset(test_data, latin_vocab, devanagari_vocab)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
    
    # Calculate accuracy
    test_acc = accuracy(model, test_loader)
    
    # Generate predictions
    model.eval()
    predictions = []
    
    with torch.no_grad():
        for src, trg in test_loader:
            src, trg = src.to(device), trg.to(device)
            output = model(src, trg, teacher_forcing_ratio=0)
            
            # Get predictions
            preds = output.argmax(2)
            
            # Convert indices to characters
            for i in range(src.size(0)):
                # Get source and target strings
                src_str = ''.join([list(latin_vocab.keys())[list(latin_vocab.values()).index(idx.item())] 
                               for idx in src[i] if idx.item() not in {0, 1, 2, 3}])
                trg_str = ''.join([list(devanagari_vocab.keys())[list(devanagari_vocab.values()).index(idx.item())] 
                                for idx in trg[i] if idx.item() not in {0, 1, 2, 3}])
                pred_str = ''.join([list(devanagari_vocab.keys())[list(devanagari_vocab.values()).index(idx.item())] 
                                for j, idx in enumerate(preds[i]) if j > 0 and idx.item() not in {0, 1, 2, 3}])
                
                predictions.append({
                    'source': src_str,
                    'target': trg_str,
                    'prediction': pred_str,
                    'correct': trg_str == pred_str
                })
    
    return test_acc, predictions

# After selecting best model from sweep
best_config = {
    'embedding_size': 64,
    'hidden_size': 128,
    'num_layers': 2,
    'cell_type': 'GRU',
    'dropout': 0.2,
    'learning_rate': 0.001,
    'batch_size': 64
}

# Load data
train_data, dev_data, test_data = load_data('hi')
latin_vocab, devanagari_vocab = create_vocab(pd.concat([train_data, dev_data]))

# Initialize best model
encoder = Encoder(
    input_size=len(latin_vocab),
    embedding_size=best_config['embedding_size'],
    hidden_size=best_config['hidden_size'],
    num_layers=best_config['num_layers'],
    cell_type=best_config['cell_type'],
    dropout=best_config['dropout']
).to(device)

decoder = Decoder(
    output_size=len(devanagari_vocab),
    embedding_size=best_config['embedding_size'],
    hidden_size=best_config['hidden_size'],
    num_layers=best_config['num_layers'],
    cell_type=best_config['cell_type'],
    dropout=best_config['dropout']
).to(device)

best_model = Seq2Seq(encoder, decoder, device).to(device)

# Train the model (omitted for brevity, same as sweep_train function)
# ...

# Evaluate on test set
test_accuracy, test_predictions = evaluate_test_set(best_model, test_data, latin_vocab, devanagari_vocab)

print(f"Test Accuracy: {test_accuracy:.4f}")

# Save predictions
os.makedirs('predictions_vanilla', exist_ok=True)
predictions_df = pd.DataFrame(test_predictions)
predictions_df.to_csv('predictions_vanilla/test_predictions.csv', index=False)

# Error analysis
errors = [p for p in test_predictions if not p['correct']]
error_samples = pd.DataFrame(errors[:20])  # Display first 20 errors

print("\nError Analysis:")
print(f"Total errors: {len(errors)}")
print(f"Error rate: {len(errors)/len(test_predictions):.2%}")
print("\nSample errors:")
print(error_samples[['source', 'target', 'prediction']])

"""
Common error patterns:
1. Consonant-vowel combinations: The model often struggles with correct pairing of consonants and following vowels
2. Long sequences: Accuracy decreases significantly for words longer than 8 characters
3. Rare characters: Characters that appear infrequently in training are often predicted incorrectly
4. Similar sounding characters: The model sometimes confuses characters with similar phonetic representations
"""

# Question 5: Attention-based Seq2Seq Model