In [None]:
from collections import Counter

import numpy as np
import pandas as pd

from tqdm import tqdm


import spacy
nlp = spacy.load('en_core_web_sm')

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader

torch.manual_seed(1)

from torchtext.vocab import GloVe

import matplotlib.pyplot as plt

In [None]:
ask_pre_embed = input("Use Pretrained Embeddings? (y/yes)")
use_pre_embed = True if ask_pre_embed == 'y' or ask_pre_embed == 'yes' else False

if use_pre_embed:
    print("Will use Pre trained Embeddings!")
    embed_dim = 100
    pre_embed = GloVe(name='6B', dim=embed_dim)
else:
    print("Will train own embeddings.")

In [None]:
df = pd.read_csv('./Corpus/Jokes/reddit_jokes.csv', names=['ID', 'Joke'], header=1, nrows=9000)
df.set_index('ID', inplace=True)

In [None]:
print('Using device:', torch.device('cuda' if torch.cuda.is_available() else 'cpu'))

In [None]:
SEQUENCE_LENGTH = 3
EPOCHS = 4

BATCH_SIZE = 32

MODEL_CONFIG = {
    'pre_embed': use_pre_embed,
    'embedding_dim': embed_dim,
    'lstm_cells': 100,
    'lstm_num_layers': 2,
    'lstm_dropout': 0.2,
    'bi_lstm': True
}

PREDICTION_SIZE = 20

In [None]:
class Dataset(torch.utils.data.Dataset):
    def __init__(self):

        self.SEQUENCE_LENGTH = SEQUENCE_LENGTH
        self.words = self.load_words() 
        self.uniq_words = self.get_uniq_words()

        self.index_to_word = {index: word for index, word in enumerate(self.uniq_words)}
        self.word_to_index = {word: index for index, word in enumerate(self.uniq_words)}

        self.words_indexes = [self.word_to_index[w] for w in self.words]

    def load_words(self):
        train_df = df
        text = train_df['Joke'].str.cat(sep=' ')
        doc = nlp(text)
        return [token.text for token in doc]

    def get_uniq_words(self):
        word_counts = Counter(self.words)
        return sorted(word_counts, key=word_counts.get, reverse=True)

    def __len__(self):
        return len(self.words_indexes) - self.SEQUENCE_LENGTH

    def __getitem__(self, index):
        return (
            torch.tensor(self.words_indexes[index:index+self.SEQUENCE_LENGTH]),
            torch.tensor(self.words_indexes[index+1:index+self.SEQUENCE_LENGTH+1]),
        )

In [None]:
class Model(nn.Module):
    def __init__(self, dataset, needed_vector_data ):
        """
        Initialises the model with the given configuration. 
        Sub class of nn.Module
        """
        super(Model, self).__init__()
        
        # CONFIGURATION: EMBEDDING
        self.embedding_dim = MODEL_CONFIG['embedding_dim']
        self.vocab_size = len(dataset.get_uniq_words())
        # CONFIGURATION: LSTM
        self.lstm_cells = MODEL_CONFIG['lstm_cells']
        self.bi_directional = MODEL_CONFIG['bi_lstm']
        if self.bi_directional:
            self.num_directions = 2
        else:
            self.num_directions = 1

        self.num_layers = MODEL_CONFIG['lstm_num_layers']
        self.lstm_dropout = MODEL_CONFIG['lstm_dropout']
        
        # LAYER: EMBEDDING
        self.embedding = nn.Embedding(
            num_embeddings=self.vocab_size,
            embedding_dim=self.embedding_dim
        )

        # LOADING WEIGHTS
        if MODEL_CONFIG['pre_embed']:
            self.embedding.weight.requires_grad = True
            self.embedding.weight.data.copy_(needed_vector_data)
        
        # LAYER: LSTM
        self.lstm = nn.LSTM(
            input_size=self.lstm_cells,
            hidden_size=self.lstm_cells,
            num_layers=self.num_layers,
            dropout=self.lstm_dropout,
            bidirectional=self.bi_directional
        )

        # LAYER: OUTPUT
        self.fc = nn.Linear(
            in_features=self.lstm_cells,
            out_features=self.vocab_size
        )

    def forward(self, x, prev_state):
        """
        Makes a forward pass through the model as created above.
        inputs:
            self
            x: The new input
            prev_state: Used by LSTMs
        """
        embed = self.embedding(x)
        output, state = self.lstm(embed, prev_state)
        logits = self.fc(output)

        return logits, state
    
    def init_lstm(self, SEQUENCE_LENGTH):
        return (
            torch.zeros(self.num_layers*self.num_directions, SEQUENCE_LENGTH, self.lstm_cells),
            torch.zeros(self.num_layers*self.num_directions, SEQUENCE_LENGTH, self.lstm_cells)
        )

In [None]:
def train(dataset):
    model.train()
    data_generator = DataLoader(dataset, batch_size=BATCH_SIZE)

    for epoch in range(EPOCHS):
        state_h, state_c = model.init_lstm(SEQUENCE_LENGTH)
        for batch, (x, y_true) in enumerate(tqdm(data_generator)):
            optimizer.zero_grad()

            y_pred, (state_h, state_c) = model(x, (state_h, state_c))
            loss = criterion(y_pred.transpose(1, 2), y_true)

            state_h = state_h.detach()
            state_c = state_c.detach()

            loss.backward()
            optimizer.step()
        
        loss_values.append(loss.item())
        epoch_numbers.append(epoch+1)
        print(f"Epoch: {epoch}, loss: {loss.item()}")

In [None]:
dataset = Dataset()

needed_vector_data = [pre_embed[word] for word in dataset.uniq_words]
needed_vector_data = torch.stack(needed_vector_data)

model = Model(dataset, needed_vector_data)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

loss_values = []
epoch_numbers = []
train(dataset)

In [None]:
plt.plot(loss_values, epoch_numbers)

In [None]:
torch.save(model.state_dict(), 'TorchJokes.model')

In [None]:
model.load_state_dict(torch.load("TorchJokes.model"))

In [None]:
def predict(dataset, model, text, next_words=PREDICTION_SIZE):
    model.eval()
    doc = nlp(text)
    words = [token.text for token in doc]
    state_h, state_c = model.init_lstm(len(words))

    for i in range(0, next_words):
        x = torch.tensor([[dataset.word_to_index[w] for w in words[i:]]])
        y_pred, (state_h, state_c) = model(x, (state_h, state_c))

        last_word_logits = y_pred[0][-1]
        p = torch.nn.functional.softmax(last_word_logits, dim=0).detach().numpy()
        word_index = np.random.choice(len(last_word_logits), p=p)
        words.append(dataset.index_to_word[word_index])

    return words

In [None]:
words_list = (predict(dataset, model, text='Knock knock. Whos there?'))
joke = ""
for word in words_list:
    joke += word + " "
print(joke)