In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import nltk
import string
import torch
import re
from nltk.util import ngrams
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from collections import Counter
from sklearn.model_selection import train_test_split
from google.colab import files, drive

In [2]:
nltk.download(['stopwords', 'wordnet', 'punkt', ])

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [3]:
torch.cuda.is_available()
drive.mount('/content/gdrive')
short_stories_filename = "/content/gdrive/My Drive/Colab Notebooks/CSC413/Project/reddit_short_stories.txt"

Mounted at /content/gdrive


In [4]:
with open(short_stories_filename, 'r') as file:
    data = file.read()

In [5]:
# Split the data string into a list of stories
stories = data.split("<eos>\n<sos>")

# Remove the first <sos> tag from the first story
stories[0] = stories[0].replace("<sos>", "")

# Remove the last <eos> tag from the last story
stories[-1] = stories[-1].replace("<eos>", "")

# Create a dataframe with one story per row
reddit_df = pd.DataFrame(stories, columns=['contents'])

In [6]:
def remove_punctuation(text):
    return re.sub(r'[{}]+'.format(string.punctuation), ' ', text)

def remove_stopwords(text):
    stopwords = nltk.corpus.stopwords.words('english')
    pattern = re.compile(r'\b(' + r'|'.join(stopwords) + r')\b\s*')
    return pattern.sub('', text)

def strip_spaces(text):
    text = re.sub('\s+', ' ', text)
    return text.strip()

def lemmatize_sentence(sentence):
    lemmatizer = nltk.stem.wordnet.WordNetLemmatizer()
    tokens = nltk.tokenize.word_tokenize(sentence)
    return ' '.join([lemmatizer.lemmatize(word) for word in tokens])

def clean_sentence(text):
    # Remove HTML tags
    text = re.sub(re.compile('<.*?>'), '', text)
    
    # Remove punctuation
    text = remove_punctuation(text)
    
    # Remove stop words
    text = remove_stopwords(text.lower())

    # Lemmatize words in the sentence
    text = lemmatize_sentence(text)

    # Strip spaces
    text = strip_spaces(text)

    return text

def create_ngrams(sentence, n_grams):
    tokenized = nltk.word_tokenize(sentence.lower())
    return list(ngrams(tokenized, n_grams))

def clean_story(text):
    sentences = []
    for sentence in text.split('.'):
        if sentence:
            sentences.append(clean_sentence(sentence))
    return sentences

def generate_n_grams(text, n_grams: int = 3):
    arr = []
    for sentence in text:
        arr += create_ngrams(sentence, n_grams)
    return arr 

In [7]:
# Decrease amount of stories as dataset is very large
reddit_df = reddit_df.iloc[:500]

In [8]:
# Perform story cleaning
reddit_df['sentences'] = reddit_df['contents'].apply(lambda text: clean_story(text))

In [9]:
# Generate ngrams
reddit_df['n_grams'] = reddit_df['sentences'].apply(lambda text: generate_n_grams(text, n_grams=5))

In [10]:
n_grams = np.concatenate(reddit_df['n_grams'].values).tolist()

In [11]:
n_grams

