# Assignment 3

Import required libraries

In [1]:
import torch
import random
import os
import numpy as np

In [2]:
# Set seeds for reproducibility
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True

set_seed(42)

In [3]:
# Check for CUDA
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cuda


## Load and Prepare Data

In [None]:
import pandas
from torch.utils.data import Dataset, DataLoader

from dataset import TransliterationDataset, collate_fn

In [5]:
def prepare_data(dataset_path, batch_size=64, shuffle=True):
    dataset = TransliterationDataset(dataset_path)
    data_loader = DataLoader(
        dataset, 
        batch_size=batch_size, 
        shuffle=shuffle, 
        collate_fn=collate_fn
    )

    return dataset, data_loader

In [6]:
# Dataset paths
train_path = 'dakshina_dataset_v1.0\ml\lexicons\ml.translit.sampled.train.tsv'
val_path = 'dakshina_dataset_v1.0\ml\lexicons\ml.translit.sampled.dev.tsv'
test_path = 'dakshina_dataset_v1.0\ml\lexicons\ml.translit.sampled.test.tsv'

# Create dataloaders
train_dataset, train_loader = prepare_data(train_path, batch_size=64)
val_dataset, val_loader = prepare_data(val_path, batch_size=64)
test_dataset, test_loader = prepare_data(test_path, batch_size=64)

  train_path = 'dakshina_dataset_v1.0\ml\lexicons\ml.translit.sampled.train.tsv'
  val_path = 'dakshina_dataset_v1.0\ml\lexicons\ml.translit.sampled.dev.tsv'
  test_path = 'dakshina_dataset_v1.0\ml\lexicons\ml.translit.sampled.test.tsv'


In [7]:
# Model parameters
input_size = train_dataset.get_vocab_size('source')
output_size = train_dataset.get_vocab_size('target')

# Print vocabulary sizes
print(f"Source vocabulary size: {input_size}")
print(f"Target vocabulary size: {output_size}")

Source vocabulary size: 29
Target vocabulary size: 73


## Train Model

In [8]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tqdm import trange

from models import Encoder, Decoder, Seq2Seq
from training import train, evaluate, transliterate, calculate_accuracy

In [None]:
def train_model(input_size, output_size, train_loader, val_loader, device, embedding_size=256, hidden_size=512, 
                n_layers=1, dropout=0.1, cell_type='lstm', epochs=10, teacher_forcing_ratio=0.5, clip=1.0):  
    print(f"Using device: {device}")
    
    # Create model
    encoder = Encoder(
        input_size=input_size,
        embedding_size=embedding_size,
        hidden_size=hidden_size,
        n_layers=n_layers,
        dropout=dropout,
        cell_type=cell_type
    ).to(device)
    
    decoder = Decoder(
        output_size=output_size,
        embedding_size=embedding_size,
        hidden_size=hidden_size,
        n_layers=n_layers,
        dropout=dropout,
        cell_type=cell_type
    ).to(device)
    
    model = Seq2Seq(encoder, decoder, device).to(device)
    
    # Define optimizer and loss function
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss(ignore_index=0)  # Ignore padding index (0)

    best_val_loss = float('inf')
    
    # Lists to store losses
    train_losses = []
    val_losses = []

    # Training loop
    print("Starting training...")
    pbar = trange(epochs, desc="Epoch", dynamic_ncols=True)
    for epoch in pbar:
        # Train
        train_loss = train(
            model=model,
            device=device,
            dataloader=train_loader,
            optimizer=optimizer,
            criterion=criterion,
            clip=clip,
            teacher_forcing_ratio=teacher_forcing_ratio
        )
        train_losses.append(train_loss)
        
        # Validate
        val_loss = evaluate(
            model=model,
            device=device,
            dataloader=val_loader,
            criterion=criterion
        )
        val_losses.append(val_loss)
        
        # Update tqdm bar description
        pbar.set_description(f"Epoch {epoch+1}/{epochs}")
        pbar.set_postfix(train_loss=f"{train_loss:.4f}", val_loss=f"{val_loss:.4f}")
            
        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model.pth')
            print("Best model saved!")

    return train_losses, val_losses

In [19]:
train_losses, val_losses = train_model(input_size=input_size, output_size=output_size, train_loader=train_loader, 
                                       val_loader=val_loader, device=device)

Using device: cuda
Starting training...


Epoch 1/20:   5%|▌         | 1/20 [00:48<15:18, 48.34s/it, train_loss=1.2254, val_loss=9.5475]

Best model saved!


Epoch 4/20:  20%|██        | 4/20 [03:16<13:05, 49.09s/it, train_loss=0.2214, val_loss=12.8525]


