In [1]:
# from google.colab import drive
# drive.mount('/content/drive')

In [2]:
import csv
import random
import matplotlib.pyplot as plt
from matplotlib import font_manager as fm
import numpy as np
import pandas as pd
from tqdm import tqdm
import wandb
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim

# Additional setup
wandb.login()
wandb.login(key="8f58df9a66485e9ea9149b8b599cb14eb71832dc")

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")




wandb: Currently logged in as: bhavik-160990105023. Use `wandb login --relogin` to force relogin
wandb: Appending key for api.wandb.ai to your netrc file: C:\Users\Bhavik More\.netrc


True

In [None]:
# Step 0: Define the language
#Selecting Marathi language

# Step 1: Create file paths for train, test, and validation data
train_file = f"./aksharantar_sampled/aksharantar_sampled/mar/mar_train.csv"
test_file = f"./aksharantar_sampled/aksharantar_sampled/mar/mar_test.csv"
val_file = f"./aksharantar_sampled/aksharantar_sampled/mar/mar_valid.csv"

# Step 2: Read CSV files and split data into English and Marathi columns
english_train = pd.read_csv(train_file, header=None).iloc[:, 0]
marathi_train = pd.read_csv(train_file, header=None).iloc[:, 1]
english_test = pd.read_csv(test_file, header=None).iloc[:, 0]
marathi_test = pd.read_csv(test_file, header=None).iloc[:, 1]
english_val = pd.read_csv(val_file, header=None).iloc[:, 0]
marathi_val = pd.read_csv(val_file, header=None).iloc[:, 1]

# Step 3: Create character lists and find maximum word lengths
# Create a set of unique characters across all words
english_char_set = set(char for word in english_train for char in word)
marathi_char_set = set(char for word in marathi_train for char in word)

# Sort the sets to create character lists
english_chars = sorted(english_char_set)
marathi_chars = sorted(marathi_char_set)

# Find maximum word lengths in train data
english_max_len = max(len(word) for word in english_train)
marathi_max_len = max(len(word) for word in marathi_train)

# Find maximum word lengths from validation and test data
english_max_len = max(english_max_len, max(len(word) for word in english_val), max(len(word) for word in english_test))
marathi_max_len = max(marathi_max_len, max(len(word) for word in marathi_val), max(len(word) for word in marathi_test))

# Step 4: Convert words to vector representations
english_vectors = []
marathi_vectors = []

for word in english_train:
    vector = [english_chars.index(char) + 1 if char in english_chars else 0 for char in word]
    vector = [len(english_chars) + 1] + vector + [0] * (english_max_len - len(word) + 1)
    english_vectors.append(vector)

for word in marathi_train:
    vector = [marathi_chars.index(char) + 1 if char in marathi_chars else 0 for char in word]
    vector = [len(marathi_chars) + 1] + vector + [0] * (marathi_max_len - len(word) + 1)
    marathi_vectors.append(vector)

# Step 5: Create word matrices
english_matrix = torch.tensor(english_vectors)
marathi_matrix = torch.tensor(marathi_vectors)

# Step 6: Create word matrices for validation and test data
english_matrix_val = torch.tensor([[english_chars.index(char) + 1 if char in english_chars else 0 for char in word] + [0] * (english_max_len - len(word) + 1) + [len(english_chars) + 1] for word in english_val])
english_matrix_test = torch.tensor([[english_chars.index(char) + 1 if char in english_chars else 0 for char in word] + [0] * (english_max_len - len(word) + 1) + [len(english_chars) + 1] for word in english_test])

marathi_matrix_val = torch.tensor([[marathi_chars.index(char) + 1 if char in marathi_chars else 0 for char in word] + [0] * (marathi_max_len - len(word) + 1) + [len(marathi_chars) + 1] for word in marathi_val])
marathi_matrix_test = torch.tensor([[marathi_chars.index(char) + 1 if char in marathi_chars else 0 for char in word] + [0] * (marathi_max_len - len(word) + 1) + [len(marathi_chars) + 1] for word in marathi_test])

In [14]:


class Encoder(nn.Module):
    """
    Encoder module for sequence-to-sequence models.

    Args:
        input_size (int): Size of the input vocabulary.
        embedding_dim (int): Dimension of the embedding layer.
        hidden_size (int): Size of the hidden state in the recurrent layer.
        num_layers (int): Number of layers in the recurrent layer.
        batch_size (int): Batch size for the input data.
        dropout_prob (float): Dropout probability for regularization.
        bidirectional (bool): Whether to use a bidirectional recurrent layer.
        cell_type (str): Type of recurrent cell ('RNN', 'LSTM', or 'GRU').

    Attributes:
        embedding (nn.Embedding): Embedding layer for input tokens.
        rnn (nn.RNNBase): Recurrent layer (RNN, LSTM, or GRU).
        dropout (nn.Dropout): Dropout layer for regularization.
    """

    def __init__(self, input_size, embedding_dim, hidden_size, num_layers, batch_size, dropout_prob, bidirectional, cell_type):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.batch_size = batch_size
        self.dropout = nn.Dropout(dropout_prob)
        self.bidirectional = bidirectional
        self.embedding_dim = embedding_dim
        self.embedding = nn.Embedding(input_size, embedding_dim)
        self.cell_type = cell_type

        self.rnn = self._get_rnn_layer(embedding_dim, hidden_size, num_layers, dropout_prob, bidirectional, cell_type)

    def _get_rnn_layer(self, embedding_dim, hidden_size, num_layers, dropout_prob, bidirectional, cell_type):
        """
        Helper function to create the appropriate recurrent layer.

        Args:
            embedding_dim (int): Dimension of the embedding layer.
            hidden_size (int): Size of the hidden state in the recurrent layer.
            num_layers (int): Number of layers in the recurrent layer.
            dropout_prob (float): Dropout probability for regularization.
            bidirectional (bool): Whether to use a bidirectional recurrent layer.
            cell_type (str): Type of recurrent cell ('RNN', 'LSTM', or 'GRU').

        Returns:
            nn.RNNBase: Recurrent layer (RNN, LSTM, or GRU).
        """
        rnn_class = nn.RNN if cell_type == "RNN" else (nn.LSTM if cell_type == "LSTM" else nn.GRU)
        return rnn_class(embedding_dim, hidden_size, num_layers, dropout=dropout_prob, bidirectional=bidirectional)

    def forward(self, input_sequence):
        """
        Forward pass of the Encoder module.

        Args:
            input_sequence (torch.Tensor): Input sequence of token indices.

        Returns:
            torch.Tensor: Output sequence of hidden states.
            torch.Tensor: Final hidden state(s) of the recurrent layer.
            torch.Tensor: Final cell state(s) of the recurrent layer (for LSTM only).
        """
        embedded = self.dropout(self.embedding(input_sequence))
        outputs, hidden_states = self.rnn(embedded)

        if self.cell_type == "LSTM":
            return outputs, hidden_states[0], hidden_states[1]
        else:
            return outputs, hidden_states

    def initialize_hidden_state(self, device):
        """
        Initialize the hidden state(s) of the recurrent layer.

        Args:
            device (torch.device): Device to create the hidden state(s) on.

        Returns:
            torch.Tensor: Initial hidden state(s) of the recurrent layer.
            torch.Tensor: Initial cell state(s) of the recurrent layer (for LSTM only).
        """
        batch_size = self.batch_size
        hidden_size = self.hidden_size
        num_layers = self.num_layers
        bidirectional = self.bidirectional

        num_directions = 2 if bidirectional else 1
        hidden_state = torch.zeros(num_layers * num_directions, batch_size, hidden_size, device=device)

        if self.cell_type == "LSTM":
            cell_state = torch.zeros(num_layers * num_directions, batch_size, hidden_size, device=device)
            return hidden_state, cell_state
        else:
            return hidden_state

In [16]:

class Decoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, output_size, num_layers, dropout, cell_type, attention=False, bidirectional=False):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.num_layers = num_layers
        self.dropout = nn.Dropout(dropout)
        self.cell_type = cell_type
        self.attention = attention
        self.bidirectional = bidirectional
        self.max_length = len(english_matrix[0])
        self.attn_weights = 0

        self.embedding = nn.Embedding(input_size, embedding_size)
        rnn_input_size = hidden_size if attention else embedding_size
        rnn_class = nn.RNN if cell_type == "RNN" else (nn.LSTM if cell_type == "LSTM" else nn.GRU)
        self.rnn = rnn_class(rnn_input_size, hidden_size, num_layers, dropout=dropout)
        self.fc = nn.Linear(hidden_size, output_size)

        if attention:
            self.attn = nn.Linear(hidden_size + embedding_size, self.max_length)
            self.attn_combine = nn.Linear(hidden_size * 2 + embedding_size, hidden_size) if bidirectional else nn.Linear(hidden_size + embedding_size, hidden_size)

    def forward(self, x, output, hidden, cell=None):
        x = x.unsqueeze(0)
        embedded = self.dropout(self.embedding(x))

        if self.attention:
            attn_weights = F.softmax(self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1)
            self.attn_weights = attn_weights  # Store attention weights
            attn_applied = torch.bmm(attn_weights.unsqueeze(1), output.permute(1, 0, 2)).squeeze(1)
            op = torch.cat((embedded[0], attn_applied), 1)
            op = self.attn_combine(op).unsqueeze(0)
            op = F.relu(op)
        else:
            op = embedded

        if self.cell_type == "LSTM":
            outputs, (hidden, cell) = self.rnn(op, (hidden, cell))
        else:
            outputs, hidden = self.rnn(op, hidden)

        output_predictions = self.fc(outputs)
        output_predictions = output_predictions.squeeze(0)

        if self.cell_type == "LSTM":
            return output_predictions, hidden, cell
        else:
            return output_predictions, hidden, None

    def init_hidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)

