In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Embedding, Dense, SimpleRNN, LSTM, GRU, Dropout
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
import wandb
from wandb.integration.keras import WandbCallback
import os
import time
import random
from sklearn.model_selection import train_test_split

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)
random.seed(42)

In [2]:
def load_data(file_path):
    """
    Load transliteration data from TSV files
    Returns pairs of (native_script, latin_script)
    """
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split('\t')
            if len(parts) >= 2:
                native_script = parts[0]  # Devanagari
                latin_script = parts[1]   # Romanized
                data.append((native_script, latin_script))
    return data

# Load Hindi transliteration data
data_dir = "dakshina_dataset_v1.0/hi/lexicons"
train_data = load_data(os.path.join(data_dir, "hi.translit.sampled.train.tsv"))
dev_data = load_data(os.path.join(data_dir, "hi.translit.sampled.dev.tsv"))
test_data = load_data(os.path.join(data_dir, "hi.translit.sampled.test.tsv"))

print(f"Training examples: {len(train_data)}")
print(f"Development examples: {len(dev_data)}")
print(f"Test examples: {len(test_data)}")

# Display a few examples
print("\nSample data (Devanagari -> Latin):")
for i in range(5):
    print(f"{train_data[i][0]} -> {train_data[i][1]}")


Training examples: 44204
Development examples: 4358
Test examples: 4502

Sample data (Devanagari -> Latin):
अं -> an
अंकगणित -> ankganit
अंकल -> uncle
अंकुर -> ankur
अंकुरण -> ankuran


In [3]:
def create_tokenizers(data):
    """
    Create character-level tokenization for input and output sequences
    """
    # Get all unique characters in input (Latin) and output (Devanagari) sequences
    input_chars = set()
    output_chars = set()
    
    for native, latin in data:
        for char in latin:
            input_chars.add(char)
        for char in native:
            output_chars.add(char)
    
    # Add special tokens
    input_chars.add('\t')  # Start token
    input_chars.add('\n')  # End token
    output_chars.add('\t')  # Start token
    output_chars.add('\n')  # End token
    
    # Create character-to-index mappings
    input_char_to_idx = {char: i for i, char in enumerate(sorted(list(input_chars)))}
    output_char_to_idx = {char: i for i, char in enumerate(sorted(list(output_chars)))}
    
    # Create index-to-character mappings
    input_idx_to_char = {i: char for char, i in input_char_to_idx.items()}
    output_idx_to_char = {i: char for char, i in output_char_to_idx.items()}
    
    return (input_char_to_idx, input_idx_to_char, 
            output_char_to_idx, output_idx_to_char)

# Create tokenizers using all data
all_data = train_data + dev_data + test_data
input_char_to_idx, input_idx_to_char, output_char_to_idx, output_idx_to_char = create_tokenizers(all_data)

print(f"Input vocabulary size: {len(input_char_to_idx)}")
print(f"Output vocabulary size: {len(output_char_to_idx)}")


Input vocabulary size: 28
Output vocabulary size: 65


In [4]:
def preprocess_data(data, input_char_to_idx, output_char_to_idx, max_input_len=None, max_output_len=None):
    """
    Convert character sequences to integer sequences
    """
    # Determine max lengths if not provided
    if max_input_len is None:
        max_input_len = max([len(latin) for _, latin in data]) + 2  # +2 for start/end tokens
    if max_output_len is None:
        max_output_len = max([len(native) for native, _ in data]) + 2  # +2 for start/end tokens
    
    encoder_input_data = []
    decoder_input_data = []
    decoder_target_data = []
    
    for native, latin in data:
        # Encoder input (Latin script with start/end tokens)
        input_text = '\t' + latin + '\n'
        encoder_input = [input_char_to_idx[char] for char in input_text]
        # Pad encoder input
        encoder_input = encoder_input + [0] * (max_input_len - len(encoder_input))
        encoder_input_data.append(encoder_input)
        
        # Decoder input (Devanagari script with start token)
        target_text = '\t' + native + '\n'
        decoder_input = [output_char_to_idx[char] for char in target_text[:-1]]  # exclude the last character
        # Pad decoder input
        decoder_input = decoder_input + [0] * (max_output_len - 1 - len(decoder_input))
        decoder_input_data.append(decoder_input)
        
        # Decoder target (Devanagari script shifted by one timestep)
        decoder_target = [output_char_to_idx[char] for char in target_text[1:]]  # exclude the first character
        # Pad decoder target
        decoder_target = decoder_target + [0] * (max_output_len - 1 - len(decoder_target))
        decoder_target_data.append(decoder_target)
    
    return (np.array(encoder_input_data), 
            np.array(decoder_input_data), 
            np.array(decoder_target_data), 
            max_input_len, max_output_len)

