# Develop a Generative Chatbot

This notebook outlines the code to train a generative chatbot using the Ubuntu dialogue corpus which can be found here: https://www.kaggle.com/datasets/rtatman/ubuntu-dialogue-corpus.
We built 2 sequence to sequence models with one using an attention mechanism that can be either Luong or Bahdanau and another that does not use attention.

## Step 1: Import Libraries

In [None]:
# Importing the libraries
import pandas as pd
import re
from torch import nn
from tqdm import tqdm
import torch
from torch.nn.utils.rnn import pad_sequence
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from tensorflow.keras.preprocessing.text import Tokenizer
import random
import sys
from sklearn.model_selection import train_test_split
import os
import numpy as np

from nltk.translate.bleu_score import sentence_bleu
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# For METEOR
import nltk
from nltk.translate.meteor_score import meteor_score

In [None]:
# Set the seed value all over the place to make this reproducible.
SEED = 42
random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

## Step 2: Data Cleaning and Preparation

Before building the training dataset, we must clean the data to handle unexpected characters and extra whitespaces. We also need to add start and end sequence tokens to help the model learn token positions.

In [None]:
def clean_text(text: str) -> str:
    """
    Clean the text by removing unnecessary characters and normalizing whitespace.

    Args:
        input: text (str): the text to be cleaned

    Returns:
        output: text (str): the cleaned
    """
    if pd.isnull(text):
        return ""

    # lowercase the text
    text = text.lower()

    # remove URLs if present
    text = re.sub(r'http\S+', '', text)

    # Remove special characters and multiple spaces
    text = re.sub(r'[^\w\s]', '', text)

    # Remove extra whitespaces
    text = re.sub(r'\s+', ' ', text).strip()
    return text

In [None]:
# Load the data
data = pd.read_csv("dialogueText.csv")

In [None]:
# Drop rows with missing values in the 'text' column
data = data.dropna(subset=['text'])

# Fill missing values in 'from' and 'to' columns with 'unknown'
data['from'] = data['from'].fillna('unknown')
data['to'] = data['to'].fillna('unknown')

# Apply the clean_text function to the 'text' column only
data['text'] = data['text'].apply(clean_text)

In [None]:
# Define the function to add <sos> and <eos> tokens after cleaning
def add_sos_eos(text: str) -> str:
    """
    Add <sos> (start of sentence) and <eos> (end of sentence) tokens to the text.

    Args:
        input: text (str): the text to be modified

    Returns:
        output: text (str): the text with <sos> and <eos> tokens added
    """
    return f"<sos> {text} <eos>"

In [None]:
# Apply the add_sos_eos function to the 'text' column only
data['text'] = data['text'].apply(add_sos_eos)

## Step 3: Tokenization

Sequence-to-sequence models cannot process text directly and require numeric representations. Therefore, we convert all sequences into numbers that map back to their string equivalents. We build a vocabulary from the text corpus, assigning each unique word an ID, and then convert each word to its corresponding ID.

In [None]:
def tokenize(df: pd.DataFrame, input_column: str, output_column: str) -> tuple[dict, dict, pd.DataFrame]:
    """
    Tokenize the text data in the specified column of a DataFrame.

    Args:
        df (pd.DataFrame): The input DataFrame containing the text data.
        input_column (str): The name of the column in the DataFrame to tokenize.
        output_column (str): The name of the column in the DataFrame that contains tokenized text.

    Returns:
        vocab (dict): A dictionary mapping words to their index.
        reverse_vocab (dict): A dictionary mapping indices to their words.
        df (pd.DataFrame): The DataFrame with an additional column containing tokenized text.
    """
    # Initialize tokenizer with out-of-vocabulary token set to <unk>
    tokenizer = Tokenizer(oov_token="<unk>", filters='!"#$%&()*+,-./:;=?@[\\]^_`{|}~\t\n')  # Use <unk> for out-of-vocabulary words

    # Fit the tokenizer to the specified column (this will add words to the word_index)
    tokenizer.fit_on_texts(df[input_column])
    tokenizer.word_index['<pad>'] = 0
    tokenizer.index_word[0] = '<pad>'

    # Create a vocabulary dictionary
    vocab = tokenizer.word_index
    reverse_vocab = tokenizer.index_word

    # Print the size of the vocabulary
    print(f"Size of the vocabulary in the text: {len(vocab)}")

    # Convert texts to tokenized sequences
    tokenized_texts = tokenizer.texts_to_sequences(df[input_column])

    # Add the tokenized sequences as a new column
    df[f"{output_column}"] = tokenized_texts

    return vocab, reverse_vocab, df

In [None]:
# Tokenize the data
vocab, reverse_vocab, data = tokenize(data, 'text', 'tokenized_text')

In [None]:
# Check the vocabulary
vocab

In [None]:
# Check the reverse vocabulary
reverse_vocab

In [None]:
# Check the first few rows of the data
data[:5]

## Step 4: Split the data

With the data tokenized, we split it into training, validation, and test sets to train the model on one set and evaluate its performance on unseen data. Our split is 80% for training data, 10% for validation, and 10% for testing.

In [None]:
# NEW
# Perform the dialogueID-aware split
unique_ids = data['dialogueID'].unique()

# We split the data so that 80% is allocated for the training set and the remaining 20% is then divided evenly between the validation and test set
train_ids, temp_ids = train_test_split(unique_ids, test_size=0.2, random_state=42)

# Split the temp set into validation and test sets. This means validation set and test set is 10% of all data each
val_ids, test_ids = train_test_split(temp_ids, test_size=0.5, random_state=42)

# Filter the original DataFrame based on the split IDs
train_set = data[data['dialogueID'].isin(train_ids)]
val_set = data[data['dialogueID'].isin(val_ids)]
test_set = data[data['dialogueID'].isin(test_ids)]

