### Importing Required Libraries

In [1]:
import re
import unicodedata
import numpy as np
import tensorflow as tf
import warnings

warnings.filterwarnings("ignore")

### Load the Dataset

In [2]:
with open("../datasmalltalk.txt", "r") as file:
    raw_data = [line.split("\t") for line in file.read().split("\n")]
raw_data_np = np.array(raw_data)
questions = raw_data_np[:, 0]
answers = np.where(raw_data_np[:, 1] != "", raw_data_np[:, 1], "")

## Data Preprocessing

### Data Tokenizing

In [3]:
from typing import List, Optional, Tuple, Any, Union


def tokenize(
    lang: List[str],
    max_vocab_size: Optional[int] = None,
    max_seq_length: Optional[int] = None,
    padding_type: str = "post",
) -> Tuple[np.ndarray, tf.keras.preprocessing.text.Tokenizer]:
    """
    Tokenizes a list of sentences and pads sequences to a maximum length.

    Args:
        lang (list): A list of sentences to be tokenized.
        max_vocab_size (int, optional): Maximum vocabulary size. Words outside the vocabulary will be ignored. Defaults to None.
        max_seq_length (int, optional): Maximum sequence length. Sequences longer than this will be truncated, and sequences shorter will be padded. Defaults to None.
        padding_type (str, optional): Type of padding. Can be 'pre' or 'post'. Defaults to 'post'.

    Returns:
        tensor (numpy.ndarray): A 2D numpy array representing the tokenized and padded sequences.
        lang_tokenizer (tf.keras.preprocessing.text.Tokenizer): A Tokenizer object fitted on the input language.
    """
    # Initialize a Tokenizer object
    lang_tokenizer = tf.keras.preprocessing.text.Tokenizer(
        num_words=max_vocab_size, filters=""
    )

    # Fit the Tokenizer on the input language
    lang_tokenizer.fit_on_texts(lang)

    # Convert sentences to sequences of tokens
    tensor = lang_tokenizer.texts_to_sequences(lang)

    # Pad sequences to a maximum length if provided
    if max_seq_length is not None:
        tensor = tf.keras.preprocessing.sequence.pad_sequences(
            tensor, padding=padding_type, maxlen=max_seq_length
        )
    else:
        tensor = tf.keras.preprocessing.sequence.pad_sequences(
            tensor, padding=padding_type
        )

    return tensor, lang_tokenizer

### Text Preprocessing

In [4]:
def preprocess_sentence(sentence: str) -> str:
    """
    Preprocesses a single sentence by removing diacritics, converting to lowercase,
    adding start and end tokens, and cleaning punctuation.

    Args:
        sentence (str): Input sentence to be preprocessed.

    Returns:
        str: Preprocessed sentence.
    """
    # Remove diacritics
    sentence = "".join(
        c
        for c in unicodedata.normalize("NFD", sentence)
        if unicodedata.category(c) != "Mn"
    )

    # Convert to lowercase and strip leading/trailing whitespaces
    sentence = sentence.lower().strip()

    # Add space around punctuation marks
    sentence = re.sub(r"([?.!,¿])", r" \1 ", sentence)

    # Replace multiple spaces with a single space
    sentence = re.sub(r'[" "]+', " ", sentence)

    # Remove any characters that are not letters, punctuation marks, or spaces
    sentence = re.sub(r"[^a-zA-Z?.!,¿]+", " ", sentence)

    # Add start and end tokens
    sentence = "<start> " + sentence + " <end>"

    return sentence

In [5]:
import multiprocessing


def preprocess_sentences(sentences: List[str]) -> List[str]:
    """
    Preprocesses a list of sentences in parallel using multiprocessing.

    Args:
        sentences (list): List of sentences to be preprocessed.

    Returns:
        list: Preprocessed sentences.
    """
    with multiprocessing.Pool() as pool:
        preprocessed_sentences = pool.map(preprocess_sentence, sentences)
    return preprocessed_sentences


# Preprocess questions and answers using multiprocessing
pre_questions = preprocess_sentences(questions)
pre_answers = preprocess_sentences(answers)
data = pre_answers, pre_questions