# Find max sequence lengths
max_input_len = max([len(latin) for _, latin in all_data]) + 2
max_output_len = max([len(native) for native, _ in all_data]) + 2

print(f"Max input sequence length: {max_input_len}")
print(f"Max output sequence length: {max_output_len}")

# Preprocess data
encoder_input_train, decoder_input_train, decoder_target_train, _, _ = preprocess_data(
    train_data, input_char_to_idx, output_char_to_idx, max_input_len, max_output_len)
encoder_input_dev, decoder_input_dev, decoder_target_dev, _, _ = preprocess_data(
    dev_data, input_char_to_idx, output_char_to_idx, max_input_len, max_output_len)
encoder_input_test, decoder_input_test, decoder_target_test, _, _ = preprocess_data(
    test_data, input_char_to_idx, output_char_to_idx, max_input_len, max_output_len)

# Convert target data to one-hot encoding
decoder_target_train_onehot = tf.keras.utils.to_categorical(
    decoder_target_train, num_classes=len(output_char_to_idx))
decoder_target_dev_onehot = tf.keras.utils.to_categorical(
    decoder_target_dev, num_classes=len(output_char_to_idx))
decoder_target_test_onehot = tf.keras.utils.to_categorical(
    decoder_target_test, num_classes=len(output_char_to_idx))

Max input sequence length: 22
Max output sequence length: 21


In [5]:
def create_seq2seq_model(input_vocab_size, output_vocab_size, 
                         embedding_dim=256, hidden_dim=256, 
                         cell_type='lstm', num_encoder_layers=1, num_decoder_layers=1):
    """
    Create a sequence-to-sequence model with the specified parameters
    
    Args:
        input_vocab_size: Size of input vocabulary
        output_vocab_size: Size of output vocabulary
        embedding_dim: Dimension of character embeddings
        hidden_dim: Dimension of hidden states in RNN
        cell_type: Type of RNN cell ('rnn', 'lstm', or 'gru')
        num_encoder_layers: Number of layers in encoder
        num_decoder_layers: Number of layers in decoder
        
    Returns:
        model: Keras model
    """
    # Define RNN cell based on type
    if cell_type.lower() == 'lstm':
        RNN = LSTM
    elif cell_type.lower() == 'gru':
        RNN = GRU
    elif cell_type.lower() == 'rnn':
        RNN = SimpleRNN
    else:
        raise ValueError("cell_type must be one of 'rnn', 'lstm', or 'gru'")
    
    # Encoder
    encoder_inputs = Input(shape=(None,), name='encoder_inputs')
    encoder_embedding = Embedding(input_vocab_size, embedding_dim, name='encoder_embedding')(encoder_inputs)
    
    encoder_outputs = encoder_embedding
    encoder_states = []
    
    # Add encoder layers
    for i in range(num_encoder_layers):
        return_sequences = i < num_encoder_layers - 1
        return_state = i == num_encoder_layers - 1
        
        if cell_type.lower() == 'lstm':
            encoder = RNN(hidden_dim, return_sequences=return_sequences, return_state=return_state, 
                          name=f'encoder_lstm_{i}')
            if return_state:
                encoder_outputs, state_h, state_c = encoder(encoder_outputs)
                encoder_states = [state_h, state_c]
            else:
                encoder_outputs = encoder(encoder_outputs)
        else:
            encoder = RNN(hidden_dim, return_sequences=return_sequences, return_state=return_state, 
                          name=f'encoder_{cell_type}_{i}')
            if return_state:
                encoder_outputs, state_h = encoder(encoder_outputs)
                encoder_states = [state_h]
            else:
                encoder_outputs = encoder(encoder_outputs)
    
    # Decoder
    decoder_inputs = Input(shape=(None,), name='decoder_inputs')
    decoder_embedding = Embedding(output_vocab_size, embedding_dim, name='decoder_embedding')(decoder_inputs)
    
    decoder_outputs = decoder_embedding
    decoder_states = []
    
    # Add decoder layers
    for i in range(num_decoder_layers):
        return_sequences = True
        return_state = True
        
        if cell_type.lower() == 'lstm':
            decoder = RNN(hidden_dim, return_sequences=return_sequences, return_state=return_state, 
                          name=f'decoder_lstm_{i}')
            if i == 0:
                decoder_outputs, _, _ = decoder(decoder_outputs, initial_state=encoder_states)
            else:
                decoder_outputs, _, _ = decoder(decoder_outputs)
        else:
            decoder = RNN(hidden_dim, return_sequences=return_sequences, return_state=return_state, 
                          name=f'decoder_{cell_type}_{i}')
            if i == 0:
                decoder_outputs, _ = decoder(decoder_outputs, initial_state=encoder_states)
            else:
                decoder_outputs, _ = decoder(decoder_outputs)
    
    # Output layer
    decoder_dense = Dense(output_vocab_size, activation='softmax', name='decoder_dense')
    decoder_outputs = decoder_dense(decoder_outputs)
    
    # Define the model
    model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
    
    return model

