<a href="https://colab.research.google.com/github/Mustaq7777777/DL-ASSIGNMENT3/blob/main/DL_Assignment3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Setup and Imports

In [None]:
#importing all necessary libraries
import torch
import torch.nn as nn
import torch.nn.functional as func
import torch.optim as optim
import torch.utils.data as data
import numpy as np
import matplotlib.pyplot as plt
import math
import os
import pandas as pd
import random
import wandb
from tqdm.auto import tqdm

# For reproducibility
def seed_everything(seed=42):
    """Set random seed for all major libraries"""
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)

# Set seed for reproducibility
seed_everything(42)

# Device selection: CPU or GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Downloading and Extracting the Dakshina Dataset

In [None]:
# Download the Dakshina dataset
!yes | wget "https://storage.googleapis.com/gresearch/dakshina/dakshina_dataset_v1.0.tar"

# Extract the downloaded tar file
!yes | tar xopf dakshina_dataset_v1.0.tar

Data Loading and Processing Functions

In [None]:
def read_tsv(file_path):
    """Read a tab-separated file with source and target text"""
    eng_words = []
    tel_words = []
    with open(file_path, encoding='utf-8') as f:
        for ln in f:
            parts = ln.strip().split('\t')
            if len(parts) >= 2:
                tel_words.append(parts[0])  # Dakshina format has target first
                eng_words.append(parts[1])  # Source (English) second
    return eng_words, tel_words

def load_dakshina_data(language='tel', base_path=None):
    """Load transliteration data from Dakshina TSV files"""
    if base_path is None:
        # Default path structure for Dakshina
        base_path = os.path.join(
            '/kaggle/working/dakshina_dataset_v1.0',
            language, 'lexicons'
        )

    # Paths to data files
    train_file = os.path.join(base_path, f"{language}.translit.sampled.train.tsv")
    valid_file = os.path.join(base_path, f"{language}.translit.sampled.dev.tsv")
    test_file = os.path.join(base_path, f"{language}.translit.sampled.test.tsv")

    # Load data
    eng_list_train, tel_list_train = read_tsv(train_file)
    eng_list_valid, tel_list_valid = read_tsv(valid_file)
    eng_list_test, tel_list_test = read_tsv(test_file)

    # Build vocabularies
    eng_vocab = []
    tel_vocab = []
    max_eng_len = -1
    max_tel_len = -1
    max_eng_word = ""
    max_tel_word = ""

    # Process training data for vocabulary
    for word in eng_list_train:
        max_eng_len = max(max_eng_len, len(word))
        if max_eng_len == len(word):
            max_eng_word = word
        for letter in word:
            eng_vocab.append(letter)
    eng_vocab = list(set(eng_vocab))
    eng_vocab.sort()

    for word in tel_list_train:
        max_tel_len = max(max_tel_len, len(word))
        if max_tel_len == len(word):
            max_tel_word = word
        for letter in word:
            tel_vocab.append(letter)
    tel_vocab = list(set(tel_vocab))
    tel_vocab.sort()

    # Update max lengths from validation and test sets
    for word in eng_list_valid:
        max_eng_len = max(max_eng_len, len(word))
    for word in eng_list_test:
        max_eng_len = max(max_eng_len, len(word))
    for word in tel_list_test:
        max_tel_len = max(max_tel_len, len(word))
    for word in tel_list_valid:
        max_tel_len = max(max_tel_len, len(word))

    #printing the values to know about data

    print(f"English vocabulary size: {len(eng_vocab)}")
    print(f"Target language vocabulary size: {len(tel_vocab)}")
    print(f"Max English length: {max_eng_len}")
    print(f"Max target language length: {max_tel_len}")
    print(f"Training examples: {len(eng_list_train)}")

    return (eng_list_train, tel_list_train, eng_list_valid, tel_list_valid,
            eng_list_test, tel_list_test, eng_vocab, tel_vocab,
            max_eng_len, max_tel_len)

Data Vectorization

