In [1]:
import math
import random
import numpy as np
import pandas as pd
import nltk
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

**Load and Preprocess Data**

In [2]:
def load_data(text):
  with open(text, "r") as f:
    data = f.read()
  return data

In [3]:
data_path = "/content/twitter.txt"
data = load_data(data_path)

print("Length of the text corpus : ",len(data))
print("Staring of the data : ", data[:100])

Length of the text corpus :  3335477
Staring of the data :  How are you? Btw thanks for the RT. You gonna be in DC anytime soon? Love to see you. Been way, way 


In [4]:
def split_to_sentences(data):
  sentences = data.split('\n')
  sentences = [s.strip() for s in sentences]
  sentences = [s for s in sentences if len(s) > 0]

  return sentences

In [5]:
def tokenize_sentences(sentences):
  tokenize = []
  for sentence in sentences:
      sentence = sentence.lower()
      tokenize_sentence = nltk.word_tokenize(sentence)
      tokenize.append(tokenize_sentence)

  return tokenize

In [6]:
def tokenize_data(data):
  sentences = split_to_sentences(data)
  tokenized_sentences = tokenize_sentences(sentences)
  return tokenized_sentences

In [7]:
def train_test_split(data):
  tokenized_data = tokenize_data(data)
  random.shuffle(tokenized_data)
  split_ratio = 0.8
  split_index = int(len(tokenized_data) * split_ratio)
  train_data = tokenized_data[:split_index]
  test_data = tokenized_data[split_index:]

  return tokenized_data, train_data, test_data

In [8]:
tokenize_data, train_data, test_data = train_test_split(data)

print("Length of tokenized data : ", len(tokenize_data))
print("Length of train data     : ", len(train_data))
print("Length of split data     : ", len(test_data))

Length of tokenized data :  47961
Length of train data     :  38368
Length of split data     :  9593


In [9]:
def count_words(tokenized_sentences):
  counts = {}
  for sentence in tokenized_sentences:
    for token in sentence:
      if token not in counts:
        counts[token] = 1
      else:
        counts[token] += 1

  return counts

In [10]:
def words_with_n_frequency(tokenized_sentences, threshold):
  closed_vocab = []
  word_counts = count_words(tokenized_sentences)
  for word, count in word_counts.items():
    if count >= threshold:
      closed_vocab.append(word)

  return closed_vocab

In [11]:
def replace_oov_with_unk(tokenized_sentences, vocabulary, unknown_token="<unk>"):
  vocabulary = vocabulary
  replaced_tokenized_sentences = []
  for sentence in tokenized_sentences:
    replaced_sentence = []
    for token in sentence:
      if token in vocabulary:
        replaced_sentence.append(token)
      else:
        replaced_sentence.append(unknown_token)

    replaced_tokenized_sentences.append(replaced_sentence)

  return replaced_tokenized_sentences

In [12]:
def preprocess_data(train_data, test_data, count_threshold, unknown_token="<unk>"):
  vocabulary = words_with_n_frequency(train_data, count_threshold)
  train_data_replaced = replace_oov_with_unk(train_data, vocabulary, unknown_token)
  test_data_replaced = replace_oov_with_unk(test_data, vocabulary, unknown_token)

  return train_data_replaced, test_data_replaced, vocabulary

In [13]:
train_data_processed, test_data_processed, vocabulary = preprocess_data(train_data,
                                                                        test_data,
                                                                        count_threshold=2)

**Develop n-gram based Language Models**

In [14]:
def count_n_gram(data, n, start_token='<s>', end_token = '<e>'):
  n_grams = {}
  for sentence in data:
    sentence = [start_token] * n + sentence + [end_token]
    sentence = tuple(sentence)
    for i in range(len(sentence) - n + 1):
      gram = sentence[i:i+n]
      if gram in n_grams:
        n_grams[gram] += 1
      else:
        n_grams[gram] = 1

  return n_grams

In [15]:
def estimate_probability_of_single_word(word, previous_n_gram,
                         n_gram_counts, n_plus1_gram_counts, vocabulary_size, k=1.0):

  previous_n_gram = tuple(previous_n_gram)
  previous_n_gram_count = n_gram_counts.get(previous_n_gram, 0)
  denominator = previous_n_gram_count+k*vocabulary_size
  n_plus1_gram = previous_n_gram + (word,)
  n_plus1_gram_count = n_plus1_gram_counts.get(n_plus1_gram, 0)
  numerator = n_plus1_gram_count+k
  probability = numerator/denominator

  return probability