# Create a model with default parameters
model = create_seq2seq_model(
    input_vocab_size=len(input_char_to_idx),
    output_vocab_size=len(output_char_to_idx),
    embedding_dim=256,
    hidden_dim=256,
    cell_type='lstm',
    num_encoder_layers=1,
    num_decoder_layers=1
)

# Compile the model
model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# Print model summary
model.summary()


In [11]:
def create_inference_models(model, cell_type='lstm'):
    """
    Create encoder and decoder models for inference
    """
    if cell_type.lower() == 'lstm':
        # Encoder model
        encoder_inputs = Input(shape=(None,), name='encoder_inference_inputs')
        encoder = model.get_layer('encoder_lstm_0')
        encoder_embedding = model.get_layer('encoder_embedding')
        
        x = encoder_embedding(encoder_inputs)
        encoder_outputs, state_h, state_c = encoder(x)
        encoder_states = [state_h, state_c]
        
        encoder_model = Model(encoder_inputs, encoder_states)
        
        # Decoder model
        decoder_inputs = Input(shape=(None,), name='decoder_inference_inputs')
        decoder_state_input_h = Input(shape=(encoder.units,))
        decoder_state_input_c = Input(shape=(encoder.units,))
        decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
        
        decoder_embedding = model.get_layer('decoder_embedding')
        decoder_lstm = model.get_layer('decoder_lstm_0')
        decoder_dense = model.get_layer('decoder_dense')
        
        decoder_outputs = decoder_embedding(decoder_inputs)
        decoder_outputs, state_h, state_c = decoder_lstm(
            decoder_outputs, initial_state=decoder_states_inputs)
        decoder_states = [state_h, state_c]
        decoder_outputs = decoder_dense(decoder_outputs)
        
        decoder_model = Model(
            [decoder_inputs] + decoder_states_inputs,
            [decoder_outputs] + decoder_states
        )
        
    else:
        # For RNN/GRU
        encoder_inputs = Input(shape=(None,), name='encoder_inference_inputs')
        encoder = model.get_layer(f'encoder_{cell_type}_0')
        encoder_embedding = model.get_layer('encoder_embedding')
        
        x = encoder_embedding(encoder_inputs)
        encoder_outputs, state_h = encoder(x)
        encoder_states = [state_h]
        
        encoder_model = Model(encoder_inputs, encoder_states)
        
        # Decoder model
        decoder_inputs = Input(shape=(None,), name='decoder_inference_inputs')
        decoder_state_input_h = Input(shape=(encoder.units,))
        decoder_states_inputs = [decoder_state_input_h]
        
        decoder_embedding = model.get_layer('decoder_embedding')
        decoder_rnn = model.get_layer(f'decoder_{cell_type}_0')
        decoder_dense = model.get_layer('decoder_dense')
        
        decoder_outputs = decoder_embedding(decoder_inputs)
        decoder_outputs, state_h = decoder_rnn(
            decoder_outputs, initial_state=decoder_states_inputs)
        decoder_states = [state_h]
        decoder_outputs = decoder_dense(decoder_outputs)
        
        decoder_model = Model(
            [decoder_inputs] + decoder_states_inputs,
            [decoder_outputs] + decoder_states
        )
    
    return encoder_model, decoder_model

