In [None]:
import wandb
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import pandas as pd
import os
from dataclasses import dataclass

# ======================
# 1. Data Loading Components
# ======================
class CharTokenizer:
    def __init__(self):
        self.special_tokens = {'<pad>': 0, '<sos>': 1, '<eos>': 2}
        self.char2idx = {}
        self.idx2char = {}
        
    def build_vocab(self, texts):
        self.char2idx = self.special_tokens.copy()
        chars = set()
        for text in texts:
            chars.update(text)
        for char in sorted(chars):
            if char not in self.char2idx:
                self.char2idx[char] = len(self.char2idx)
        self.idx2char = {v: k for k, v in self.char2idx.items()}
        self.vocab_size = len(self.char2idx)

class TransliterationDataset(Dataset):
    def __init__(self, src_sequences, tgt_sequences):
        self.src = src_sequences
        self.tgt = tgt_sequences
        
    def __len__(self):
        return len(self.src)
    
    def __getitem__(self, idx):
        return self.src[idx], self.tgt[idx]

def load_data(batch_size):
    base_dir = 'dakshina_dataset_v1.0/hi/lexicons/'
    train_df = pd.read_csv(os.path.join(base_dir, 'hi.translit.sampled.train.tsv'), sep='\t', names=['devanagari', 'latin', 'freq'])
    val_df = pd.read_csv(os.path.join(base_dir, 'hi.translit.sampled.dev.tsv'), sep='\t', names=['devanagari', 'latin', 'freq'])

    # Fix: Ensure all entries are strings and handle NaN/floats
    train_df['latin'] = train_df['latin'].fillna('').astype(str)
    train_df['devanagari'] = train_df['devanagari'].fillna('').astype(str)
    val_df['latin'] = val_df['latin'].fillna('').astype(str)
    val_df['devanagari'] = val_df['devanagari'].fillna('').astype(str)

    # Now continue as before...
    train_inputs = train_df['latin'].str.strip().tolist()
    train_targets = train_df['devanagari'].str.strip().tolist()
    val_inputs = val_df['latin'].str.strip().tolist()
    val_targets = val_df['devanagari'].str.strip().tolist()
    # Build tokenizers
    input_tokenizer = CharTokenizer()
    input_tokenizer.build_vocab(train_inputs)
    output_tokenizer = CharTokenizer()
    output_tokenizer.build_vocab(train_targets)

    # Convert words to sequences
    def process_sequences(words, tokenizer):
        sequences = []
        for word in words:
            seq = [tokenizer.char2idx['<sos>']]
            seq += [tokenizer.char2idx.get(c, 0) for c in word]  # 0 for unknown
            seq += [tokenizer.char2idx['<eos>']]
            sequences.append(torch.tensor(seq, dtype=torch.long))
        return sequences

    # Create and pad sequences
    train_src = pad_sequence(process_sequences(train_inputs, input_tokenizer), batch_first=True)
    train_tgt = pad_sequence(process_sequences(train_targets, output_tokenizer), batch_first=True)
    val_src = pad_sequence(process_sequences(val_inputs, input_tokenizer), batch_first=True)
    val_tgt = pad_sequence(process_sequences(val_targets, output_tokenizer), batch_first=True)

    # Create DataLoaders
    train_loader = DataLoader(TransliterationDataset(train_src, train_tgt), 
                             batch_size=batch_size, shuffle=True,drop_last=True)
    val_loader = DataLoader(TransliterationDataset(val_src, val_tgt), 
                           batch_size=batch_size,drop_last=True)
    
    return train_loader, val_loader, input_tokenizer, output_tokenizer

# ======================
# 2. Model Architecture
# ======================
@dataclass
class ModelConfig:
    input_vocab_size: int
    output_vocab_size: int
    embedding_size: int
    hidden_size: int
    encoder_layers: int
    decoder_layers: int
    cell_type: str
    dropout: float

class Encoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embedding = nn.Embedding(config.input_vocab_size, config.embedding_size)
        rnn_map = {'rnn': nn.RNN, 'gru': nn.GRU, 'lstm': nn.LSTM}
        self.rnn = rnn_map[config.cell_type](
            config.embedding_size, config.hidden_size,
            num_layers=config.encoder_layers,
            dropout=config.dropout if config.encoder_layers > 1 else 0,
            batch_first=True
        )
        
    def forward(self, x):
        embedded = self.embedding(x)
        _, hidden = self.rnn(embedded)
        return hidden

class Decoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embedding = nn.Embedding(config.output_vocab_size, config.embedding_size)
        rnn_map = {'rnn': nn.RNN, 'gru': nn.GRU, 'lstm': nn.LSTM}
        self.rnn = rnn_map[config.cell_type](
            config.embedding_size, config.hidden_size,
            num_layers=config.decoder_layers,
            dropout=config.dropout if config.decoder_layers > 1 else 0,
            batch_first=True
        )
        self.fc = nn.Linear(config.hidden_size, config.output_vocab_size)
        
    def forward(self, x, hidden):
        embedded = self.embedding(x)
        output, hidden = self.rnn(embedded, hidden)
        return self.fc(output), hidden

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = trg.size(0)
        trg_len = trg.size(1)
        outputs = torch.zeros(batch_size, trg_len, self.decoder.fc.out_features).to(self.device)
        
        # Get encoder's hidden state
        encoder_hidden = self.encoder(src)
        
        # Adjust hidden state dimensions for decoder
        if isinstance(encoder_hidden, tuple):  # LSTM case
            h, c = encoder_hidden
            decoder_layers = self.decoder.rnn.num_layers
            
            # Adjust hidden states
            h = self._adjust_hidden(h, decoder_layers)
            c = self._adjust_hidden(c, decoder_layers)
            decoder_hidden = (h, c)
        else:  # GRU/RNN case
            decoder_hidden = self._adjust_hidden(encoder_hidden, self.decoder.rnn.num_layers)
        
        # Decoder initialization
        input = trg[:, 0].unsqueeze(1)
        
        # Decoder forward with teacher forcing
        for t in range(1, trg_len):
            output, decoder_hidden = self.decoder(input, decoder_hidden)
            outputs[:, t] = output.squeeze(1)
            
            # Teacher forcing
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            input = trg[:, t].unsqueeze(1) if teacher_force else output.argmax(-1)
            
        return outputs

    def _adjust_hidden(self, hidden, target_layers):
        current_layers = hidden.size(0)
        if current_layers == target_layers:
            return hidden
        elif current_layers < target_layers:
            # Repeat last layer to match decoder layers
            diff = target_layers - current_layers
            last_layer = hidden[-1:, :, :]
            return torch.cat([hidden, last_layer.repeat(diff, 1, 1)], dim=0)
        else:
            # Trim extra layers
            return hidden[:target_layers]



# ======================
# 3. Training Components
# ======================
def train():
    wandb.init()
    config = wandb.config
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Load data
    train_loader, val_loader, input_tokenizer, output_tokenizer = load_data(config.batch_size)

    # Model config
    model_config = ModelConfig(
        input_vocab_size=input_tokenizer.vocab_size,
        output_vocab_size=output_tokenizer.vocab_size,
        embedding_size=config.embedding_size,
        hidden_size=config.hidden_size,
        encoder_layers=config.encoder_layers,
        decoder_layers=config.decoder_layers,
        cell_type=config.cell_type,
        dropout=config.dropout
    )

    # Initialize model
    encoder = Encoder(model_config)
    decoder = Decoder(model_config)
    model = Seq2Seq(encoder, decoder, device).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate)
    criterion = nn.CrossEntropyLoss(ignore_index=0)

    best_val_acc = 0.0
    epochs_no_improve = 0
    patience = 5  

    for epoch in range(config.epochs):
        model.train()
        train_loss = 0
        train_correct = 0
        train_total = 0

        for src, trg in train_loader:
            src, trg = src.to(device), trg.to(device)

            optimizer.zero_grad()
            output = model(src, trg, config.teacher_forcing)

            # Calculate loss
            output_flat = output[:, 1:].reshape(-1, output.shape[-1])
            targets_flat = trg[:, 1:].reshape(-1)
            loss = criterion(output_flat, targets_flat)

            # Calculate training accuracy
            predictions = output_flat.argmax(-1)
            mask = targets_flat != 0  # Ignore padding
            train_correct += (predictions[mask] == targets_flat[mask]).sum().item()
            train_total += mask.sum().item()

            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        avg_train_loss = train_loss / len(train_loader)
        train_acc = train_correct / train_total if train_total > 0 else 0

        # Validation
        val_loss, val_acc = evaluate(model, val_loader, criterion, device)

        # Early stopping check
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            epochs_no_improve = 0
            torch.save({
                'model_state': model.state_dict(),
                'input_tokenizer': input_tokenizer.char2idx,
                'output_tokenizer': output_tokenizer.char2idx,
                'config': model_config.__dict__
            }, 'best_model.pth')
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                # print(f"Early stopping triggered. No improvement in {patience} consecutive epochs.")
                break

        # Log metrics
        wandb.log({
            "epoch": epoch,
            "train_loss": avg_train_loss,
            "train_acc": train_acc,
            "val_loss": val_loss,
            "val_acc": val_acc,
            "best_val_acc": best_val_acc
        })

        print(f"Epoch {epoch+1:03} | "
              f"Train Loss: {avg_train_loss:.4f} | Train Acc: {train_acc:.2%} | "
              f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2%}")