KeyboardInterrupt: 

### Visualize training results

In [None]:
import matplotlib.pyplot as plt

In [None]:
# Plot loss curves
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')
plt.savefig('loss_plot.png')
plt.close()

## Hyperparameter Tuning with Wandb

In [None]:
import wandb

In [None]:
# Train the model with wandb integration
def train_with_wandb(config=None):
    with wandb.init(config=config):
        # Access wandb config
        config = wandb.config
        
        # Load datasets
        train_dataset = TransliterationDataset('dakshina_dataset_v1.0/hi/lexicons/hi.translit.sampled.train.tsv')
        val_dataset = TransliterationDataset('dakshina_dataset_v1.0/hi/lexicons/hi.translit.sampled.dev.tsv')
        
        # Make sure we're using the same vocabulary across datasets
        val_dataset.source_char_to_idx = train_dataset.source_char_to_idx
        val_dataset.source_idx_to_char = train_dataset.source_idx_to_char
        val_dataset.target_char_to_idx = train_dataset.target_char_to_idx
        val_dataset.target_idx_to_char = train_dataset.target_idx_to_char
        
        # Create dataloaders
        train_dataloader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True, collate_fn=collate_fn)
        val_dataloader = DataLoader(val_dataset, batch_size=config.batch_size, shuffle=False, collate_fn=collate_fn)
        
        # Initialize model components
        input_size = train_dataset.get_vocab_size('source')
        output_size = train_dataset.get_vocab_size('target')
        
        encoder = Encoder(
            input_size=input_size, 
            embedding_size=config.embedding_size, 
            hidden_size=config.hidden_size, 
            n_layers=config.encoder_layers, 
            dropout=config.dropout,
            cell_type=config.cell_type
        )
        
        decoder = Decoder(
            output_size=output_size, 
            embedding_size=config.embedding_size, 
            hidden_size=config.hidden_size, 
            n_layers=config.decoder_layers, 
            dropout=config.dropout,
            cell_type=config.cell_type
        )
        
        model = Seq2Seq(encoder, decoder, device).to(device)
        
        # Set up optimizer and loss function
        optimizer = optim.Adam(model.parameters(), lr=config.learning_rate)
        criterion = nn.CrossEntropyLoss(ignore_index=train_dataset.target_char_to_idx['<PAD>'])
        
        # Track best validation loss
        best_valid_loss = float('inf')
        
        # Training loop
        for epoch in range(config.epochs):
            # Train
            train_loss = train(model, train_dataloader, optimizer, criterion, 
                             clip=config.clip, teacher_forcing_ratio=config.teacher_forcing_ratio)
            
            # Evaluate
            val_loss = evaluate(model, val_dataloader, criterion)
            
            # Log metrics
            wandb.log({
                'train_loss': train_loss,
                'val_loss': val_loss,
                'epoch': epoch
            })
            
            # Save best model
            if val_loss < best_valid_loss:
                best_valid_loss = val_loss
                torch.save(model.state_dict(), f'model_{config.cell_type}.pt')
                print(f'Model saved at epoch {epoch+1}')
            
            print(f'Epoch: {epoch+1}')
            print(f'\tTrain Loss: {train_loss:.3f}')
            print(f'\tVal Loss: {val_loss:.3f}')
        
        # Evaluation on validation set
        val_accuracy, val_predictions = calculate_accuracy(model, val_dataloader, train_dataset)
        
        # Log final metrics
        wandb.log({
            'val_accuracy': val_accuracy
        })
        
        print(f'Validation Accuracy: {val_accuracy:.3f}')
        
        # Sample predictions
        for i, (source, pred, target) in enumerate(val_predictions[:10]):
            print(f'Source: {source}')
            print(f'Prediction: {pred}')
            print(f'Target: {target}')
            print()

In [None]:
# Define sweep configuration
sweep_config = {
    'method': 'grid',  # We can also use 'random' or 'bayes' for more efficient searching
    'metric': {
        'name': 'val_loss',
        'goal': 'minimize'
    },
    'parameters': {
        'learning_rate': {
            'values': [0.001, 0.0001]
        },
        'batch_size': {
            'values': [32, 64]
        },
        'embedding_size': {
            'values': [64, 128, 256]
        },
        'hidden_size': {
            'values': [128, 256]
        },
        'encoder_layers': {
            'values': [1, 2]
        },
        'decoder_layers': {
            'values': [1, 2]
        },
        'dropout': {
            'values': [0.1, 0.3]
        },
        'cell_type': {
            'values': ['rnn', 'lstm', 'gru']
        },
        'teacher_forcing_ratio': {
            'values': [0.5, 0.7]
        },
        'clip': {
            'values': [1.0]
        },
        'epochs': {
            'values': [10]
        }
    }
}