def train_model(model, encoder_input_train, decoder_input_train, decoder_target_train_onehot,
               encoder_input_dev, decoder_input_dev, decoder_target_dev_onehot,
               input_vocab_size, output_vocab_size, embedding_dim, hidden_dim, cell_type,
               epochs=20, batch_size=64):
    """
    Train the sequence-to-sequence model
    """
    # Initialize wandb
    wandb.init(
        project="seq2seq-transliteration",
        name=f"seq2seq-{cell_type}",
        config={
            "cell_type": cell_type,
            "embedding_dim": embedding_dim,
            "hidden_dim": hidden_dim,
            "epochs": epochs,
            "batch_size": batch_size
        }
    )
    
    # Create callbacks
    checkpoint = ModelCheckpoint(
        f"checkpoints/seq2seq_{cell_type}_model.keras",
        monitor='val_loss',
        save_best_only=True,
        verbose=1
    )
    
    early_stopping = EarlyStopping(
        monitor='val_loss',
        patience=5,
        restore_best_weights=True,
        verbose=1
    )
    
    # Train the model
    history = model.fit(
        [encoder_input_train, decoder_input_train],
        decoder_target_train_onehot,
        batch_size=batch_size,
        epochs=epochs,
        validation_data=([encoder_input_dev, decoder_input_dev], decoder_target_dev_onehot),
        callbacks=[checkpoint, early_stopping]
    )
    
    # Create inference models
    encoder_model, decoder_model = create_inference_models(model, cell_type)
    
    return decoder_model, history


def decode_sequence(input_seq, encoder_model, decoder_model, 
                   output_char_to_idx, output_idx_to_char,
                   max_output_len, cell_type):
    """
    Decode an input sequence using the trained encoder and decoder models
    """
    # Encode the input as state vectors
    states_value = encoder_model.predict(input_seq, verbose=0)
    
    # Generate empty target sequence of length 1, with the start token
    target_seq = np.zeros((1, 1))
    target_seq[0, 0] = output_char_to_idx['\t']
    
    # Sampling loop for a batch of sequences
    stop_condition = False
    decoded_seq = ''
    
    while not stop_condition:
        # Predict next character
        if cell_type.lower() == 'lstm':
            output_tokens, h, c = decoder_model.predict([target_seq] + states_value, verbose=0)
            states_value = [h, c]
        else:
            output_tokens, h = decoder_model.predict([target_seq] + states_value, verbose=0)
            states_value = [h]
        
        # Sample a token
        sampled_token_index = np.argmax(output_tokens[0, 0, :])
        sampled_char = output_idx_to_char[sampled_token_index]
        
        # Exit condition: either hit max length or find stop character
        if sampled_char == '\n' or len(decoded_seq) >= max_output_len - 2:
            stop_condition = True
        else:
            decoded_seq += sampled_char
        
        # Update the target sequence (length 1)
        target_seq = np.zeros((1, 1))
        target_seq[0, 0] = sampled_token_index
    
    return decoded_seq