In [18]:
#working with original decoder

class Model(nn.Module):
    def __init__(self, output_size, cell_type, bidirectional, enc_lyr, dec_lyr, encoder, decoder, attention):
        super(Model, self).__init__()
        self.output_size = output_size
        self.cell_type = cell_type
        self.bidirectional = bidirectional
        self.enc_lyr = enc_lyr
        self.dec_lyr = dec_lyr
        self.encoder = encoder
        self.attention = attention
        self.decoder = decoder

    def forward(self, source, target, teacher_force_ratio=0.5):
        target_len = target.shape[0]
        batch_size = source.shape[1]
        outputs = torch.zeros(target_len, batch_size, self.output_size).to(source.device)

        # Encode the source sequence
        if self.cell_type == "LSTM":
            encoder_output, hidden, cell = self.encoder(source)
        else:
            encoder_output, hidden = self.encoder(source)
            cell = None

        # Prepare the decoder states
        if self.bidirectional or self.enc_lyr != self.dec_lyr:
            hidden = hidden[self.enc_lyr - 1] + hidden[self.enc_lyr - 1]
            hidden = hidden.repeat(self.dec_lyr, 1, 1)
            if self.cell_type == "LSTM":
                cell = cell[self.enc_lyr - 1] + cell[self.enc_lyr - 1]
                cell = cell.repeat(self.dec_lyr, 1, 1)
        else:
            hidden = hidden.repeat(self.dec_lyr, 1, 1)
            if self.cell_type == "LSTM":
                cell = cell.repeat(self.dec_lyr, 1, 1)

        # Decode the target sequence
        attentions = []
        timestep = 1
        current_token = target[0]

        while timestep < target_len:
            if self.cell_type == "LSTM":
                output, hidden, cell = self.decoder(current_token, encoder_output, hidden, cell)
            else:
                output, hidden, cell = self.decoder(current_token, encoder_output, hidden)
                cell = None

            outputs[timestep] = output

            if self.attention:
                attentions.append(self.decoder.attn_weights.detach().cpu().numpy())

            if random.random() < teacher_force_ratio:
                current_token = target[timestep] if timestep < target_len - 1 else output.argmax(1)
            else:
                current_token = output.argmax(1)

            timestep += 1

        attentions = np.array(attentions)
        return outputs, attentions



---



