# N-gram model

In [1]:
from nltk.corpus import brown
from sklearn.model_selection import train_test_split
import math
import pandas as pd
import random

In [2]:
# Define some global variables for word preprocessing and ngrams
START_CHAR = '<s>'
END_CHAR = '<e>'
UNK = '<u>'
PUNCTUATION = '".,;:+*`´¨!?¡¿&%()/' + "'"
N = 3

In [3]:
def preprocess(sentence, n):
    '''
    Remove whitespaces and punctuation, turn into lowercase, add necessary starting and ending
    dummy words. 
    Takes:
        - sentence, a list of strings
        - n, an int indicating the n-grams
    
    Returns:
        - words, a list of words stripped of punctuation and with additional characters
        to build ngram model
    '''
    words = ' '.join(sentence).encode('utf-8').lower().strip().split()
    words = [START_CHAR]*(n-1) + words + [END_CHAR]
    words = [word.strip(PUNCTUATION) for word in words]
    words = [word for word in words if len(word) != 0]

    return words

def init_dict():
    return {'_wordcount': 0}

def add_words(d, words, n):
    '''
    Modifies d, a dictionary with counts of ngrams, on site.
    Takes:
        - d, a dictionary where words will be added
        - words, a list of strings representing words
        - n, size of ngrams
    '''
    for i in range(len(words) - n):
        prefix = ' '.join(words[i:i + n])
        value  = words[i + n]
        d['_wordcount'] += 1
        if prefix not in d:
            d[prefix] = init_dict()
        temp = d[prefix]
        temp['_wordcount'] += 1
        if value not in temp:
            temp[value] = 0
        temp[value] += 1
        
def calculate_probability(words, corpus, n):
    '''
    Takes:
        words, a preprocessed list of words, according to n
        corpus, a dictionary with counts corresponding to a set of sentences
        n, an int indicating size of ngrams
        
    Returns:
        probability (float) of a word in the model
    '''
    logp = 0
    for i in range(len(words) - n + 1):
        
        if n == 1:
            if words[i] not in corpus['']: 
                temp_p = 1.0 / corpus['']['_wordcount'] 
            else:
                temp_p = 1.0 * corpus[''][words[i]] / corpus['']['_wordcount'] 
                
        else:
            prefix = ' '.join(words[i:i + n - 1])
            if (prefix not in corpus) or (words[i + n - 1] not in corpus[prefix]): 
                temp_p = 1.0 / corpus['_wordcount'] 
            else:
                temp_p = 1.0 * corpus[prefix][words[i + n - 1]] / corpus[prefix]['_wordcount']
        logp += math.log(temp_p)
    
    return math.exp(logp)

def linear_smoothing(lambdas, words_list, model_list, n):
    '''
    Takes:
        - lambdas, a list of float values to weight probabilities
        - words_list, a list of lists of words with START and END characters 
        - model_list, a list of models with ngram counts
        - n, an int indicating ngram size
    
    Returns:
        rv, a lambda-weighted probability of finding a word in the model_list
    '''
    rv = 0
    for l, words, model, i in zip(lambdas, words_list, model_list, range(n)):
        rv += l*calculate_probability(words, model, i+1)

    return rv


In [4]:
## Separate holdout data. Split remaining into train and test sets. 
class BrownData(object):
    def __init__(self):
        self.data = {}
        
    def save_data_by_genre(self, train, test, holdout, genre):
        self.data[genre] = {'train': train, 'test': test, 'holdout': holdout}  
        
    def split(self):
        '''
        Splits data into train, test, and holdout sets. Also modifies
        the training data to include <UNK> character to correct for unknown words.
        '''
        for genre in brown.categories():
            split = train_test_split(brown.sents(categories=genre))
            working, holdout = split[0], split[1]
            train, test = train_test_split(working)
            self.replace_unk(train)
            self.save_data_by_genre(train, test, holdout, genre)
            
    def replace_unk(self, train):
        '''
        Takes:
            - train, a list of lists of strings
        '''
        words = []
        for i in range(len(train)):
            for j in range(len(train[i])):
                if train[i][j] not in words:
                    words.append(train[i][j])
                    train[i][j] = UNK                   
        