In [12]:
def evaluate_model(encoder_model, decoder_model, data, encoder_input_data,
                  output_char_to_idx, output_idx_to_char,
                  max_output_len, cell_type):
    """
    Evaluate the model on given data
    """
    correct = 0
    total = len(data)
    predictions = []
    
    for i, (native, latin) in enumerate(data):
        input_seq = encoder_input_data[i:i+1]
        predicted = decode_sequence(
            input_seq, encoder_model, decoder_model,
            output_char_to_idx, output_idx_to_char,
            max_output_len, cell_type
        )
        predictions.append((native, latin, predicted))
        
        if predicted == native:
            correct += 1
    
    accuracy = correct / total
    return accuracy, predictions


In [8]:
def train_model(model, encoder_input_train, decoder_input_train, decoder_target_train_onehot,
               encoder_input_dev, decoder_input_dev, decoder_target_dev_onehot,
               input_vocab_size, output_vocab_size, embedding_dim, hidden_dim, cell_type,
               epochs=20, batch_size=64):
    """
    Train the sequence-to-sequence model
    """
    # Initialize wandb
    wandb.init(
        project="seq2seq-transliteration",
        name=f"seq2seq-{cell_type}",
        config={
            "cell_type": cell_type,
            "embedding_dim": embedding_dim,
            "hidden_dim": hidden_dim,
            "epochs": epochs,
            "batch_size": batch_size
        }
    )
    
    # Create a valid checkpoint file path
    checkpoint_dir = "checkpoints"
    os.makedirs(checkpoint_dir, exist_ok=True)
    checkpoint_path = os.path.join(checkpoint_dir, f"seq2seq_{cell_type}_model.keras")
    
    # Define callbacks
    checkpoint = ModelCheckpoint(
        checkpoint_path,
        monitor='val_loss',
        save_best_only=True,
        verbose=1
    )
    
    early_stopping = EarlyStopping(
        monitor='val_loss',
        patience=5,
        restore_best_weights=True,
        verbose=1
    )
    
    # Train the model
    history = model.fit(
        [encoder_input_train, decoder_input_train],
        decoder_target_train_onehot,
        batch_size=batch_size,
        epochs=epochs,
        validation_data=([encoder_input_dev, decoder_input_dev], decoder_target_dev_onehot),
        callbacks=[checkpoint, early_stopping]
    )
    
    # Create inference models
    encoder_model, decoder_model = create_inference_models(model, cell_type)
    
    return model, encoder_model, decoder_model, history

In [13]:
def compute_total_computations(m, k, T, V):
    """
    Calculate the total number of computations in the seq2seq model
    
    Args:
        m: Input embedding size
        k: Hidden state size
        T: Sequence length
        V: Vocabulary size
    """
    # Encoder computations
    # Embedding layer: T * m (for each character in the input sequence)
    # RNN layer: T * (m*k + k*k) (for each character, compute from input and previous state)
    encoder_computations = T * m + T * (m*k + k*k)
    
    # Decoder computations
    # Embedding layer: T * m (for each character in the output sequence)
    # RNN layer: T * (m*k + k*k) (for each character, compute from input and previous state)
    # Output layer: T * k*V (for each character, compute probability for each character in vocab)
    decoder_computations = T * m + T * (m*k + k*k) + T * k * V
    
    total_computations = encoder_computations + decoder_computations
    return total_computations

def compute_total_parameters(m, k, T, V):
    """
    Calculate the total number of parameters in the seq2seq model
    
    Args:
        m: Input embedding size
        k: Hidden state size
        T: Sequence length
        V: Vocabulary size
    """
    # Encoder parameters
    # Embedding layer: V * m (one embedding vector for each character in vocab)
    # RNN layer: (m*k + k*k + k) (weights for input, weights for hidden state, bias)
    encoder_parameters = V * m + (m*k + k*k + k)
    
    # Decoder parameters
    # Embedding layer: V * m (one embedding vector for each character in vocab)
    # RNN layer: (m*k + k*k + k) (weights for input, weights for hidden state, bias)
    # Output layer: k*V + V (weights and bias)
    decoder_parameters = V * m + (m*k + k*k + k) + (k*V + V)
    
    total_parameters = encoder_parameters + decoder_parameters
    return total_parameters

# Answer to question 1(a)
m = 256  # embedding size
k = 256  # hidden state size
T = 20   # approximate sequence length
V = 100  # approximate vocabulary size