In [6]:
def prepare_data(
    data: Tuple[list, list],
    max_vocab_size: Optional[int] = None,
    max_seq_length: Optional[int] = None,
) -> Tuple[
    np.ndarray,
    np.ndarray,
    tf.keras.preprocessing.text.Tokenizer,
    tf.keras.preprocessing.text.Tokenizer,
]:
    """
    Prepares data for training by tokenizing input and target languages.

    Args:
        data (Tuple[list, list]): A tuple containing target and input language data.
        max_vocab_size (Optional[int], optional): Maximum vocabulary size for tokenization. Defaults to None.
        max_seq_length (Optional[int], optional): Maximum sequence length for padding. Defaults to None.

    Returns:
        Tuple[np.ndarray, np.ndarray, tf.keras.preprocessing.text.Tokenizer, tf.keras.preprocessing.text.Tokenizer]: A tuple containing input tensor, target tensor, input language tokenizer, and target language tokenizer.
    """
    targ_lang, inp_lang = data

    # Tokenize input and target languages
    input_tensor, inp_lang_tokenizer = tokenize(
        inp_lang, max_vocab_size, max_seq_length
    )
    target_tensor, targ_lang_tokenizer = tokenize(
        targ_lang, max_vocab_size, max_seq_length
    )

    return input_tensor, target_tensor, inp_lang_tokenizer, targ_lang_tokenizer

In [7]:
input_tensor, target_tensor, inp_lang, targ_lang = prepare_data(data)
max_length_targ = target_tensor.shape[1]
max_length_inp = input_tensor.shape[1]
print(max_length_targ)
print(max_length_inp)

24
24


### Download Tokenizers

In [8]:
import pickle


def save_tokenizer(tokenizer, filename):
    with open(filename, "wb") as handle:
        pickle.dump(tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL)


save_tokenizer(inp_lang, "input_tokenizer.pkl")
save_tokenizer(targ_lang, "target_tokenizer.pkl")

## Data Splitting

In [9]:
def save_tokenizer(tokenizer: Any, filename: str) -> None:
    """
    Saves a tokenizer object to a file using pickle serialization.

    Args:
        tokenizer (Any): Tokenizer object to be saved.
        filename (str): Name of the file to save the tokenizer to.
    """
    with open(filename, "wb") as handle:
        pickle.dump(tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL)


def load_tokenizer(filename: str) -> Any:
    """
    Loads a tokenizer object from a file using pickle deserialization.

    Args:
        filename (str): Name of the file containing the tokenizer object.

    Returns:
        Any: Loaded tokenizer object.
    """
    with open(filename, "rb") as handle:
        tokenizer = pickle.load(handle)
    return tokenizer


save_tokenizer(inp_lang, "input_tokenizer.pkl")
save_tokenizer(targ_lang, "target_tokenizer.pkl")

In [10]:
from sklearn.model_selection import train_test_split

# Splitting the data into 90% train, 10% validation
input_tensor_train, input_tensor_val, target_tensor_train, target_tensor_val = (
    train_test_split(input_tensor, target_tensor, test_size=0.1, random_state=42)
)

## Model Building

### Defining the PipeLine

In [11]:
BUFFER_SIZE = len(input_tensor_train)
BATCH_SIZE = 64
steps_per_epoch = len(input_tensor_train) // BATCH_SIZE
embedding_dim = 400
units = 1500
vocab_inp_size = len(inp_lang.word_index) + 1
vocab_tar_size = len(targ_lang.word_index) + 1


def create_dataset(
    input_tensor: tf.Tensor, target_tensor: tf.Tensor, batch_size: int
) -> tf.data.Dataset:
    """
    Creates a TensorFlow dataset from input and target tensors.

    Args:
        input_tensor (tf.Tensor): Tensor containing input sequences.
        target_tensor (tf.Tensor): Tensor containing target sequences.
        batch_size (int): Batch size for the dataset.

    Returns:
        tf.data.Dataset: TensorFlow dataset containing input-target pairs.
    """
    # Determine buffer size for shuffling
    buffer_size = len(input_tensor)

    # Create dataset from tensors and shuffle
    dataset = tf.data.Dataset.from_tensor_slices((input_tensor, target_tensor)).shuffle(
        buffer_size
    )

    # Batch the dataset
    dataset = dataset.batch(batch_size, drop_remainder=True)

    return dataset


# Create dataset
dataset = create_dataset(input_tensor_train, target_tensor_train, BATCH_SIZE)

# Example input and target batches
example_input_batch, example_target_batch = next(iter(dataset))
example_input_batch.shape, example_target_batch.shape

example_input_batch, example_target_batch = next(iter(dataset))
example_input_batch.shape, example_target_batch.shape