In [None]:
print(len(train_set))
print(len(test_set))
print(len(val_set))

## Step 5: Create Dialogue Pairs

Our goal is to train a chatbot, so it must learn from question-answer pairs to identify questions and respond appropriately. Training on individual sequences without pairing them prevents learning this relationship, making dialogue pairs essential.

In [None]:
def create_pairs(df: pd.DataFrame, text_column: str, tokenized_column: str) -> tuple[list[tuple[str, str]], list[tuple[torch.Tensor, torch.Tensor]]]:
    """
    Create dialogue pairs from the DataFrame.

    Args:
        df (pd.DataFrame): The input DataFrame containing the dialogue data.
        text_column (str): The name of the column in the DataFrame containing the text data.
        tokenized_column (str): The name of the column in the DataFrame containing the tokenized text data.

    Returns:
        text_dialogue_pairs (list[tuple[str, str]]): A list of dialogue pairs in text form.
        tokenized_dialogue_pairs (list[tuple[torch.Tensor, torch.Tensor]]): A list of dialogue pairs in tensor form.
    """

    # Sort by dialogueID and date
    df = df.sort_values(by=['dialogueID', 'date'])

    # Create dialogue pairs
    text_dialogue_pairs = []
    tokenized_dialogue_pairs = []
    # We group by the dialogueID so that all sequences related to a specific conversation are grouped together
    grouped = df.groupby('dialogueID')

    for _, group in tqdm(grouped):
        group = group.reset_index(drop=True)
        for i in range(len(group) - 1):
            # Source is the current row's text
            text_source = group.loc[i, text_column]
            # Target is the next row's text
            text_target = group.loc[i + 1, text_column]

            # Tokenized Source is the current row's tokenized text
            tokenized_source = group.loc[i, tokenized_column]
            # Tokenized Target is the next row's tokenized text
            tokenized_target = group.loc[i + 1, tokenized_column]

            if text_source and text_target:  # Ensure both source and target have text
                text_dialogue_pairs.append((text_source, text_target))

            # Ensure both tokenized source and target are present
            if tokenized_source and tokenized_target:
                # Convert to tensors and add to tokenized dialogue pairs
                tokenized_dialogue_pairs.append((torch.tensor(tokenized_source), torch.tensor(tokenized_target)))

    return text_dialogue_pairs, tokenized_dialogue_pairs

In [None]:
# Create dialogue pairs
train_text_dialogue_pairs, train_tokenized_dialogue_pairs = create_pairs(train_set, 'text', 'tokenized_text')
# Test set
test_text_dialogue_pairs, test_tokenized_dialogue_pairs = create_pairs(test_set, 'text', 'tokenized_text')
# Validation Set
val_text_dialogue_pairs, val_tokenized_dialogue_pairs = create_pairs(val_set, 'text', 'tokenized_text')

In [None]:
# Display a few text pairs
for source, target in train_text_dialogue_pairs[:5]:
    print(f"Source: {source}")
    print(f"Target: {target}\n")

In [None]:
# Display a few tokenized pairs
for source, target in train_tokenized_dialogue_pairs[:5]:
    print(f"Source: {source}")
    print(f"Target: {target}\n")

In [None]:
# Example of a tokenized pair
print(f"Vocabulary Size: {len(vocab)}")
print(f"Source sentence: {train_text_dialogue_pairs[0][1]}")
print(f"Source tokens: {train_tokenized_dialogue_pairs[0][1]}")
print(f"Target sentence: {train_text_dialogue_pairs[1][0]}")
print(f"Target tokens: {train_tokenized_dialogue_pairs[1][0]}")

## Step 6: Create our Dataloaders

Training models on batches of dialogue pairs, rather than individual ones, normalizes gradient updates and speeds up training. The code below defines an object to load batches of dialogue pairs from the training data.

In [None]:
def collate_fn(batch: list[tuple[torch.Tensor, torch.Tensor]]) -> tuple[torch.Tensor, torch.Tensor, list[int]]:
    """
    Custom collate function to pad the sequences in the batch to the same length.

    Args:
        batch (list[tuple[torch.Tensor, torch.Tensor]]): A list of tuples containing source and target sequences.

    Returns:
        sources_padded (torch.Tensor): Padded source sequences.
        targets_padded (torch.Tensor): Padded target sequences.
        input_lengths (list[int]): List of input lengths.
    """
    sources, targets = zip(*batch)

    # Extract the input lengths from the raw sources before padding
    input_lengths = [len(src) for src in sources]  # Lengths of each sequence

    sources_padded = pad_sequence(sources, batch_first=True, padding_value=0)
    targets_padded = pad_sequence(targets, batch_first=True, padding_value=0)
    return sources_padded, targets_padded, input_lengths

