# Programming Component
## yz4152

In [None]:
import sys
from collections import defaultdict
import math
import random
import os
import os.path
"""
COMS W4705 - Natural Language Processing
Homework 1 - Programming Component: Trigram Language Models
Yassine Benajiba
"""

In [57]:
import numpy as np
from copy import copy

In [58]:
def corpus_reader(corpusfile, lexicon=None):
    with open(corpusfile,'r') as corpus:
        for line in corpus:
            if line.strip():
                sequence = line.lower().strip().split()
                if lexicon:
                    yield [word if word in lexicon else "UNK" for word in sequence]
                else:
                    yield sequence


def get_lexicon(corpus):
    word_counts = defaultdict(int)
    for sentence in corpus:
        for word in sentence:
            word_counts[word] += 1
    return set(word for word in word_counts if word_counts[word] > 1)

In [59]:
def get_ngrams(sequence, n):
    """
    COMPLETE THIS FUNCTION (PART 1)
    Given a sequence, this function should return a list of n-grams, where each n-gram is a Python tuple.
    This should work for arbitrary values of 1 <= n < len(sequence).
    """
    ngrams = []
    sequence_c = copy(sequence)
    
    if n-1 > 0:
        padding_start = ['START'] * (n-1)
    else:
        padding_start = ['START']

    padding_end = ['STOP']
    sequence_c = padding_start + sequence_c + padding_end
    for i in range(len(sequence_c) - n + 1):
        ngrams.append(tuple(sequence_c[i:i+n]))
    return ngrams


In [60]:
class TrigramModel(object):

    def __init__(self, corpusfile):

        # Iterate through the corpus once to to build a lexicon
        generator = corpus_reader(corpusfile)
        self.lexicon = get_lexicon(generator)
        self.lexicon.add("UNK")
        self.lexicon.add("START")
        self.lexicon.add("STOP")

        # Now iterate through the corpus again and count ngrams
        generator = corpus_reader(corpusfile, self.lexicon)
        self.count_ngrams(generator)
        
        # caculating the denominator for raw_unigram_probability
        self.n_counts = sum(self.unigramcounts.values())


    def count_ngrams(self, corpus):
        """
        COMPLETE THIS METHOD (PART 2)
        Given a corpus iterator, populate dictionaries of unigram, bigram,
        and trigram counts.
        """

        self.unigramcounts = defaultdict(int)
        self.bigramcounts = defaultdict(int)
        self.trigramcounts = defaultdict(int)

        ##Your code here
        for i in corpus:
            for unigram in get_ngrams(i, 1):
                self.unigramcounts[unigram] += 1

            for bigram in get_ngrams(i, 2):
                self.bigramcounts[bigram] += 1

            for trigram in get_ngrams(i, 3):
                self.trigramcounts[trigram] += 1


    def raw_trigram_probability(self,trigram):
        """
        COMPLETE THIS METHOD (PART 3)
        Returns the raw (unsmoothed) trigram probability
        """

        # denominator = 0, prob = 0
        if self.bigramcounts[trigram[0:2]] == 0:
            return 0

        return self.trigramcounts[trigram] / self.bigramcounts[trigram[0:2]]


    def raw_bigram_probability(self, bigram):
        """
        COMPLETE THIS METHOD (PART 3)
        Returns the raw (unsmoothed) bigram probability
        """

        if self.unigramcounts[(bigram[0],)] == 0:
            return 0

        return self.bigramcounts[bigram] / self.unigramcounts[(bigram[0],)]


    def raw_unigram_probability(self, unigram):
        """
        COMPLETE THIS METHOD (PART 3)
        Returns the raw (unsmoothed) unigram probability.
        """
        
        #self.n_counts = sum(self.unigramcounts.values())
        # call this function every time could be very slow
        return self.unigramcounts[unigram] / self.n_counts


    # Generate trigram probability for every trigram that starts with a certain bigram
    # This is a helper function for generate_sentence
    def generate_trigram(self, given):
        possible_w = []
        prob = []

        w_1 = given[1]
        w_2 = given[2]
        for trigram in self.trigramcounts:
            if trigram[2] != 'START' and trigram[:2] == (w_1, w_2):
                possible_w.append(trigram[2])
                prob.append(self.raw_trigram_probability(trigram))

        return possible_w, prob


    def generate_sentence(self, t=20): 
        """
        COMPLETE THIS METHOD (OPTIONAL)
        Generate a random sentence from the trigram model. t specifies the
        max length, but the sentence may be shorter if STOP is reached.
        """
        current_tri = (None, 'START', 'START')
        possible_w = []
        prob = []
        sentence = []
        i = 0
        
        for i in range(0,t):
            possible_w, prob = self.generate_trigram(tuple(current_tri))
            w_1 = current_tri[1]
            w_2 = current_tri[2]
            
            # n_w = np.random.choice(possible_w, 1, p = prob)[0]
            # wrong for the sum of p != 1            
            pprob = np.random.multinomial(10, prob)
            # return a same size list while every element denotes how many times 
            # the word occurs (10 times random sampling with replacement)
            # all elements of the list sum to 10
            
            idx = np.argmax(pprob)
            n_w = possible_w[idx]
            current_tri = (w_1, w_2, n_w)
            sentence.append(n_w)
            i += 1
            
            if n_w == 'STOP':
                break
            
        return sentence 



    def smoothed_trigram_probability(self, trigram):
        """
        COMPLETE THIS METHOD (PART 4)
        Returns the smoothed trigram probability (using linear interpolation).
        """

        lambda1 = 1/3.0
        lambda2 = 1/3.0
        lambda3 = 1/3.0

        t_prob = lambda1 * self.raw_trigram_probability(trigram)
        b_prob = lambda2 * self.raw_bigram_probability(trigram[1:3])
        u_prob = lambda3 * self.raw_unigram_probability((trigram[2],))
        p = t_prob + b_prob + u_prob
        
        return p


    def sentence_logprob(self, sentence):
        """
        COMPLETE THIS METHOD (PART 5)
        Returns the log probability of an entire sequence.
        """

        ngrams = get_ngrams(sentence, 3)
        log_p = 0
        for i in range(len(ngrams)):
            trigram = ngrams[i]
            s_p = self.smoothed_trigram_probability(trigram)
            if s_p == 0:
                return float('inf')
            else:
                log_p += math.log2(s_p)
            
        return log_p


    def perplexity(self, corpus):
        """
        COMPLETE THIS METHOD (PART 6)
        Returns the log probability of an entire sequence.
        """
        l = 0
        m = 0
        for s in corpus:
            l += self.sentence_logprob(s)
            m += len(s)
        l /= m
        l = 2**(-l)
        
        return l

