In [1]:
%cd /content/drive/MyDrive/Colab Notebooks/nlp/apps

/content/drive/MyDrive/Colab Notebooks/nlp/apps


In [2]:
# This is just the first novel, we use it for testing purposes because it is smaller
testing_path = '/content/drive/MyDrive/Colab Notebooks/nlp/apps/data/study in scarlet.txt'

# This is the whole corpus
path = '/content/drive/MyDrive/Colab Notebooks/nlp/apps/data/sherlock_novels.txt'

# Preprocessing the corpus

These are the preprocessing steps that we are going to use:

- lowercase the text
- remove special characters
- split text to list of sentences
- split sentences into list of words

Notice that we will consider each line as a sentences for this language model.

In [3]:
import nltk
import re
import numpy as np
import pandas as pd

nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [4]:
def remove_special(sentence):
    """
    Takes a sentence and only keeps .,?! and space
    as special characters.
    Args:
        sentence: str
    returns
        sentence: str. The full sentence cleaned of special characters
    """
    sentence = re.sub(r'[^a-zA-Z0-9.,?! ]+', '', sentence)

    return sentence

def get_text(path):
    """
    It reads a txt file and returns a string with all the corpus
    Args:
        path: str
    returns:
        text: str
    """
    with open(path) as f:
        text = f.read()

    return text

def get_sentences(text):
    """
    Takes a whole text removes special characters and divides it by \n
    then it returns a list of list with the sentences
    Args:
        text: str
    returns:
        sentences: list
    """
    text = text.lower()
    sentences = text.split('\n')
    # also removes any empty line
    sentences = [remove_special(sentence.strip()) for sentence in sentences if len(sentence) > 0]

    return sentences

# Uncomment to see the 10 first sentences

# text = get_text(testing_path)
# sentences = get_sentences(text)
# for s in sentences[:10]:
#     print(s)


# Tokenize the corpus

In [5]:
def tokenize(sentences):
    """
    It takes a list of strings that are the sentences
    and returns a list of list of tokens
    Args:
        sentences: list
    returns:
        tokenized_sentences: list
    """
    tokenized_sentences = [nltk.word_tokenize(sentence) for sentence in sentences]

    return tokenized_sentences

# Uncomment to test

# text = get_text(testing_path)
# sentences = get_sentences(text)
# tokenized_sentences = tokenize(sentences[:10])
# for s in tokenized_sentences:
#     print(s)

In [6]:
def get_tokens(path):
    """
    It takes the path of a txt file and applies 
    get_text(), get_sentences(), and tokenize()
    functions .
    Args:
        path: str
    returns:
        tokenized_sentences: list
    """
    text = get_text(path)
    sentences = get_sentences(text)
    tokenized_sentences = tokenize(sentences)

    return tokenized_sentences
    

# Count words

We are going to pass through each sentence and each token counting each tokens occurrence in the corpus.

This will help us to take the tokens that appear N times in the corpus and also to calculate probabilities

In [7]:
def get_token_counts(tokenized_sentences):
    """
    It takes a list of list of tokens and returns
    a dict where the key are going to be the tokens 
    and the value is how many times it appears
    Args:
        tokenized_sentences: list
    returns:
        token_counts: dict
    """
    token_counts = dict()
    for sentence in tokenized_sentences:
        for token in sentence:
            if token not in token_counts.keys():
                token_counts[token] = 1
            else:
                token_counts[token] += 1
    
    return token_counts

# Uncomment for testing

# to = get_tokens(testing_path)
# counts = get_token_counts(to)
# from collections import Counter
# c = Counter(counts)
# c.most_common(10)     

# Handling out of vocabulary words

Because it is probable that in some point we are going to encounter words that were not in our training dataset, we need to handle out of vocabulary words. Otherwise, we won't be able to predict the next word.

in this case, we are going to add an "unk" token, which is going to replace the words with less than N occurrences in the training data and the words left are going to be our vocab.


In [8]:
threshold = 2