In [None]:
# Create DataLoader
batch_size = 64
train_loader = DataLoader(train_tokenized_dialogue_pairs, batch_size=batch_size, collate_fn=collate_fn, shuffle=True) 
test_loader = DataLoader(test_tokenized_dialogue_pairs, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
val_loader = DataLoader(val_tokenized_dialogue_pairs, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)

In [None]:
# Example of a batch
for src, tgt, input_lengths in train_loader:
    print(f"Source batch shape: {src.shape}")
    print(f"Target batch shape: {tgt.shape}")
    print(f"Input lengths: {input_lengths}")
    break

In [None]:
def print_progress_bar(epoch: int, batch_idx: int, total_batches: int, batch_loss: float, total_epochs: int):
    """
    Print the progress bar during training.

    Args:
        epoch (int): The current epoch.
        batch_idx (int): The index of the current batch.
        total_batches (int): The total number of batches in the dataset.
        batch_loss (float): The average loss of the current batch.
        total_epochs (int): The total number of epochs.
    """

    progress = (batch_idx + 1) / total_batches
    bar_length = 40  # Length of the progress bar
    filled_length = int(bar_length * progress)
    bar = "â–ˆ" * filled_length + "-" * (bar_length - filled_length)
    percentage = int(progress * 100)
    sys.stdout.write(
        f"\rEpoch {epoch}/{total_epochs}: |{bar}| {percentage}% "
        f"({batch_idx + 1}/{total_batches} batches), Loss: {batch_loss:.4f}"
    )
    sys.stdout.flush()

## Step 7: Defining the Sequence To Sequence Model Without Attention

In [None]:
class EncoderNoAttention(nn.Module):
    """
    Encoder module that encodes the input sequence without attention.
    """
    def __init__(self, input_vocab_len: int, embedding_size: int, hidden_dim: int, n_layers=1, drop_prob=0.1):
        """
        Args:
            input_vocab_len (int): The input vocabulary size.
            embedding_size (int): The size of the word embeddings.
            hidden_dim (int): The size of the hidden dimension.
            n_layers (int): The number of layers in the LSTM.
            drop_prob (float): The dropout probability.
        """
        super(EncoderNoAttention, self).__init__()

        self.hidden_dim = hidden_dim
        self.n_layers = n_layers

        self.embedding = nn.Embedding(input_vocab_len, embedding_size)
        self.dropout = nn.Dropout(drop_prob)
        self.lstm = nn.LSTM(embedding_size, hidden_dim, n_layers,
                            dropout=drop_prob, batch_first=True)

    def forward(self, inputs: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
        embedded = self.embedding(inputs)
        embedded = self.dropout(embedded)
        outputs, (hidden, cell) = self.lstm(embedded)
        return hidden, cell

In [None]:
class DecoderNoAttention(nn.Module):
    """
    Decoder module that decodes the input sequence without attention.
    """
    def __init__(self, output_vocab_len: int, embedding_size: int, hidden_dim: int, n_layers=1, drop_prob=0.1):
        """
        Args:
            output_vocab_len (int): The output vocabulary size.
            embedding_size (int): The size of the word embeddings.
            hidden_dim (int): The size of the hidden dimension.
            n_layers (int): The number of layers in the LSTM.
            drop_prob (float): The dropout probability.
        """
        super(DecoderNoAttention, self).__init__()

        self.hidden_dim = hidden_dim
        self.n_layers = n_layers

        self.embedding = nn.Embedding(output_vocab_len, embedding_size)
        self.dropout = nn.Dropout(drop_prob)
        self.lstm = nn.LSTM(embedding_size, hidden_dim, n_layers, dropout=drop_prob, batch_first=True)
        self.classifier = nn.Linear(hidden_dim, output_vocab_len)

    def forward(self, inputs: torch.tensor, hidden: torch.tensor, cell: torch.tensor) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        embedded = self.embedding(inputs).unsqueeze(1)
        embedded = self.dropout(embedded)

        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))

        # Pass LSTM outputs through a Linear layer acting as a classifier
        output = F.log_softmax(self.classifier(output.squeeze(1)), dim=1)

        return output, hidden, cell

## Step 8: Defining the function to train the Sequence to Sequence model without attention

In [None]:
def train_seq2seq_no_attention(dataloader: DataLoader, vocab: dict, start_of_sequence_token_index: int, padding_token_index: int, embedding_size=100, hidden_dim=256, epochs=10, lr=0.001, teacher_forcing_prob=0.5, device='cpu', folder_path='', model_filename='trained_seq_2_seq_no_attention.pth'):
    """
    Train the sequence-to-sequence model without attention.

    Args:
        dataloader (DataLoader): The DataLoader object containing the tokenized dialogue pairs.
        vocab (dict): The vocabulary dictionary.
        start_of_sequence_token_index (int): The index of the <sos> token.
        padding_token_index (int): The index of the <pad> token.
        embedding_size (int): The size of the word embeddings.
        hidden_dim (int): The size of the hidden dimension.
        epochs (int): The number of epochs to train the model.
        lr (float): The learning rate.
        teacher_forcing_prob (float): The probability of using teacher forcing.
        device (str): The device to run the model on ('cpu' or 'cuda').
        folder_path (str): The folder where the output model parameters will be saved.
        model_filename (str): The name of the model parameter file.
    """

    # Initialize the encoder and decoder
    encoder = EncoderNoAttention(len(vocab), embedding_size, hidden_dim).to(device)
    decoder = DecoderNoAttention(len(vocab), embedding_size, hidden_dim).to(device)

    # Initialize the optimizers
    encoder_optimizer = optim.Adam(encoder.parameters(), lr=lr)
    decoder_optimizer = optim.Adam(decoder.parameters(), lr=lr)

    # Define the loss function
    loss_fnc = nn.CrossEntropyLoss(ignore_index=padding_token_index)
    total_batches = len(dataloader)

    # Train the model
    for epoch in range(epochs):
        epoch_loss = 0.
        for batch_idx, batch in enumerate(dataloader):
            # Move the batch tensors to the device
            q_inputs, a_inputs, _ = batch
            q_inputs, a_inputs = q_inputs.to(device), a_inputs.to(device)

            # Get batch size and max target length
            batch_size = len(q_inputs)
            max_target_len = len(a_inputs)

            # Zero the gradients to prevent accumulation across batches, which can distort training dynamics.
            encoder_optimizer.zero_grad()
            decoder_optimizer.zero_grad()

            # Hidden and cell states of the encoder are inputs to the decoder
            decoder_hidden, decoder_cell = encoder(q_inputs)

            # Initialize the decoder input with the start of sequence token
            decoder_input = torch.tensor([start_of_sequence_token_index] * batch_size).to(device)
            loss = 0.
            for t in range(a_inputs.size(1)):
                # Forward pass
                decoder_output, decoder_hidden, decoder_cell = decoder(decoder_input, decoder_hidden, decoder_cell)
                # Get the top prediction
                _, top_index = decoder_output.topk(1)
                # Teacher forcing: next input is current target
                decoder_input = a_inputs[:, t] if random.random() < teacher_forcing_prob else top_index.squeeze().detach()
                # Calculate the loss
                loss += loss_fnc(decoder_output, a_inputs[:, t])

            # Backward pass
            loss.backward()


            # Clip gradients to prevent exploding gradients
            torch.nn.utils.clip_grad_norm_(encoder.parameters(), 1)
            torch.nn.utils.clip_grad_norm_(decoder.parameters(), 1)

            # Update parameters
            encoder_optimizer.step()
            decoder_optimizer.step()

            # Calculate the average loss
            batch_loss = loss.item() / max_target_len
            epoch_loss += batch_loss

            print_progress_bar(epoch + 1, batch_idx, total_batches, batch_loss, epochs)

        print(f"\n Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss/total_batches:.4f}")

    # Define the full save path for the model
    save_path = os.path.join(folder_path, model_filename)
    # Save the model
    torch.save({
        'encoder': encoder.state_dict(),
        'decoder': decoder.state_dict(),
        'e_optimizer': encoder_optimizer.state_dict(),
        'd_optimizer': decoder_optimizer.state_dict()
    }, save_path)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
