# LSTM Bot

## Project Overview

In this project, you will build a chatbot that can converse with you at the command line. The chatbot will use a Sequence to Sequence text generation architecture with an LSTM as it's memory unit. You will also learn to use pretrained word embeddings to improve the performance of the model. At the conclusion of the project, you will be able to show your chatbot to potential employers.

Additionally, you have the option to use pretrained word embeddings in your model. We have loaded Brown Embeddings from Gensim in the starter code below. You can compare the performance of your model with pre-trained embeddings against a model without the embeddings.



---



A sequence to sequence model (Seq2Seq) has two components:
- An Encoder consisting of an embedding layer and LSTM unit.
- A Decoder consisting of an embedding layer, LSTM unit, and linear output unit.

The Seq2Seq model works by accepting an input into the Encoder, passing the hidden state from the Encoder to the Decoder, which the Decoder uses to output a series of token predictions.

## Dependencies

- Pytorch
- Numpy
- Pandas
- NLTK
- Gzip
- Gensim


Please choose a dataset from the Torchtext website. We recommend looking at the Squad dataset first. Here is a link to the website where you can view your options:

- https://pytorch.org/text/stable/datasets.html





In [10]:
!pip uninstall torchtext==0.12.0 -y
!pip install torch==1.8.0 torchtext==0.9.0 torch-utils datasets transformers
!pip install ipywidgets

import ipywidgets

Found existing installation: torchtext 0.9.0
Uninstalling torchtext-0.9.0:
  Successfully uninstalled torchtext-0.9.0
Defaulting to user installation because normal site-packages is not writeable
Collecting torchtext==0.9.0
  Using cached torchtext-0.9.0-cp37-cp37m-manylinux1_x86_64.whl (7.1 MB)
Installing collected packages: torchtext
Successfully installed torchtext-0.9.0
Defaulting to user installation because normal site-packages is not writeable
Collecting ipywidgets
  Downloading ipywidgets-8.1.0-py3-none-any.whl (139 kB)