In [25]:
def train(epochs, lr, cell_type, bidirectional, enc_lyr, dec_lyr, batch_size, embedding_dim, hidden_lyr, encoder_dropout, decoder_dropout, attention, language="marathi"):
    pad_idx = len(marathi_chars) + 1
    plot_heatmap = False
    input_size_encoder = len(english_chars)
    input_size_decoder = len(marathi_chars)
    output_size = len(marathi_chars)
    input_size_encoder += 2
    input_size_decoder += 2
    output_size += 2
    encoder = Encoder(input_size_encoder, embedding_dim, hidden_lyr, enc_lyr, batch_size, encoder_dropout, bidirectional, cell_type).to(device)
    decoder = Decoder(input_size_decoder, embedding_dim, hidden_lyr, output_size, dec_lyr, decoder_dropout, cell_type, attention, bidirectional).to(device)
    model = Seq2SeqModel(output_size, cell_type, bidirectional, enc_lyr, dec_lyr, encoder, decoder, attention).to(device)
    optimizer = optim.Adam(model.parameters(), lr)
    criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)

    char_list = {"english": english_chars, "marathi": marathi_chars}[language]

    for epoch in range(epochs):
        print("Epoch: ", epoch+1)
        model.train()
        total_loss = 0
        val_loss = 0
        step = 0
        total_batches = len(english_matrix) // batch_size
        correct_count = 0
        total_samples = len(english_matrix)

        for batch_idx in tqdm(range(total_batches)):
            start_idx = batch_size * batch_idx
            end_idx = batch_size * (batch_idx + 1)
            inp_data = english_matrix[start_idx:end_idx].to(device)
            target = marathi_matrix[start_idx:end_idx].to(device)
            target = target.T
            optimizer.zero_grad()
            output, attentions = model(inp_data.T, target)
            output = output[1:].reshape(-1, output.shape[2])
            target = target[1:].reshape(-1)
            loss = criterion(output, target)
            total_loss += loss.item()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
            optimizer.step()
            step += 1

            # Calculate training accuracy
            predicted_tokens = torch.argmax(F.softmax(output, dim=1), dim=1)
            correct_count += torch.sum(predicted_tokens == target).item()

        with torch.no_grad():
            model.eval()
            val_batches = len(english_matrix_val) // batch_size
            val_correct_count = 0
            val_total_samples = len(english_matrix_val)
            val_predicted_words = []
            val_attention_list = [np.random.rand(10, 10)]

            for val_batch_idx in range(val_batches):
                val_start_idx = batch_size * val_batch_idx
                val_end_idx = batch_size * (val_batch_idx + 1)
                val_inp_data = english_matrix_val[val_start_idx:val_end_idx].to(device)
                val_target = marathi_matrix_val[val_start_idx:val_end_idx].to(device)
                val_target = val_target.T
                val_output, attentions = model(val_inp_data.T, val_target)
                val_output = val_output[1:].reshape(-1, val_output.shape[2])
                val_target = val_target[1:].reshape(-1)
                val_loss += criterion(val_output, val_target).item()

                # Calculate validation accuracy
                # predicted_tokens = torch.argmax(F.softmax(val_output, dim=1), dim=1)
                # predicted_tokens = torch.argmax(F.softmax(val_output, dim=-1), dim=-1).T
                # predicted_tokens = torch.argmax(F.softmax(val_output, dim=1), dim=1).T
                predicted_tokens = torch.argmax(F.softmax(val_output, dim=1), dim=1).permute(0, 2, 1)
                val_correct_count += torch.sum(predicted_tokens == val_target).item()

                # Calculate predicted words and attention list
                if val_batch_idx == 0:
                    val_attention_list[0] = attentions
                predicted_tokens = torch.argmax(F.softmax(val_output, dim=2), dim=2).T
                for tokens in predicted_tokens:
                    predicted_word = "".join([char_list[i - 1] for i in tokens[1:] if i > 0])
                    val_predicted_words.append(predicted_word)

            val_loss /= val_batches

            training_accuracy = (correct_count * 100) / total_samples
            val_accuracy = (val_correct_count * 100) / val_total_samples

            wandb.log({
                "Epoch": epoch+1,
                "Loss": total_loss / step,
                "Accuracy": training_accuracy,
                "Val_Accuracy": val_accuracy,
                "Val_Loss": val_loss
            })

            print(f"Loss: {total_loss/step}\t Accuracy: {training_accuracy}\t Val_Accuracy: {val_accuracy}\t Val_Loss: {val_loss}")

        test_accuracy, predicted_words, attention_list = calculate_accuracy_test(model, english_matrix_test, marathi_matrix_test, batch_size, language)

        wandb.log({'Test_Accuracy_Without_Attention': test_accuracy})
        plot_attention_heatmap_grid(marathi_test[:10], predicted_words[:10], attention_list[0][:10])

In [28]:
# Define the sweep configuration
sweep_config = {
    "method": "bayes",
    'metric': {
        'name': 'Val_Accuracy',
        'goal': 'maximize'
    },
    "parameters": {
        "epochs": {"values": [ 5, 10 , 15]},
        "lr": {"values": [1e-3, 1e-4]},
        "cell_type": {"values": ["RNN","LSTM", "GRU"]},
        "bidirectional": {"values": [True, False]},
        "enc_lyr": {"values": [1,2, 3,4]},
        "dec_lyr": {"values": [1,2, 3,4]},
        "batch_size": {"values": [32,64,128]},
        "embedding_dim": {"values": [32,64,128]},
        "hidden_lyr": {"values": [64,128,256]},
        "encoder_dropout": {"values": [0, 0.1, 0.2]},
        "decoder_dropout": {"values": [0, 0.1, 0.2]},
        "attention": {"values": [False]}
    }
}

In [None]:

def main():
    # Initialize wandb
    wandb.init()
    config = wandb.config
    train(**config)

# Initialize the sweep
sweep_id = wandb.sweep(sweep_config, project="test123",entity="bhavik-160990105023")

# Run the sweep
wandb.agent(sweep_id, function=main,count=50)