folder_path = './'
model_filename = "trained_seq_2_seq_no_attention.pth"
train_seq2seq_no_attention(train_loader, vocab, vocab['<sos>'], vocab['<pad>'], epochs=2, device=device, folder_path=folder_path, model_filename=model_filename)

## Step 9: Defining the Sequence to Sequence model with Attention

In [None]:
class EncoderAttention(nn.Module):
    """
    Encoder module that encodes the input sequence with attention.
    """
    def __init__(self, input_vocab_len: int, embedding_size: int, hidden_size: int, n_layers=1, dropout=0.1):
        """
        Args:
            input_vocab_len (int): The input vocabulary size.
            embedding_size (int): The size of the word embeddings.
            hidden_size (int): The size of the hidden dimension.
            n_layers (int): The number of layers in the LSTM.
            dropout (float): The dropout probability.
        """
        super(EncoderAttention, self).__init__()

        self.hidden_size = hidden_size
        self.n_layers = n_layers
        self.dropout = dropout

        self.embedding = nn.Embedding(input_vocab_len, embedding_size)
        self.gru = nn.GRU(embedding_size, self.hidden_size, self.n_layers, dropout=(self.dropout if n_layers > 1 else 0), bidirectional=True, batch_first=True)

    def forward(self, x: torch.tensor, input_lengths: int, hidden=None, device='cpu') -> tuple[torch.tensor, torch.tensor]:
        """
        Forward pass of the encoder module.

        Args:
            x (torch.tensor): The input tensor.
            input_lengths (int): The lengths of the input sequences.
            hidden (torch.tensor): The hidden state tensor.
            device (str): The device to run the model on ('cpu' or 'cuda').

        Returns:
            outputs (torch.tensor): The output tensor.
            hidden (torch.tensor): The hidden state tensor.
        """
        # Moves the input to the device prior to processing
        x = x.to(device)
        # Convert inputs to embeddings
        x = self.embedding(x).to(device)
        # Pack the sequence for handling variable lengths
        packed = nn.utils.rnn.pack_padded_sequence(x, input_lengths, batch_first=True, enforce_sorted=False)
        # Pass through GRU
        outputs, hidden = self.gru(packed, hidden)
        # Unpack the sequence
        outputs, _ = nn.utils.rnn.pad_packed_sequence(outputs, batch_first=True)
        # Sum bidirectional outputs
        outputs = outputs[:, :, :self.hidden_size] + outputs[:, :, self.hidden_size:]
        # Move outputs and hidden states to the specified device
        return outputs.to(device), hidden.to(device) if hidden is not None else hidden

