In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

## Config

In [1]:
# Configurations

class Config:
    # Model parameters
    embedding_dim = 256
    hidden_dim = 512
    encoder_layers = 1
    decoder_layers = 1
    cell_type = 'LSTM'  # Options: 'RNN', 'LSTM', 'GRU'
    
    # Training parameters
    batch_size = 64
    num_epochs = 20
    learning_rate = 0.001
    teacher_forcing_ratio = 0.5
    
    # Data parameters
    max_input_length = 30
    max_output_length = 30


## Encoder Model

In [2]:
# Encoder Model

import torch
import torch.nn as nn

class Encoder(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, num_layers=1, cell_type='LSTM'):
        super(Encoder, self).__init__()

        self.embedding = nn.Embedding(input_dim, embedding_dim)
        
        if cell_type == 'RNN':
            self.rnn = nn.RNN(embedding_dim, hidden_dim, num_layers, batch_first=True)
        elif cell_type == 'GRU':
            self.rnn = nn.GRU(embedding_dim, hidden_dim, num_layers, batch_first=True)
        elif cell_type == 'LSTM':
            self.rnn = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True)
        else:
            raise ValueError(f"Unknown RNN cell type: {cell_type}")
        
        self.cell_type = cell_type
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

    def forward(self, src):
        # src shape: (batch_size, src_len)
        embedded = self.embedding(src)  # (batch_size, src_len, embedding_dim)
        
        outputs, hidden = self.rnn(embedded)  # outputs: (batch_size, src_len, hidden_dim)
        
        # hidden: for LSTM, it's a tuple (hidden_state, cell_state)
        return outputs, hidden


## Decoder Model

In [3]:
# Decoder Model

class Decoder(nn.Module):
    def __init__(self, output_dim, embedding_dim, hidden_dim, num_layers=1, cell_type='LSTM'):
        super(Decoder, self).__init__()

        self.embedding = nn.Embedding(output_dim, embedding_dim)
        
        if cell_type == 'RNN':
            self.rnn = nn.RNN(embedding_dim, hidden_dim, num_layers, batch_first=True)
        elif cell_type == 'GRU':
            self.rnn = nn.GRU(embedding_dim, hidden_dim, num_layers, batch_first=True)
        elif cell_type == 'LSTM':
            self.rnn = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True)
        else:
            raise ValueError(f"Unknown RNN cell type: {cell_type}")
        
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        
        self.cell_type = cell_type
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

    def forward(self, input, hidden):
        # input shape: (batch_size)
        input = input.unsqueeze(1)  # (batch_size, 1)
        
        embedded = self.embedding(input)  # (batch_size, 1, embedding_dim)
        
        output, hidden = self.rnn(embedded, hidden)  # output: (batch_size, 1, hidden_dim)
        
        prediction = self.fc_out(output.squeeze(1))  # (batch_size, output_dim)
        
        return prediction, hidden


## Seq2Seq wrapper

In [4]:
# Seq2Seq Model

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device, teacher_forcing_ratio=0.5):
        super(Seq2Seq, self).__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        self.teacher_forcing_ratio = teacher_forcing_ratio

        assert encoder.hidden_dim == decoder.hidden_dim, "Encoder and Decoder hidden dimensions must match!"
        assert encoder.num_layers == decoder.num_layers, "Encoder and Decoder must have same number of layers!"
        assert encoder.cell_type == decoder.cell_type, "Encoder and Decoder must have same RNN cell type!"

    def forward(self, src, trg, teacher_forcing=True):
        """
        src: source sequences (batch_size, src_len)
        trg: target sequences (batch_size, trg_len)
        """

        batch_size = src.shape[0]
        trg_len = trg.shape[1]
        output_dim = self.decoder.fc_out.out_features

        # Tensor to store decoder outputs
        outputs = torch.zeros(batch_size, trg_len, output_dim).to(self.device)

        # Encode the source sequence
        encoder_outputs, hidden = self.encoder(src)

        # First input to the decoder is the <sos> tokens (start of sequence)
        input = trg[:, 0]

        for t in range(1, trg_len):
            output, hidden = self.decoder(input, hidden)
            outputs[:, t] = output

            # Decide whether to do teacher forcing
            if teacher_forcing and (torch.rand(1).item() < self.teacher_forcing_ratio):
                input = trg[:, t]  # use actual next token
            else:
                input = output.argmax(1)  # use predicted token

        return outputs


## Dataset preparation

In [5]:
# Data Loading

import pandas as pd
import numpy as np
from torch.utils.data import Dataset, DataLoader

class DakshinaDataset(Dataset):
    def __init__(self, data_path, input_tokenizer, output_tokenizer, max_input_len=30, max_output_len=30):
        """
        data_path: path to the dataset CSV or TXT file
        input_tokenizer: tokenizer for Latin script
        output_tokenizer: tokenizer for Devanagari script
        """
        self.data = pd.read_csv(data_path, sep='\t', header=None, names=['latin', 'devanagari'])
        
        self.input_tokenizer = input_tokenizer
        self.output_tokenizer = output_tokenizer
        self.max_input_len = max_input_len
        self.max_output_len = max_output_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        latin_word = self.data.iloc[idx]['latin']
        devanagari_word = self.data.iloc[idx]['devanagari']

        input_seq = self.input_tokenizer.text_to_sequence(latin_word, self.max_input_len)
        output_seq = self.output_tokenizer.text_to_sequence(devanagari_word, self.max_output_len)

        return {
            'input': torch.tensor(input_seq, dtype=torch.long),
            'target': torch.tensor(output_seq, dtype=torch.long)
        }