def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for src, trg in loader:
            src, trg = src.to(device), trg.to(device)
            output = model(src, trg, teacher_forcing_ratio=0)
            
            # Calculate loss
            output_flat = output[:, 1:].reshape(-1, output.shape[-1])
            targets_flat = trg[:, 1:].reshape(-1)
            loss = criterion(output_flat, targets_flat)
            total_loss += loss.item()
            
            # Calculate accuracy
            predictions = output.argmax(-1)
            mask = trg != 0
            correct += (predictions[mask] == trg[mask]).sum().item()
            total += mask.sum().item()
    
    avg_loss = total_loss / len(loader)
    accuracy = correct / total if total > 0 else 0
    return avg_loss, accuracy


# ======================
# 4. Sweep Configuration
# ======================
sweep_config = {
    "method": "Vanilla Sweep - 4", 
    "metric": {"name": "val_acc", "goal": "maximize"},
    "parameters": {
        "embedding_size": {"values": [16, 32, 64, 128, 256]},
        "hidden_size": {"values": [16, 32, 64, 128, 256]},
        "encoder_layers": {"values": [1, 2, 3]},
        "decoder_layers": {"values": [1, 2, 3]},
        "cell_type": {"values": ["rnn", "gru", "lstm"]},
        "dropout": {"values": [0.2, 0.3]},
        "batch_size": {"values": [64, 128]},
        "learning_rate": {"values": [0.001, 0.0005]},
        "teacher_forcing": {"values": [0.5, 0.7]},
        "epochs": {"values": [15, 20]}
    }
}

if __name__ == "__main__":
    sweep_id = wandb.sweep(sweep_config, project="DA6401_Assignment_3")
    wandb.agent(sweep_id, train,count=50)

import torch
import pandas as pd
import os
from torch.nn.utils.rnn import pad_sequence

def load_test_data(tokenizer_path='best_model.pth'):
    # Load saved model and tokenizers
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    device = 'cpu'
    checkpoint = torch.load(tokenizer_path, map_location=device)
    
    # Load test data
    base_dir = 'dakshina_dataset_v1.0/hi/lexicons/'
    test_path = os.path.join(base_dir, 'hi.translit.sampled.test.tsv')
    
    # Handle compressed version
    if not os.path.exists(test_path):
        test_path += '.gz'
    
    test_df = pd.read_csv(test_path, sep='\t', 
                         names=['devanagari', 'latin', 'freq'],
                         compression='gzip' if test_path.endswith('.gz') else None)
    
    # Clean data
    test_df['latin'] = test_df['latin'].fillna('').astype(str).str.strip()
    test_df['devanagari'] = test_df['devanagari'].fillna('').astype(str).str.strip()
    
    return test_df['latin'].tolist(), test_df['devanagari'].tolist()