In [None]:
class Attn(nn.Module):
    """
    Attention module that computes the attention weights. This attention class supports Luong's general, Luong's dot, and Bahdanau's concatenation attention methods.
    """
    def __init__(self, method: str, hidden_size: int):
        """
        Args:
            method (str): The attention method to use.
            hidden_size (int): The size of the hidden dimension.
        """
        super(Attn, self).__init__()
        self.method = method
        self.hidden_size = hidden_size

        if self.method not in ['dot', 'general', 'concat']:
            raise ValueError(f"Unsupported attention method: {self.method}")

        # This handles one of Luong's Attention method
        if self.method == 'general':
            self.attn = nn.Linear(self.hidden_size, self.hidden_size)
        # This handles Bahdanau Attention method
        elif self.method == 'concat':
            self.attn = nn.Linear(self.hidden_size*2, self.hidden_size)
            self.v = nn.Parameter(torch.FloatTensor(1, hidden_size))

    def forward(self, hidden: torch.tensor, encoder_outputs: torch.tensor) -> torch.tensor:
        """
        Compute the attention weights.
        Args:
            hidden: Decoder hidden state (1, B, H) or (B, H) depending on implementation
            encoder_outputs: Encoder outputs (T, B, H)
        Returns:
            Attention weights of shape (B, 1, T)
        """
        # Ensure hidden is in (B, H) format
        if hidden.dim() == 3:
            hidden = hidden.squeeze(0)  # (B, H)
        # Compute the attention energies
        attn_energies = self.score(hidden, encoder_outputs)  # (B, T)
        # Normalize energies to get attention weights
        attn_weights = F.softmax(attn_energies, dim=1).unsqueeze(1)  # (B, 1, T)
        # Compute the context vector
        context = torch.bmm(attn_weights, encoder_outputs)  # (B, 1, H)
        return attn_weights, context

    def score(self, hidden: torch.tensor, encoder_outputs: torch.tensor) -> torch.tensor:
        """
        Calculate attention scores.
        Args:
            hidden: Decoder hidden state (1, B, H)
            encoder_outputs: Encoder outputs (T, B, H)
        Returns:
            Attention scores of shape (B, T)
        """
        # Handle "dot" attention (Luong)
        if self.method == 'dot':
            return torch.bmm(hidden.unsqueeze(1), encoder_outputs.transpose(1, 2)).squeeze(1)  # (B, T)

        # Handle "general" attention (Luong)
        elif self.method == 'general':
            energy = self.attn(encoder_outputs)  # (B, T, H)
            return torch.bmm(hidden.unsqueeze(1), energy.transpose(1, 2)).squeeze(1)  # (B, T)

        # Handle "concat" attention (Bahdanau)
        elif self.method == 'concat':
            hidden_expanded = hidden.unsqueeze(1).expand(-1, encoder_outputs.size(1), -1)  # (B, T, H)
            energy = self.attn(torch.cat((hidden_expanded, encoder_outputs), dim=2))  # (B, T, H)
            energy = torch.tanh(energy)  # (B, T, H)
            # Reshape v to (1, H, 1) for matrix multiplication
            v_reshaped = self.v.unsqueeze(2)  # (1, H, 1)

            # Perform batch matrix multiplication (B, T, H) with (1, H, 1)
            attn_scores = torch.bmm(energy, v_reshaped)  # (B, T, 1)
            return attn_scores.squeeze(2)  # (B, T)

In [None]:
class DecoderAttention(nn.Module):
    """
    Decoder module that decodes the input sequence with attention. This Decoder module support Luong and Bahdanau's attention which can be changed by passing in the attn_type and attn_approach
    """
    def __init__(self, output_vocab_len:int, embedding_size:int, hidden_size:int, attn_type:str, attn_approach:str, n_layers=1, dropout=0.1):
        """
        Args:
            output_vocab_len (int): The output vocabulary size.
            embedding_size (int): The size of the word embeddings.
            hidden_size (int): The size of the hidden dimension.
            attn_type (str): The type of attention to use ('luong' or 'bahdanau').
            attn_approach (str): The approach to use for attention ('general', 'dot', or 'concat'). Can only use 'general' or 'dot' with 'luong' as the attn_type and can only use 'concat' with 'bahdanau' as the attn_type.
            n_layers (int): The number of layers in the LSTM.
            dropout (float): The dropout probability.
        """
        super(DecoderAttention, self).__init__()

        # Keep for reference
        self.attn_type = attn_type
        self.attn_approach = attn_approach
        self.hidden_size = hidden_size
        self.n_layers = n_layers
        self.dropout = dropout

        # Define layers
        self.embedding = nn.Embedding(output_vocab_len, embedding_size)
        self.gru = nn.GRU(embedding_size, hidden_size, n_layers, dropout=(self.dropout if n_layers > 1 else 0), batch_first=True)
        self.out = nn.Linear(hidden_size, output_vocab_len)
        if self.attn_type == 'bahdanau':
            self.out = nn.Linear(hidden_size*2, output_vocab_len)


        # Attention mechanism
        self.attn = Attn(self.attn_approach, hidden_size)
        if self.attn_type == 'luong' and self.attn_approach in ['general', 'dot']:
            self.concat = nn.Linear(hidden_size * 2, hidden_size)

    def forward(self, input_seq: torch.tensor, last_hidden: torch.tensor, encoder_outputs: torch.tensor) -> tuple[torch.tensor, torch.tensor, torch.tensor]:
        """
        Forward pass for the decoder.

        Parameters:
        - input_seq: Input sequence of shape (B), where B is the batch size.
        - last_hidden: Previous hidden state of shape (n_layers, B, H).
        - encoder_outputs: Encoder outputs of shape (B, T, H), where T is sequence length.

        Returns:
        - output: Final output tensor of shape (B, output_size).
        - hidden: Updated hidden state of shape (n_layers, B, H).
        - attn_weights: Attention weights of shape (B, T).
        """

        # Handle bidirectional hidden states
        if last_hidden.size(0) == 2 * self.n_layers:  # For bidirectional hidden state
            last_hidden = last_hidden.view(self.n_layers, 2, -1, self.hidden_size).sum(1)  # (n_layers, B, H)
            last_hidden = last_hidden.unsqueeze(0)  # Add layer dimension (1, B, H)
        # Embedding layer
        embedded = self.embedding(input_seq)  # (B, H)
        #embedded = self.embedding_dropout(embedded)
        embedded = embedded.unsqueeze(1)  # (B, 1, H)
        # GRU step

        rnn_output, hidden = self.gru(embedded, last_hidden)  # rnn_output: (B, 1, H)

        rnn_output = rnn_output.squeeze(1)  # Remove the sequence dimension (B, H)
        #hidden = hidden.squeeze(0)  # Remove the layers dimension (B, H). not needed

        # Attention mechanism
        if self.attn_type == 'luong':
            attn_weights, context = self.attn(rnn_output, encoder_outputs)  # (B, T), (B, 1, H)
            context = context.squeeze(1)  # (B, H)
            concat_input = torch.cat((rnn_output, context), 1)  # (B, 2*H)
            concat_output = F.tanh(self.concat(concat_input))  # (B, H)
            output = self.out(concat_output)  # (B, output_size)

        elif self.attn_type == 'bahdanau':
            attn_weights, context = self.attn(hidden, encoder_outputs)  # (B, T), (B, 1, H)
            context = context.squeeze(1)  # (B, H)
            concatenated_output = torch.cat((rnn_output, context), dim=1)
            # After concatenation the shape is [B, hidden_size*2] and self.out has the shape of [hidden_size*2, len(vocab)] so need to transpose self.out to [len(vocab), hidden_size*2] and matmul will result in [B, (len(vocab))]
            output = torch.matmul(concatenated_output, self.out.weight.T)

        return output, hidden, attn_weights

