In [3]:
!cp -r https://github.com/gil-kapel/cs236299_spring2022/* .
!pip install -r requirements.txt

SyntaxError: ignored

In [2]:
import os
import itertools
import math
import random
import re
import wget
import torchtext.legacy as tt
import copy
# import torch

from collections import defaultdict, Counter
from sys import getsizeof


ModuleNotFoundError: ignored

In [None]:
# Set random seeds
SEED = 305323776
random.seed(SEED)
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# print (device)


In [None]:
# Some utilities to manipulate the corpus - Was taken from lab 2-1

def preprocess(text):
    """Strips #comments and empty lines from a string
    """
    result = []
    for line in text.split("\n"):
        line = line.strip()              # trim whitespace
        line = re.sub('#.*$', '', line)  # trim comments
        if line != '':                   # drop blank lines
            result.append(line)
    return result

def geah_tokenize(lines):
    """Specialized tokenizer for GEaH corpus handling speaker IDs"""
    result = []
    for line in lines:
        # tokenize
        tokens = tt.data.get_tokenizer("basic_english")(line)
        # revert the speaker ID token
        if tokens[0] == "sam":
            tokens[0] = "SAM:"
        elif tokens[0] == "guy":
            tokens[0] = "GUY:"
        else:
            raise ValueError("format problem - bad speaker ID")
        # add a start of sentence token
        result += ["<s>"] + tokens
    return result
                    
def postprocess(tokens):
    """Converts `tokens` to a string with one sentence per line"""
    return ' '.join(tokens)\
              .replace("<s> ", "\n")

def split(list, portions, offset):
    """Splits `list` into a "large" and a "small" part, returning them as a pair.
    
    The two parts are formed by partitioning `list` into `portions` disjoint pieces.
    The small part is the piece at index `offset`; the large part is the remainder.
    """
    return ([list[i] for i in range(0, len(list)) if i % portions != offset],
            [list[i] for i in range(0, len(list)) if i % portions == offset])
    

In [None]:
# Read the GEaH data and preprocess into training and test streams of tokens
geah_filename = ("https://github.com/khushmeeet/potter-nlp/blob/master/data/"
                 "Book 1 - The Philosopher's Stone_djvu.txt.html")
os.makedirs('data', exist_ok=True)
wget.download(geah_filename, out="data/")

with open("data/Book 1 - The Philosopher's Stone_djvu.txt.html", 'r') as fin:
    lines = preprocess(fin.read())
    train_lines, test_lines = split(lines, 12, 0)
    train_tokens = geah_tokenize(train_lines)
    test_tokens = geah_tokenize(test_lines)

vocabulary = list(set(train_tokens))


In [None]:
def all_ngrams(vocabulary, n):
    """Returns a list of all `n`-long *tuples* of elements of the `vocabulary`.
    """
    return list(itertools.product(vocabulary, repeat = n))

def ngrams(tokens, n):
    """Returns a list of all `n`-gram instances in a list of `tokens`, in order.
    """
    return [tuple(tokens[i : i + n])
            for i in range(0, len(tokens) - n + 1)]

def ngram_counts(vocabulary, tokens, n):
    """Returns a dictionary of counts of the `n`-grams in `tokens`.
    
    The dictionary is structured with first index by (n-1)-gram context
    and second index by the final target token.
    """
    context_dict = defaultdict(lambda: defaultdict(int))
    # zero all ngrams
    for context in all_ngrams(vocabulary, n - 1):
        for target in vocabulary:
            context_dict[context][target] = 0
    # add counts for attested ngrams
    for ngram, count in Counter(ngrams(tokens, n)).items():
        context_dict[ngram[:-1]][ngram[-1]] = count
    return context_dict

In [None]:
def ngram_model(ngram_counts):
    """Returns an n-gram probability model calculated by normalizing the 
       provided `ngram-counts` dictionary
    """
    import copy
    normalized = copy.deepcopy(ngram_counts)
    for key, value in ngram_counts.items():
      for word in value:
        s = sum(value.values())
        if s == 0:
          normalized[key][word] = 0
        else:
          normalized[key][word] = normalized[key][word] / s
    return normalized


In [None]:
def sample(model, context):
    """Returns a token sampled from the `model` assuming the `context`"""
    distribution = model[context]
    prob_remaining = random.random()
    for token, prob in sorted(distribution.items()):
        if prob_remaining < prob:
            return token
        else:
            prob_remaining -= prob
    raise ValueError

def sample_sequence(model, start_context, count=100):
    """Returns a sequence of `count` tokens sampled successively
       from the `model` *following the `start_context`*.
       The length of the returned list should be `count+len(start_context)`.
    """
    random.seed(SEED) # for reproducibility, do not change
    seq = list(start_context)
    n = len(start_context)
    for i in range(n+1, count+n+1):
      next_context = seq[i-(n+1) : i]
      next_word = sample(model, tuple(next_context))
      seq.append(next_word)
    return seq


In [None]:
def probability(tokens, model, n):
    """Returns the probability of a sequence of `tokens` according to an
       `n`-gram `model`
    """
    score = 1.0
    context = tokens[0:n-1]
    # Ignores the scores of the first n-1 tokens
    for token in tokens[n-1:]:
        prob = model[tuple(context)][token]
        score *= prob
        context = (context + [token])[1:]
    return score

def neglogprob(tokens, model, n):
    """Returns the negative log probability of a sequence of `tokens`
       according to an `n`-gram `model`
    """
    score = probability(tokens, model, n)
    if score == 0:
      return math.inf
    else:
      return -math.log2(score)

def perplexity(tokens, model, n):
    """Returns the perplexity of a sequence of `tokens` according to an
       `n`-gram `model`
    """
    N = len(tokens) - n + 1
    prob = 2 ** (-neglogprob(tokens, model, n))
    if prob == 0:
      return math.inf
    else:
      return (1 / prob) ** (1/N)

In [None]:
def ngram_model_smoothed(ngram_counts, delta:float):
  normalized = copy.deepcopy(ngram_counts)
  for key, value in ngram_counts.items():
    for word in value:
      s = sum(value.values())
      normalized[key][word] = (normalized[key][word] + delta) / (s + len(normalized)*delta)
  return normalized

In [None]:
unigram_counts = ngram_counts(vocabulary, train_tokens, 1)
bigram_counts = ngram_counts(vocabulary, train_tokens, 2)
trigram_counts = ngram_counts(vocabulary, train_tokens, 3)

unigram_model = ngram_model(unigram_counts)
bigram_model = ngram_model(bigram_counts)
trigram_model = ngram_model(trigram_counts)

delta = 1

unigram_model_smoothed = ngram_model_smoothed(copy.deepcopy(unigram_counts), delta)
bigram_model_smoothed = ngram_model_smoothed(copy.deepcopy(bigram_counts), delta)
trigram_model_smoothed = ngram_model_smoothed(copy.deepcopy(trigram_counts), delta)

print(f"Test perplexity - smoothed unigram: {perplexity(test_tokens, unigram_model_smoothed, 1):.3f}\n"
      f"Test perplexity - smoothed bigram: {perplexity(test_tokens, bigram_model_smoothed, 2):.3f}\n"
      f"Test perplexity - smoothed trigram: {perplexity(test_tokens, trigram_model_smoothed, 3):.3f}")

print(f"Test perplexity - unigram: {perplexity(test_tokens, unigram_model, 1):.3f}\n"
      f"Test perplexity -  bigram: {perplexity(test_tokens, bigram_model, 2):.3f}\n"
      f"Test perplexity - trigram: {perplexity(test_tokens, trigram_model, 3):.3f}")