<a href="https://colab.research.google.com/github/ashmcmn/ANLE-Labs/blob/main/week2/Lab2V2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1 Getting started 
During labs this term, you are going to be exploring language models and applying them to the Mi crosoft Research Sentence Completion Challenge (Zweig and Burges, 2011). Today, you are going to start by building an n-gram language model based on the training corpus for the Challenge. All of the data for the Sentence Completion Challenge is contained in this Lab2resources directory in the sub-directory titled sentence-completion. You may need to unzip training data.tgz so that the Holmes Training Data sub-directory is visible. 
You can use the following code to list the directory and select 50% of the files (holding back the other 50% so that you can evaluate your language model).

In [None]:
import nltk
!gdown --id '1iif5UfVD-z5wCmXuKzp7zGyz22L_mlii'
!unzip 'lab2data.zip'
nltk.download('punkt')

In [None]:
import os,random,math 
from nltk import word_tokenize as tokenize 
import operator
import random
import numpy as np
import pandas as pd

TRAINING_DIR='lab2data/Holmes_Training_Data'

def get_training_testing(training_dir=TRAINING_DIR,split=0.5,amount=6): 
  filenames=os.listdir(training_dir) 
  random.shuffle(filenames)
  selection=random.sample(filenames, amount) 
  index=int(amount*split) 
  return(selection[:index],selection[index:]) 

trainingfiles,heldoutfiles=get_training_testing()


# 2 A Unigram Model 
The code below implements a simple unigram model. The class stores the unigram probabliity distribution and provide methods for training and lookup. 

Look through the code and you should see the following steps in the training process. 
*   Each of the subset of datafiles is processed in turn. 
*   In each datafile, each line is tokenized using nltk.word tokenize() 
*   START and END tokens are added to the beginning and end of each line. 
*   A count is kept of how many times each token occurs. This is stored in a dictionary self.unigram 
*   Once all of the datafiles have been processed, 
the counts are converted into probabilities (by dividing all of them by the sum of all of the values in the dictionary). 

Test your model by looking up the probabilities of some (known) words. At this stage, if the word was not seen in the training data, the probability returned should be zero (we will worry about smoothing later!) 


In [None]:
class language_model(): 
  def __init__(self, trainingdir=TRAINING_DIR, files=[], verbose=True): 
    self.training_dir = trainingdir 
    self.files = files 
    self.verbose = verbose
    self.train() 

  def train(self): 
    self.unigram={} 
    self._processfiles() 
    self._convert_to_probs() 

  def _processline(self, line): 
    tokens = ['_START'] + tokenize(line) + ['_END'] 
    for token in tokens: 
      self.unigram[token] = self.unigram.get(token, 0) + 1 

  def _processfiles(self): 
    for afile in self.files:
      if self.verbose: 
        print('Processing {}'.format(afile)) 
      try: 
        with open(os.path.join(self.training_dir, afile)) as instream: 
          for line in instream: 
            line = line.rstrip() 
            if len(line) > 0: 
              self._processline(line) 
      except UnicodeDecodeError: 
        print('UnicodeDecodeError processing {}: ignoring file'.format(afile)) 

  def _convert_to_probs(self): 
    self.unigram={w:f / sum(self.unigram.values()) for (w, f) in self.unigram.items()} 

  def get_prob(self, token, method='unigram'): 
    if method == 'unigram': 
      return self.unigram.get(token,0) 
    else: 
      print('Not implemented: {}'.format(method)) 
      return

In [None]:
mylm=language_model(files=trainingfiles) 

In [None]:
mylm.get_prob('man')

# 2.2 Generation 
Add some functionality to your class so that you can generate a string of highly probable words. You can do this by sorting the dictionary of unigram probabilities and then randomly choosing 1 of the top k words. Repeatedly choose words until a particular token is generated (e.g., ’.’) or a maximum length is exceeded. 

As an extension, you could try to sample from the distribution. Assign each of the words a range of numbers - the size of which is proportional to its probability. You can do this by considering the cumulative probability distribution (iterate through the words in the distribution, adding the probability of the current word to the sum of all probabilities seen so far). Then you just need to pick a random number and select the word which has this number in its assigned range. Of course, there are library methods which can do this for you to — check out random.choices() at https://www.kite.com/python/ answers/how-to-sample-a-random-number-from-a-probability-distribution-in-python 

