<a href="https://colab.research.google.com/github/TomasMendozaHN/ICDF_Class/blob/main/05112022_LSTM_TextGeneration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Begin by defining the LSTM model

In [2]:
import torch
from torch import nn

class Model(nn.Module):
    def __init__(self, dataset):
        super(Model, self).__init__()
        self.lstm_size = 128
        self.embedding_dim = 128
        self.num_layers = 3

        # Embedding is a layer that converts words into one-hot encodings
        # For example:
        # hello my name is Tomas = 00001 00010 00100 01000 1000
        # As you can see, each unique word has it's own "embedding"
        # in this case, the embedding is a one-hot encoded vector
        # therefore, the moroe unique words you have, the longer each vector will be
        # Also, this means you can't generate words that the LSTM has not seen
        n_vocab = len(dataset.uniq_words)
        self.embedding = nn.Embedding(
            num_embeddings=n_vocab,
            embedding_dim=self.embedding_dim,
        )
        self.lstm = nn.LSTM(
            input_size=self.lstm_size,
            hidden_size=self.lstm_size,
            num_layers=self.num_layers,
            dropout=0.2,
        )
        self.fc = nn.Linear(self.lstm_size, n_vocab)

    def forward(self, x, prev_state):
        embed = self.embedding(x)
        output, state = self.lstm(embed, prev_state)
        logits = self.fc(output)
        return logits, state

    def init_state(self, sequence_length):
        return (torch.zeros(self.num_layers, sequence_length, self.lstm_size),
                torch.zeros(self.num_layers, sequence_length, self.lstm_size))

ValueError: ignored

# Test loading an online CSV with the dataset in it

In [None]:
import pandas as pd
dataset = pd.read_csv("https://raw.githubusercontent.com/amoudgl/short-jokes-dataset/master/data/reddit-cleanjokes.csv")
print(dataset)

# Visualize the entire text (all jokes) as a single string

In [None]:
text_as_a_single_string = dataset['Joke'].str.cat(sep=' ')
print(text_as_a_single_string)

# Separate all words in that string

In [None]:
individual_words = text_as_a_single_string.split(' ')
print(individual_words)

# Obtain a list containing all words (without repetition) in decreasing order of frequency

In [None]:
from collections import Counter
unique_words = Counter(individual_words)
unique_words_in_order_of_frequency = sorted(unique_words, key=unique_words.get, reverse=True)
print(unique_words_in_order_of_frequency)
print(len(unique_words_in_order_of_frequency))

# Convert each word to an integer (for embedding)

In [None]:
index_to_word = {index: word for index, word in enumerate(unique_words_in_order_of_frequency)}
print(index_to_word)

# Convert each integer (embedding) back into word

In [None]:
word_to_index = {word: index for index, word in enumerate(unique_words_in_order_of_frequency)}
print(word_to_index)

# Convert the entire string of jokes into integers (using the word_to_index dictionary)

In [None]:
words_indexes = [word_to_index[w] for w in individual_words]
print(words_indexes)

# Now that we have seen how we need to prepare our data, we must create a Dataset function that will do this automatically

In [None]:
import torch
import pandas as pd
from collections import Counter

# Remember: Every dataset function you create MUST have the following three methods:
# 1. __init__      --> must read and prepare the dataset
# 2. __len__       --> must return the number of datapoints in your entire dataset
# 3. __getitem__   --> must return a batch of data
class Dataset(torch.utils.data.Dataset):
    def __init__(self, sequence_length):
        self.sequence_length = sequence_length
        self.words = self.load_words()
        self.uniq_words = self.get_uniq_words()

        self.index_to_word = {index: word for index, word in enumerate(self.uniq_words)}
        self.word_to_index = {word: index for index, word in enumerate(self.uniq_words)}

        self.words_indexes = [self.word_to_index[w] for w in self.words]

    def __len__(self):
        return len(self.words_indexes) - self.sequence_length

    def __getitem__(self, index):
        return (
            torch.tensor(self.words_indexes[index:index+self.sequence_length]),
            torch.tensor(self.words_indexes[index+1:index+self.sequence_length+1]),
        )

    def load_words(self):
        train_df = pd.read_csv("https://raw.githubusercontent.com/amoudgl/short-jokes-dataset/master/data/reddit-cleanjokes.csv")
        text = train_df['Joke'].str.cat(sep=' ')
        return text.split(' ')

    def get_uniq_words(self):
        word_counts = Counter(self.words)
        return sorted(word_counts, key=word_counts.get, reverse=True)



# initialize the Dataloader

In [None]:
sequence_length = 6
dataset = Dataset(sequence_length=sequence_length)

# Test the output of the dataloader

In [None]:
for a,b in dataset:
  print(f"originally, our dataset returns a = {a}, b = {b}")
  
  # Converting both tensors into strings
  a,b = a.numpy(), b.numpy()
  a = [index_to_word[x] for x in a] 
  b = [index_to_word[x] for x in b]
  
  print(f"converting these back into text, we have: a = {a}, b = {b}")
  break

# Initialize the model

In [None]:
model = Model(dataset)

# Begin training!

In [None]:
max_epochs = 10
batch_size = 128

In [None]:
import argparse
import torch
import numpy as np
from torch import nn, optim
from torch.utils.data import DataLoader

def train(dataset, model, max_epochs, sequence_length, batch_size):
    model.train()

    dataloader = DataLoader(dataset, batch_size=batch_size)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(max_epochs):
        state_h, state_c = model.init_state(sequence_length)

        for batch, (x, y) in enumerate(dataloader):
            optimizer.zero_grad()

            y_pred, (state_h, state_c) = model(x, (state_h, state_c))
            loss = criterion(y_pred.transpose(1, 2), y)

            state_h = state_h.detach()
            state_c = state_c.detach()

            loss.backward()
            optimizer.step()

            print({ 'epoch': epoch, 'batch': batch, 'loss': loss.item() })

In [None]:
train(dataset, model, max_epochs=max_epochs, sequence_length=sequence_length, batch_size=batch_size)

In [None]:
def predict(dataset, model, text, next_words=100):
    model.eval()

    words = text.split(' ')
    state_h, state_c = model.init_state(len(words))

    for i in range(0, next_words):
        x = torch.tensor([[dataset.word_to_index[w] for w in words[i:]]])
        y_pred, (state_h, state_c) = model(x, (state_h, state_c))

        last_word_logits = y_pred[0][-1]
        p = torch.nn.functional.softmax(last_word_logits, dim=0).detach().numpy()
        word_index = np.random.choice(len(last_word_logits), p=p)
        words.append(dataset.index_to_word[word_index])

    return words

In [None]:
predict(dataset, model, text="Knock knock. Who's there?", next_words=5)

# The results are still pretty bad. You can always improve it by:


1.   Clean up the data by removing non-letter characters.
2.   Increase the model capacity by adding more Linear or LSTM layers.
3.   Split the dataset into train, test, and validation sets.