# N-Grams

The goal here is to implement a simple n-gram model for text generation. The model will be trained on a given text and then used to generate new text. We will also use a perplexity metric to evaluate the model.

In [96]:
import random 
import math


random.seed(42)

## Calculate Document N-Grams

In [10]:
def _find_ngrams(tokenised_document, n, min_frequency, ignore_case=True):
    '''Find n-grams in a tokenised document.
    
    tokenised_document: A list of string tokens.
    n: The n in n-grams.
    min_frequency: The minimum frequency of an n-gram to be outputted.
    ignore_case: If True, n-grams are case-insensitive.
    returns: A list of tuples: n-gram, n-gram frequency.'''

    if ignore_case:
        tokenised_document = [word.lower() for word in tokenised_document]
    ngrams = {}
    for token_index in range(len(tokenised_document) - n + 1):
        ngram = tuple(tokenised_document[token_index:token_index + n])
        if ngram in ngrams:
            ngrams[ngram] += 1
        else:
            ngrams[ngram] = 1
    ngrams_subset = sorted([k for k, v in ngrams.items() if v >= min_frequency], key=ngrams.get, reverse=True)

    return [(ngram, ngrams[ngram]) for ngram in ngrams_subset]

def find_all_ngrams(tokenised_document, max_n, ignore_case=True):
    '''Find all n-grams in a tokenised document, up to a maximum n.
    
    tokenised_document: A list of string tokens.
    max_n: The maximum n in n-grams.
    ignore_case: If True, n-grams are case-insensitive.
    returns: A list per n, each with a list of tuples: n-gram, n-gram frequency.'''

    all_ngrams = []
    for n in range(1, max_n + 1):
        ngrams = _find_ngrams(tokenised_document, n, min_frequency=1, ignore_case=ignore_case)
        all_ngrams.append(ngrams)
    return all_ngrams

Calculating n-grams can be slow, so we persist the results, for decoupled analysis.

In [None]:
def calculate_and_store_all_ngrams(filename, max_n):
    print('Calculating all n-grams for ' + filename)
    with open(filename, 'r', encoding="mbcs") as file:
        document = file.read()
        tokenised_document = document.split()
        all_ngrams = find_all_ngrams(tokenised_document, max_n)
        ngram_counts = [len(ngram) for ngram in all_ngrams]
        print(f'N-Gram counts: {ngram_counts}')
        for n_index, ngrams in enumerate(all_ngrams):
            with open(f'{filename}.{n_index + 1}.ngrams', 'w') as file:
                for ngram, frequency in ngrams:
                    file.write(str(ngram) + ',' + str(frequency) + '\n')

calculate_and_store_all_ngrams('./datasets/Alice.txt', max_n=10)
calculate_and_store_all_ngrams('./datasets/Shakespeare.txt', max_n=10)

Here we load the n-grams from disk. We prefer to always load from disk, rather than from memory, as a simple way to guarantee consistency of the in memory data.

In [None]:
def load_all_ngrams(root_filename, max_n):
    print(f'Loading all n-grams up to n={max_n} for {root_filename}')
    all_ngrams = []
    for n in range(1, max_n + 1):
        print(f'Loading {root_filename}.{n}.ngrams')
        with open(f'{root_filename}.{n}.ngrams', 'r') as file:
            all_ngrams.append([eval(line.strip()) for line in file])
    ngrams_counts = [len(ngrams) for ngrams in all_ngrams]
    print(f'N-Grams\' counts: {ngrams_counts}')
    return all_ngrams
    

In [None]:
all_alice_ngrams = load_all_ngrams('./datasets/Alice.txt', max_n=10)

In [None]:
all_shakespeare_ngrams = load_all_ngrams('./datasets/Shakespeare.txt', max_n=10)

## Generate text from n-grams

First preprocess the ngrams ready for usage.

In [77]:
def ngrams_grouped_by_prior_tokens(all_ngrams):
    return [_group_by_prior_tokens(ngrams) for ngrams in all_ngrams]