In [None]:
def generate_sentence(self, n=None, method='unigram'):
  k = 10
  blacklist = ['_START', '_END']
  if method == 'unigram':
    words = {k:v for k,v in self.unigram.items() if k not in blacklist}
    top_k = sorted(words, key=words.get, reverse=True)[:k]
    sentence = [random.choice(top_k)]

    if n == None:
      print('Parameter \'n\' must be provided when generating sentence from the unigram model.')
      return

    while len(sentence) < n:
      words = {k:v for k,v in self.unigram.items() if k not in blacklist}
      top_k = sorted(words, key=words.get, reverse=True)[:k]
      sentence.append(random.choice(top_k))

    return ' '.join(sentence)
  else:
    print('Not implemented: {}'.format(method)) 
    return

language_model.generate_sentence = generate_sentence

In [None]:
mylm.generate_sentence(10)

# 3 Adding Bigrams 
# 3.1 Training 
Extend your class so that, at training time, you also store bigram counts. I suggest that you use a dictionary of dictionaries for the bigram counts. The key for the outer dictionary should be the previous
word (which may be a START token) and the key for the inner dictionary should be the current word. The value is then how many times the current word has occurred after the previous word. For example, if you see the sentence ”When did the cat sit on the mat?”, you should get the following dictionary: 

bigram={_START:{When:1}, 
When:{did:1}, 
did:{the:1}, 
the:{cat:1, mat: 1}, 
cat:{sit:1}, 
sit:{on:1}, 
on:{the:1}, 
mat:{?:1},} 

You need to normalise each of the internal dictionaries to create probability distributions for P(w2|w1). You do this by summing the values in each internal dictionary and then dividing each count by the sum of values for that dictionary. 


In [None]:
def train(self): 
  self.unigram={} 
  self.bigram={}

  self._processfiles() 
  self._convert_to_probs() 

def _processline(self, line): 
  tokens = ['_START'] + tokenize(line) + ['_END']
  prevToken = '_END'

  for i, token in enumerate(tokens):
    self.unigram[token] = self.unigram.get(token, 0) + 1

    tokenDict = self.bigram.get(prevToken, {})
    tokenDict[token] = tokenDict.get(token, 0) + 1
    self.bigram[prevToken] = tokenDict
    prevToken = token

def _convert_to_probs(self): 
  self.unigram = {w:f / sum(self.unigram.values()) for (w, f) in self.unigram.items()} 
  self.bigram = {w1:{w2:f / sum(words.values()) for (w2, f) in words.items()} for (w1, words) in self.bigram.items()}

def get_prob(self, token, token2=None, method='unigram'): 
  if method == 'unigram': 
    return self.unigram.get(token,0) 
  elif method == 'bigram':
    return self.bigram.get(token2,{}).get(token,0)
  else: 
    print('Not implemented: {}'.format(method)) 
    return

language_model.train = train
language_model._processline = _processline
language_model._convert_to_probs = _convert_to_probs
language_model.get_prob = get_prob
mylm.train()

In [None]:
mylm.get_prob('man', 'The', 'bigram')

# 3.2 Generation 
Can you use your bigram probabilities to generate plausible sounding sentences? Initialise the current token as START and then randomly choose one of the top k most probable words for the next token. Then use this as the current token to generate the next token and so on. Experiment with different values of k 

In [None]:
def generate_sentence(self, n=None, method='unigram'):
  k = 10
  blacklist = ['_START']
  if method == 'unigram':
    words = {k:v for k,v in self.unigram.items() if k not in blacklist+['_END']}
    top_k = sorted(words, key=words.get, reverse=True)[:k]
    sentence = [random.choice(top_k)]

    if n == None:
      print('Parameter \'n\' must be provided when generating sentence from the unigram model.')
      return

    while len(sentence) < n:
      words = {k:v for k,v in self.unigram.items() if k not in blacklist+['_END']}
      top_k = sorted(words, key=words.get, reverse=True)[:k]
      sentence.append(random.choice(top_k))
      
    return ' '.join(sentence)
  elif method == 'bigram':
    words = {k:v for k,v in self.bigram['_START'].items() if k not in blacklist}
    top_k = sorted(words, key=words.get, reverse=True)[:k]
    sentence = [random.choice(top_k)]

    if n == None:
      while sentence[-1] != '_END':
        words = {k:v for k,v in self.bigram[sentence[-1]].items() if k not in blacklist}
        top_k = sorted(words, key=words.get, reverse=True)[:k]
        sentence.append(random.choice(top_k))

      sentence.pop()
    else:
      while len(sentence) < n:
        words = {k:v for k,v in self.bigram[sentence[-1]].items() if k not in blacklist+['_END']}
        top_k = sorted(words, key=words.get, reverse=True)[:k]
        sentence.append(random.choice(top_k))
    return ' '.join(sentence)
  else:
    print('Not implemented: {}'.format(method)) 
    return