(TensorShape([64, 24]), TensorShape([64, 24]))

In [12]:
class Encoder(tf.keras.Model):
    """
    Encoder class for sequence-to-sequence models.

    Args:
        vocab_size (int): Size of the vocabulary.
        embedding_dim (int): Dimensionality of the embedding space.
        enc_units (int): Number of units in the encoder LSTM layer.
        batch_sz (int): Batch size.

    Attributes:
        batch_sz (int): Batch size.
        enc_units (int): Number of units in the encoder LSTM layer.
        embedding (tf.keras.layers.Embedding): Embedding layer.
        lstm (tf.keras.layers.LSTM): LSTM layer.

    Methods:
        call(x, hidden):
            Performs the forward pass of the encoder.
        initialize_hidden_state():
            Initializes the hidden state of the encoder.
    """

    def __init__(
        self, vocab_size: int, embedding_dim: int, enc_units: int, batch_sz: int
    ):
        super(Encoder, self).__init__()
        self.batch_sz = batch_sz
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.lstm = tf.keras.layers.LSTM(
            self.enc_units,
            return_sequences=True,
            return_state=True,
            recurrent_initializer="glorot_uniform",
        )

    def call(self, x: tf.Tensor, hidden: tf.Tensor) -> Union[tf.Tensor, list]:
        """
        Performs the forward pass of the encoder.

        Args:
            x (tf.Tensor): Input tensor of shape (batch_size, sequence_length).
            hidden (tf.Tensor): Initial hidden state of the LSTM.

        Returns:
            output (tf.Tensor): Output tensor of shape (batch_size, sequence_length, enc_units).
            state (list): List containing the final hidden states.
        """
        x = self.embedding(x)
        output, state_h, state_c = self.lstm(x, initial_state=hidden)
        state = [state_h, state_c]
        return output, state

    def initialize_hidden_state(self) -> list:
        """
        Initializes the hidden state of the encoder.

        Returns:
            list: List containing the initial hidden states.
        """
        return [
            tf.zeros((self.batch_sz, self.enc_units)),
            tf.zeros((self.batch_sz, self.enc_units)),
        ]


encoder = Encoder(vocab_inp_size, embedding_dim, units, BATCH_SIZE)

In [13]:
class Attention(tf.keras.layers.Layer):
    """
    Custom attention layer for sequence-to-sequence models.

    Args:
        units (int): Number of units for the attention mechanism.

    Attributes:
        W1 (tf.keras.layers.Dense): Dense layer for query transformation.
        W2 (tf.keras.layers.Dense): Dense layer for values transformation.
        V (tf.keras.layers.Dense): Dense layer for attention score computation.

    Methods:
        call(query, values):
            Computes the attention weights and context vector.
    """

    def __init__(self, units: int):
        """
        Initializes the attention layer.

        Args:
            units (int): Number of units for the attention mechanism.
        """
        super(Attention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, query: tf.Tensor, values: tf.Tensor) -> Union[tf.Tensor, tf.Tensor]:
        """
        Performs the forward pass of the attention mechanism.

        Args:
            query (tf.Tensor): Query tensor of shape (batch_size, query_length, hidden_size).
            values (tf.Tensor): Values tensor of shape (batch_size, values_length, hidden_size).

        Returns:
            context_vector (tf.Tensor): Context vector of shape (batch_size, hidden_size).
            attention_weights (tf.Tensor): Attention weights of shape (batch_size, values_length, 1).
        """
        query_with_time_axis = tf.expand_dims(query, 1)
        score = self.V(tf.nn.tanh(self.W1(query_with_time_axis) + self.W2(values)))
        attention_weights = tf.nn.softmax(score, axis=1)
        context_vector = attention_weights * values
        context_vector = tf.reduce_sum(context_vector, axis=1)

        return context_vector, attention_weights