## Step 10: Defining the functions to train the Sequence to Sequence model with attention

train_seq2seq_attention is the main function which in turn calls train_step_attention for each batch of training.

In [None]:
def train_step_attention(source: torch.tensor, target :torch.tensor, input_lengths: list[int],
                         encoder: nn.Module, decoder: nn.Module, encoder_optimizer: torch.optim,
                         decoder_optimizer: torch.optim, criterion: nn.Module, device: str, start_of_sequence_token_index: str) -> float:
    """
    Perform a single training step for the sequence-to-sequence model with attention.

    Args:
        source (torch.Tensor): The source tensor.
        target (torch.Tensor): The target tensor.
        input_lengths (list[int]): The lengths of the input sequences.
        encoder (nn.Module): The encoder model.
        decoder (nn.Module): The decoder model.
        encoder_optimizer (torch.optim): The encoder optimizer.
        decoder_optimizer (torch.optim): The decoder optimizer.
        criterion (nn.Module): The loss function.
        device (str): The device to run the model on ('cpu' or 'cuda').
        start_of_sequence_token_index (int): The index of the <sos> token.

    Returns:
        float: The loss value.
    """
    batch_size = source.size(0)
    max_target_len = target.size(1)

    # Move tensors to the specified device
    source, target = source.to(device), target.to(device)

    # Zero gradients
    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    # Encoder forward pass
    encoder_outputs, encoder_hidden = encoder(source, input_lengths, device=device)
    # Decoder initial state
    decoder_input = torch.tensor([start_of_sequence_token_index] * batch_size, device=device)
    decoder_hidden = encoder_hidden[:decoder.n_layers]  # Use only the forward hidden states
    loss = 0
    for t in range(1, max_target_len):
        decoder_output, decoder_hidden, _ = decoder(decoder_input, decoder_hidden, encoder_outputs)
        loss += criterion(decoder_output, target[:, t])
        decoder_input = target[:, t]  # Teacher forcing

    # Backpropagation
    loss.backward()

    # Clip gradients to prevent exploding gradients
    torch.nn.utils.clip_grad_norm_(encoder.parameters(), 1)
    torch.nn.utils.clip_grad_norm_(decoder.parameters(), 1)

    # Update parameters
    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.item() / max_target_len

In [None]:
def train_seq2seq_attention(dataloader: DataLoader, vocab: dict, start_of_sequence_token_index: int, padding_token_index: int, input_dim=100, hidden_dim=256, attn_type='luong', attn_approach='general', epochs=10, lr=0.001, device='cpu', folder_path='', model_filename='trained_seq_2_seq_no_attention.pth'):
    """
    Train the sequence-to-sequence model with attention.

    Args:
        dataloader (DataLoader): The DataLoader object containing the tokenized dialogue pairs.
        vocab (dict): The vocabulary dictionary.
        start_of_sequence_token_index (int): The index of the <sos> token.
        padding_token_index (int): The index of the <pad> token.
        input_dim (int): The size of the word embeddings.
        hidden_dim (int): The size of the hidden dimension.
        attn_type (str): The attention mechanism which can be either luong or bahdanau.
        attn_approach (str): The actual attention calculation which can be either dot or general for luong or concat for bahdanau.
        epochs (int): The number of epochs to train the model.
        lr (float): The learning rate.
        device (str): The device to run the model on ('cpu' or 'cuda').
        folder_path (str): The folder where the output model parameters will be saved.
        model_filename (str): The name of the model parameter file.
    """
    # Ensure the folder exists, or create it
    os.makedirs(folder_path, exist_ok=True)

    vocab_size = len(vocab)

    encoder = EncoderAttention(vocab_size, input_dim, hidden_dim).to(device)
    decoder = DecoderAttention(vocab_size, input_dim, hidden_dim, attn_type, attn_approach).to(device)

    encoder_optimizer = optim.Adam(encoder.parameters(), lr=lr)
    decoder_optimizer = optim.Adam(decoder.parameters(), lr=lr)

    criterion = nn.CrossEntropyLoss(ignore_index=padding_token_index)

    # Training loop
    for epoch in range(1, epochs + 1):
        epoch_loss = 0
        total_batches = len(dataloader)
        for batch_idx, (source, target, input_lengths) in enumerate(dataloader):
            batch_loss = train_step_attention(source, target, input_lengths, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, device, start_of_sequence_token_index)
            epoch_loss += batch_loss
            # Update progress bar
            print_progress_bar(epoch, batch_idx, total_batches, batch_loss, epochs)
            #time.sleep(0.01)  # Optional: Slows the output for demonstration purposes

        print(f"\n Epoch {epoch}/{epochs}, Loss: {epoch_loss/len(dataloader):.4f}")

    # Define the full save path for the model
    save_path = os.path.join(folder_path, model_filename)
    # Save the model
    torch.save({
        'encoder': encoder.state_dict(),
        'decoder': decoder.state_dict(),
        'e_optimizer': encoder_optimizer.state_dict(),
        'd_optimizer': decoder_optimizer.state_dict()
    }, save_path)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
folder_path = './'
model_filename = "trained_seq_2_seq_attention.pth"
train_seq2seq_attention(train_loader, vocab, vocab['<sos>'], vocab['<pad>'], epochs=5, device=device, attn_type='luong', attn_approach='general', folder_path=folder_path, model_filename=model_filename)