total_computations = compute_total_computations(m, k, T, V)
print(f"1(a) Total computations: {total_computations}")

# Answer to question 1(b)
total_parameters = compute_total_parameters(m, k, T, V)
print(f"1(b) Total parameters: {total_parameters}")

def character_level_accuracy(true_text, pred_text):
    """
    Calculate character-level accuracy between two strings
    """
    min_len = min(len(true_text), len(pred_text))
    matches = sum(true_text[i] == pred_text[i] for i in range(min_len))
    return matches / max(len(true_text), len(pred_text))

def evaluate_model_with_char_accuracy(encoder_model, decoder_model, data, encoder_input_data,
                                     output_char_to_idx, output_idx_to_char,
                                     max_output_len, cell_type):
    """
    Evaluate the model on given data with both word and character accuracy
    """
    word_correct = 0
    total_char_accuracy = 0
    total = len(data)
    predictions = []
    
    for i, (native, latin) in enumerate(data):
        input_seq = encoder_input_data[i:i+1]
        predicted = decode_sequence(
            input_seq, encoder_model, decoder_model,
            output_char_to_idx, output_idx_to_char,
            max_output_len, cell_type
        )
        predictions.append((native, latin, predicted))
        
        # Word-level accuracy
        if predicted == native:
            word_correct += 1
            
        # Character-level accuracy
        char_acc = character_level_accuracy(native, predicted)
        total_char_accuracy += char_acc
    
    word_accuracy = word_correct / total
    avg_char_accuracy = total_char_accuracy / total
    
    return word_accuracy, avg_char_accuracy, predictions


1(a) Total computations: 5765120
1(b) Total parameters: 339556


In [14]:
# Train a simple model
embedding_dim = 256
hidden_dim = 256
cell_type = 'lstm'
epochs = 2
batch_size = 256

model = create_seq2seq_model(
    input_vocab_size=len(input_char_to_idx),
    output_vocab_size=len(output_char_to_idx),
    embedding_dim=embedding_dim,
    hidden_dim=hidden_dim,
    cell_type=cell_type
)

model.compile(
    optimizer='adam',
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

model, encoder_model, decoder_model, history = train_model(
    model, encoder_input_train, decoder_input_train, decoder_target_train_onehot,
    encoder_input_dev, decoder_input_dev, decoder_target_dev_onehot,
    len(input_char_to_idx), len(output_char_to_idx), embedding_dim, hidden_dim, cell_type,
    epochs=epochs, batch_size=batch_size
)

# Evaluate the model
word_accuracy, char_accuracy, dev_predictions = evaluate_model_with_char_accuracy(
    encoder_model, decoder_model, dev_data, encoder_input_dev,
    output_char_to_idx, output_idx_to_char,
    max_output_len, cell_type
)

print(f"Development word accuracy: {word_accuracy:.4f}")
print(f"Development character accuracy: {char_accuracy:.4f}")

# Show some predictions
print("\nSample predictions:")
for i in range(10):
    native, latin, predicted = dev_predictions[i]
    print(f"Input: {latin}, Target: {native}, Predicted: {predicted}")


Epoch 1/2
[1m173/173[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 114ms/step - accuracy: 0.6744 - loss: 1.4344
Epoch 1: val_loss improved from inf to 0.92819, saving model to checkpoints/seq2seq_lstm_model.keras
[1m173/173[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 122ms/step - accuracy: 0.6746 - loss: 1.4327 - val_accuracy: 0.7491 - val_loss: 0.9282
Epoch 2/2
[1m173/173[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 126ms/step - accuracy: 0.7487 - loss: 0.9225
Epoch 2: val_loss improved from 0.92819 to 0.77273, saving model to checkpoints/seq2seq_lstm_model.keras
[1m173/173[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 130ms/step - accuracy: 0.7488 - loss: 0.9222 - val_accuracy: 0.7831 - val_loss: 0.7727
Restoring model weights from the end of the best epoch: 2.


ValueError: not enough values to unpack (expected 4, got 2)