def add_unk_token(tokenized_sentences, vocab, unk_token):
    """
    It updates the tokens that are not in the vocab
    to the unk token
    Args:
        tokenized_sentences: list
        vocab: set
        unk_token: str
    returns:
        tokenized_sentences_with_unk: list. updated list of list of tokens with
            the unk character
    """

    tokenized_sentences_with_unk = []

    for sentence in tokenized_sentences:
        # we need to keep track of the new sentence
        new_sentence = []

        for token in sentence:
            if token in vocab:
                new_sentence.append(token)
            else:
                new_sentence.append(unk_token)
        
        # save the new sentence
        tokenized_sentences_with_unk.append(new_sentence)

    return tokenized_sentences_with_unk

def create_new_sentences(tokenized_sentences, mode='train', vocab=None, threshold=2, unk_token='unk'):
    """
    It takes a list of list of tokens, counts the tokens occurrences and
    search for the tokens with less occurrences than the threshold. Then it
    transform them into the "unk" token.
    Args:
        tokenized_sentences: list
        mode: str. (train or test)
        vocab: list. (we get the vocab from train and use it again for test)
        threshold: int
        unk_token: str
    returns:
        tokenized_sentences_with_unk: dict. Updated with the unk token
        if mode == 'train'
            vocab: set
    """
    if mode == 'train':
        vocab = []
        token_counts = get_token_counts(tokenized_sentences)

        for word, count in token_counts.items():
            # check the threshold
            if count >= threshold:
                vocab.append(word)
        
        # cast the vocab to set. It will allow faster search
        vocab = set(vocab)

        tokenized_sentences_with_unk = add_unk_token(tokenized_sentences, vocab, unk_token)

        return tokenized_sentences_with_unk, vocab

    elif mode == 'test':
        vocab = vocab
        tokenized_sentences_with_unk = add_unk_token(tokenized_sentences, vocab, unk_token)
        
        return tokenized_sentences_with_unk

    else:
        raise Exception("Wrong mode was passed") 


# Uncomment for testing

# tokens = get_tokens(testing_path)
# updated_tokens, vocab = create_new_sentences(tokens, threshold=threshold)
# from collections import Counter
# token_counts = get_token_counts(updated_tokens)
# c = Counter(token_counts)
# print(c['unk'])
# c.most_common(10)


# Create the corpus and split into train and test sets

In [9]:
import random

tokenized_sentences = get_tokens(path)
random.seed(10)
random.shuffle(tokenized_sentences)
print(f'Amount of sentences {len(tokenized_sentences)}')

Amount of sentences 60198


### Because the corpus is big enough we can test using just 10% of the sentences

In [10]:
size = int(len(tokenized_sentences) * 0.9)
train = tokenized_sentences[:size]
test = tokenized_sentences[size:]
print(f'Training size: {len(train)}')
print(f'Testing size: {len(test)}')

Training size: 54178
Testing size: 6020


# Preprocess the data

In this step, we are going to join the functions that we have been creating to process our train and test datasets.

In [11]:
def preprocess(train, test, threshold=2):
    """
    It takes the train and test datasets (list of list of tokens)
    and preprocesses them. We will end with a train and test datasets
    updated with the unk token and the vocab.
    Args:
        train: list
        test: list
        threshold: int
    returns:
        train_sentences: list
        test_sentences: list
        vocab: set
    """
    train_sentences, vocab = create_new_sentences(train, threshold=threshold)
    test_sentences = create_new_sentences(test, mode='test', vocab=vocab)

    return train_sentences, test_sentences, vocab



In [12]:
train_sentences, test_sentences, vocab = preprocess(train, test, threshold=threshold)

# N-gram model

The model is going to predict the next word based on the previous n-gram. The model is going to use the conditional probability of a word appearing after a serie of words. So, 

- The numerator is the number of times that the word appears after the serie of previous words.
- The denominator is the number of times that this serie or words appears in the training data.

also, to be able to know thr probabilities of a word in the beginning of a sentences and the end we have to add two tokens.

Depending on the number of words that we are going to use for the n-gram we have to add n - 1 tokens at the beginning and always one at the end.


In [13]:
def count_ngrams(tokenized_sentences, n, start_token='<s>', end_token='<e>'):
    """
    it takes a list of list of tokens and counts all the possible
    n-grams and returns the counts for each n-gram as a dict.
    Args:
        tokenized_sentences: list (list of tokenized sentences)
        n: int
        start_token: str
        end_token: str
    returns:
        ngram_count: dict
    """
    ngram_count = dict()

    for sentence in tokenized_sentences:
        # add the start_tokens and end_token to each sentence
        sentence = [start_token] * n + sentence + [end_token]

        # Cast the sentence to a tuple, thus we can use it as
        # a key for the dict
        sentence = tuple(sentence)

        for i in range(len(sentence) - n + 1):
            ngram = tuple(sentence[i:i+n])
            if ngram in ngram_count.keys():
                ngram_count[ngram] += 1
            else:
                ngram_count[ngram] = 1
    
    return ngram_count