def get_full_vocabulary(all_ngrams):
    return list(set([token for ngrams in all_ngrams for tokens, _ in ngrams for token in tokens]))

def _group_by_prior_tokens(ngrams):
    grouped_tokens = {}
    for token, frequency in ngrams:
        prior_tokens = token[:-1]
        final_token = token[-1]
        if prior_tokens in grouped_tokens:
            grouped_tokens[prior_tokens].append((final_token, frequency))
        else:
            grouped_tokens[prior_tokens] = [(final_token, frequency)]
    return grouped_tokens

In [78]:
alice_vocabulary = get_full_vocabulary(all_alice_ngrams)
alice_ngrams_grouped_by_prior_tokens = ngrams_grouped_by_prior_tokens(all_alice_ngrams)

In [87]:
shakespeare_vocabulary = get_full_vocabulary(all_shakespeare_ngrams)
shakespeare_ngrams_grouped_by_prior_tokens = ngrams_grouped_by_prior_tokens(all_shakespeare_ngrams)

Given the input token sequence we build probabilities for all possible next tokens.

Note: This implementation doesn't include handling of 'out of vocabulary' words.

In [88]:
def get_vocabulary_probabilities(vocabulary, tokenised_input, ngrams_grouped_by_prior_tokens, max_n, laplace_constant=0.0001):
    '''Get the probability of each token in the vocabulary given the input and n-grams.
    
    vocabulary: A list of all tokens in the inputs and outputs.
    tokenised_input: A list of tokens.
    ngrams_grouped_by_prior_tokens: A list of dictionaries, each with n-gram prior tokens as keys and a list of tuples: n-gram final token, n-gram frequency.
    max_n: The maximum n in n-grams.
    laplace_constant: A constant to add to the probability of each token to avoid zero probabilities.'''
    
    vocabulary_size = len(vocabulary)
    weight_per_n_function = lambda n: n
    n_values = range(1, max_n+1)
    total_ngram_length_weights = sum([weight_per_n_function(n) for n in n_values])
    ngram_length_weights_normalised = {n: (weight_per_n_function(n) / total_ngram_length_weights) for n in n_values}
    unigram_additive_probability = (ngram_length_weights_normalised[1] / vocabulary_size) * laplace_constant
    token_probabilities = {token: unigram_additive_probability for token in vocabulary}
    for n in reversed(n_values[1:]):
        _add_probablities_from_ngrams(tokenised_input, ngrams_grouped_by_prior_tokens[n - 1], ngram_length_weights_normalised[n], token_probabilities, n)
    return token_probabilities

def _add_probablities_from_ngrams(tokenised_input, ngrams_grouped_by_prior_tokens, ngram_length_weight, token_probabilities, n):
    ngram_to_find = tuple(tokenised_input[-n+1:])
    non_zero_frequencies = (ngrams_grouped_by_prior_tokens[ngram_to_find] if ngram_to_find in ngrams_grouped_by_prior_tokens else [])
    additive_probabilities = _get_ngram_additive_probabilities(ngram_length_weight, non_zero_frequencies)
    for token in additive_probabilities:
        token_probabilities[token] += additive_probabilities[token]

def _get_ngram_additive_probabilities(ngram_weight, non_zero_frequencies):
    if not non_zero_frequencies:
        return {}
    total_frequencies = sum([frequency for _, frequency in non_zero_frequencies])
    frequency_unit_probability = ngram_weight / total_frequencies
    additive_probabilities = {token: frequency*frequency_unit_probability for token, frequency in non_zero_frequencies}
    return additive_probabilities