[K     |████████████████████████████████| 139 kB 4.1 MB/s eta 0:00:01
Collecting widgetsnbextension~=4.0.7
  Downloading widgetsnbextension-4.0.8-py3-none-any.whl (2.3 MB)
[K     |████████████████████████████████| 2.3 MB 45.2 MB/s eta 0:00:01
[?25hCollecting jupyterlab-widgets~=3.0.7
  Downloading jupyterlab_widgets-3.0.8-py3-none-any.whl (214 kB)
[K     |████████████████████████████████| 214 kB 61.2 MB/s eta 0:00:01
[?25hCollecting comm>=0.1.3
  Downloadin

In [1]:
"""My main Seq2Seq model, with encoder and decoder layers."""

import random
from typing import Any, Tuple

import torch
from torch import Tensor, nn

train_on_gpu = torch.cuda.is_available()

class Encoder(nn.Module):
    """My Encoder layer."""

    def __init__(self, input_size: int, hidden_size: int, embedding_dim: int) -> None:
        super(Encoder, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(input_size, embedding_dim)  # , hidden_size?
        self.lstm_1 = nn.LSTM(  # type: ignore
            embedding_dim, self.hidden_size * 2, 4, dropout=0.5, batch_first=True
        )
        self.lstm_2 = nn.LSTM(  # type: ignore
            self.hidden_size * 2, self.hidden_size * 4, 4, dropout=0.2
        )

    def forward(self, x: Tensor) -> Tuple[Tensor, Any, Any]:
        """
        Forward method for the tensor network.

        Todo
        ----
        * Define the Any's at the return type

        Parameters
        ----------
            x : Tensor
                the src vector

        Returns
        -------
            x : Tensor
                the encoder outputs
            h :
                the hidden state
            c :
                the cell state
        """
        # x = self.embedding(x)
        # x, (h, c) = self.lstm_1(x)
        # x, (h, c) = self.lstm_2(x, (h, c))
        #
        # return x, h, c
        # Add this line to check the shape of the input tensor
        print("Input shape:", x.shape)
        # Add this line to check the values of the input tensor
        print("Input values:", x)
        x = self.embedding(x)
        # Add this line to check the shape of the embedded tensor
        print("Embedded shape:", x.shape)
        # Add this line to check the values of the embedded tensor
        print("Embedded values:", x)
        x, (h, c) = self.lstm_1(x)
        # Add this line to check the shape of the LSTM 1 output
        print("LSTM 1 output shape:", x.shape)
        # Add this line to check the values of the LSTM 1 output
        print("LSTM 1 output values:", x)
        x, (h, c) = self.lstm_2(x, (h, c))
        # Add this line to check the shape of the LSTM 2 output
        print("LSTM 2 output shape:", x.shape)
        # Add this line to check the values of the LSTM 2 output
        print("LSTM 2 output values:", x)
        return x, h, c


class Decoder(nn.Module):
    """My decoder with a LSTM layer."""

    def __init__(self, hidden_size: int, output_size: int, embedding_dim: int) -> None:
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size

        self.embedding = nn.Embedding(self.output_size, embedding_dim)
        self.lstm_1 = nn.LSTM(  # type: ignore
            embedding_dim, hidden_size, num_layers=2, dropout=0.3, batch_first=True
        )

        self.output = nn.Linear(hidden_size, self.output_size)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x: Tensor, h: Tuple[Tensor, Tensor]) -> Tuple[Any, Any]:
        """
        Forward method for the tensor network.

        Notes
        -----
            LSTMs, or Long Short-Term Memory units, are a type of recurrent neural
            network (RNN) that have feedback connections. This means they can process
            sequences of data, retaining a 'memory' of the previous states of the data
            as they process each new timestep. This makes them ideal for tasks
            involving sequential data, like time series prediction, natural language
            processing, and more.

        Parameters
        ----------
            x : Tensor
                the src vector
            h : Tuple[Tensor, Tensor]
                The hidden statek cell state

        Returns
        -------
            x :
                The prediction
            h :
                the hidden state
        """
        x = self.embedding(x)
        x, h = self.lstm_1(x, h)

        x = self.output(x)
        x = self.softmax(x)

        return x, h


class Seq2Seq(nn.Module):
    """The main NN architecture."""

    def __init__(
        self,
        encoder_input_size: int,
        encoder_hidden_size: int,
        decoder_hidden_size: int,
        decoder_output_size: int,
    ) -> None:
        super(Seq2Seq, self).__init__()
        self.hidden_dim = 10
        self.encoder = Encoder(encoder_input_size, encoder_hidden_size, self.hidden_dim)
        self.decoder = Decoder(decoder_hidden_size, decoder_output_size, self.hidden_dim)

    def forward(
        self, src: Tensor, trg: Tensor, teacher_forcing_ratio: float = 0.5
    ) -> Tensor:
        """
        Forward method for the tensor network.

        Todo
        ----
        * Define the Any's at the return type

        Parameters
        ----------
            src : Tensor
                the src vector
            trg : Tensor
                the trg vector
            teacher_forcing_ratio : float
                the teacher forcing ratio

        Returns
        -------
            x : Tensor
                the prediction
            h :
                the hidden state
            c :
                the cell state
        """
        # Initialize an empty tensor to store decoder outputs
        outputs = torch.zeros(trg.shape[0], trg.shape[1], self.decoder.output_size, device=trg.device)

        # First, the source sequence (src) is passed through the encoder
        _, hidden, cell = self.encoder(src)

        # The initial decoder input is the <sos> token, i.e., the first token of the target sequence
        decoder_input = trg[:, 0].unsqueeze(1)

        # Iteratively decode each time step
        for t in range(1, trg.shape[1]):
            # Pass the decoder input, hidden, and cell states to the decoder
            decoder_output, (hidden, cell) = self.decoder(decoder_input, (hidden, cell))

            # Store the decoder output in the outputs tensor
            outputs[:, t] = decoder_output.squeeze(1)

            # Decide if we will use teacher forcing or not
            teacher_force = random.random() < teacher_forcing_ratio

            # Get the most probable token
            top1 = decoder_output.argmax(2)

            # If teacher forcing, use the actual next token as next input. If not, use the predicted token
            decoder_input = trg[:, t].unsqueeze(1) if teacher_force else top1

        return outputs

    def init_hidden(self, batch_size: int) -> Tuple[Tuple[Tensor, Tensor], Tuple[Tensor, Tensor]]:
        """Initializes hidden state"""
        # Create two new tensors with sizes n_layers x batch_size x hidden_dim,
        # initialized to zero, for hidden state and cell state of LSTM
        weight = next(self.parameters()).data

        # Initialize hidden states for Encoder
        # Encoder has 2 LSTMs with 4 layers each
        encoder_h = weight.new(8, batch_size, self.encoder.hidden_size * 4).zero_()
        encoder_c = weight.new(8, batch_size, self.encoder.hidden_size * 4).zero_()

        # Initialize hidden states for Decoder
        # Decoder has 1 LSTM with 2 layers
        decoder_h = weight.new(2, batch_size, self.decoder.hidden_size).zero_()
        decoder_c = weight.new(2, batch_size, self.decoder.hidden_size).zero_()

        if train_on_gpu:
            encoder_h, encoder_c = encoder_h.cuda(), encoder_c.cuda()
            decoder_h, decoder_c = decoder_h.cuda(), decoder_c.cuda()

        return ((encoder_h, encoder_c), (decoder_h, decoder_c))


In [2]:
"""My Vocabulary class."""

from typing import List

class Vocabulary:
    """A vocabulary."""

    def __init__(self, debug=False) -> None:
        """
        Vocabulary.

        Attributes
        ----------
        word2index : dict
            A dictionary mapping words to indices.
        index2word : dict
            A dictionary mapping indices to words.
        n_words : int
            The number of words in the vocabulary.

        Parameters
        ----------
        debug : bool
            If true, will print useful debug information.
        """
        self.word2index = {
            "<PAD>": 0,
            "<SOS>": 1,
            "<EOS>": 2,
            "<UNK>": 3
        }
        self.index2word = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        self.n_words = 4  # Starting count considering the special tokens
        self.debug = debug

    def add_sentence(self, sentence: List[str]) -> None:
        """
        Add a sentence to the vocabulary.

        Parameters
        ----------
        sentence : list
            A list of words.
        """
        for word in sentence:
            self.add_word(word)

    def add_word(self, word: str) -> None:
        """
        Add a word to the vocabulary.

        Parameters
        ----------
        word : str
            A word.
        """
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.index2word[self.n_words] = word
            self.n_words += 1
            if self.debug:
                print(f"Added word: {word} with index: {self.n_words}")

    def to_index(self, word: str) -> int:
        """
        Convert a word to its index.

        Parameters
        ----------
        word : str
            A word.

        Returns
        -------
        int
            The index of the word or index of "<UNK>" for unknown words.
        """
        return self.word2index.get(word, self.word2index["<UNK>"])

    def to_word(self, index: int) -> str:
        """
        Convert an index to its word.

        Parameters
        ----------
        index : int
            An index.

        Returns
        -------
        str
            The word or "<UNK>" for unknown indices.
        """
        return self.index2word.get(index, "<UNK>")


In [4]:
"""
My main Chat-Bot file, potentially to be converted to the main CLI file.
"""

# from .vocabulary import Vocabulary
# from .model import Seq2Seq
from typing import List, Dict, Any, cast

import torch
import os
import sys
import contextlib
from torch.utils.data import DataLoader
import numpy as np
from datasets import load_dataset, disable_progress_bar
from transformers import AutoTokenizer

# Disable all progress bars to avoid the Udacity lack up Jupyter Updates
import tqdm

train_on_gpu = torch.cuda.is_available()


class ChatBot:
    """
    A ChatBot utilizing a Sequence to Sequence model.

    Attributes
    ----------
    debug : bool
        Whether to print debugging information.
    vocabulary : Vocabulary
        The vocabulary object.
    model : Seq2Seq
        The sequence-to-sequence model.
    dataset : Dataset
        The dataset to be used.
    """

    def __init__(self, debug=False):
        """
        Initialize the ChatBot.

        Parameters
        ----------
        debug : bool, optional
            Whether to print debugging information. Defaults to False.
        """
        self.debug = debug
        self.vocabulary = Vocabulary(debug=self.debug)
        self.model = None
        self.dataset = None

    @staticmethod
    def get_batches(arr, batch_size: int, seq_length: int):
        """Create a generator that returns batches of size
        batch_size x seq_length from arr.

        Arguments
        ---------
            arr : dict
                Dictionary containing data you want to make batches from.
            batch_size :  integer
                Batch size, the number of sequences per batch.
            seq_length : integer
                Number of encoded chars in a sequence.
        """

        if not isinstance(arr, torch.Tensor):
            arr = torch.tensor(arr)  # Convert to PyTorch tensor

        # Determine the number of batches we can make
        total = batch_size * seq_length
        n_batches = len(arr) // total

        # Keep only enough characters to make full batches
        arr = arr[: n_batches * total]

        # Reshape into batch_size rows
        arr = arr.reshape((batch_size, -1))

        # Iterate over the batches using a window of size seq_length
        for n in range(0, arr.shape[1], seq_length):
            x = arr[:, n : n + seq_length]
            y = torch.zeros_like(x)
            print(f"x slice shape: {x.shape}")
            print(f"y slice shape: {y.shape}")

            try:
                y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n + seq_length]
            except IndexError:
                y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
            yield x, y

    def load_hyperparameters(self):
        """
        Load the hyperparameters for the model.
        """
        # Set your hyperparameters here. For instance:
        self.embedding_dim = 256
        self.hidden_size = 512
        self.encoder_input_size = len(self.vocabulary.word2index)
        self.decoder_output_size = len(self.vocabulary.word2index)
        self.learning_rate = 0.001
        self.epochs = 10
        self.batch_size = 128
        self.clip = 5.0

    def initialize_model(self):
        """
        Initialize the Seq2Seq model with the given hyperparameters.
        """
        self.model = Seq2Seq(
            encoder_input_size=self.encoder_input_size,
            encoder_hidden_size=self.hidden_size,
            decoder_hidden_size=self.hidden_size,
            decoder_output_size=self.decoder_output_size,
        )
        self.optimizer = torch.optim.Adam(
            self.model.parameters(), lr=self.learning_rate
        )
        self.criterion = torch.nn.CrossEntropyLoss()

    def train(self):
        """
        Train the model on the dataset.
        """
        # Define your training loop here
        if not self.dataset:
            raise ValueError("Dataset has not been loaded.")

        if train_on_gpu:
            self.model.cuda()

        counter = 0
        n_chars = len(self.vocabulary.word2index)
        if self.debug:
            print("Vocabulary Size (n_chars):", n_chars)
        for epoch in range(self.epochs):
            if self.debug:
                print(f"Training epoch: {epoch+1}/{self.epochs}")

            # Initialize hidden state
            h = self.model.init_hidden(batch_size=self.batch_size)

            # for x, y in self.get_batches(
            #     self.train_data, batch_size=self.batch_size, seq_length=n_chars
            # ):
            # for (attention_mask, input_ids, label, token_type_ids) in self.dataloader:
            for (_, x, y, __) in self.dataloader:
                counter += 1

                if self.debug:
                    print(f"Shapes: x, y: {(x.shape, y.shape)}", end="")

                x = torch.nn.functional.one_hot(x, num_classes=n_chars)
                if self.debug:
                    print(f"One-hot encoded x shape: {x.shape}")

                x, y = torch.from_numpy(x), torch.from_numpy(y)
                if train_on_gpu:
                    x, y = x.cuda(), y.cuda()

                if self.debug:
                    print(f"x shape: {x.shape}, y shape: {y.shape}")

                self.model.zero_grad()
                if self.debug:
                    print("Zero Graded....", end="")

                output, h = self.model(x, h)
                if self.debug:
                    print(f"output shape: {output.shape}", end="")

                loss = self.criterion(output, y)
                if self.debug:
                    print("\rGot Loss....", end="")

                loss.backward()
                if self.debug:
                    print("\rGot Backward....", end="")

                torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.clip)
                self.optimizer.step()
                if self.debug:
                    print("\rGot Optimized....", end="")

                if counter % 100 == 0:
                    if self.debug:
                        print("\r", end="")
                    print(f"Loss: {loss.item()}")

    @contextlib.contextmanager
    def suppress_output(self):
        """
        Udacity has NOT update it's jupyter notebook, use this to suppress

        ImportError: FloatProgress not found. Please update jupyter and ipywidgets.
        See https://ipywidgets.readthedocs.io/en/stable/user_install.html
        """
        with open(os.devnull, 'w') as fnull:
            old_out = os.dup(1)
            old_err = os.dup(2)
            os.dup2(fnull.fileno(), 1)
            os.dup2(fnull.fileno(), 2)
            try:
                yield
            finally:
                os.dup2(old_out, 1)
                os.dup2(old_err, 2)
                os.close(old_out)
                os.close(old_err)

    def load_dataset(self, dataset_name="glue"):
        """
        Load a dataset using huggingface datasets.

        Parameters
        ----------
        dataset_name : str, optional
            The name of the dataset. Defaults to "glue".
        """

        with self.suppress_output():
            self.dataset = load_dataset(dataset_name, "mrpc", split="train")
            tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

            self.dataset = self.dataset.map(
                lambda e: tokenizer(e["sentence1"], truncation=True, padding="max_length"),
                batched=True,
            )

            if train_on_gpu:
                self.dataset.set_format(
                    type="torch",
                    columns=["input_ids", "token_type_ids", "attention_mask", "label"],
                    device="cuda",
                )
            else:
                self.dataset.set_format(
                    type="torch",
                    columns=["input_ids", "token_type_ids", "attention_mask", "label"],
                )
            # self.dataloader = DataLoader(self.dataset, batch_size=self.batch_size)
            self.dataloader = DataLoader(self.dataset)

    def assert_seq2seq(self):
        """
        Assert the Seq2Seq model to ensure its correctness.
        """
        # Add your assertion code here
        pass

    def use_pretrained_embeddings(self, embeddings):
        """
        Optionally use pretrained word embeddings in the model.

        Parameters
        ----------
        embeddings : Any
            The pre-trained embeddings.
        """
        # If you decide to use pre-trained embeddings, implement this method.
        pass

    def evaluate(self):
        """
        Evaluate the model's performance.
        """
        # You can implement methods to evaluate your model's performance here.
        pass

    def interact(self):
        """
        Interact with the chatbot.
        """
        # Here you'll write code to interact with the model in a dialogue manner.
        pass


def main():
    """
    Main function to run the chatbot.
    """
    bot = ChatBot(debug=True)


    disable_progress_bar()
    tqdm.tqdm.disable = True
    bot.load_dataset()
    bot.load_hyperparameters()
    bot.initialize_model()

    bot.train()

    bot.interact()


if __name__ == "__main__":
    main()


Found cached dataset glue (/root/.cache/huggingface/datasets/glue/mrpc/1.0.0/dacbe3125aa31d7f70367a07a8a9e72a5a0bfeb5fc42e75c9db75b96da6053ad)
Loading cached processed dataset at /root/.cache/huggingface/datasets/glue/mrpc/1.0.0/dacbe3125aa31d7f70367a07a8a9e72a5a0bfeb5fc42e75c9db75b96da6053ad/cache-a9d2111ec7901b64.arrow


RuntimeError: cuDNN error: CUDNN_STATUS_NOT_INITIALIZED