In [None]:
def run_sweep():
    # Initialize wandb
    sweep_id = wandb.sweep(sweep_config, project='DA6401-Assignment-3')
    
    # Run sweep
    wandb.agent(sweep_id, train_model, count=24)  # You can adjust the count based on resources
    
    # Find best model configuration
    api = wandb.Api()
    sweep = api.sweep(f"your_username/DA6401-Assignment-3/{sweep_id}")
    best_run = sweep.best_run()
    best_config = best_run.config
    
    print("Best Configuration:")
    print(best_config)
    
    return best_config

In [None]:
# Run hyperparameter sweep
best_config = run_sweep()

In [None]:
def analyze_errors(predictions):
    """
    Analyze the errors made by the model
    """
    # Count total predictions and correct predictions
    total = len(predictions)
    correct = sum(1 for _, pred, target in predictions if pred == target)
    
    print(f'Accuracy: {correct/total:.3f} ({correct}/{total})')
    
    # Analyze error patterns
    errors = [(source, pred, target) for source, pred, target in predictions if pred != target]
    
    # Error by length
    length_errors = {}
    for source, _, target in errors:
        length = len(source)
        if length not in length_errors:
            length_errors[length] = 0
        length_errors[length] += 1
    
    # Sort by length
    sorted_length_errors = {k: v for k, v in sorted(length_errors.items())}
    
    plt.figure(figsize=(10, 6))
    plt.bar(sorted_length_errors.keys(), sorted_length_errors.values())
    plt.xlabel('Source Length')
    plt.ylabel('Number of Errors')
    plt.title('Errors by Source Length')
    plt.savefig('predictions_vanilla/errors_by_length.png')
    
    # Sample error analysis
    print("\nSample Error Analysis:")
    for i, (source, pred, target) in enumerate(errors[:10]):
        print(f'Source: {source}')
        print(f'Prediction: {pred}')
        print(f'Target: {target}')
        print()

In [None]:
def test_best_model(best_config):
    # Load datasets
    train_dataset = TransliterationDataset('dakshina_dataset_v1.0/hi/lexicons/hi.translit.sampled.train.tsv')
    test_dataset = TransliterationDataset('dakshina_dataset_v1.0/hi/lexicons/hi.translit.sampled.test.tsv')
    
    # Make sure test dataset uses the same vocabulary as training
    test_dataset.source_char_to_idx = train_dataset.source_char_to_idx
    test_dataset.source_idx_to_char = train_dataset.source_idx_to_char
    test_dataset.target_char_to_idx = train_dataset.target_char_to_idx
    test_dataset.target_idx_to_char = train_dataset.target_idx_to_char
    
    # Create dataloader
    test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)
    
    # Initialize model with best configuration
    input_size = train_dataset.get_vocab_size('source')
    output_size = train_dataset.get_vocab_size('target')
    
    best_encoder = Encoder(
        input_size=input_size, 
        embedding_size=best_config['embedding_size'], 
        hidden_size=best_config['hidden_size'], 
        n_layers=best_config['encoder_layers'], 
        dropout=best_config['dropout'],
        cell_type=best_config['cell_type']
    )
    
    best_decoder = Decoder(
        output_size=output_size, 
        embedding_size=best_config['embedding_size'], 
        hidden_size=best_config['hidden_size'], 
        n_layers=best_config['decoder_layers'], 
        dropout=best_config['dropout'],
        cell_type=best_config['cell_type']
    )
    
    best_model = Seq2Seq(best_encoder, best_decoder, device).to(device)
    
    # Load model parameters
    best_model.load_state_dict(torch.load(f"model_{best_config['cell_type']}.pt"))
    
    # Evaluate on test set
    test_accuracy, test_predictions = calculate_accuracy(best_model, test_dataloader, train_dataset)
    
    print(f'Test Accuracy: {test_accuracy:.3f}')
    
    # Save predictions to file
    with open('predictions_vanilla/test_predictions.txt', 'w', encoding='utf-8') as f:
        for source, pred, target in test_predictions:
            f.write(f'Source: {source}\n')
            f.write(f'Prediction: {pred}\n')
            f.write(f'Target: {target}\n\n')
    
    # Create error analysis
    analyze_errors(test_predictions)
    
    return test_predictions

In [None]:
# Make sure predictions directory exists
os.makedirs('predictions_vanilla', exist_ok=True)

In [None]:
# Test best model
test_predictions = test_best_model(best_config)