class Translator:
    def __init__(self, model_path='best_model.pth'):
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        device = 'cpu'
        checkpoint = torch.load(model_path, map_location=device)

        # Rebuild config as a proper object
        class TempConfig:
            def __init__(self, config_dict):
                self.__dict__.update(config_dict)

        config = TempConfig(checkpoint['config'])

        # Initialize encoder and decoder
        encoder = Encoder(config)
        decoder = Decoder(config)
        self.model = Seq2Seq(encoder, decoder, device).to(device)
        self.model.load_state_dict(checkpoint['model_state'])
        self.model.eval()

        # ✅ Load and assign tokenizers
        self.input_tokenizer = checkpoint['input_tokenizer']
        self.output_tokenizer = checkpoint['output_tokenizer']
        self.output_inv_vocab = {v: k for k, v in self.output_tokenizer.items()}

        self.model.device = device  # Ensure model.device is set

    def translate(self, word, max_length=20):
        # Convert input word to input token indices
        seq = [1]  # SOS
        for c in word:
            seq.append(self.input_tokenizer.get(c, 0))  # 0 = unknown
        seq.append(2)  # EOS

        src = torch.tensor(seq).unsqueeze(0).to(self.model.device)

        with torch.no_grad():
            hidden = self.model.encoder(src)
            trg = torch.tensor([[1]], device=self.model.device)  # Start with SOS

            output_chars = []
            for _ in range(max_length):
                output, hidden = self.model.decoder(trg, hidden)
                pred = output.argmax(-1).item()
                char = self.output_inv_vocab.get(pred, '<unk>')
                if char == '<eos>':
                    break
                output_chars.append(char)
                trg = torch.tensor([[pred]], device=self.model.device)

        return ''.join(output_chars)

# (a) Calculate Test Accuracy
def evaluate_test_set():
    # Load data
    input_words, output_words = load_test_data()
    
    # Initialize translator
    translator = Translator()
    
    correct = 0
    total = 0
    results = []
    
    for latin, devanagari in zip(input_words, output_words):
        pred = translator.translate(latin)
        results.append((latin, pred, devanagari))
        if pred == devanagari:
            correct += 1
        total += 1
    
    accuracy = correct / total
    print(f"Test Accuracy: {accuracy:.2%}")
    return results, accuracy




# (b) Generate predictions
def save_predictions(results, folder='predictions_vanilla'):
    os.makedirs(folder, exist_ok=True)
    
    # Save CSV
    df = pd.DataFrame(results, columns=['Input', 'Prediction', 'Target'])
    df.to_csv(os.path.join(folder, 'predictions.csv'), index=False)
    
    # Save sample comparisons
    sample_df = df.sample(10, random_state=42)
    print("\nSample Predictions:")
    print(sample_df.to_markdown(index=False))
    
    # Save all predictions
    with open(os.path.join(folder, 'all_predictions.txt'), 'w') as f:
        for row in results:
            f.write(f"Input: {row[0]}\nPrediction: {row[1]}\nTarget: {row[2]}\n\n")

# Execute
# if __name__ == "__main__":
test_results, test_acc = evaluate_test_set()
save_predictions(test_results)

import torch
import pandas as pd
import os
import time
from tqdm import tqdm
from torch.nn.utils.rnn import pad_sequence

def load_test_data(tokenizer_path='best_model.pth', limit=None):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    device = 'cpu'
    checkpoint = torch.load(tokenizer_path, map_location=device)

    base_dir = 'dakshina_dataset_v1.0/hi/lexicons/'
    test_path = os.path.join(base_dir, 'hi.translit.sampled.test.tsv')

    if not os.path.exists(test_path):
        test_path += '.gz'

    test_df = pd.read_csv(test_path, sep='\t', 
                          names=['devanagari', 'latin', 'freq'],
                          compression='gzip' if test_path.endswith('.gz') else None)

    test_df['latin'] = test_df['latin'].fillna('').astype(str).str.strip()
    test_df['devanagari'] = test_df['devanagari'].fillna('').astype(str).str.strip()

    if limit is not None:
        test_df = test_df.head(limit)

    return test_df['latin'].tolist(), test_df['devanagari'].tolist()