In [None]:
def word_to_vector(language, word, eng_vocab, tel_vocab, max_eng_len, max_tel_len):
    """Convert a word to its vectorial representation"""
    vec = []
    if language == "english":
        # Start token
        vec.append(len(eng_vocab) + 1)
        # Word content
        for letter in word:
            for albt in range(len(eng_vocab)):
                if eng_vocab[albt] == letter:
                    vec.append(albt + 1)
        # Padding
        while len(vec) < (max_eng_len + 1):
            vec.append(0)
        # End token
        vec.append(0)
    else:
        # Start token
        vec.append(len(tel_vocab) + 1)
        # Word content
        for letter in word:
            for albt in range(len(tel_vocab)):
                if tel_vocab[albt] == letter:
                    vec.append(albt + 1)
        # Padding
        while len(vec) < (max_tel_len + 1):
            vec.append(0)
        # End token
        vec.append(0)
    return vec

def prepare_matrices(eng_list, tel_list, eng_vocab, tel_vocab, max_eng_len, max_tel_len):
    """Create tensor matrices from word lists"""
    eng_matrix = []
    tel_matrix = []

    for word in eng_list:
        eng_matrix.append(word_to_vector("english", word, eng_vocab, tel_vocab, max_eng_len, max_tel_len))

    for word in tel_list:
        tel_matrix.append(word_to_vector("telugu", word, eng_vocab, tel_vocab, max_eng_len, max_tel_len))

    return torch.tensor(eng_matrix), torch.tensor(tel_matrix)

Loading data

In [None]:
# Load the data
data = load_dakshina_data('tel')
(eng_list_train, tel_list_train, eng_list_valid, tel_list_valid,
 eng_list_test, tel_list_test, eng_vocab, tel_vocab,
 max_eng_len, max_tel_len) = data

# Prepare matrices
eng_matrix_train, tel_matrix_train = prepare_matrices(
    eng_list_train, tel_list_train, eng_vocab, tel_vocab, max_eng_len, max_tel_len
)

eng_matrix_valid, tel_matrix_valid = prepare_matrices(
    eng_list_valid, tel_list_valid, eng_vocab, tel_vocab, max_eng_len, max_tel_len
)

eng_matrix_test, tel_matrix_test = prepare_matrices(
    eng_list_test, tel_list_test, eng_vocab, tel_vocab, max_eng_len, max_tel_len
)

print(f"Training matrices shape: English {eng_matrix_train.shape}, Telugu {tel_matrix_train.shape}")
print(f"Validation matrices shape: English {eng_matrix_valid.shape}, Telugu {tel_matrix_valid.shape}")
print(f"Test matrices shape: English {eng_matrix_test.shape}, Telugu {tel_matrix_test.shape}")

Encoder

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, enc_layers, hidden_size,
                 cell_type, bi_directional_bit, dropout, batch_size):
        super(Encoder, self).__init__()
        self.input_size = input_size
        self.embedding_size = embedding_size
        self.enc_layers = enc_layers
        self.cell_type = cell_type
        self.bi_directional_bit = bi_directional_bit
        self.dropout = nn.Dropout(dropout)
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.hidden_size = hidden_size
        self.batch_size = batch_size

        # Initialize RNN based on cell type
        if cell_type == "RNN":
            self.rnn = nn.RNN(embedding_size, hidden_size, enc_layers,
                             dropout=dropout, bidirectional=bi_directional_bit)
        elif cell_type == "GRU":
            self.gru = nn.GRU(embedding_size, hidden_size, enc_layers,
                             dropout=dropout, bidirectional=bi_directional_bit)
        else:  # LSTM
            self.lstm = nn.LSTM(embedding_size, hidden_size, enc_layers,
                               dropout=dropout, bidirectional=bi_directional_bit)

    def forward(self, x, hidden, cell):
        """Forward pass through the encoder"""
        # Apply embedding and reshape
        embedding = self.embedding(x).view(-1, self.batch_size, self.embedding_size)

        # Pass through the appropriate RNN type
        if self.cell_type == "RNN":
            output, hidden = self.rnn(embedding, hidden)
        elif self.cell_type == "GRU":
            output, hidden = self.gru(embedding, hidden)
        else:  # LSTM
            output, (hidden, cell) = self.lstm(embedding, (hidden, cell))
            return output, hidden, cell

        return output, hidden

    def initialize_hidden(self):
        """Initialize hidden state tensor"""
        if self.bi_directional_bit:
            return torch.zeros(2 * self.enc_layers, self.batch_size,
                               self.hidden_size, device=device)
        return torch.zeros(self.enc_layers, self.batch_size,
                           self.hidden_size, device=device)

    def initialize_cell(self):
        """Initialize cell state tensor (for LSTM)"""
        if self.bi_directional_bit:
            return torch.zeros(2 * self.enc_layers, self.batch_size,
                               self.hidden_size, device=device)
        return torch.zeros(self.enc_layers, self.batch_size,
                           self.hidden_size, device=device)