# Helper class for tokenizing characters
class CharTokenizer:
    def __init__(self, texts):
        self.char2idx = {'<pad>': 0, '<sos>': 1, '<eos>': 2}
        self.idx2char = {0: '<pad>', 1: '<sos>', 2: '<eos>'}
        
        idx = 3
        for text in texts:
            for ch in text:
                if ch not in self.char2idx:
                    self.char2idx[ch] = idx
                    self.idx2char[idx] = ch
                    idx += 1

    def text_to_sequence(self, text, max_len):
        seq = [self.char2idx.get(ch, 0) for ch in text]  # Unknown characters go to <pad> (0)
        seq = [self.char2idx['<sos>']] + seq + [self.char2idx['<eos>']]
        
        if len(seq) < max_len:
            seq += [self.char2idx['<pad>']] * (max_len - len(seq))
        else:
            seq = seq[:max_len]
        
        return seq

    def sequence_to_text(self, sequence):
        return ''.join([self.idx2char.get(idx, '') for idx in sequence if idx not in [0, 1, 2]])

    def vocab_size(self):
        return len(self.char2idx)


## Training Function

In [5]:
# Training Loop

import torch.optim as optim

def train(model, dataloader, optimizer, criterion, device):
    model.train()
    
    epoch_loss = 0
    
    for batch in dataloader:
        src = batch['input'].to(device)    # (batch_size, src_len)
        trg = batch['target'].to(device)    # (batch_size, trg_len)
        
        optimizer.zero_grad()

        output = model(src, trg)  # output shape: (batch_size, trg_len, output_dim)
        
        # reshape to calculate loss
        output_dim = output.shape[-1]
        output = output[:, 1:].reshape(-1, output_dim)   # skip <sos> for output
        trg = trg[:, 1:].reshape(-1)                     # skip <sos> for target

        loss = criterion(output, trg)
        
        loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()

    return epoch_loss / len(dataloader)


## Validation Function

In [6]:
# Evaluation

def evaluate(model, dataloader, criterion, device):
    model.eval()
    
    epoch_loss = 0
    
    with torch.no_grad():
        for batch in dataloader:
            src = batch['input'].to(device)
            trg = batch['target'].to(device)
            
            output = model(src, trg, teacher_forcing=False)  # No teacher forcing during eval
            
            output_dim = output.shape[-1]
            output = output[:, 1:].reshape(-1, output_dim)
            trg = trg[:, 1:].reshape(-1)

            loss = criterion(output, trg)
            
            epoch_loss += loss.item()

    return epoch_loss / len(dataloader)


## wandb logging setup

In [8]:
# wandb Setup

import wandb
wandb.login(key='f56388c51b488c425a228537fd2d35e5498a3a91')
def init_wandb(project_name, config):
    wandb.init(
        project=project_name,
        config={
            "embedding_dim": config.embedding_dim,
            "hidden_dim": config.hidden_dim,
            "encoder_layers": config.encoder_layers,
            "decoder_layers": config.decoder_layers,
            "cell_type": config.cell_type,
            "batch_size": config.batch_size,
            "learning_rate": config.learning_rate,
            "teacher_forcing_ratio": config.teacher_forcing_ratio,
            "max_input_length": config.max_input_length,
            "max_output_length": config.max_output_length
        }
    )


[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.
[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /Users/sathwikpentela/.netrc
[34m[1mwandb[0m: Currently logged in as: [33mda24m017[0m ([33mda24m017-indian-institute-of-technology-madras[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


## Main

In [None]:
# Main Run

# 1. Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# 2. Initialize config
config = Config()

# 3. Load dataset
train_data_path = 'dakshina_dataset_v1.0/hi/lexicons/romanized_train.tsv'
val_data_path = 'dakshina_dataset_v1.0/hi/lexicons/romanized_val.tsv'

# First, gather all characters for tokenizers
train_df = pd.read_csv(train_data_path, sep='\t', header=None, names=['latin', 'devanagari'])
val_df = pd.read_csv(val_data_path, sep='\t', header=None, names=['latin', 'devanagari'])

input_texts = train_df['latin'].tolist()
output_texts = train_df['devanagari'].tolist()

input_tokenizer = CharTokenizer(input_texts)
output_tokenizer = CharTokenizer(output_texts)

# Create datasets
train_dataset = DakshinaDataset(train_data_path, input_tokenizer, output_tokenizer, config.max_input_length, config.max_output_length)
val_dataset = DakshinaDataset(val_data_path, input_tokenizer, output_tokenizer, config.max_input_length, config.max_output_length)

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=config.batch_size)

# 4. Initialize models
encoder = Encoder(
    input_dim=input_tokenizer.vocab_size(),
    embedding_dim=config.embedding_dim,
    hidden_dim=config.hidden_dim,
    num_layers=config.encoder_layers,
    cell_type=config.cell_type
)

decoder = Decoder(
    output_dim=output_tokenizer.vocab_size(),
    embedding_dim=config.embedding_dim,
    hidden_dim=config.hidden_dim,
    num_layers=config.decoder_layers,
    cell_type=config.cell_type
)

model = Seq2Seq(encoder, decoder, device, config.teacher_forcing_ratio).to(device)

# 5. Loss function and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=0)  # ignore padding index
optimizer = optim.Adam(model.parameters(), lr=config.learning_rate)

# 6. Initialize wandb
init_wandb(project_name="seq2seq-dakshina", config=config)

# 7. Training loop
for epoch in range(config.num_epochs):
    train_loss = train(model, train_loader, optimizer, criterion, device)
    val_loss = evaluate(model, val_loader, criterion, device)
    
    print(f"Epoch [{epoch+1}/{config.num_epochs}] Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")
    
    # Log to wandb
    wandb.log({"train_loss": train_loss, "val_loss": val_loss})