class Translator:
    def __init__(self, model_path='best_model.pth'):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.device = 'cpu'
        checkpoint = torch.load(model_path, map_location=self.device)

        class TempConfig:
            def __init__(self, config_dict):
                self.__dict__.update(config_dict)

        config = TempConfig(checkpoint['config'])

        self.model = Seq2Seq(Encoder(config), Decoder(config), self.device).to(self.device)
        self.model.load_state_dict(checkpoint['model_state'])
        self.model.eval()

        self.input_tokenizer = checkpoint['input_tokenizer']
        self.output_tokenizer = checkpoint['output_tokenizer']
        self.output_inv_vocab = {v: k for k, v in self.output_tokenizer.items()}
        self.config = config

    def adjust_hidden(self, hidden, target_layers):
        def pad_or_trim(h, target_layers):
            current_layers = h.size(0)
            if current_layers == target_layers:
                return h
            elif current_layers < target_layers:
                last_layer = h[-1:, :, :]
                return torch.cat([h, last_layer.repeat(target_layers - current_layers, 1, 1)], dim=0)
            else:
                return h[:target_layers]

        if isinstance(hidden, tuple):
            h, c = hidden
            return pad_or_trim(h, target_layers), pad_or_trim(c, target_layers)
        return pad_or_trim(hidden, target_layers)

    def translate(self, word, max_length=20):
        seq = [1] + [self.input_tokenizer.get(c, 0) for c in word] + [2]  # SOS and EOS
        src = torch.tensor(seq).unsqueeze(0).to(self.device)

        with torch.no_grad():
            hidden = self.model.encoder(src)
            hidden = self.adjust_hidden(hidden, self.model.decoder.rnn.num_layers)

            trg = torch.tensor([[1]], device=self.device)  # SOS
            output_chars = []

            for _ in range(max_length):
                output, hidden = self.model.decoder(trg, hidden)
                pred = output.argmax(-1).item()
                char = self.output_inv_vocab.get(pred, '<unk>')
                if char == '<eos>':
                    break
                output_chars.append(char)
                trg = torch.tensor([[pred]], device=self.device)

        return ''.join(output_chars)


def evaluate_test_set(limit=None):
    input_words, output_words = load_test_data(limit=limit)
    translator = Translator()

    correct = 0
    total = 0
    results = []

    print(f"Translating {len(input_words)} test samples...")
    for latin, devanagari in tqdm(zip(input_words, output_words), total=len(input_words)):
        pred = translator.translate(latin)
        results.append((latin, pred, devanagari))
        correct += int(pred == devanagari)
        total += 1

    accuracy = correct / total
    print(f"\n✅ Test Accuracy: {accuracy:.2%}")
    return results, accuracy


def save_predictions(results, folder='predictions_vanilla'):
    os.makedirs(folder, exist_ok=True)
    
    # Save CSV with all predictions
    df = pd.DataFrame(results, columns=['Input', 'Prediction', 'Target'])
    df.to_csv(os.path.join(folder, 'predictions.csv'), index=False)
    
    # Separate true and false predictions
    df['Correct'] = df['Prediction'] == df['Target']
    true_df = df[df['Correct']]
    false_df = df[~df['Correct']]
    
    # Save true predictions
    true_df.to_csv(os.path.join(folder, 'true_predictions.csv'), index=False)
    # Save false predictions
    false_df.to_csv(os.path.join(folder, 'false_predictions.csv'), index=False)
    
    # Save sample comparisons
    sample_df = df.sample(10, random_state=42)
    print("\nSample Predictions:")
    print(sample_df[['Input', 'Prediction', 'Target', 'Correct']].to_markdown(index=False))
    
    # Save all predictions as text
    with open(os.path.join(folder, 'all_predictions.txt'), 'w') as f:
        for _, row in df.iterrows():
            f.write(f"Input: {row['Input']}\nPrediction: {row['Prediction']}\nTarget: {row['Target']}\nCorrect: {row['Correct']}\n\n")


if __name__ == "__main__":
    start_time = time.time()
    
    # Set limit=100 if debugging, or None for full test set
    test_results, test_acc = evaluate_test_set(limit=None)
    
    print(f"\n⏱️ Total Time Taken: {time.time() - start_time:.2f} seconds")
    save_predictions(test_results)

