In [1]:
import numpy as np
import pandas as pd
import nltk

In [2]:
from nltk.tokenize import sent_tokenize, word_tokenize
from sklearn.model_selection import train_test_split

In [3]:
FILENAME = "data/en_US.twitter.txt"

In [4]:
def load_data(filename):
    with open(filename, "rb") as data:
        text = data.read()
    text = text.decode("utf-8")
    return text

In [5]:
data = load_data(FILENAME)

In [6]:
def tokenize(data, return_sentences = True):
    sentences = data.split('\r\n')
    sentences = [sent.strip() for sent in sentences]
    sentences = [sent for sent in sentences if len(sent) > 0]

    word_tokens = [word_tokenize(sent.lower()) for sent in sentences]
    if return_sentences:
        return sentences, word_tokens
    return word_tokens

In [7]:
sentences, word_tokens = tokenize(data)

In [8]:
def split_data(word_tokens, test_size=0.2, random_state=42):
    train_data, test_data = train_test_split(word_tokens, test_size=0.2, random_state=42)
    return train_data, test_data

In [9]:
train_data, test_data = split_data(word_tokens)

### Count Words and Select vocabulary

In [10]:
from collections import Counter

def count_words(data):
    cnt = Counter()
    for words in data:
        cnt.update(words)
    return cnt

In [11]:
word_count = count_words(train_data)

In [12]:
# [word[0] for word in word_count.most_common(10)]

In [13]:
def create_vocabulary(word_count, min_frequency=0, max_words=0):
    if min_frequency > 0:
        vocabulary = []
        for word, count in word_count.items():
            if count >= min_frequency:
                vocabulary.append(word)
    if max_words > 0:
        vocabulary = [word[0] for word in word_count.most_common(max_words)]

    return vocabulary

In [14]:
# word_count.items()

### Replace words by unknown tokens

In [15]:
def handle_unk_words(data, vocabulary, unk_token="<unk>"):
    handled_data = []
    for sentences in data:
        sentence = []
        for word in sentences:
            if word in vocabulary:
                sentence.append(word)
            else:
                sentence.append(unk_token)
        handled_data.append(sentence)
    return handled_data

In [16]:
vocabulary = create_vocabulary(word_count, min_frequency=3)
preprocessed_data = handle_unk_words(train_data, vocabulary=vocabulary)

In [17]:
preprocessed_data_test = handle_unk_words(test_data, vocabulary=vocabulary)

## Create n-gram model
### Count n-grams

In [18]:
from collections import defaultdict
def count_n_grams(data, n, start_token="<s>", end_token="<e>"):
    """
    Define and count the numbers of n-grams in text corpus.

    Args:
        data : preprocessed text corpus
        n : n in n_gram
        start_token: starting token symbol
        end_token: ending token symbol

    Returns:
        n_grams: Dictionary containing n_gram tuples as keys and their counts as
        values
    """

    n_grams = defaultdict(int)
    for sentence in data:
        sentence = [start_token]*n + sentence + [end_token]
        
        """
            If we are calculating 1-gram we need to include all single words so we
            need to loop over all the words of sentence
            but if n > 2 we need to include last word only once, so we cannot
            loop over all the words but n-1
            e.g. ```
                A = ["A","B","C","D"]
                if n=1, then len = len(A) = 4,
                A1 = ["A"], A2 = ["B"], A3=["C"], A4=["D"]
                if n> 2,
                then len = len(A)-1 = 3
                A1 = ["A","B"], A2 = ["B","C"] A3 = ["C","D"]
            ```
        """
        m = len(sentence) if n == 1 else len(sentence)-1


        for i in range(m):
            n_gram = sentence[i:i+n]
            n_gram = tuple(n_gram)
            n_grams[n_gram] += 1
    return n_grams

In [19]:
def calculate_probability(word, previous_n_gram, 
                          n_gram_counts, n_plusone_gram_counts,
                          vocabulary_size,
                          k=1):
    
    """
    Calculates the probability of given word in sentences given the previous n
    words (n-gram)

    Args:
        word: current word
        previous_n_gram: Previous n words
        n_gram_counts : dictionary of n-gram counts
        n_plusone_gram_counts: dictionary of (n+1)-gram counts
        vocabulary_size: Size of the vocabulary
        k: smoothing constant

    Returns:
        Probability
        P = {count(previous_n_gram + current_word) + k} 
            / {count(previous_n_gram)+ k * voc_size}
    """

    n_plus_one_gram = tuple(previous_n_gram + [word])
    n_plus_one_gram_count = n_plusone_gram_counts[n_plus_one_gram] \
                    if n_plus_one_gram in n_plusone_gram_counts else 0
    numerator = n_plus_one_gram_count + k

    previous_n_gram = tuple(previous_n_gram)
    previous_n_gram_count = n_gram_counts[previous_n_gram] \
                    if previous_n_gram in n_gram_counts else 0

    denominator = previous_n_gram_count + k * vocabulary_size


    probability = numerator / denominator
    return probability

### Evaluation of Language Model using Perplexity

In [20]:
def perplexity(sentence,n_gram_counts, 
               n_plusone_gram_counts, 
               vocabulary, k=1.0, log_perplexity=True):
    # find n as in n-grams
    n = len(list(n_gram_counts.keys())[0])

    sentence = ["<s>"] * n + sentence + ["<e>"]
    sentence = tuple(sentence)

    vocabulary_size = len(vocabulary)

    # N = len(sentence) if n == 1 else len(sentence)-1
    N = len(sentence)

    if log_perplexity:
        temp_sum = 0.0
    else:
        temp_product = 1.0

    for t in range(n,N):
        n_gram = sentence[t-n:t] 
        # Because the loop starts at n i.e. it excludes first n words as
        # part of n-gram where t is current word so n-gram will be previous n
        # words

        word = sentence[t]

        probability = calculate_probability(word, n_gram, 
                                            n_gram_counts,
                                            n_plusone_gram_counts,
                                            vocabulary, k=k)
        if log_perplexity:
            temp_sum += (1 / probability)
        else:
            temp_product *= (1 / probability)
        
    if log_perplexity:
        perplexity = - (1/N) * temp_sum
    else:
        perplexity = temp_product**(1/N)

    return perplexity

