In [0]:
import json
import numpy as np
from collections import Counter, defaultdict

If using Google Colab then this will be necessary, put the input files in the same way as below.

In [0]:
#from google.colab import drive
#drive.mount('/content/drive')

In [0]:
#import os
#os.chdir("drive/My Drive/CL_Dataset")

In [0]:
#!ls

Because of the computation time we used a smaller dataset

In [0]:
#train_path = './train1_corpus.json'
#test_path = './test1_corpus.json'

### Test / Train Set Path

In [0]:
train_path = './train_corpus.json'
test_path = './test_corpus.json'

### Actual Code


In [0]:
class Corpus(object):
    
    """
    This class creates a corpus object read off a .json file consisting of a list of lists,
    where each inner list is a sentence encoded as a list of strings.
    """
    
    def __init__(self, path, t, n=3, bos_eos=True, vocab=None):
        
        """
        DON'T TOUCH THIS CLASS! 
        IT'S HERE TO SHOW THE PROCESS, YOU DON'T NEED TO ANYTHING HERE. 
        
        A Corpus object has the following attributes:
         - vocab: set or None (default). If a set is passed, words in the input file not 
                         found in the set are replaced with the UNK string
         - path: str, the path to the .json file used to build the corpus object
         - t: int, words with frequency count < t are replaced with the UNK string
         - ngram_size: int, 2 for bigrams, 3 for trigrams, and so on.
         - bos_eos: bool, default to True. If False, bos and eos symbols are not 
                     prepended and appended to sentences.
         - sentences: list of lists, containing the input sentences after lowercasing and 
                         splitting at the white space
         - frequencies: Counter, mapping tokens to their frequency count in the corpus
        """
        
        self.vocab = vocab        
        self.path = path
        self.t = t
        self.ngram_size = n
        self.bos_eos = bos_eos
        
        self.sentences = self.read()
        # output --> [['i', 'am', 'home' '.'], ['you', 'went', 'to', 'the', 'park', '.'], ...]
    
        self.frequencies = self.freq_distr()
        # output --> Counter('the': 485099, 'of': 301877, 'i': 286549, ...)
        # the numbers are made up, they aren't the actual frequency counts
        
        if self.t or self.vocab:
            # input --> [['i', 'am', 'home' '.'], ['you', 'went', 'to', 'the', 'park', '.'], ...]
            self.sentences = self.filter_words()
            # output --> [['i', 'am', 'home' '.'], ['you', 'went', 'to', 'the', 'UNK', '.'], ...]
            # supposing that park wasn't frequent enough or was outside of the training 
            # vocabulary, it gets replaced by the UNK string
            
        if self.bos_eos:
            # input --> [['i', 'am', 'home' '.'], ['you', 'went', 'to', 'the', 'park', '.'], ...]
            self.sentences = self.add_bos_eos()
            # output --> [['bos', i', 'am', 'home' '.', 'eos'], 
            #             ['bos', you', 'went', 'to', 'the', 'park', '.', 'eos'], ...]
                    
    def read(self):
        
        """
        Reads the sentences off the .json file, replaces quotes, lowercases strings and splits 
        at the white space. Returns a list of lists.
        """
        
        if self.path.endswith('.json'):
            sentences = json.load(open(self.path, 'r', encoding ='ISO-8859-1'))                
        else:   
            sentences = []
            with open(self.path, 'r', encoding ='ISO-8859-1') as f:
                for line in f:
                    print(line[:20])
                    # first strip away newline symbols and the like, then replace ' and " with the empty 
                    # string and get rid of possible remaining trailing spaces 
                    line = line.strip().translate({ord(i): None for i in '"\'\\'}).strip(' ')
                    # lowercase and split at the white space (the corpus has ben previously tokenized)
                    sentences.append(line.lower().split(' '))
        
        return sentences
    
    def freq_distr(self):
        
        """
        Creates a counter mapping tokens to frequency counts
        
        count = Counter()
        for sentence in self.sentences:
            for word in sentence:
                count[w] += 1
            
        """
    
        return Counter([word for sentence in self.sentences for word in sentence])
        
    
    def filter_words(self):
        
        """
        Replaces illegal tokens with the UNK string. A token is illegal if its frequency count
        is lower than the given threshold and/or if it falls outside the specified vocabulary.
        The two filters can be both active at the same time but don't have to be. To exclude the 
        frequency filter, set t=0 in the class call.
        """
        
        filtered_sentences = []
        for sentence in self.sentences:
            filtered_sentence = []
            for word in sentence:
                if self.t and self.vocab:
                    # check that the word is frequent enough and occurs in the vocabulary
                    filtered_sentence.append(
                        word if self.frequencies[word] > self.t and word in self.vocab else 'UNK'
                    )
                else:
                    if self.t:
                        # check that the word is frequent enough
                        filtered_sentence.append(word if self.frequencies[word] > self.t else 'UNK')
                    else:
                        # check if the word occurs in the vocabulary
                        filtered_sentence.append(word if word in self.vocab else 'UNK')
                        
            if len(filtered_sentence) > 1:
                # make sure that the sentence contains more than 1 token
                filtered_sentences.append(filtered_sentence)
    
        return filtered_sentences
    
    def add_bos_eos(self):
        
        """
        Adds the necessary number of BOS symbols and one EOS symbol.
        
        In a bigram model, you need one bos and one eos; in a trigram model you need two bos and one eos, 
        and so on...
        """
        
        padded_sentences = []
        for sentence in self.sentences:
            padded_sentence = ['#bos#']*(self.ngram_size-1) + sentence + ['#eos#']
            padded_sentences.append(padded_sentence)
    
        return padded_sentences