In [14]:
class Decoder(tf.keras.Model):
    """
    Decoder class for sequence-to-sequence models with attention.

    Args:
        vocab_size (int): Size of the target vocabulary.
        embedding_dim (int): Dimensionality of the embedding space.
        dec_units (int): Number of units in the decoder LSTM layer.
        batch_sz (int): Batch size.

    Attributes:
        batch_sz (int): Batch size.
        dec_units (int): Number of units in the decoder LSTM layer.
        embedding (tf.keras.layers.Embedding): Embedding layer.
        lstm (tf.keras.layers.LSTM): LSTM layer.
        fc (tf.keras.layers.Dense): Fully connected layer.
        attention (Attention): Attention mechanism.

    Methods:
        call(x, hidden, enc_output):
            Performs the forward pass of the decoder.
    """

    def __init__(
        self, vocab_size: int, embedding_dim: int, dec_units: int, batch_sz: int
    ):
        super(Decoder, self).__init__()
        self.batch_sz = batch_sz
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.lstm = tf.keras.layers.LSTM(
            self.dec_units,
            return_sequences=True,
            return_state=True,
            recurrent_initializer="glorot_uniform",
        )
        self.fc = tf.keras.layers.Dense(vocab_size)
        self.attention = Attention(self.dec_units)

    def call(
        self, x: tf.Tensor, hidden: list, enc_output: tf.Tensor
    ) -> Union[tf.Tensor, list, tf.Tensor]:
        """
        Performs the forward pass of the decoder.

        Args:
            x (tf.Tensor): Input tensor of shape (batch_size, 1).
            hidden (list): List of initial hidden states.
            enc_output (tf.Tensor): Encoder output tensor of shape (batch_size, sequence_length, hidden_size).

        Returns:
            x (tf.Tensor): Output tensor of shape (batch_size, vocab_size).
            state (list): List of final hidden states.
            attention_weights (tf.Tensor): Attention weights of shape (batch_size, sequence_length, 1).
        """
        context_vector, attention_weights = self.attention(hidden[0], enc_output)

        x = self.embedding(x)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
        output, state_h, state_c = self.lstm(x, initial_state=hidden)
        state = [state_h, state_c]
        output = tf.reshape(output, (-1, output.shape[2]))
        x = self.fc(output)
        return x, state, attention_weights


decoder = Decoder(vocab_tar_size, embedding_dim, units, BATCH_SIZE)

###Adjusting Learning Rates

In [15]:
# Define the initial learning rate for adaptive learning rate
initial_learning_rate = 0.001

# Define the learning rate schedule using Exponential Decay
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=initial_learning_rate,
    decay_steps=1000,
    decay_rate=0.9,
    staircase=True,
)

# Define the optimizer with the adaptive learning rate
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

# Define the loss function
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction="none"
)


def loss_function(real: tf.Tensor, pred: tf.Tensor) -> tf.Tensor:
    """
    Custom loss function for sequence-to-sequence models.

    Args:
        real (tf.Tensor): Ground truth labels, shape (batch_size, sequence_length).
        pred (tf.Tensor): Predicted logits, shape (batch_size, sequence_length, vocab_size).

    Returns:
        loss (tf.Tensor): Mean loss over the sequence, excluding padding tokens.
    """
    # Create a mask to exclude padding tokens from loss calculation
    mask = tf.math.logical_not(tf.math.equal(real, 0))

    # Calculate the loss using Sparse Categorical Crossentropy
    loss_ = loss_object(real, pred)

    # Apply the mask to the loss
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    # Compute the mean loss over the sequence
    loss = tf.reduce_mean(loss_)
    return loss

###Defining Train Step

In [16]:
@tf.function
def train_step(inp: tf.Tensor, targ: tf.Tensor, enc_hidden: tf.Tensor) -> tf.Tensor:
    """
    Performs a single training step.

    Args:
        inp (tf.Tensor): Input sequences, shape (batch_size, input_sequence_length).
        targ (tf.Tensor): Target sequences, shape (batch_size, target_sequence_length).
        enc_hidden (tf.Tensor): Initial hidden state of the encoder, shape (batch_size, encoder_units).

    Returns:
        batch_loss (tf.Tensor): Average loss over the batch.
    """
    loss = 0

    with tf.GradientTape() as tape:
        # Forward pass through the encoder
        enc_output, enc_hidden = encoder(inp, enc_hidden)

        # Initialize decoder hidden state from encoder's last hidden state
        dec_hidden = [enc_hidden[0][:, :units], enc_hidden[1][:, :units]]

        # Initialize decoder input with start token
        dec_input = tf.expand_dims([targ_lang.word_index["<start>"]] * BATCH_SIZE, 1)

        # Teacher forcing - feeding the target as the next input
        for t in range(1, targ.shape[1]):
            # Forward pass through the decoder
            predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)

            # Compute loss
            loss += loss_function(targ[:, t], predictions)

            # Use teacher forcing by passing the target as the next input
            dec_input = tf.expand_dims(targ[:, t], 1)

    # Compute average loss over the batch
    batch_loss = loss / int(targ.shape[1])

    # Get trainable variables
    variables = encoder.trainable_variables + decoder.trainable_variables

    # Compute gradients
    gradients = tape.gradient(loss, variables)

    # Update weights
    optimizer.apply_gradients(zip(gradients, variables))

    return batch_loss