In [5]:
class NGramClassifier(object):
    
    def __init__(self, lambda1, lambda2, lambda3):
        #assert lambda1 + lambda2 + lambda3 == 1
        self.lambda1, self.lambda2, self.lambda3 = lambda1, lambda2, lambda3
        self._unigram, self._bigram, self._trigram = {}, {}, {}
        self.genres = set()
    
    @property
    def unigrams(self):
        return self._unigram
    
    @property
    def bigrams(self):
        return self._bigram
    
    @property
    def trigrams(self):
        return self._trigram
    
    @property
    def lambdas(self):
        return [self.lambda1, self.lambda2, self.lambda3]
    
    def modify_lambdas(self, l1, l2, l3):
        self.lambda1, self.lambda2, self.lambda3 = l1, l2, l3
    
    def build_ngram_model(self, n, list_of_sentences):
        '''
        Takes:
            - n, an int indicating number in n-grams
            - list_of_sentences, a list of strings with sentences
            - genre, a string indicating the genre
            
        Returns:
            - ngram, 
        '''
        model = init_dict()
        for sentence in list_of_sentences:
            words = preprocess(sentence, n)
            add_words(model, words, n-1)
            
        return model
              
    def train(self, list_of_sentences, genre):
        '''
        Takes:
            - list_of_sentences, a list of lists of strings [[...],[...],...,[...]]
            - genre, a string indicating the genre of interest
        '''
        self.genres.add(genre)
        self._unigram[genre] = self.build_ngram_model(1, list_of_sentences)
        self._bigram[genre] = self.build_ngram_model(2, list_of_sentences)
        self._trigram[genre] = self.build_ngram_model(3, list_of_sentences)
    
    def call_model_list_by_genre(self, genre):
        return [self.unigrams[genre], self.bigrams[genre], self.trigrams[genre]]
        
    def predict_one_sentence(self, sentence):
        '''
        Takes:
            - sentence, a list of strings (words)
            
        Returns:
            - results, a dictionary where the key, value pairs are genres, likelihood of
            text being of such genre
        '''
        lambdas = self.lambdas
        words_list = [preprocess(sentence, n+1) for n in range(N)]
        results = {}
        for genre in self.genres:
            model_list = self.call_model_list_by_genre(genre)
            results[genre] = linear_smoothing(lambdas, words_list, model_list, N)
        return results
    
    def predict(self, list_of_sentences):
        '''
        Takes:
            - list_of_sentences, a list of lists of strings [[...],[...],...,[...]]
        
        Returns:
            - rv, a list of predicted genres (strings) 
        '''
        rv = []
        for sentence in list_of_sentences:
            results = self.predict_one_sentence(sentence)
            max_cat = 0
            max_val = 0
            for key, value in results.items():
                if value > max_val:
                    max_val = value
                    max_cat = key
            #if max_cat == 0: print(results)
            rv.append(max_cat)
            
        return rv
    
    # results table
    def get_labeled_predictions(self, data):
        '''
        Takes:
            - data, a dictionary with train, test, and holdout data
            
        Returns:
            - rv, predicted labels for test data
        '''
        rv = pd.DataFrame(columns=['prediction', 'label'])
        for genre in self.genres:
            test = data.data[genre]['test']
            results = self.predict(test)
            labels = [genre]*len(results)
            temp = pd.DataFrame({'prediction': results, 'label': labels})
            rv = rv.append(temp)

        return rv

    #get rates from results table 
    def get_metrics(self, predictions):
        '''
        Takes:
            - predictions, a list of labels
        
        Returns:
            - tp, fn, fp, fn: true positive, false negative, false positive, and false negative
            rates
        '''
        tp, fn, fp, tn = {}, {}, {}, {}
        for genre in self.genres:
            g = predictions[predictions['label'] == genre]
            tp[genre] = g[g['prediction'] == genre].label.count()
            fn[genre] = g[g['prediction'] != genre].label.count()
            for other_genre in self.genres:
                if other_genre != genre:
                    og = predictions[predictions['label'] == other_genre]
                    fp[genre] = og[og['prediction'] == genre].label.count()
                    tn[genre] = og[og['prediction'] != genre].label.count()

        return tp, fn, fp, fn

    def micro_avg_precision(self, tp, fp):
        tp_sum = 0
        fp_sum = 0
        for genre in self.genres:
            tp_sum += tp[genre]
            fp_sum += fp[genre]

        return 1.0 * tp_sum / (tp_sum + fp_sum)

    def micro_avg_recall(self, tp, fn):
        tp_sum = 0
        fn_sum = 0
        for genre in self.genres:
            tp_sum += tp[genre]
            fn_sum += fn[genre]

        return 1.0 * tp_sum / (tp_sum + fn_sum)

In [6]:
def test(clf, data, l1 = 0, l2=0.5, l3=0.5):
    '''
    Takes:
        - data, a dictionary with train, test, and holdout data
        - l1, l2, l3, lambdas (floats) to weight ngram models
    '''
    clf.modify_lambdas(l1, l2, l3)
    labeled_preds = clf.get_labeled_predictions(data)
    tp, fn, fp, fn = clf.get_metrics(labeled_preds)
    p = clf.micro_avg_precision(tp, fp)
    r = clf.micro_avg_recall(tp, fn)
    
    return 2*p*r/(p+r)

In [7]:
# Do once, because it takes time to load all the words into the dictionary.
brown_data = BrownData()
brown_data.split()

# Create and train a classifier, do once.
clf = NGramClassifier(0, 0.5, 0.5) 
for category in brown_data.data.keys():
    clf.train(brown_data.data[category]['train'], category)


In [8]:
test(clf, brown_data)

0.4029676444322319

In [9]:
max_val = 0
max_weights = []
for j in range(10):
    for i in range(10):
        if i%4 == 0 : print('{}... '.format(i))
        l1, l2 = 0.1 * j, 0.1 * i
        l3 = 1 - l1 - l2
        f1 = test(clf, brown_data, l1, l2, l3)
        if f1 > max_val:
            max_val = f1
            max_weights = [l1, l2, l3]
            print(max_val, max_weights)

0... 
(0.1701093560145808, [0.0, 0.0, 1.0])
(0.4012103706760195, [0.0, 0.1, 0.9])
(0.4016500515641114, [0.0, 0.2, 0.8])
(0.4020894906866452, [0.0, 0.30000000000000004, 0.7])
4... 
(0.4029676444322319, [0.0, 0.4, 0.6])
(0.40346177622089435, [0.0, 0.6000000000000001, 0.3999999999999999])
(0.4036810658608613, [0.0, 0.7000000000000001, 0.29999999999999993])
8... 
(0.4039557722683882, [0.0, 0.9, 0.09999999999999998])
0... 
4... 
8... 
0... 
4... 
8... 
0... 
4... 
8... 
0... 
4... 
8... 
0... 
4... 
8... 
0... 
4... 
8... 
0... 
4... 
8... 
0... 
4... 
8... 
0... 
4... 
8... 