In [16]:
def estimate_probability_of_all_words(previous_n_gram, n_gram_counts, n_plus1_gram_counts, vocabulary, k=1.0):
  previous_n_gram = tuple(previous_n_gram)
  vocabulary = vocabulary + ['<e>', '<unk>']
  vocabulary_size = len(vocabulary)
  probabilities = {}
  for word in vocabulary:
    probability = estimate_probability_of_single_word(word, previous_n_gram,
                                        n_gram_counts, n_plus1_gram_counts,
                                        vocabulary_size, k=k)

    probabilities[word] = probability

  return probabilities

In [17]:
def make_count_matrix(n_plus1_gram_counts, vocabulary):
    vocabulary = vocabulary + ["<e>", "<unk>"]
    n_grams = []
    for n_plus1_gram in n_plus1_gram_counts.keys():
      n_gram = n_plus1_gram[0:-1]
      n_grams.append(n_gram)
    n_grams = list(set(n_grams))

    row_index = {n_gram:i for i, n_gram in enumerate(n_grams)}
    col_index = {word:j for j, word in enumerate(vocabulary)}

    nrow = len(n_grams)
    ncol = len(vocabulary)
    count_matrix = np.zeros((nrow, ncol))
    for n_plus1_gram, count in n_plus1_gram_counts.items():
      n_gram = n_plus1_gram[0:-1]
      word = n_plus1_gram[-1]
      if word not in vocabulary:
          continue
      i = row_index[n_gram]
      j = col_index[word]
      count_matrix[i, j] = count

    count_matrix = pd.DataFrame(count_matrix, index=n_grams, columns=vocabulary)
    return count_matrix

In [18]:
def make_probability_matrix(n_plus1_gram_counts, vocabulary, k):
    count_matrix = make_count_matrix(n_plus1_gram_counts, unique_words)
    count_matrix += k
    prob_matrix = count_matrix.div(count_matrix.sum(axis=1), axis=0)
    return prob_matrix

**Perplexity**

In [19]:
def calculate_perplexity(sentence, n_gram_counts, n_plus1_gram_counts, vocabulary_size, start_token='<s>', end_token = '<e>', k=1.0):
    n = len(list(n_gram_counts.keys())[0])
    sentence = [start_token] * n + sentence + [end_token]
    sentence = tuple(sentence)
    N = len(sentence)
    product_pi = 1.0
    for t in range(n, N):
      n_gram = sentence[t-n:t]
      word = sentence[t]
      probability = estimate_probability_of_single_word(word, n_gram, n_gram_counts, n_plus1_gram_counts, vocabulary_size, k)
      product_pi *= 1/probability

    perplexity = (product_pi)**(1/N)

    return perplexity

**Auto-complete System**

In [20]:
def suggest_a_word(previous_tokens, n_gram_counts, n_plus1_gram_counts, vocabulary, end_token='<e>', unknown_token="<unk>", k=1.0, start_with=None):
    n = len(list(n_gram_counts.keys())[0])
    previous_tokens = ['<s>'] * n + previous_tokens
    previous_n_gram = previous_tokens[-n:]
    probabilities = estimate_probability_of_all_words(previous_n_gram,
                                           n_gram_counts, n_plus1_gram_counts,
                                           vocabulary, k=k)

    suggestion = None
    max_prob = 0
    for word, prob in probabilities.items():
      if start_with is not None:
        if not word.startswith(start_with):
            continue

      if prob >= max_prob:
        suggestion = word
        max_prob = prob

    return suggestion, max_prob

In [22]:
def get_suggestions(previous_tokens, n_gram_counts_list, vocabulary, k=1.0, start_with=None):
    model_counts = len(n_gram_counts_list)
    suggestions = []
    for i in range(model_counts-1):
      n_gram_counts = n_gram_counts_list[i]
      n_plus1_gram_counts = n_gram_counts_list[i+1]

      suggestion = suggest_a_word(previous_tokens, n_gram_counts,
                                    n_plus1_gram_counts, vocabulary,
                                    k=k, start_with=start_with)
      suggestions.append(suggestion)
    return suggestions

In [28]:
n_gram_counts_list = []
for n in range(1, 4):
    print("Computing n-gram counts with n =", n, "...")
    n_model_counts = count_n_gram(train_data_processed, n)
    n_gram_counts_list.append(n_model_counts)

Computing n-gram counts with n = 1 ...
Computing n-gram counts with n = 2 ...
Computing n-gram counts with n = 3 ...


In [29]:
previous_tokens = ["Who", "are"]
suggest = get_suggestions(previous_tokens, n_gram_counts_list, vocabulary, k=1.0)

print(f"The previous words are {previous_tokens}, the suggestions are:")
print(suggest)

The previous words are ['Who', 'are'], the suggestions are:
[('you', 0.022645393721049924), ('<unk>', 6.75995403231258e-05)]