In [21]:
def calculate_probabilities(previous_n_gram, 
                            n_gram_counts, n_plusone_gram_counts, 
                            vocabulary, k=1.0):
    probabilities = dict()
    vocabulary = vocabulary + ["<e>","<unk>"]
    vocabulary_size = len(vocabulary)

    for word in vocabulary:
        probability = calculate_probability(word,
                                            previous_n_gram, 
                                            n_gram_counts, 
                                            n_plusone_gram_counts, 
                                            vocabulary_size,
                                            k=1.0)
        probabilities[word] = probability

    return probabilities

In [55]:
def suggest_a_word(previous_tokens, 
                   n_gram_counts, 
                   n_plusone_gram_counts, 
                   vocabulary, 
                   k=1.0,
                   start_with=None):
    
    n = len(list(n_gram_counts.keys())[0])

    previous_n_gram = previous_tokens[-n:] 

    probabilities = calculate_probabilities(previous_n_gram,
                                            n_gram_counts,
                                            n_plusone_gram_counts,
                                            vocabulary,
                                            k=k)
    max_probability = 0
    suggestion = None

    for word, prob in probabilities.items():
        if start_with:
            if not word.startswith(start_with):
                continue
        
        if prob > max_probability:
            max_probability = prob
            suggestion = word
    return (suggestion, max_probability)

In [26]:
# def get_suggestions(previous_tokens, n_gram_counts_list, vocabulary, k=1.0, start_with=None):
#     model_counts = len(n_gram_counts_list) # 5
#     suggestions = []
#     for i in range(model_counts-1): #0 
#         n_gram_counts = n_gram_counts_list[i] # uni
#         n_plus1_gram_counts = n_gram_counts_list[i+1] # bi
        
#         suggestion = suggest_a_word(previous_tokens, n_gram_counts,
#                                     n_plus1_gram_counts, vocabulary,
#                                     k=k, start_with=start_with)
#         suggestions.append(suggestion)
    # return suggestions

In [64]:
def get_suggestions(previous_tokens, 
                    n_gram_counts, 
                    n_plusone_gram_counts,
                    vocabulary,
                    k=1.0,
                    n_suggestions=4,
                    starts_with=None):
    
    n = len(list(n_gram_counts.keys())[0])

    previous_n_gram = previous_tokens[-n:]

    probabilities = calculate_probabilities(
        previous_n_gram,
        n_gram_counts,
        n_plusone_gram_counts,
        vocabulary,
        k=k
    )

    sorted_probabilities = {
        word : probability for word, probability in \
        sorted(probabilities.items(), key=lambda item: item[1], reverse=True)
            }
    if starts_with:
        suggestions = dict()
        for word, prob in sorted_probabilities.items():
            if word.startswith(starts_with):
                suggestions[word] = prob
            else:
                continue
        return list(suggestions.items())[:n_suggestions]

    return list(sorted_probabilities.items())[:n_suggestions]

{3: 4, 4: 3}

In [65]:
sentences = [['i', 'like', 'a', 'cat'],
             ['this', 'dog', 'is', 'like', 'a', 'cat']]
unique_words = list(set(sentences[0] + sentences[1]))

unigram_counts = count_n_grams(sentences, 1)
bigram_counts = count_n_grams(sentences, 2)

previous_tokens = ["i", "like"]
tmp_suggest1 = suggest_a_word(previous_tokens, unigram_counts, bigram_counts, unique_words, k=1.0)
print(f"The previous words are 'i like',\n\tand the suggested word is `{tmp_suggest1[0]}` with a probability of {tmp_suggest1[1]:.4f}")

print()
# test your code when setting the starts_with
tmp_starts_with = 'c'
tmp_suggest2 = suggest_a_word(previous_tokens, unigram_counts, bigram_counts, unique_words, k=1.0, start_with=tmp_starts_with)
print(f"The previous words are 'i like', the suggestion must start with `{tmp_starts_with}`\n\tand the suggested word is `{tmp_suggest2[0]}` with a probability of {tmp_suggest2[1]:.4f}")

The previous words are 'i like',
	and the suggested word is `a` with a probability of 0.2727

The previous words are 'i like', the suggestion must start with `c`
	and the suggested word is `cat` with a probability of 0.0909


In [66]:
get_suggestions(previous_tokens,
                unigram_counts,
                bigram_counts,
                unique_words,
                k=1,
                n_suggestions=4,
                starts_with="c"
                )

[('cat', 0.09090909090909091)]

In [68]:
bigram_counts = count_n_grams(preprocessed_data, 2)
trigram_counts = count_n_grams(preprocessed_data, 3)

In [69]:
c

In [71]:
get_suggestions(previous_tokens, bigram_counts, trigram_counts, vocabulary, k=1.0)

[('the', 0.001925113100394648),
 ('to', 0.0014438348252959862),
 ('it', 0.0014438348252959862),
 ('that', 0.0013475791702762538)]

In [73]:
previous_tokens = previous_tokens + ['the']
get_suggestions(previous_tokens, unigram_counts, bigram_counts, unique_words, k=1.0)

[('<unk>', 175.77777777777777),
 ('<e>', 4.666666666666667),
 ('dog', 1.5555555555555556),
 ('cat', 1.0)]