## Step 11: Trained no Attention Model Evaluation

In [None]:
def generate_answer_no_attention(question, vocab, reverse_vocab, start_of_sequence_token_index, end_of_sequence_token_index, embedding_size=100, hidden_size=256, max_length=50, device='cpu', saved_model=''):
    # Define the encoder and decoder models
    encoder = EncoderNoAttention(len(vocab), embedding_size, hidden_size).to(device)
    decoder = DecoderNoAttention(len(vocab), embedding_size, hidden_size).to(device)

    # If saved_model is not empty, attempt to load it
    if saved_model != '':
        checkpoint = torch.load(saved_model, map_location=device)
        # Load the saved model weights into the models
        encoder.load_state_dict(checkpoint['encoder'])
        decoder.load_state_dict(checkpoint['decoder'])

    # Put them in evaluation mode
    encoder.eval()
    decoder.eval()

    # Prepare the question input
    question_tensor = torch.tensor([vocab[word] for word in question.split()]).unsqueeze(0).to(device)

    # Encode the question
    decoder_hidden, decoder_cell = encoder(question_tensor)

    # Initialize the decoder input with the start of sequence token
    decoder_input = torch.tensor([start_of_sequence_token_index]).to(device)
    generated_answer = []

    for _ in range(max_length):
        # Pass the latest token (decoder_input), the previous decoder hidden state (initially the encoder's final hidden state), and the decoder cell state to the decoder
        decoder_output, decoder_hidden, decoder_cell = decoder(decoder_input, decoder_hidden, decoder_cell)
        # Get the token with the highest probability
        topv, topi = decoder_output.topk(1)
        # If the token is the <eos> token then stop the text generation
        if topi.item() == end_of_sequence_token_index:
            break
        else:
            # If not <eos> then add it to the answer array
            generated_answer.append(reverse_vocab[topi.item()])
        # Get the latest token and store it as the input for the next iteration of the decoder
        decoder_input = topi.squeeze().detach().unsqueeze(0)

    return ' '.join(generated_answer)

In [None]:
# Example usage
model_path = "trained_seq_2_seq_no_attention.pth"
question = test_text_dialogue_pairs[0][0]
print("Question:", question)
answer = generate_answer_no_attention(question, vocab, reverse_vocab, vocab["<sos>"], vocab["<eos>"], saved_model=model_path)
print("Answer:", answer)

## Step 12: Trained Attention Model Evaluation

In [None]:
def generate_answer_attention(question, vocab, reverse_vocab, start_of_sequence_token_index, end_of_sequence_token_index, embedding_size=100, hidden_size=256, attn_type='luong', attn_approach='general', max_length=50, device='cpu', saved_model=''):
    encoder = EncoderAttention(len(vocab), embedding_size, hidden_size).to(device)
    decoder = DecoderAttention(len(vocab), embedding_size, hidden_size, attn_type, attn_approach).to(device)

    # If saved_model is not empty, attempt to load it
    if saved_model != '':
        checkpoint = torch.load(saved_model, map_location=device)
        # Load the saved model weights into the models
        encoder.load_state_dict(checkpoint['encoder'])
        decoder.load_state_dict(checkpoint['decoder'])

    encoder.eval()
    decoder.eval()

    # Prepare the question input
    question_tensor = torch.tensor([vocab[word] for word in question.split()]).unsqueeze(0).to(device)
    input_lengths = torch.tensor([len(question.split())]).to(device)

    # Encode the question
    encoder_outputs, encoder_hidden = encoder(question_tensor, input_lengths, device=device)

    # Initialize the decoder input with the start of sequence token
    decoder_input = torch.tensor([start_of_sequence_token_index], device=device)
    decoder_hidden = encoder_hidden[:decoder.n_layers]  # Use only the forward hidden states
    generated_answer = []

    for _ in range(max_length):
        # Pass the latest token (decoder_input), the previous decoder hidden state (initially the encoder's final hidden state), and the decoder cell state to the decoder
        decoder_output, decoder_hidden, _ = decoder(decoder_input, decoder_hidden, encoder_outputs)
        # Get the token with the highest probability
        topv, topi = decoder_output.topk(1)
        # If the token is the <eos> token then stop the text generation
        if topi.item() == end_of_sequence_token_index:
            break
        else:
            # If not <eos> then add it to the answer array
            generated_answer.append(reverse_vocab[topi.item()])

        # Get the latest token and store it as the input for the next iteration of the decoder
        decoder_input = topi.squeeze().detach().unsqueeze(0)

    return ' '.join(generated_answer)

In [None]:
# Example usage
model_path = "trained_seq_2_seq_attention.pth" 
question = test_text_dialogue_pairs[0][0]
print("Question:", question)
attention_type = 'luong'
attention_approach = 'general'
answer = generate_answer_attention(question, vocab, reverse_vocab, vocab["<sos>"], vocab["<eos>"], attn_type=attention_type, attn_approach=attention_approach, saved_model=model_path)
print("Answer:", answer)

## Additional model testing

In [None]:
# Example usage
model_path = "trained_seq_2_seq_no_attention.pth"
question = test_text_dialogue_pairs[0][0]
print("Question:", question)
answer = generate_answer_no_attention(question, vocab, reverse_vocab, vocab["<sos>"], vocab["<eos>"], saved_model=model_path)
print("Answer:", answer)

In [None]:
# Example usage
model_path = "trained_seq_2_seq_attention.pth"
question = test_text_dialogue_pairs[0][0]
print("Question:", question)
attention_type = 'luong'
attention_approach = 'general'
answer = generate_answer_attention(question, vocab, reverse_vocab, vocab["<sos>"], vocab["<eos>"], attn_type=attention_type, attn_approach=attention_approach, saved_model=model_path)
print("Answer:", answer)