In [0]:
class LM(object):
    
    """
    Creates a language model object which can be trained and tested.
    The language model has the following attributes:
     - vocab: set of strings
     - lam: float, indicating the constant to add to transition counts to smooth them (default to 1)
     - ngram_size: int, the size of the ngrams
    """
    
    def __init__(self, n=3, vocab=None):
        
        self.vocab = vocab
        self.ngram_size = n
      
    def get_ngram(self, sentence, i):
        
        """
        CHANGE AT OWN RISK.
        
        Takes in a list of string and an index, and returns the history and current 
        token of the appropriate size: the current token is the one at the provided 
        index, while the history consists of the n-1 previous tokens. If the ngram 
        size is 1, only the current token is returned.
        
        Example:
        input sentence: ['bos', 'i', 'am', 'home', 'eos']
        target index: 2
        ngram size: 3
        
        ngram = ['bos', 'i', 'am']  
        #from index 2-(3-1) = 0 to index i (the +1 is just because of how Python slices lists) 
        
        history = ('bos', 'i')
        target = 'am'
        return (('bos', 'i'), 'am')
        """
        
        if self.ngram_size == 1:
            return sentence[i]
        else:
            ngram = sentence[i-(self.ngram_size-1):i+1]
            history = tuple(ngram[:-1])
            target = ngram[-1]
            return (history, target)
                    
    def update_counts(self, corpus):
        
        """
        CHANGE AT OWN RISK.
        
        Creates a transition matrix with counts in the form of a default dict mapping history
        states to current states to the co-occurrence count (unless the ngram size is 1, in which
        case the transition matrix is a simple counter mapping tokens to frequencies. 
        The ngram size of the corpus object has to be the same as the language model ngram size.
        The input corpus (passed by providing the corpus object) is processed by extracting ngrams
        of the chosen size and updating transition counts.
        
        This method creates three attributes for the language model object:
         - counts: dict, described above
         - vocab: set, containing all the tokens in the corpus
         - vocab_size: int, indicating the number of tokens in the vocabulary
        """
        
        # Removed this ValueError as we are using interpolation with tri, bi and unigrams.
        """if self.ngram_size != corpus.ngram_size:
            raise ValueError("The corpus was pre-processed considering an ngram size of {} while the "
                             "language model was created with an ngram size of {}. \n"
                             "Please choose the same ngram size for pre-processing the corpus and fitting "
                             "the model.".format(corpus.ngram_size, self.ngram_size))"""

        #This was used to check if the ngram_size was correct
        #print("TRI: ", self.ngram_size) 
        
        """A dictionary is made for the trigrams, 
        every trigrams gets counted and if it is not in the dictionary yet it will be added"""
        
        self.counts_tri = defaultdict(dict) if self.ngram_size > 1 else Counter()
        for sentence in corpus.sentences:
            for idx in range(self.ngram_size-1, len(sentence)):
                ngram = self.get_ngram(sentence, idx)
                if self.ngram_size == 1:
                    self.counts_tri[ngram] += 1
                else:
                    # it's faster to try to do something and catch an exception than to use an if statement to check
                    # whether a condition is met beforehand. The if is checked everytime, the exception is only catched
                    # the first time, after that everything runs smoothly
                    try:
                        self.counts_tri[ngram[0]][ngram[1]] += 1
                    except KeyError:
                        self.counts_tri[ngram[0]][ngram[1]] = 1

        #We now move on to make a dictionary for the bigrams, so we decrease the ngram_size by one.
        self.ngram_size -= 1
        #This was used to check if the ngram_size changed correctly
        #print("BI: ", self.ngram_size) 
      
        """A dictionary is made for the bigrams, 
        every bigrams gets counted and if it is not in the dictionary yet it will be added"""

        self.counts_bi = defaultdict(dict) if self.ngram_size > 1 else Counter()
        for sentence in corpus.sentences:
            for idx in range(self.ngram_size-1, len(sentence)):
                ngram = self.get_ngram(sentence, idx)
                if self.ngram_size == 1:
                    self.counts_bi[ngram] += 1
                else:
                    # it's faster to try to do something and catch an exception than to use an if statement to check
                    # whether a condition is met beforehand. The if is checked everytime, the exception is only catched
                    # the first time, after that everything runs smoothly
                    try:
                        self.counts_bi[ngram[0]][ngram[1]] += 1
                    except KeyError:
                        self.counts_bi[ngram[0]][ngram[1]] = 1

        #We now move on to make a counter for unigrams, so we decrease the ngram_size by one. 
        self.ngram_size -= 1 
        #This was used to check if the ngram_size changed correctly 
        #print("UNI: ", self.ngram_size) 

        """A counter is made for the unigrams, 
        every unigram gets counted and if it is not in the counter yet it will be added"""

        self.counts = defaultdict(dict) if self.ngram_size > 1 else Counter()
        for sentence in corpus.sentences:
            for idx in range(self.ngram_size-1, len(sentence)):
                ngram = self.get_ngram(sentence, idx)
                if self.ngram_size == 1:
                    self.counts[ngram] += 1
                else:
                    # it's faster to try to do something and catch an exception than to use an if statement to check
                    # whether a condition is met beforehand. The if is checked everytime, the exception is only catched
                    # the first time, after that everything runs smoothly
                    try:
                        self.counts[ngram[0]][ngram[1]] += 1
                    except KeyError:
                        self.counts[ngram[0]][ngram[1]] = 1

        #We revert the ngram_size back to the original as that is needed to run the Perplexity function properly
        self.ngram_size += 2
        #This was used to check if the ngram_size changed correctly  
        #print("Final: ",self.ngram_size )   
        
        # first loop through the sentences in the corpus, than loop through each word in a sentence
        self.vocab = {word for sentence in corpus.sentences for word in sentence}
        self.vocab_size = len(self.vocab)
    
    def get_unigram_probability(self, ngram):
        
        """
        CHANGE THIS.
        Compute the probability of a given unigram in the estimated language model using
        Laplace smoothing (add k).
        """
        
        tot = sum(list(self.counts.values()))
        try:
            ngram_count = self.counts[ngram]
        except KeyError:
            ngram_count = 0
            print(ngram_count, tot)
        
        return ngram_count/tot
    
    def get_ngram_probability(self, history, target):

        """First we compute the probabilities of the trigram.
        The amount of times the specific trigram with [history][target] will be divided by the total amount of trigrams with [history]. 
        If there is no count then the number will be set to 0.
        Probabilties are put in the tri variable"""

        ngram_tot_tri = np.sum(list(self.counts_tri[history].values()))
        try:
            transition_count_tri = self.counts_tri[history][target]
        except KeyError:
            transition_count_tri = 0

        """We had some issues with the trigram, as we would sometimes get NaN (not a number) values, 
        so then the model would crash because it only accepts numbers. 
        If there was a NaN value, which can be observed when we try to see if the value is not equal to itself, 
        a NaN would return true, then we replaced it with 0.
        """
        tri = transition_count_tri/ngram_tot_tri
        if tri != tri:
          tri = 0

        """As the current history is meant for trigrams, we have to make it suitable for bigrams.
        This can be done by just taking the second word for the history and target should be the same.
        
        Example: 
        history = ("scary", "dog") 
        target = "barks"
        history_bi = ("dog")
        target = "barks"

        history = ("dog", "barks")
        target = "menacingly"
        history_bi = ("barks",)
        target = "menacingly"    """

        history_bi = history[1:2]

        """Now we compute the probabilities of the bigram.
        The amount of times the specific bigram with [history][target] will be divided by the total amount of bigrams with [history]. 
        If there is no count then the number will be set to 0.
        Probabilties are put in the bi variable"""

        ngram_tot_bi = np.sum(list(self.counts_bi[history_bi].values()))
        try:
            transition_count_bi = self.counts_bi[history_bi][target]
        except KeyError:
            transition_count_bi = 0

        bi = transition_count_bi/ngram_tot_bi

        """Now we compute the probabilities of the unigram.
        It is the count of a specific unigram divided by the total amount of unigrams.
        Probabilities are saved in uni"""

        tot = sum(list(self.counts.values()))
        ngram_count = self.counts[target]

        uni = ngram_count/tot

        """These are weights for the interpolation, 
        l1 is for the unigram, l2 is for the bigram and l3 is for the trigram"""
        l1 = 0.3
        l2 = 0.4
        l3 = 0.3

        return (uni*l1) + (bi*l2) + (tri*l3) 
    
    def perplexity(self, test_corpus):
        
        """
        Uses the estimated language model to process a corpus and computes the perplexity 
        of the language model over the corpus.
        
        DON'T TOUCH THIS FUNCTION!!!
        """
        probs = []
        for sentence in test_corpus.sentences:
            for idx in range(self.ngram_size-1, len(sentence)):
                ngram = self.get_ngram(sentence, idx)
                if self.ngram_size == 1:
                    probs.append(self.get_unigram_probability(ngram))
                else:
                    probs.append(self.get_ngram_probability(ngram[0], ngram[1]))
        
        entropy = np.log2(probs)
        # this assertion makes sure that you retrieved valid probabilities, whose log must be <= 0
        assert all(entropy <= 0)
        
        avg_entropy = -1 * (sum(entropy) / len(entropy))
        
        return pow(2.0, avg_entropy)

### Implementing Models

In [0]:
# We set n = 3 everywhere so in the corpus and the model classes, as we were not allowed to have any free parameters.

train_corpus = Corpus(train_path, 10, n=3 , bos_eos=True, vocab=None)
bigram_model = LM(n=3)
bigram_model.update_counts(train_corpus)

# to ensure consistency, the test corpus is filtered using the vocabulary of the trained language model
test_corpus = Corpus(test_path, None, n=3, bos_eos=True, vocab=bigram_model.vocab)
bigram_model.perplexity(test_corpus)



180.35761362138015