In [None]:
from collections import Counter
from typing import List, Tuple, Callable
from string import punctuation

START_TOKEN: str = "<s>"
END_TOKEN: str = "</s>"
UNKNOWN_TOKEN: str = "<UNK>"

def get_sanitized_n_gram(filename: str, counter_fn: Callable[[Counter, List[str]], None]) -> Counter:
    result_counter: Counter = Counter()
    with open(filename, "r") as file_object:
        # File is already document sentence tokenized
        for comment in file_object.readlines(): 
            # pad each comment with start..stop tokens
            words = [START_TOKEN] 
            for unsanitized_word in comment.strip().split():
                # keep a punctuation-free vocabulary
                if unsanitized_word not in punctuation:
                    words.append(unsanitized_word)
            words.append(END_TOKEN)

            # result_counter is passed in an out-param
            counter_fn(result_counter, words)

    return result_counter
            


In [None]:
### UNIT TESTS ###

def _counter_fn (ctr, words): 
    ctr += Counter(words)

_test_words: Counter = get_sanitized_n_gram("train.txt", _counter_fn)
assert len(_test_words) > 0
list(_test_words.items())[:5]


In [None]:
def collect_unigram(filename: str) -> Counter:
    def _unigram_collector(unigram_counter: Counter, words: List[str]):
        for word in words:
            unigram_counter[word] += 1
    return get_sanitized_n_gram(filename, _unigram_collector)

def collect_bigram(filename: str) -> Counter:
    def _bigram_collector(bigram_counter: Counter, words: List[str]):
        # note that the last word can't form a bigram (OOB)
        for i in range(len(words) - 1):
            bigram = (words[i], words[i + 1]) 
            bigram_counter[bigram] += 1
    
    return get_sanitized_n_gram(filename, _bigram_collector)


In [None]:
train_unigram_counter: Counter = collect_unigram("train.txt") 
train_bigram_counter: Counter = collect_bigram("train.txt")

In [None]:
class NGramModel:
  def __init__(self, unigram_counts: Counter, bigram_counts: Counter=None):
    self.unigram_counts: Counter = unigram_counts # ( word, number_occurrence)
    self.bigram_counts: Counter = bigram_counts
    self.total_unigram_count: int = sum(self.unigram_counts.values())

    if bigram_counts: # If asking for Bigram Model
      self.bigram_context_counter: Counter = Counter()
      for (word_1, word_2), count in bigram_counts.items():
        self.bigram_context_counter[word_1] += count # This is the same as the unigarm_counts
      

  def unigram_probability(self, word: str) -> float:
    return self.unigram_counts[word] / self.total_unigram_count
  
  def bigram_probability(self, ask: str, given: Tuple[str, str]) -> float:
    if self.bigram_context_counter[given] == 0: 
      print("Unseen Before")
      return 0 # Unseen before
    else:
      # Return number of times { ask | Given} \over number of {Given}
      return self.bigram_counts[(given, ask)]/ self.bigram_context_counter[given]
  
  def add_k_smoothing(self, w1: str, w2: str | None=None, k: int=1) -> float: #Add 1 Method
    v_size = len(self.unigram_counts)
    if w2 is None:  # Unigram
        return (self.unigram_counts[w1] + k) / (self.total_unigram_count + ( k * v_size ))
    else:  # Bigram 
        return (self.bigram_counts[(w1, w2)] + k) / ( self.bigram_context_counter[w1] + (v_size  * k))


In [None]:
tester = NGramModel(train_unigram_counter, train_bigram_counter)
print(tester.unigram_counts['I'], tester.bigram_context_counter["I"])

In [None]:
example_bi = collect_bigram("example.txt")
example_uni = collect_unigram("example.txt")
example_tester = NGramModel(example_uni, example_bi)
print(example_bi)
print(example_tester.bigram_context_counter)
print(example_tester.unigram_counts)
print(example_tester.unigram_probability("the"), example_tester.unigram_probability("like"))
print(example_tester.bigram_probability(ask="the", given="like"), example_tester.bigram_probability("students", "the")) # 1.0, 0.5


In [None]:
def handle_unknown_words(word_counts, bigram_counts, threshold=5):
  # The idea is to remake the Tuple frequency: for any tuple such that frequency < 'threshold', replace with  <UNK>
  vocab = set()
  unknown = 0
  special_token = {START_TOKEN, END_TOKEN, UNKNOWN_TOKEN}
  vocab.update(special_token)

  for word, count in word_counts.items():
    if count > threshold or word in special_token: # In case the file has less than threshold tokens.
      vocab.add(word)
    else:
      unknown += count
    
  #After the for loop, the total "Unknown" should be equal to SUM(vocabs < threshold)
  new_counter = Counter()
  new_counter[UNKNOWN_TOKEN] = unknown
  for word, count in word_counts.items():
    if word in vocab:
      new_counter[word] = count
  
  processed_bigram = Counter()
  
  for (w1, w2), count in bigram_counts.items():
    new_w1 = w1 if w1 in vocab else UNKNOWN_TOKEN
    new_w2 = w2 if w2 in vocab else UNKNOWN_TOKEN
    processed_bigram[(new_w1, new_w2)] += count

  return vocab, new_counter, processed_bigram

In [None]:
def train_model(train_file, threshold = 5) -> NGramModel:
  # Collect frequency in the train.txt
  train_unigram = collect_unigram(train_file) # ('word', frequency)
  train_bigram = collect_bigram(train_file) # (('Given', 'Ask'), frequency)

  vocab, processed_uni, processed_bi = handle_unknown_words(train_unigram, train_bigram, threshold) #Vocab with frequency > 5, and using the criteria to get processed_uni and bi, which contains <UNK>

  model = NGramModel(processed_uni, processed_bi)
  model.vocab = vocab

  return model

In [None]:
import math
import sys

def perplexity(val_file: str, model: NGramModel, k_smoothing: int, verbose: bool= False) -> float:
  pp: float = 0
  val_file_words: int = 0
  unknown_count: int = 0
  
  with open(val_file, "r") as f:
    for line in f.readlines():
      line = line.strip()
      if not line:
        continue
      
      words: List[str] = [START_TOKEN]
      words += line.split()
      words.append(END_TOKEN)

      words = [w if w in model.vocab else UNKNOWN_TOKEN for w in words] 
      # unknown_count += sum(1 for orig, new in zip ( original_words, words ) if new == '<UNK>')
      for i in range(len(words)):
        if words[i] in punctuation:
            continue
        if i == 0:
          prob = model.add_k_smoothing(words[i], k = k_smoothing)
        else:
          prob = model.add_k_smoothing(words[i - 1], words[i], k_smoothing)
        if prob == 0:
          sys.stderr.write(f"Token \'{words[i]}\' has 0 smoothed probability.")
        else:
          pp += math.log(prob)
          
        val_file_words += 1
          
        if verbose:
          print(f"Unknown word \'{words[i]}\' in {val_file}: {unknown_count}")
    return math.exp( -pp / val_file_words)



In [None]:
def main() -> None:
  model = train_model("train.txt", threshold=5)

  print("Vocab size :", len( model.vocab  ))
  print("Unigram", model.total_unigram_count)

  for k in [0.1, 0.6, 1, 2]:
    train_pp: float = perplexity("train.txt", model, k)
    val_pp: float = perplexity("val.txt", model, k)
    print(f"k= {k}: Train PP = {train_pp}, val pp = {val_pp}")

In [None]:
main()