language_model.generate_sentence = generate_sentence

In [None]:
mylm.generate_sentence(10)

In [None]:
mylm.generate_sentence(method='bigram')

In [None]:
mylm.generate_sentence(10, method='bigram')

# 4 Perplexity 
Perplexity is a function of a model and some language data. If the observed language data is highly likely given the model then the perplexity will be low. 
* Write a method or function that calculates the log probability (LP) of a corpus using the unigram model and the bigram model. 
* Compute the perplexity using the formula p = e^(-lp/N) , where N is the number of words in the corpus

In [None]:
def _lp_line(self, line, method='unigram'):
  tokens = ['_START']+tokenize(line)+['_END']
  lp = 0

  for i, token in enumerate(tokens[1:]):
    lp += np.log(self.get_prob(token, tokens[i], method))
  
  return lp, len(tokens[1:])

def calculate_perplexity(self, corpus, method='unigram'):
  p, N = 0, 0

  for line in corpus:
    line = line.rstrip()
    if len(line) > 0:
      res = self._lp_line(line, method)
      p += res[0]
      N += res[1]

  return np.exp(-p/N)

language_model._lp_line = _lp_line
language_model.calculate_perplexity = calculate_perplexity

In [None]:
for afile in trainingfiles:
  print("Processing file {}".format(afile))
  try:
    with open(os.path.join(TRAINING_DIR, afile)) as instream:
      print(mylm.calculate_perplexity(instream))
  except UnicodeDecodeError:
    print('UnicodeDecodeError processing file {}: ignoring rest of file'.format(afile))

# 5 Dealing with Unseen Data 
If you run your perplexity function on the testingfiles (rather than the training files), you will quickly run into the problem that certain unigrams and bigrams in the testingfiles were not present in the trainingfiles. This means that their probability will be zero (making the probability of the whole corpus to be zero). 

# 5.1 Unknown words 
In order to have some probability mass for unseen unigrams, we need to pretend that some were unseen in the training data. Therefore, we replace low frequency unigrams in the training data with the UNK token. One way of implementing this is: 
* Before converting counts to probabilities, iterate over the unigram probability distribution and find words which have occurred less frequently than a threshold (e.g., 2). Remove this key from the unigram distribution del self.unigram[key] and add the count to the entry for the UNK token. 
* Iterate over the bigram distribution. 
– For each inner dictionary, use the same process as for the unigram distribution. – If any outer keys are no longer valid unigram keys, delete that outer key and add the whole contents of the dictionary associated with it to an entry in the bigram distribution for UNK 
* When looking up a probability or a distribution for a word, if it is not found then use the probability or distribution for UNK instead 
You should now be able to compute the perplexity of the unigram model for the testingfiles. What do you notice about the perplexity values for this model?

In [None]:
def _add_unknowns(self, k=2):
  for word, freq in list(self.unigram.items()):
    if freq < k:
      del self.unigram[word]
      self.unigram['_UNK'] = self.unigram.get('_UNK', 0) + freq
  
  for word, inner in list(self.bigram.items()):
    for word2, freq in list(inner.items()):
      if self.unigram.get(word2, 0) == 0:
        inner['_UNK'] = inner.get('_UNK', 0) + freq
        del inner[word2]
    
    if self.unigram.get(word, 0) == 0:
      del self.bigram[word]
      b = self.bigram.get('_UNK', {})
      b.update(inner)
      self.bigram['_UNK'] = b
    else:
      self.bigram[word] = inner