# STEP 14: Evaluation

In [None]:
def download_nltk_resources():
    """
    Download necessary NLTK resources for evaluation metrics.
    """
    try:
        nltk.download('wordnet')
        nltk.download('omw-1.4')
        nltk.download('punkt')
    except Exception as e:
        print(f"Error downloading NLTK resources: {e}")

In [None]:
import torch
import numpy as np
from nltk.translate.bleu_score import sentence_bleu
from nltk.translate.meteor_score import meteor_score
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from tqdm import tqdm

class ModelEvaluator:
    def __init__(self, model_no_attention, model_attention, test_loader,
                 vocab, reverse_vocab, start_token, end_token, device='cuda:0'):
        self.model_no_attention = model_no_attention
        self.model_attention = model_attention
        self.test_loader = test_loader
        self.vocab = vocab
        self.reverse_vocab = reverse_vocab
        self.start_token = start_token
        self.end_token = end_token
        self.device = device

        # Assuming you want to download NLTK resources for METEOR
        import nltk
        nltk.download('punkt')
        nltk.download('wordnet')

    def generate_answers_without_attention(self):
        references = []
        candidates = []

        with torch.no_grad():
            for batch in tqdm(self.test_loader, desc='Generating answers without attention'):
                question, ground_truth = batch
                question = question # Move to device if using CUDA

                # Generate answer without attention
                generated_answer = generate_answer_no_attention(
                    question, self.vocab, self.reverse_vocab,
                    self.vocab["<sos>"], self.vocab["<eos>"],
                    saved_model=self.model_no_attention, device=self.device
                )
                references.append(ground_truth)
                candidates.append(generated_answer)

        return references, candidates

    def generate_answers_with_attention(self):
        references = []
        candidates = []

        with torch.no_grad():
            for batch in tqdm(self.test_loader, desc='Generating answers with attention'):
                question, ground_truth = batch
                question = question  # Move to device if using CUDA

                # Generate answer with attention
                attention_type = 'luong'
                attention_approach = 'general'
                generated_answer = generate_answer_attention(
                    question, self.vocab, self.reverse_vocab,
                    self.vocab["<sos>"], self.vocab["<eos>"],
                    attn_type=attention_type, attn_approach=attention_approach,
                    saved_model=self.model_attention, device=self.device
                )

                references.append(ground_truth)
                candidates.append(generated_answer)

        return references, candidates

    def calculate_bleu_score(self, references, candidates):
        bleu_scores = []
        # Add tqdm progress bar for BLEU calculation
        for ref, cand in tqdm(zip(references, candidates), desc='Calculating BLEU scores', total=len(references)):
            ref_tokens = ref.split()
            cand_tokens = cand.split()
            score = sentence_bleu([ref_tokens], cand_tokens)
            bleu_scores.append(score)

        return np.mean(bleu_scores)

    def calculate_cosine_similarity(self, references, candidates):
        vectorizer = CountVectorizer().fit(references + candidates)
        ref_vectors = vectorizer.transform(references)
        cand_vectors = vectorizer.transform(candidates)
        similarities = cosine_similarity(ref_vectors, cand_vectors)
        pair_similarities = [similarities[i][i] for i in range(len(references))]
        return np.mean(pair_similarities)

    def calculate_meteor_score(self, references, candidates):
        meteor_scores = []
        # Add tqdm progress bar for METEOR calculation
        for ref, cand in tqdm(zip(references, candidates), desc='Calculating METEOR scores', total=len(references)):
            ref_tokens = ref.split()
            cand_tokens = cand.split()
            try:
                score = meteor_score([ref_tokens], cand_tokens)
                meteor_scores.append(score)
            except Exception as e:
                print(f"METEOR score calculation error: {e}")
                meteor_scores.append(0.0)

        return np.mean(meteor_scores)

    def evaluate_models(self):
        # Generate answers
        attention_refs, attention_cands = self.generate_answers_with_attention()
        no_attention_refs, no_attention_cands = self.generate_answers_without_attention()

        # Calculate metrics for models without attention
        no_attention_metrics = {
            'bleu_score': self.calculate_bleu_score(no_attention_refs, no_attention_cands),
            'cosine_similarity': self.calculate_cosine_similarity(no_attention_refs, no_attention_cands),
            'meteor_score': self.calculate_meteor_score(no_attention_refs, no_attention_cands)
        }

        # Calculate metrics for models with attention
        attention_metrics = {
            'bleu_score': self.calculate_bleu_score(attention_refs, attention_cands),
            'cosine_similarity': self.calculate_cosine_similarity(attention_refs, attention_cands),
            'meteor_score': self.calculate_meteor_score(attention_refs, attention_cands)
        }

        return {
            'no_attention': no_attention_metrics,
            'attention': attention_metrics,
            'attention_refs': attention_refs,
            'attention_cands': attention_cands,
            'no_attention_refs': no_attention_refs,
            'no_attention_cands':no_attention_cands
        }


In [None]:
model_no_attention= "trained_seq_2_seq_no_attention.pth"
model_attention = "trained_seq_2_seq_attention.pth"

evaluator = ModelEvaluator(
    model_no_attention=model_no_attention,
    model_attention=model_attention,
    test_loader=test_text_dialogue_pairs[:50],
    vocab=vocab,
    reverse_vocab=reverse_vocab,
    start_token="<sos>",
    end_token="<eos>",
    device='cpu'
)

metrics = evaluator.evaluate_models()

# Output the metrics
print("No Attention Model Metrics:", metrics['no_attention'])
print("Attention Model Metrics:", metrics['attention'])