Bahdanau Attention Mechanism

In [None]:
class BahdanauAttention(nn.Module):
    def __init__(self, enc_hid_dim, dec_hid_dim):
        super().__init__()
        self.attn = nn.Linear(enc_hid_dim + dec_hid_dim, dec_hid_dim)
        self.v = nn.Linear(dec_hid_dim, 1, bias=False)

    def forward(self, hidden, encoder_outputs):
        # hidden: [batch_size, dec_hid_dim]
        # encoder_outputs: [src_len, batch_size, enc_hid_dim]

        batch_size = encoder_outputs.shape[1]
        src_len = encoder_outputs.shape[0]

        # Repeat hidden for src_len times
        # [batch_size, dec_hid_dim] -> [batch_size, src_len, dec_hid_dim]
        hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)

        # Transpose encoder outputs for attention calculation
        # [src_len, batch_size, enc_hid_dim] -> [batch_size, src_len, enc_hid_dim]
        encoder_outputs = encoder_outputs.permute(1, 0, 2)

        # Calculate attention scores
        # [batch_size, src_len, enc_hid_dim + dec_hid_dim] -> [batch_size, src_len, dec_hid_dim]
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))

        # [batch_size, src_len, dec_hid_dim] -> [batch_size, src_len, 1]
        attention = self.v(energy)

        # [batch_size, src_len, 1] -> [batch_size, src_len]
        attention = attention.squeeze(2)

        # Apply softmax to get attention weights
        # [batch_size, src_len]
        return func.softmax(attention, dim=1)

Decoder (without attention)

In [None]:
class Decoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, dec_layers,
                 dropout, cell_type, output_size):
        super(Decoder, self).__init__()
        self.input_size = input_size
        self.embedding_size = embedding_size
        self.hidden_size = hidden_size
        self.dec_layers = dec_layers
        self.dropout = nn.Dropout(dropout)
        self.cell_type = cell_type
        self.embedding = nn.Embedding(input_size, embedding_size)

        # Initialize RNN based on cell type
        if cell_type == "RNN":
            self.rnn = nn.RNN(embedding_size, hidden_size, dec_layers, dropout=dropout)
        elif cell_type == "GRU":
            self.gru = nn.GRU(embedding_size, hidden_size, dec_layers, dropout=dropout)
        else:  # LSTM
            self.lstm = nn.LSTM(embedding_size, hidden_size, dec_layers, dropout=dropout)

        # Output projection
        self.fully_conc = nn.Linear(hidden_size, output_size)

    def forward(self, x, prev_output, prev_hidden, cell=0):
        """Forward pass through the decoder"""
        # Reshape input token and apply embedding
        x = x.unsqueeze(0).int()
        embedding = self.embedding(x)
        embedding = self.dropout(embedding)

        # Pass through the appropriate RNN type
        if self.cell_type == "RNN":
            outputs, hidden = self.rnn(embedding, prev_hidden)
        elif self.cell_type == "GRU":
            outputs, hidden = self.gru(embedding, prev_hidden)
        else:  # LSTM
            outputs, (hidden, cell) = self.lstm(embedding, (prev_hidden, cell))

        # Project to vocabulary size
        pred = self.fully_conc(outputs)
        pred = pred.squeeze(0)  # Remove sequence dimension

        if self.cell_type == "GRU" or self.cell_type == "RNN":
            return pred, hidden

        return pred, hidden, cell