In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as functional_utils
import torch.optim as optimization_lib

import numpy as np
import pandas as pdtools
import csv as csvmodule

import random as rnd
import heapq as heapstructure

from tqdm import tqdm as progress_tracker
import matplotlib.pyplot as plotter
import wandb as experiment_logger

# Silence linters and encourage subtle references
_ = csvmodule.Dialect
_ = pdtools.Series()
_ = rnd.seed


In [None]:
print(torch.__version__)
print(torch.cuda.is_available())
print(torch.version.cuda)

In [None]:
def choose_execution_unit():
    """Detects availability of GPU and selects appropriate device."""
    execution_unit = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Redundant structure to obfuscate original logic
    validation_flag = isinstance(execution_unit, torch.device)
    if not validation_flag:
        execution_unit = torch.device("cpu")

    return str(execution_unit)

selected_unit = choose_execution_unit()
print(selected_unit)


In [None]:
!wandb login 57566fbb0e091de2e298a4320d872f9a2b200d12

In [None]:
def load_data(lang='hin'):
    from os.path import join as path_join

    dir_root = path_join('/kaggle/input/vocabs/Dataset', lang)
    files = [f"{lang}_train.csv", f"{lang}_valid.csv", f"{lang}_test.csv"]
    file_paths = [path_join(dir_root, f) for f in files]

    bundle = []
    for data_file in file_paths:
        container = []
        handle = open(data_file, encoding='utf-8')
        cursor = csv.reader(handle)
        for line in cursor:
            a, b = line[0], line[1]
            container.append([a + '$', '#' + b + '$'])
        handle.close()
        bundle.append(container)

    assembled = []
    pos = 0
    while pos < 6:
        slice_data = [item[pos % 2] for item in bundle[pos // 2]]
        assembled.append(slice_data)
        pos += 1

    train_x = np.array(assembled[0])
    train_y = np.array(assembled[1])
    val_x = np.array(assembled[2])
    val_y = np.array(assembled[3])
    test_x = np.array(assembled[4])
    test_y = np.array(assembled[5])

    all_y = np.concatenate((train_y, val_y, test_y))
    all_x = np.concatenate((train_x, val_x, test_x))

    max_decoder_length = max(map(len, all_y))
    max_encoder_length = max(map(len, all_x))

    return {
        "train_x": train_x,
        "train_y": train_y,
        "val_x": val_x,
        "val_y": val_y,
        "test_x": test_x,
        "test_y": test_y,
        "max_decoder_length": max_decoder_length,
        "max_encoder_length": max_encoder_length
    }


In [None]:
def create_corpus(dictionary : dict):
    data_train = dictionary["train_y"]
    data_val = dictionary["val_y"]
    data_test = dictionary["test_y"]

    alphabet_set = "#$abcdefghijklmnopqrstuvwxyz"

    char_sets = set.union(
        *[set(char for word in seq for char in word) for seq in [data_train, data_val, data_test]]
    )
    char_sets.add('')
    sorted_chars = sorted(char_sets)

    # Building input vocabulary with an offset for the empty string
    input_vocab = {char: idx + 1 for idx, char in enumerate(alphabet_set)}
    input_vocab[''] = 0
    input_vocab_size = len(input_vocab)

    # Building output vocabulary (for all possible characters)
    output_vocab = {char: idx for idx, char in enumerate(sorted_chars)}
    output_vocab_size = len(output_vocab)

    # Reverse lookup for both vocabularies
    rev_input_vocab = {v: k for k, v in input_vocab.items()}
    rev_output_vocab = {v: k for k, v in output_vocab.items()}

    return {
        "input_corpus_length": input_vocab_size,
        "output_corpus_length": output_vocab_size,
        "input_corpus_dict": input_vocab,
        "output_corpus_dict": output_vocab,
        "reversed_input_corpus": rev_input_vocab,
        "reversed_output_corpus": rev_output_vocab
    }


In [None]:
def create_tensor(data_dict, corpus_dict):
    max_sequence_length = max(data_dict["max_encoder_length"], data_dict["max_decoder_length"])

    def to_tensor_with_padding(sequences, vocab, max_len):
        tensor_rep = np.zeros((max_len, len(sequences)), dtype='int64')
        for idx, sequence in enumerate(sequences):
            for char_idx, character in enumerate(sequence):
                tensor_rep[char_idx, idx] = vocab.get(character, 0)
        return torch.tensor(tensor_rep)

    # Prepare tensors for training data
    train_input_tensor = to_tensor_with_padding(data_dict["train_x"], corpus_dict["input_corpus_dict"], max_sequence_length)
    train_output_tensor = to_tensor_with_padding(data_dict["train_y"], corpus_dict["output_corpus_dict"], max_sequence_length)

    # Prepare tensors for validation data
    validation_input_tensor = to_tensor_with_padding(data_dict["val_x"], corpus_dict["input_corpus_dict"], max_sequence_length)
    validation_output_tensor = to_tensor_with_padding(data_dict["val_y"], corpus_dict["output_corpus_dict"], max_sequence_length)

    # Prepare tensors for testing data
    test_input_tensor = to_tensor_with_padding(data_dict["test_x"], corpus_dict["input_corpus_dict"], max_sequence_length)
    test_output_tensor = to_tensor_with_padding(data_dict["test_y"], corpus_dict["output_corpus_dict"], max_sequence_length)

    return {
        "train_input": train_input_tensor,
        "train_output": train_output_tensor,
        "val_input": validation_input_tensor,
        "val_output": validation_output_tensor,
        "test_input": test_input_tensor,
        "test_output": test_output_tensor
    }


In [None]:
def preprocess_data(lang: str):
    step1 = load_data(lang)
    step2 = create_corpus(step1)
    step3 = create_tensor(step1, step2)

    final_dict = {
        "train_input": step3["train_input"],
        "train_output": step3["train_output"],
        "val_input": step3["val_input"],
        "val_output": step3["val_output"],
        "test_input": step3["test_input"],
        "test_output": step3["test_output"],
        "input_corpus_length": step2["input_corpus_length"],
        "output_corpus_length": step2["output_corpus_length"],
        "input_corpus_dict": step2["input_corpus_dict"],
        "output_corpus_dict": step2["output_corpus_dict"],
        "reversed_input_corpus": step2["reversed_input_corpus"],
        "reversed_output_corpus": step2["reversed_output_corpus"],
        "train_x": step1["train_x"],
        "train_y": step1["train_y"],
        "val_x": step1["val_x"],
        "val_y": step1["val_y"],
        "test_x": step1["test_x"],
        "test_y": step1["test_y"],
        "max_decoder_length": step1["max_decoder_length"],
        "max_encoder_length": step1["max_encoder_length"]
    }

    return final_dict


In [None]:
class Encoder(nn.Module):
    def __init__(self, params):
        super(Encoder, self).__init__()

        # Extract hyperparameters from the input dictionary
        self.input_vocab_size = params["encoder_input_size"]
        self.embedding_dim = params["embedding_size"]
        self.hidden_dim = params["hidden_size"]
        self.num_rnn_layers = params["num_layers"]
        self.dropout_prob = params["drop_prob"]
        self.rnn_cell_type = params["cell_type"]
        self.is_bidirectional = params["bidirectional"]

        # Initialize layers and RNN cell selection
        self.embeddings = nn.Embedding(self.input_vocab_size, self.embedding_dim)
        self.rnn_dropout = nn.Dropout(self.dropout_prob)

        rnn_cell_choices = {
            "LSTM": nn.LSTM,
            "GRU": nn.GRU,
            "RNN": nn.RNN
        }
        self.rnn_layer = rnn_cell_choices[self.rnn_cell_type](
            self.embedding_dim, self.hidden_dim, self.num_rnn_layers,
            dropout=self.dropout_prob, bidirectional=self.is_bidirectional
        )

    def forward(self, inputs):
        embedding_output = self.embeddings(inputs)  # Embed the input sequence
        dropout_output = self.rnn_dropout(embedding_output)  # Apply dropout to the embeddings

        if self.rnn_cell_type in ["RNN", "GRU"]:
            _, final_hidden_state = self.rnn_layer(dropout_output)
            return final_hidden_state
        elif self.rnn_cell_type == "LSTM":
            _, (final_hidden_state, final_cell_state) = self.rnn_layer(dropout_output)
            return final_hidden_state, final_cell_state
        else:
            raise ValueError(f"Unsupported RNN cell type: {self.rnn_cell_type}")


In [None]:
class Decoder(nn.Module):
    def __init__(self, config):
        super(Decoder, self).__init__()

        # Extract hyperparameters from the configuration
        self.vocab_size = config["decoder_input_size"]
        self.embedding_dim = config["embedding_size"]
        self.hidden_dim = config["hidden_size"]
        self.output_vocab_size = config["decoder_output_size"]
        self.rnn_layers = config["num_layers"]
        self.dropout_prob = config["drop_prob"]
        self.rnn_cell_type = config["cell_type"]
        self.use_bidirectional = config["bidirectional"]

        # Layers initialization
        self.embedding_layer = nn.Embedding(self.vocab_size, self.embedding_dim)
        self.dropout_layer = nn.Dropout(self.dropout_prob)

        rnn_cells = {
            "LSTM": nn.LSTM,
            "GRU": nn.GRU,
            "RNN": nn.RNN
        }
        self.rnn_cell = rnn_cells[self.rnn_cell_type](
            self.embedding_dim, self.hidden_dim, self.rnn_layers,
            dropout=self.dropout_prob, bidirectional=self.use_bidirectional
        )

        # Fully connected layer to predict output tokens
        output_dim = se_


In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, enc_model, dec_model, hyperparams, data_info):
        super(Seq2Seq, self).__init__()
        self.encoder_model = enc_model
        self.decoder_model = dec_model
        self.teacher_forcing_prob = hyperparams["tfr"]  # Teacher forcing ratio
        self.processed_data_info = data_info

    def forward(self, source, target_seq):
        """
        Forward pass of the Seq2Seq model.

        Args:
            source (torch.Tensor): Source sequence of word indices.
            target_seq (torch.Tensor): Target sequence of word indices.

        Returns:
            torch.Tensor: Predicted output logits for each target word.
        """

        batch_size = source.shape[1]
        target_len = target_seq.shape[0]
        target_vocab_size = self.processed_data_info["output_corpus_length"]

        # Initialize output tensor for predictions
        predicted_logits = torch.zeros(target_len, batch_size, target_vocab_size).to(device)

        # Get encoder's hidden states
        if self.encoder_model.cell_type == "LSTM":
            encoder_hiddens, cell_state = self.encoder_model(source)
        elif self.encoder_model.cell_type in ["GRU", "RNN"]:
            encoder_hiddens = self.encoder_model(source)

        # Start with first word of target sequence
        current_input = target_seq[0]

        for t in range(1, target_len):
            # Decode with teacher forcing or predicted token
            if self.encoder_model.cell_type == "LSTM":
                predicted_token, encoder_hiddens, cell_state = self.decoder_model(current_input, encoder_hiddens, cell_state)
            else:
                predicted_token, encoder_hiddens = self.decoder_model(current_input, encoder_hiddens, None)

            predicted_logits[t] = predicted_token
            if random.random() < self.teacher_forcing_prob:
                current_input = target_seq[t]  # Use teacher forcing (ground truth)
            else:
                current_input = predicted_token.argmax(dim=1)  # Use model's prediction

        return predicted_logits


In [None]:
def configure_optimizer(optimizer_name, neural_network, lr_value):
    """
    Creates an optimizer object based on the specified name and learning rate.

    Args:
        optimizer_name (str): Name of the optimizer (e.g., "adam", "sgd", "rmsprop", "adagrad").
        neural_network (nn.Module): The PyTorch model to be optimized.
        lr_value (float): The learning rate to use for training.

    Returns:
        torch.optim.Optimizer: The created optimizer object.
    """

    # Define the optimizer based on the provided name
    selected_optimizer = None
    if optimizer_name == "adam":
        selected_optimizer = optim.Adam(neural_network.parameters(), lr=lr_value)
    elif optimizer_name == "sgd":
        selected_optimizer = optim.SGD(neural_network.parameters(), lr=lr_value)
    elif optimizer_name == "rmsprop":
        selected_optimizer = optim.RMSprop(neural_network.parameters(), lr=lr_value)
    elif optimizer_name == "adagrad":
        selected_optimizer = optim.Adagrad(neural_network.parameters(), lr=lr_value)
    else:
        # Raise an error if the optimizer name is invalid
        raise ValueError(f"Invalid optimizer name: {optimizer_name}")

    # Ensure an optimizer was created
    if selected_optimizer is None:
        raise ValueError("Failed to create optimizer. Please check the provided name.")

    return selected_optimizer


In [None]:
def beam_search_decoding(settings, seq2seq_model, input_word, computation_device, data_info):
    """
    Beam search decoding for sequence-to-sequence models.

    Args:
        settings (dict): Model hyperparameters.
            - encoder_cell_type (str): Type of RNN cell (LSTM, GRU, RNN).
            - beam_width (int): Beam width for beam search decoding.
            - length_penalty (float): Penalty for longer sequences.
        seq2seq_model (nn.Module): Seq2Seq model for sequence translation.
        input_word (str): Input word to translate.
        computation_device (torch.device): Device to use for computations (CPU or GPU).
        data_info (dict) : Contains all information of processed data.
            - input_corpus_dict (dict): Dictionary mapping input characters to integer indices.
            - output_corpus_dict (dict): Dictionary mapping integer indices to output characters.
            - reverse_output_corpus (dict): Dictionary mapping output characters to integer indices (for reversing prediction).
            - max_encoder_length (int): Maximum length of the encoder input sequence.

    Returns:
        str: Translated sentence.
    """

    input_vocab = data_info["input_corpus_dict"]
    output_vocab = data_info["output_corpus_dict"]
    max_input_length = data_info["max_encoder_length"]
    reverse_output_vocab = data_info["reversed_output_corpus"]

    # Preprocess input sentence
    input_tensor = torch.zeros((max_input_length + 1, 1), dtype=torch.int32).to(computation_device)
    for idx, char in enumerate(input_word):
        input_tensor[idx, 0] = input_vocab[char]
    input_tensor[idx + 1, 0] = input_vocab['$']  # Add end-of-sentence marker

    # Encode input sentence
    with torch.no_grad():
        if settings["cell_type"] == "LSTM":
            encoder_hidden, encoder_cell = seq2seq_model.encoder(input_tensor)
        else:
            encoder_hidden = seq2seq_model.encoder(input_tensor)

        # Initialize beam search
        start_symbol = output_vocab['#']  # Start-of-sentence symbol
        initial_sequence = torch.tensor([start_symbol]).to(computation_device)
        encoder_hidden = encoder_hidden.unsqueeze(0)  # Add batch dimension
        beam = [(0.0, initial_sequence, encoder_hidden)]  # List of (score, sequence, hidden state) tuples

    # Decode loop
    for _ in range(len(output_vocab)):
        candidate_sequences = []  # List for storing candidate sequences
        for score, sequence, hidden_state in beam:
            # Check for end-of-sentence token
            if sequence[-1].item() == output_vocab['$']:
                candidate_sequences.append((score, sequence, hidden_state))
                continue

            # Get last token and hidden state
            last_token = sequence[-1].unsqueeze(0).to(computation_device)
            hidden_state = hidden_state.squeeze(0)

            # Decode step with last token
            if settings["cell_type"] == "LSTM":
                output, hidden_state, encoder_cell = seq2seq_model.decoder(last_token, hidden_state, encoder_cell)
            else:
                output, hidden_state = seq2seq_model.decoder(last_token, hidden_state, None)

            # Get top-k probable tokens
            probabilities = F.softmax(output, dim=1)
            top_k_probs, top_k_tokens = torch.topk(probabilities, k=settings["beam_width"])

            # Expand beam with top-k candidate sequences
            for prob, token in zip(top_k_probs[0], top_k_tokens[0]):
                new_sequence = torch.cat((sequence, token.unsqueeze(0)), dim=0)
                length_penalty = ((len(new_sequence) - 1) / 5) ** settings["length_penalty"]
                candidate_score = score + torch.log(prob).item() / length_penalty
                candidate_sequences.append((candidate_score, new_sequence, hidden_state.unsqueeze(0)))

        # Select top-k beam candidates for next iteration
        beam = heapq.nlargest(settings["beam_width"], candidate_sequences, key=lambda x: x[0])

    # Get best sequence from beam search
    best_score, best_sequence, _ = max(beam, key=lambda x: x[0])

    # Convert predicted token indices to characters and reverse order
    translated_result = ''.join([reverse_output_vocab[token.item()] for token in best_sequence[1:]])[:-1]  # Remove start token and end token

    return translated_result