In [61]:
def essay_scoring_experiment(training_file1, training_file2, testdir1, testdir2):

        model1 = TrigramModel(training_file1) # trained on train_high.txt
        model2 = TrigramModel(training_file2) # trained on train_low.txt

        total = 0
        correct = 0

        # test_high
        for f in os.listdir(testdir1):
            p_high = model1.perplexity(corpus_reader(os.path.join(testdir1, f), model1.lexicon))
            p_low = model2.perplexity(corpus_reader(os.path.join(testdir1, f), model2.lexicon))

            if p_high < p_low:
                correct += 1
            total += 1

        # test_low 
        for f in os.listdir(testdir2):
            p_high = model1.perplexity(corpus_reader(os.path.join(testdir2, f), model1.lexicon))
            p_low = model2.perplexity(corpus_reader(os.path.join(testdir2, f), model2.lexicon))

            if p_low < p_high:
                correct += 1
            total += 1

        return correct / total


### Testing raw probability

In [62]:
if __name__ == '__main__':
    # Loading model
    data_dir = 'hw1_data/'
    model = TrigramModel(data_dir + 'brown_train.txt')
    print(model.raw_unigram_probability(('the',)))
    print(model.raw_bigram_probability(('the', 'jury',))) 

0.056658540702227214
0.0005697727420720192


### Testing perplexity

In [63]:
train_corpus = corpus_reader(data_dir + 'brown_train.txt', model.lexicon)
test_corpus = corpus_reader(data_dir + 'brown_test.txt', model.lexicon)
pptrain = model.perplexity(train_corpus)
pptest = model.perplexity(test_corpus)

print('Perplexity for brown_train: ')
print(pptrain)
print('Perplexity for brown_test: ')
print(pptest)

Perplexity for brown_train: 
18.503468749839712
Perplexity for brown_test: 
318.32613335065446


### Testing essay scoring accuracy

In [64]:
ets_toefl_data_dir = data_dir + 'ets_toefl_data/'
acc = essay_scoring_experiment(ets_toefl_data_dir + 'train_high.txt', ets_toefl_data_dir + 'train_low.txt', ets_toefl_data_dir + 'test_high', ets_toefl_data_dir + 'test_low')
print('Essay scoring accuracy: ')
print(acc)

Essay scoring accuracy: 
0.852589641434263


### Testing sentence generate

In [65]:
random_sentence = model.generate_sentence(10)
print('Sample random sentence of max length 10:')
print(random_sentence)

Sample random sentence of max length 10:
['eternity', 'is', 'no', 'reason', 'to', 'believe', 'that', 'the', 'united', 'states']