In [14]:
def compute_probability(word, previous_ngram, ngram_counts, nplus1_gram_counts, vocab_size, k=1.0):
    """
    It computes the probability of a next word using the ngram count of the previous
    words. it also adds k-smoothing (laplace smoothing)
    Args:
        word: str
        previous_ngram: list. (sequence of previous n words)
        ngram_counts: dict
        nplus1_gram_counts: dict
        vocab_size: int
        k: int
    returns:
        probability: float (the probability of current word appearing after the previous ngram)
    """
    # the keys of the ngram_counts are tuples
    previous_ngram = tuple(previous_ngram)

    # if the ngram does not exits we set it to 0
    previous_ngram_count = ngram_counts.get(previous_ngram, 0)

    # k-smoothing on the denominator
    denominator = previous_ngram_count + k * vocab_size

    # the nplus_1gram is the current ngram plus the current word
    nplus1_gram = tuple(previous_ngram + (word,))

    # the same, if it not exist we set it to 0
    nplus1_gram_count = nplus1_gram_counts.get(nplus1_gram, 0)

    # calculate numerator using k-smoothing
    numerator = nplus1_gram_count + k

    probability = numerator / denominator

    return probability



# Computing probabilities for all the words in the data


In [15]:
def compute_all_probabilities(previous_ngram, ngram_counts, nplus1_gram_counts, vocab, k=1.0):
    """
    It takes all the words and calculate the probablities of appearing after
    a sequence of previous words.
    Args:
        previous_ngram: list
        ngram_counts: dict
        nplus1_gram_counts: dict
        vocab: list
        k: int
    returns:
        probabilities: dict.
    """

    previous_ngram = tuple(previous_ngram)

    # add <e> and unk to the vocab
    # we don't add <s> because it should not appear as a next word
    vocab = vocab + ['<e>', 'unk']
    vocab_size = len(vocab)

    probabilities = dict()

    for i, word in enumerate(vocab):
        probability = compute_probability(word, previous_ngram, ngram_counts, nplus1_gram_counts, vocab_size)
        probabilities[word] = probability
        if i == 100:
            break
    
    return probabilities



In [43]:
def create_matrix(nplus1_gram_counts, vocab):
    """
    It creates a matrix where the columns are the words in the vocabulary and
    the rows are the ngram previous to the target word.
    
    Args:
        nplus1_gram_counts: dict
        vocab: list
    returns:
        count_matrix: Pandas DataFrame
    """
    vocab = vocab + ['<e>', 'unk']

    # obtain unique ngrams
    ngrams = []
    for ngram_plus1 in nplus1_gram_counts.keys():
        ngram = ngram_plus1[:-1]
        ngrams.append(ngram)
    
    
    # eliminate duplicates
    ngrams = list(set(ngrams))

    row_index = {ngram:i for i, ngram in enumerate(ngrams)}
    col_index = {word:i for i, word in enumerate(vocab)}

    nrow = len(ngrams)
    ncol = len(vocab)
    matrix = np.zeros((nrow, ncol))

    for ngram_plus1, count in nplus1_gram_counts.items():
        ngram = ngram_plus1[:-1]
        word = ngram_plus1[-1]

        if word not in vocab:
            continue
        i = row_index[ngram]
        j = col_index[word]
        matrix[i, j] = count

    count_matrix = pd.DataFrame(matrix, index=ngrams, columns=vocab)

    return count_matrix

def create_probability_matrix(count_matrix, k):
    """
    It adds smoothing to the values in the count_matrix to avoid dividing by zero
    error, then it calculates the probability of each word appearing after
    a previous series of words (ngrams)

    Args:
        count_matrix: pandas DataFrame
        k: int
    returns:
        probability_matrix: pandas DataFrame
    """
    count_matrix += k
    probability_matrix = count_matrix.div(count_matrix.sum(axis=1), axis=0)

    return probability_matrix