def train(self): 
  self.unigram={} 
  self.bigram={}
  
  self._processfiles()
  self._add_unknowns()
  self._convert_to_probs() 

def get_prob(self, token, token2=None, method='unigram'): 
  if method == 'unigram': 
    return self.unigram.get(token, self.unigram.get('_UNK', 0))
  elif method == 'bigram':
    b = self.bigram.get(token2, self.bigram.get('_UNK', {}))
    return b.get(token, b.get('_UNK', 0))
  else: 
    print('Not implemented: {}'.format(method)) 
    return


language_model._add_unknowns = _add_unknowns
language_model.train = train
language_model.get_prob = get_prob
mylm.train()

In [None]:
for afile in trainingfiles:
  print("Processing file {}".format(afile))
  try:
    with open(os.path.join(TRAINING_DIR, afile)) as instream:
      print(mylm.calculate_perplexity(instream))
  except UnicodeDecodeError:
    print('UnicodeDecodeError processing file {}: ignoring rest of file'.format(afile))

# 5.2 Discounting for unseen combinations 
Having dealt with unseen unigrams, we need to also deal with unseen bigrams i.e., bigrams where both words have been seen but not with each other. 

We could use Laplacian (add-k) smoothing but this is both computationally inefficient and it tends to assign too much probability mass to unseen events. A more common approach is to use some form of discounting and backoff. 