[['learned', 'name', 'phonebook', 'shaking', 'finger'],
 ['name', 'phonebook', 'shaking', 'finger', 'carefully'],
 ['phonebook', 'shaking', 'finger', 'carefully', 'caressing'],
 ['shaking', 'finger', 'carefully', 'caressing', 'page'],
 ['finger', 'carefully', 'caressing', 'page', 'searched'],
 ['carefully', 'caressing', 'page', 'searched', 'address'],
 ['caressing', 'page', 'searched', 'address', 'seen'],
 ['page', 'searched', 'address', 'seen', 'many'],
 ['searched', 'address', 'seen', 'many', 'time'],
 ['43', 'mako', 'drive', 'small', 'brick'],
 ['mako', 'drive', 'small', 'brick', 'house'],
 ['drive', 'small', 'brick', 'house', 'corner'],
 ['small', 'brick', 'house', 'corner', 'braxton'],
 ['brick', 'house', 'corner', 'braxton', 'mako'],
 ['memorized', 'shape', 'home', 'week', 'bare'],
 ['shape', 'home', 'week', 'bare', 'foot'],
 ['home', 'week', 'bare', 'foot', 'sliding'],
 ['week', 'bare', 'foot', 'sliding', 'across'],
 ['bare', 'foot', 'sliding', 'across', 'wet'],
 ['foot', 'slidi

In [12]:
# Make a single array of all words from ngrams
split_sentences = []
for arr in n_grams:
    split_sentences += arr

In [13]:
def create_vocabulary(sentence):
    # Create empty dictionaries
    word_to_index = {}
    index_to_word = {}

    # Get array of set of words
    word_set = list(set(sentence))

    # Iterate over set of words and save them to dictionaries
    for i in range(len(word_set)):
        word_to_index[word_set[i]] = i
        index_to_word[i] = word_set[i]

    return word_to_index, index_to_word


def input_target_generator(n_grams: list, word_to_index: dict):
    inputs = []
    targets = []

    # Iterate over ngrams
    for i in range(len(n_grams)):

        # Separate input from ngram and target
        input_part = n_grams[i][:-1]
        target_word = n_grams[i][-1]

        # Create and append input and target
        _input = []
        _target = []
        for word in input_part:
            _input.append(word_to_index[word])
        _target.append(word_to_index[target_word])

        # Convert to torch arrays
        _input = torch.tensor(_input, dtype=torch.long)
        _target = torch.tensor(_target, dtype=torch.long)

        inputs.append(_input)
        targets.append(_target)

    return inputs, targets

# Create vocabulary and 'reverse' vocabulary
word_to_index, index_to_word = create_vocabulary(sentence=split_sentences)
# Create arrays of inputs and targets
inputs, targets = input_target_generator(n_grams=n_grams, word_to_index=word_to_index)

In [14]:
# Join sentences in each Story then join all stories
joined_sentences = [' '.join(sentences) for sentences in reddit_df['sentences']]
all_stories = ' '.join(joined_sentences)

# Tokenize the words
words = nltk.word_tokenize(all_stories)

# Count the occurrences of each word
word_counts = Counter(words)

# Top 10 most occuring words
top_10_words = word_counts.most_common(10)
print(top_10_words)

# The total number of words
total_words = sum(word_counts.values())
print(f"total words: {total_words}")

# Get the frequencies of each word
word_frequencies = {}
for word, count in word_counts.items():
    word_frequencies[word] = count / total_words
    
# Top 10 word's frequencies
sorted_word_frequencies = sorted(word_frequencies.items(), key=lambda x: x[1], reverse=True)
top_10_word_frequencies = sorted_word_frequencies[:10]
print("Frequencies of 10 most common word")
for word, freq in top_10_word_frequencies:
    print(f"{word}: {freq *100}")

[('said', 1363), ('one', 1348), ('like', 1225), ('time', 1013), ('would', 912), ('know', 910), ('back', 909), ('could', 888), ('eye', 733), ('man', 681)]
total words: 177772
Frequencies of 10 most common word
said: 0.76671241815359
one: 0.7582746439259276
like: 0.6890848952590959
time: 0.569831019508134
would: 0.5130166730418739
know: 0.5118916364781855
back: 0.5113291181963414
could: 0.499516234277614
eye: 0.4123259005917692
man: 0.38307494993587293


In [15]:
class StoryGenerator(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, n_grams_size, n_layers=1):
        super(StoryGenerator, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_layers = n_layers
        self.n_grams_size = n_grams_size - 1
        
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.lstm = nn.LSTM(input_size=hidden_size*self.n_grams_size,
                            hidden_size=hidden_size, 
                            num_layers=n_layers, 
                            batch_first=True)
        self.linear = nn.Linear(hidden_size, output_size)
    
    def forward(self, input, hidden):
        # Shape: (batch_size, seq_len, hidden_size)
        input = self.embedding(input)
        # Reshape to (batch_size, seq_len, hidden_size * (g_grams_size - 1))
        input = input.view(input.size(0), -1, self.hidden_size * self.n_grams_size)
        output, hidden = self.lstm(input, hidden)
        # Shape: (batch_size, output_size)
        output = self.linear(output[:, -1, :])
        return output, hidden

    def init_hidden(self, batch_size=1):
        # LSTM requires tuple as output
        return (torch.zeros(self.n_layers, batch_size, self.hidden_size),
                torch.zeros(self.n_layers, batch_size, self.hidden_size))

In [16]:
# Split the data into train and test sets
x_train, x_test, y_train, y_test = train_test_split(inputs, targets, test_size=0.2, random_state=42)
x_train, x_valid, y_train, y_valid = train_test_split(x_train, y_train, test_size=0.25, random_state=42)


In [17]:
def get_accuracy(model, dataset):
    # Set the model to evaluation mode
    model.eval()
    correct = 0
    total = 0

    # Create dataloader
    data_loader = DataLoader(dataset, batch_size=32, shuffle=False)

    # Disable gradient computation
    with torch.no_grad():
        for batch_data, batch_target in data_loader:
            hidden_h, hidden_c = model.init_hidden(batch_data.size(0))

            output, (hidden_h, hidden_c) = model(batch_data, (hidden_h, hidden_c))
            _, predicted = torch.max(output, dim=1)
            correct += (predicted == batch_target.squeeze()).sum().item()
            total += batch_target.size(0)
    accuracy = correct / total
    return accuracy


def train(model, train_data, train_target, valid_data, valid_target, num_epochs, learning_rate, batch_size=32, checkpoint_path=None):
    model.train()
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # Convert the lists of tensors to a single tensor
    train_data_tensor = torch.stack(train_data)
    train_target_tensor = torch.stack(train_target)

    valid_data_tensor = torch.stack(valid_data)
    valid_target_tensor = torch.stack(valid_target)

    # Create a TensorDataset and DataLoader for mini-batches
    train_dataset = TensorDataset(train_data_tensor, train_target_tensor)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)

    valid_dataset = TensorDataset(valid_data_tensor, valid_target_tensor)

    iters, losses = [], []
    iters_sub, train_accs, val_accs  = [], [] ,[]
    n = 0 # nums of iterations
    for i in range(0, num_epochs):
        hidden_h, hidden_c = model.init_hidden(batch_size)
        for batch_num, (batch_train_data, batch_train_target) in enumerate(train_loader):  
            # clean up gradient
            optimizer.zero_grad()
            # forward step
            output, (hidden_h, hidden_c) = model(batch_train_data, (hidden_h, hidden_c))
            # compute total loss
            loss = criterion(output, batch_train_target.squeeze())
            # detach hidden_h and hidden_c for next iteration
            hidden_h = hidden_h.detach()
            hidden_c = hidden_c.detach()
            
            # compute updates for each parameters and make the update
            loss.backward()
            optimizer.step()
            # clean up gradient
            optimizer.zero_grad()

            iters.append(n)
            losses.append(float(loss)/batch_size)
            n += 1

        train_accuracy = get_accuracy(model, train_dataset)
        valid_accuracy = get_accuracy(model, valid_dataset)
        train_cost = float(loss)/batch_size

        iters_sub.append(n)
        train_accs.append(train_accuracy)
        val_accs.append(valid_accuracy)
        print('Epoch {}. Iter {}. [Val Acc {}%] [Train Acc {}%, Loss {}]'.format(i+1, n, valid_accuracy * 100, train_accuracy * 100, train_cost))
        if checkpoint_path is not None:
            torch.save(model.state_dict(), checkpoint_path.format(i))
          

    # return iters, losses, iters_sub, train_accs, val_accs
    plot(iters, losses, iters_sub, train_accs, val_accs)



def plot(iters, losses, iters_sub, train_accs, val_accs):
    plt.title("Learning Curve: Loss per Iteration")
    plt.plot(iters, losses, label="Train")
    plt.xlabel("Iterations")
    plt.ylabel("Loss")
    plt.show()

    plt.title("Learning Curve: Accuracy per Iteration")
    plt.plot(iters_sub, train_accs, label="Train")
    plt.plot(iters_sub, val_accs, label="Validation")
    plt.xlabel("Iterations")
    plt.ylabel("Accuracy")
    plt.legend(loc='best')
    plt.show()

num_epochs = 15
hidden_size = 128
n_layers = 3
learning_rate = 0.015
# checkpoint_path = "./weights/epoch-{}.pk"

vocab_length = len(word_to_index)
story_model = StoryGenerator(input_size=vocab_length, hidden_size=hidden_size, output_size=vocab_length, n_layers=n_layers, n_grams_size=5)


train(story_model,  x_train, y_train, x_valid, y_valid, num_epochs, learning_rate, batch_size=128, checkpoint_path=None)

Epoch 1. Iter 374. [Val Acc 0.6446363750156465%] [Train Acc 0.7155672382859766%, Loss 0.0651494488120079]


KeyboardInterrupt: ignored

In [18]:
def generate(model, index_to_word, prompt='sliding across wet floor hit police', length=20, n_grams_size=5):
    model.eval()
    batch_size = 1
    hidden_h, hidden_c = story_model.init_hidden(batch_size)
    n_grams_size -= 1
    for prediction in range(length):
        prompt_split = [word_to_index[w] for w in prompt.split()]
        prompt_tensor = torch.tensor(prompt_split[-n_grams_size:]).unsqueeze(0)
        # Get predictions
        output, (hidden_h, hidden_c) = model(prompt_tensor, (hidden_h, hidden_c))
        # Make distribution
        distribution = output.data.view(-1).exp()
        # Sample from distribution
        sample = torch.multinomial(distribution, 1)[0].item()
        # Search for word in 'reverse' dictionry
        predicted_word = index_to_word[sample]
        # Add word to prompt
        prompt += " " + predicted_word

    return prompt

# Generate text based on input prompt
print(generate(story_model, index_to_word))

sliding across wet floor hit police 33rd witha never answered lifelong instead competent prayer man extra eternal fear table edge word fatal problem finally scope tee