In [89]:
def generate_and_print_sentence(input_string, vocabulary, ngrams_grouped_by_prior_tokens, max_length=100):
    '''Generate and print a sentence given an input string, vocabulary, and n-grams.
    
    input_string: A string of space-separated tokens.
    vocabulary: A list of all tokens in the inputs and outputs.
    ngrams_grouped_by_prior_tokens: A list of dictionaries, each with n-gram prior tokens as keys and a list of tuples: n-gram final token, n-gram frequency.
    max_length: The maximum length of the sentence to generate.'''
    
    print(f'Generating sentence for input: {input_string}')
    tokenised_input = input_string.split()
    max_n = len(ngrams_grouped_by_prior_tokens)
    for _ in range(max_length):
        token_probabilities = get_vocabulary_probabilities(vocabulary, tokenised_input, ngrams_grouped_by_prior_tokens, max_n)
        next_token = _generate_token_from_token_probabilities(token_probabilities)
        tokenised_input.append(next_token)
    print(' '.join(tokenised_input))

def _generate_token_from_token_probabilities(token_probabilities):
    tokens = list(token_probabilities.keys())
    token_weights = [token_probabilities[token] for token in tokens]
    random_token = random.choices(tokens, weights=token_weights)[0]
    return random_token


In [94]:
generate_and_print_sentence('Alice was', alice_vocabulary, alice_ngrams_grouped_by_prior_tokens, max_length=50)

Generating sentence for input: Alice was
Alice was reading, but it did not appear, and after a minute or two she walked on in the direction in which the march hare was said to live. â€œiâ€™ve seen hatters before,â€ she said to herself; â€œthe march hare will be much the most interesting, and perhaps as this is may


In [95]:
generate_and_print_sentence('Today we', shakespeare_vocabulary, shakespeare_ngrams_grouped_by_prior_tokens, max_length=50)

Generating sentence for input: Today we
Today we should have spoke; for his father's sake, in honour of a true plantagenet, and rise resty grown; he bade me trust you; but i do not think i flatter; for what advancement may i think, is a smock, creaking my shoes on the plain words, by thy true-telling friend. and


## Perplexity

Note: we didn't split training and test data, so perplexity is not really meaningful here, but this is the approach.

First we load a 'test set' (in this case it matches the training set)

In [108]:
def load_tokenised_file(filename):
    print('Retrieving token/words for ' + filename)
    with open(filename, 'r', encoding="mbcs") as file:
        document = file.read()
        tokenised_document = [token.casefold() for token in document.split()]
        print(f'tokenised_document length: {len(tokenised_document)}')
        return tokenised_document

alice_test_set = load_tokenised_file('./datasets/Alice.txt') 

Retrieving token/words for ../Word2Vec/Alice.txt
tokenised_document length: 29594


We use the log based perplexity calculation, as it the multiplication of many small probabilities will likely underflow the float, and as perplexity is a relative measure, the log based version is equally relevant.

In [104]:
def calculate_log_based_perplexity(model, test_set):
    '''Calculate the perplexity of a model on a training set.
    
    model: A function that generates the probabilities of the next token in a sequence.
    test_set: A list of string tokens.
    returns: The perplexity of the model on the training set.'''

    N = len(test_set)
    log_probabilities = [
        (math.log(model(test_set[:i])[test_set[i]])) 
        for i in range(1, N)
    ]
    return math.exp(-sum(log_probabilities)/N)


As a baseline, we calculate the perplexity of a random model.

In [109]:
def build_random_model(vocabulary):
    '''Build a random model.
    
    vocabulary: A list of string tokens.
    returns: A function that generates the next token in a sequence.'''

    static_equal_probabilities = {token: 1/len(vocabulary) for token in vocabulary}
    def random_model(_):
        return static_equal_probabilities
    return random_model

random_model = build_random_model(set(alice_test_set))
perplexity = calculate_log_based_perplexity(random_model, alice_test_set)
print('Perplexity: ' + str(perplexity))

Perplexity: 5647.351155318548


Here we see that the ngram model has an excellent perplexity, as expected, as it is trained on the same data.

In [106]:
alice_model = lambda tokenised_input: get_vocabulary_probabilities(alice_vocabulary, tokenised_input, alice_ngrams_grouped_by_prior_tokens, max_n=10)
perplexity = calculate_log_based_perplexity(alice_model, alice_test_set)
print('Perplexity: ' + str(perplexity))

Perplexity: 1.0788138979888258