First, we apply a discount to each bigram count. There are variations where the discount applied depends on the word or the frequency of the word or the frequency of the bigram, but applying a fixed or absolute discount of 0.75 to each bigram count is the simplest method and works very well. It is also necessary to store the total amount that has been discounted for each unigram (this will depend on how many words have been seen following it). You can store this in the bigram distribution (under self.bigram[unigram][" DISCOUNT"]. This means the total counts will not have changed and therefore you have effectively reserved some probability mass for unseen bigrams. 

Then, to estimate the smoothed probability of Pe(w2|w1): 
Pe(w2|w1) = Po(w2|w1) + Po( DISCOUNT |w1) × Po(w2) 
You should now be able to compute the perplexity of the bigram model with respect to the testing files.

In [None]:
def train(self): 
  self.unigram={}
  self.bigram={}

  self._processfiles()
  self._add_unknowns()
  self._discount()
  self._convert_to_probs() 

def _discount(self, d=0.75):
  for word1 in self.bigram:
    for word2 in self.bigram[word1]:
      self.bigram[word1][word2] -= d
    self.bigram[word1]['_DISCOUNT'] = d * len(self.bigram[word1])

def get_prob(self, token, token2=None, method='unigram'): 
  if method == 'unigram': 
    return self.unigram.get(token, self.unigram.get('_UNK', 0))
  elif method == 'bigram':
    b = self.bigram.get(token2, self.bigram.get('_UNK', {}))
    p1 = b.get(token, b.get('_UNK', 0))
    p2 = b['_DISCOUNT']
    p3 = self.unigram.get(token, self.unigram.get('_UNK', 0))
    if p1 < 0:
      print(token, token2)
    return p1 + p2 * p3
  else: 
    print('Not implemented: {}'.format(method)) 
    return

language_model._discount = _discount
language_model.train = train
language_model.get_prob = get_prob
mylm.train()

In [None]:
for afile in trainingfiles:
  print("Processing file {}".format(afile))
  try:
    with open(os.path.join(TRAINING_DIR, afile)) as instream:
      print(mylm.calculate_perplexity(instream, method='bigram'))
  except UnicodeDecodeError:
    print('UnicodeDecodeError processing file {}: ignoring rest of file'.format(afile))

# 6 Extensions 
1. Implement the Kneser-Ney backoff method and compare this to the absolute discounting method introduced above. 


In [None]:
def _discount(self, d=0.75):
  self.kn = {}

  for word1 in self.bigram:
    for word2 in self.bigram[word1]:
      self.bigram[word1][word2] -= d
      self.kn[word2] = self.kn.get(word2, 0) + 1
    self.bigram[word1]['_DISCOUNT'] = d * len(self.bigram[word1])

def _convert_to_probs(self): 
  self.unigram = {w:f / sum(self.unigram.values()) for (w, f) in self.unigram.items()} 
  self.bigram = {w1:{w2:f / sum(words.values()) for (w2, f) in words.items()} for (w1, words) in self.bigram.items()}
  self.kn = {w:f / sum(self.kn.values()) for (w,f) in self.kn.items()}

def get_prob(self, token, token2=None, method='unigram', smoothing=True): 
  if method == 'unigram': 
    return self.unigram.get(token, self.unigram.get('_UNK', 0))
  elif method == 'bigram':
    d = self.kn if smoothing else self.unigram
    b = self.bigram.get(token2, self.bigram.get('_UNK', {}))
    p1 = b.get(token, b.get('_UNK', 0))
    p2 = b['_DISCOUNT']
    p3 = d.get(token, d.get('_UNK', 0))
    return p1 + p2 * p3
  else: 
    print('Not implemented: {}'.format(method)) 
    return

def _lp_line(self, line, method='unigram', smoothing=True):
  tokens = ['_START']+tokenize(line)+['_END']
  lp = 0

  for i, token in enumerate(tokens[1:]):
    lp += np.log(self.get_prob(token, tokens[i], method, smoothing))
  
  return lp, len(tokens[1:])

def calculate_perplexity(self, corpus, method='unigram', smoothing=True):
  p, N = 0, 0

  for line in corpus:
    line = line.rstrip()
    if len(line) > 0:
      res = self._lp_line(line, method, smoothing)
      p += res[0]
      N += res[1]

  return np.exp(-p/N)

language_model._discount = _discount
language_model._convert_to_probs = _convert_to_probs
language_model.get_prob = get_prob
language_model._lp_line = _lp_line
language_model.calculate_perplexity = calculate_perplexity
mylm.train()

In [None]:
for afile in trainingfiles:
  print("Processing file {}".format(afile))
  try:
    with open(os.path.join(TRAINING_DIR, afile)) as instream:
      print(mylm.calculate_perplexity(instream, method='bigram'))
  except UnicodeDecodeError:
    print('UnicodeDecodeError processing file {}: ignoring rest of file'.format(afile))

2. Investigate how perplexity is affected by one or more of the following 
  * amount of training data 
  * amount of testing data 
  * the threshold frequency for unknown words 
  * the size of the discount applied to observed bigrams 


In [None]:
def __init__(self, trainingdir=TRAINING_DIR, files=[], verbose=True, k=2, d=0.75): 
  self.training_dir = trainingdir 
  self.files = files 
  self.verbose = verbose
  self.train(k, d) 

def train(self, k, d): 
  self.unigram={}
  self.bigram={}

  self._processfiles()
  self._add_unknowns(k)
  self._discount(d)
  self._convert_to_probs() 

def _add_unknowns(self, k):
  for word, freq in list(self.unigram.items()):
    if freq < k:
      del self.unigram[word]
      self.unigram['_UNK'] = self.unigram.get('_UNK', 0) + freq
  
  for word, inner in list(self.bigram.items()):
    for word2, freq in list(inner.items()):
      if self.unigram.get(word2, 0) == 0:
        inner['_UNK'] = inner.get('_UNK', 0) + freq
        del inner[word2]
    
    if self.unigram.get(word, 0) == 0:
      del self.bigram[word]
      b = self.bigram.get('_UNK', {})
      b.update(inner)
      self.bigram['_UNK'] = b
    else:
      self.bigram[word] = inner

def _discount(self, d):
  self.kn = {}

  for word1 in self.bigram:
    for word2 in self.bigram[word1]:
      self.bigram[word1][word2] -= d
      self.kn[word2] = self.kn.get(word2, 0) + 1
    self.bigram[word1]['_DISCOUNT'] = d * len(self.bigram[word1])

language_model.__init__ = __init__
language_model.train = train
language_model._add_unknowns = _add_unknowns
language_model._discount = _discount

In [None]:
splits = [0.2, 0.6, 0.9]
thresholds = [2, 4, 5]
discounts = [0.3, 0.6, 0.9]

results = []

for split in splits:
  for k in thresholds:
    for d in discounts:
      trainingfiles,heldoutfiles=get_training_testing(split=split,amount=10)
      mylm = language_model(files=trainingfiles, verbose=False, k=k, d=d)
      p = 0
      n = len(heldoutfiles)
      for afile in heldoutfiles:
        try:
          with open(os.path.join(TRAINING_DIR, afile)) as instream:
            p += mylm.calculate_perplexity(instream, method='bigram')
        except UnicodeDecodeError:
          print('UnicodeDecodeError processing file {}: ignoring rest of file'.format(afile))
          n -= 1
      if n == 0:
        print('Skipping test due to lack of test data')
        continue
      p /= n
      results.append([split, k, d, p])
      print('Split: {}, Threshold: {}, Discount: {}, Average Perplexity: {}'.format(split, k, d, p))

df = pd.DataFrame(results, columns=['Split', 'Threshold', 'Discount', 'Perplexity'])
df.sort_values(by=['Perplexity'])

3. Extend your model to trigrams. 


In [None]:
class language_model(): 
  def __init__(self, trainingdir=TRAINING_DIR, files=[], verbose=True, k=2, d=0.75): 
    self.training_dir = trainingdir 
    self.files = files 
    self.verbose = verbose
    self.train(k, d) 

  def train(self, k, d): 
    self.unigram={}
    self.bigram={}
    self.trigram={}

    self._processfiles()
    self._add_unknowns(k)
    self._discount(d)
    self._convert_to_probs() 

  def _processline(self, line): 
    tokens = ['_START'] + tokenize(line) + ['_END']
    prevToken = '_END'
    
    for i, token in enumerate(tokens):
      self.unigram[token] = self.unigram.get(token, 0) + 1

      tokenDict = self.bigram.get(prevToken, {})
      tokenDict[token] = tokenDict.get(token, 0) + 1
      self.bigram[prevToken] = tokenDict

      if i < len(tokens) - 1:
        nextToken = tokens[i+1]
        tokenDict = self.trigram.get(prevToken, {})
        tokenDict[token] = tokenDict.get(token, 0) + 1
        tokenDict[nextToken] = tokenDict.get(nextToken, 0) + 1
        self.trigram[prevToken] = tokenDict

      prevToken = token

  def _processfiles(self): 
    for afile in self.files:
      if self.verbose: 
        print('Processing {}'.format(afile)) 
      try: 
        with open(os.path.join(self.training_dir, afile)) as instream: 
          for line in instream: 
            line = line.rstrip() 
            if len(line) > 0: 
              self._processline(line) 
      except UnicodeDecodeError: 
        print('UnicodeDecodeError processing {}: ignoring file'.format(afile)) 

  def _convert_to_probs(self): 
    self.unigram = {w:f / sum(self.unigram.values()) for (w, f) in self.unigram.items()} 
    self.bigram = {w1:{w2:f / sum(words.values()) for (w2, f) in words.items()} for (w1, words) in self.bigram.items()}
    self.trigram = {w1:{w2:f / sum(words.values()) for (w2, f) in words.items()} for (w1, words) in self.trigram.items()}
    self.kn = {w:f / sum(self.kn.values()) for (w,f) in self.kn.items()}

  def get_prob(self, token, token2=None, method='unigram', smoothing=True): 
    if method == 'unigram': 
      return self.unigram.get(token, self.unigram.get('_UNK', 0))
    elif method == 'bigram':
      d = self.kn if smoothing else self.unigram
      b = self.bigram.get(token2, self.bigram.get('_UNK', {}))
      p1 = b.get(token, b.get('_UNK', 0))
      p2 = b['_DISCOUNT']
      p3 = d.get(token, d.get('_UNK', 0))
      return p1 + p2 * p3
    elif method == 'trigram':
      b = self.trigram.get(token2, self.trigram.get('_UNK', {}))
      p1 = b.get(token, b.get('_UNK', 0))
      p2 = b['_DISCOUNT']
      p3 = b.get(token, b.get('_UNK', 0))
      return p1 + p2 * p3
    else: 
      print('Not implemented: {}'.format(method)) 
      return

  def _lp_line(self, line, method='unigram', smoothing=True):
    tokens = ['_START']+tokenize(line)+['_END']
    lp = 0

    for i, token in enumerate(tokens[1:]):
      lp += np.log(self.get_prob(token, tokens[i], method, smoothing))
    
    return lp, len(tokens[1:])

  def calculate_perplexity(self, corpus, method='unigram', smoothing=True):
    p, N = 0, 0

    for line in corpus:
      line = line.rstrip()
      if len(line) > 0:
        res = self._lp_line(line, method, smoothing)
        p += res[0]
        N += res[1]

    return np.exp(-p/N)

  def _add_unknowns(self, k):
    for word, freq in list(self.unigram.items()):
      if freq < k:
        del self.unigram[word]
        self.unigram['_UNK'] = self.unigram.get('_UNK', 0) + freq
    
    for word, inner in list(self.bigram.items()):
      for word2, freq in list(inner.items()):
        if self.unigram.get(word2, 0) == 0:
          inner['_UNK'] = inner.get('_UNK', 0) + freq
          del inner[word2]
      
      if self.unigram.get(word, 0) == 0:
        del self.bigram[word]
        b = self.bigram.get('_UNK', {})
        b.update(inner)
        self.bigram['_UNK'] = b
      else:
        self.bigram[word] = inner

    for word, inner in list(self.trigram.items()):
      for word2, freq in list(inner.items()):
        if self.unigram.get(word2, 0) == 0:
          inner['_UNK'] = inner.get('_UNK', 0) + freq
          del inner[word2]
      
      if self.unigram.get(word, 0) == 0:
        del self.trigram[word]
        b = self.trigram.get('_UNK', {})
        b.update(inner)
        self.trigram['_UNK'] = b
      else:
        self.trigram[word] = inner

  def _discount(self, d):
    self.kn = {}

    for word1 in self.bigram:
      for word2 in self.bigram[word1]:
        self.bigram[word1][word2] -= d
        self.kn[word2] = self.kn.get(word2, 0) + 1
      self.bigram[word1]['_DISCOUNT'] = d * len(self.bigram[word1])

    for word1 in self.trigram:
      for word2 in self.trigram[word1]:
        self.trigram[word1][word2] -= d
      self.trigram[word1]['_DISCOUNT'] = d * len(self.trigram[word1])

  def generate_sentence(self, n=None, method='unigram', k=10):
    blacklist = ['_START', '_UNK', '_DISCOUNT']
    if method == 'unigram':
      words = {k:v for k,v in self.unigram.items() if k not in blacklist+['_END']}
      top_k = sorted(words, key=words.get, reverse=True)[:k]
      sentence = [random.choice(top_k)]

      if n == None:
        print('Parameter \'n\' must be provided when generating sentence from the unigram model.')
        return

      while len(sentence) < n:
        words = {k:v for k,v in self.unigram.items() if k not in blacklist+['_END']}
        top_k = sorted(words, key=words.get, reverse=True)[:k]
        sentence.append(random.choice(top_k))
        
      return ' '.join(sentence)
    elif method == 'bigram' or method == 'trigram':
      source = self.bigram if method == 'bigram' else self.trigram
      words = {k:v for k,v in source['_START'].items() if k not in blacklist}
      top_k = sorted(words, key=words.get, reverse=True)[:k]
      sentence = [random.choice(top_k)]

      if n == None:
        while sentence[-1] != '_END':
          words = {k:v for k,v in source[sentence[-1]].items() if k not in blacklist}
          top_k = sorted(words, key=words.get, reverse=True)[:k]
          sentence.append(random.choice(top_k))

        sentence.pop()
      else:
        while len(sentence) < n:
          words = {k:v for k,v in source[sentence[-1]].items() if k not in blacklist+['_END']}
          top_k = sorted(words, key=words.get, reverse=True)[:k]
          sentence.append(random.choice(top_k))
      return ' '.join(sentence)
    else:
      print('Not implemented: {}'.format(method)) 
      return

In [None]:
trainingfiles,heldoutfiles=get_training_testing(split=0.8,amount=5)
mylm = language_model(files=trainingfiles)

In [None]:
mylm.generate_sentence(10)

In [None]:
mylm.generate_sentence(method='bigram')

In [None]:
mylm.generate_sentence(10, method='bigram')

In [None]:
mylm.generate_sentence(method='trigram')

In [None]:
mylm.generate_sentence(10, method='trigram')

In [None]:
mylm.generate_sentence(method='bigram', k=20)

In [None]:
mylm.generate_sentence(method='trigram', k=20)