In [17]:
@tf.function
def validation_step(
    inp: tf.Tensor, targ: tf.Tensor, enc_hidden: tf.Tensor
) -> tf.Tensor:
    """
    Performs a single validation step.

    Args:
        inp (tf.Tensor): Input sequences, shape (batch_size, input_sequence_length).
        targ (tf.Tensor): Target sequences, shape (batch_size, target_sequence_length).
        enc_hidden (tf.Tensor): Initial hidden state of the encoder, shape (batch_size, encoder_units).

    Returns:
        val_loss (tf.Tensor): Average validation loss over the batch.
    """
    val_loss = 0
    val_samples = 0

    # Forward pass through the encoder
    enc_output, enc_hidden = encoder(inp, enc_hidden)

    # Initialize decoder hidden state from encoder's last hidden state
    dec_hidden = [enc_hidden[0][:, :units], enc_hidden[1][:, :units]]

    # Initialize decoder input with start token
    dec_input = tf.expand_dims([targ_lang.word_index["<start>"]] * BATCH_SIZE, 1)

    for t in range(1, targ.shape[1]):
        # Forward pass through the decoder
        predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)

        # Compute loss
        loss = loss_function(targ[:, t], predictions)
        val_loss += loss
        val_samples += 1

        # Use teacher forcing by passing the target as the next input
        dec_input = tf.expand_dims(targ[:, t], 1)

    # Compute average validation loss over the batch
    val_loss /= val_samples

    return val_loss

###Training the Pipeline

In [18]:
from tqdm import tqdm

# Define the number of epochs
EPOCHS = 5

# Lists to store training and validation losses
train_losses = []
val_losses = []

# Training loop
for epoch in range(1, EPOCHS + 1):
    enc_hidden = (
        encoder.initialize_hidden_state()
    )  # Initialize encoder hidden state for each epoch
    total_loss = 0

    # Create a tqdm progress bar for training batches
    with tqdm(total=steps_per_epoch, desc=f"Epoch {epoch}", unit="batch") as pbar:
        for batch, (inp, targ) in enumerate(dataset.take(steps_per_epoch)):
            batch_loss = train_step(inp, targ, enc_hidden)
            total_loss += batch_loss
            pbar.update(1)  # Update progress bar
            pbar.set_postfix(
                {"loss": total_loss / (batch + 1)}
            )  # Update loss in progress bar

    # Validation loop over batches
    validation_dataset = tf.data.Dataset.from_tensor_slices(
        (input_tensor_val, target_tensor_val)
    )
    validation_dataset = validation_dataset.batch(BATCH_SIZE, drop_remainder=True)
    val_loss = 0
    val_samples = 0
    for batch, (inp, targ) in enumerate(validation_dataset):
        enc_hidden = (
            encoder.initialize_hidden_state()
        )  # Initialize encoder hidden state for each batch
        val_batch_loss = validation_step(inp, targ, enc_hidden)
        val_loss += val_batch_loss
        val_samples += 1

    # Compute average validation loss
    val_loss /= val_samples

    # Store training and validation losses for plotting
    train_losses.append(total_loss / steps_per_epoch)
    val_losses.append(val_loss)

Epoch 1: 100%|██████████| 52/52 [01:21<00:00,  1.58s/batch, loss=tf.Tensor(2.0429952, shape=(), dtype=float32)]
Epoch 2: 100%|██████████| 52/52 [00:40<00:00,  1.27batch/s, loss=tf.Tensor(1.7405486, shape=(), dtype=float32)]
Epoch 3: 100%|██████████| 52/52 [00:22<00:00,  2.34batch/s, loss=tf.Tensor(1.5771211, shape=(), dtype=float32)]
Epoch 4: 100%|██████████| 52/52 [00:40<00:00,  1.27batch/s, loss=tf.Tensor(1.4425851, shape=(), dtype=float32)]
Epoch 5: 100%|██████████| 52/52 [00:21<00:00,  2.39batch/s, loss=tf.Tensor(1.3388522, shape=(), dtype=float32)]
