# Statistical Model Languages

Tarea 3 - Luis Eduardo Robles Jimenez

## Language Model and Evaluation

In [46]:
import os
import nltk
import numpy as np
from nltk.tokenize import TweetTokenizer
from sklearn.model_selection import train_test_split, ParameterGrid

### 1. Preprocessing

In [47]:
tokens = {'begin': '<s>', 'end': '</s>', 'unknown': '<unk>', 'separator': '<sep>'}

In [48]:
class Corpus:
    def __init__(self, vocabSize = 100):
        self.size = vocabSize
        self.corpus = []

    def _createVocabulary(self):
        words, tokenizer, corpusByWords = [], TweetTokenizer(), []
        for doc in self.corpus:
            tokens = tokenizer.tokenize(doc)
            words += tokens
            corpusByWords += [tokens]
        count = nltk.FreqDist(words)
        count = sorted([(count[key], key) for key in count])[::-1]
        if self.size != -1: count = count[:self.size]
        return [word for _, word in count], corpusByWords

    def _readFile(self, path, divideByLine = True):
        file = []
        with open(path, "r") as f_corpus:
            for line in f_corpus:
                if not line.isspace():
                    file += [line[:-1]]
        if not divideByLine: 
            f = ""
            for line in file: f += line + "\n"
            file = [f]
        self.corpus += file

    def _buildCorpus(self, vocab, tokenized):
        self.corpus = []
        for doc in tokenized:
            tweet = []
            tweet.append(tokens['begin'])
            for word in doc:
                tweet.append(tokens['unknown'] if word not in vocab else word.lower().strip())
            tweet.append(tokens['end'])
            self.corpus.append(tweet)

    def loadCorpus(self):
        raise NotImplementedError()
    
    def describe(self):
        print(f'')

class Tweets(Corpus):
    def __init__(self, vocabSize = 100):
        super().__init__(vocabSize)
        self.path_corpus = "../../data/agresividad/mex_train.txt"

    def loadCorpus(self):
        self._readFile(self.path_corpus)
        vocab, tokenized = self._createVocabulary()
        self._buildCorpus(vocab, tokenized)
        return self.corpus
    

class Mananera(Corpus):
    def __init__(self, nFiles = 3, vocabSize = 100):
        super().__init__(vocabSize)
        self.nFiles = nFiles
        self.path_corpus = '../../data/presidente/estenograficas_limpias_por_fecha/'

    def loadCorpus(self):
        for f, file in enumerate(os.listdir(self.path_corpus)):
            if f >= self.nFiles: break
            file_path = os.path.join(self.path_corpus, file)
            if os.path.isfile(file_path):
                self._readFile(file_path, divideByLine = False)
        vocab, tokenized = self._createVocabulary()
        self._buildCorpus(vocab, tokenized)
        return self.corpus

In [49]:
vocabulary_size = 100

In [50]:
corpus = Tweets(vocabulary_size).loadCorpus()

#### Comment

### 2. Models Training

In [51]:
class LanguageModel:
    def __init__(self, corpus = None):
        self.corpus = corpus
        self.nGrams, self.vocab = dict(), set()
        if corpus is None: return
        for line in corpus:
            self.vocab.update(line)
            for g, grams in enumerate(self.getNGrams(line)):
                gram = self.toString(grams)
                if not gram in self.nGrams: self.nGrams[gram] = 0
                self.nGrams[gram] += 1
        self.sGrams = dict()
        for gram in self.nGrams:
            smallerGram = self.toString(self.toTokens(gram)[: -1])
            if not smallerGram in self.sGrams: self.sGrams[smallerGram] = 0
            self.sGrams[smallerGram] += self.nGrams[gram]
        self.vocab = list(self.vocab)

    def toString(self, gramList):
        gram = ""
        for i, g in enumerate(gramList):
            if i: gram += tokens['separator']
            gram += g
        return gram

    def toTokens(self, gram):
        assert isinstance(gram, str), 'Gram is not a string'
        return gram.split(tokens['separator'])

    def flatten(self, sentence):
        if isinstance(sentence[0], list): sentence = sum(sentence, [])
        return sentence

    def P(self, *words):
        # Laplace smoothing
        assert len(words) == self.gramLen, "n-gram doesn't match the expected length"
        words = [(w if w in self.vocab else tokens['unknown']) for w in words]
        return self._Laplace(words)

    def _Laplace(self, words):
        count = 0
        gram = self.toString(words)
        if gram in self.nGrams: count = self.nGrams[gram]
        ctx = self.toString(words[: -1])
        ctxCount = self.sGrams[ctx] if ctx in self.sGrams else 0
        return (count + 1) / (ctxCount + len(self.vocab))

    def getNGrams(self, line):
        return [line[start: start + self.gramLen] for start in range(len(line) - self.gramLen + 1)]

    def getProbs(self, sentence, log = False):
        sentence = self.flatten(sentence)
        logProb = 0
        for gram in self.getNGrams(sentence):
            p = self.P(*gram)
            logProb += np.log(p)
            assert p > 0, "Probability is zero"
        if log: return logProb
        return np.exp(logProb)

    # Include the <s> and </s> tokens, but don't count </s> - (Page 8, Dan Jurafsky on Language Models)
    def perplexity(self, sentence):
        sentence = self.flatten(sentence)
        pp = 1
        for g in self.getNGrams(sentence):
            pp *= self.P(*g) ** (-1 / (len(sentence) - sentence.count(tokens['begin'])))
        return pp

    def tweet(self, length = 50):
        tweet = [tokens['begin'] for _ in range(self.gramLen - 1)]
        for _ in range(length):
            ctx = tweet[-self.gramLen + 1] if self.gramLen > 1 else []
            probs = []
            for _, w in enumerate(self.vocab):
                w = ctx + [w]
                p = self.P(*w)
                probs.append(p)
            choice = np.random.choice(self.vocab, p = probs / np.sum(probs))
            tweet += [choice]
            if choice == tokens['end']: break
        return tweet

    def test(self):
        # Hypothesis: The sum of probabilities for a model is: vocabSize ^ (gramLength - 1)
        raise NotImplementedError()

In [52]:
class Unigram(LanguageModel):
    def __init__(self, corpus = None):
        self.gramLen = 1
        super().__init__(corpus)
    
    def test(self):
        p = 0
        for w in self.vocab: p += self.P(w)
        assert np.round(p, decimals = 3) == 1, "Probs don't sum up the expected value"


class Bigram(LanguageModel):
    def __init__(self, corpus = None):
        self.gramLen = 2
        super().__init__(corpus)      

    def test(self):
        for w1 in self.vocab:
            p = 0
            for w2 in self.vocab:
                p += self.P(w1, w2)
            assert np.round(p, decimals = 3) == 1, "Probs don't sum up the expected value"


class Trigram(LanguageModel):
    def __init__(self, corpus = None):
        self.gramLen = 3
        super().__init__(corpus)    

    def test(self):
        for w1 in self.vocab:
            for w2 in self.vocab:
                p = 0   
                for w3 in self.vocab:
                    p += self.P(w1, w2, w3)
                assert np.round(p, decimals = 3) == 1, "Probs don't sum up the expected value"


class N_Gram(LanguageModel):
    def __init__(self, gramLen, corpus = None):
        self.gramLen = gramLen
        super().__init__(corpus)    


class Interpolated(LanguageModel):
    def __init__(self, models = None, lambdas = None):
        #super().__init__(corpus)   
        self.models = models
        self.lambdas = lambdas
        assert len(models) == len(self.lambdas), "The number of models doesn't match the number of lambdas"
        self.vocab = models[0].vocab

    def getProbs(self, sentence, log = False):
        raise NotImplementedError()
    
    # Unlike the super().P(...), this one takes only the context it needs, so it can receive long sentences
    def P(self, *words):
        prob = 0
        for m, model in enumerate(self.models):
            nGram = words[-model.gramLen: ]
            prob += self.lambdas[m] * model.P(*nGram)
        return prob
    
    def perplexity(self, sentence):
        sentence = self.flatten(sentence)
        begins = sentence.count(tokens['begin'])
        pp = 1
        unigrams = Unigram().getNGrams(sentence)
        for u, _ in enumerate(unigrams):
            prob = 0
            for m, model in enumerate(self.models):
                # Start index where the context will be taken from
                idx = u + 1 - model.gramLen
                nGram = []
                if idx < 0:
                    nGram += np.abs(idx) * [tokens['begin']]
                    nGram += unigrams[: u + 1]
                else: nGram = unigrams[idx: u + 1]
                prob += model.P(*nGram) * self.lambdas[m]
            pp *= (prob) ** (-1 / (len(sentence) - begins))
        return pp
    
    def tweet(self, length = 50):
        tweet = [tokens['begin']] * max([model.gramLen for model in self.models])
        for _ in range(length):
            probs = []
            for w in self.vocab:
                ctx = tweet + [w]
                p = self.P(*ctx)
                probs.append(p)
            choice = np.random.choice(self.vocab, p = probs / np.sum(probs))
            tweet += [choice]
            #if choice == tokens['end']: break
        return tweet


In [53]:
uni = Unigram(corpus)
#uni.test()

# Very frequent
print(uni.P("que"))
print(uni.P(tokens['begin']))

# Doesn't exist
print(uni.P("otorrinolaringologo"))

0.030060494079397367
0.04925692661650662
0.39978502838158353


In [54]:
bi = Bigram(corpus)
#bi.test()

# Very frequent 
print(bi.P('.', tokens['end']))
print(bi.P("es", "que"))

# Doesn't exist
print(bi.P(tokens['begin'], tokens['end']))

0.47758081334723673
0.08350515463917525
0.00017708517797060386


In [55]:
tri = Trigram(corpus)
#tri.test()

# Very frequent
print(tri.P('!', '!', '!'))
print(tri.P('es', 'que', 'no'))

# Doesn't exist
print(tri.P('Luis', 'Eduardo', 'Robles'))


0.30403458213256485
0.04918032786885246
0.36449394038650507


#### Comment

### 3. Interpolated Model

In [56]:
#c_train, c_test = train_test_split(corpus, test_size = 0.001, train_size = 0.999)
c_train, c_test = train_test_split(corpus, test_size = 0.2, train_size = 0.8)
c_test, c_val = train_test_split(c_test, test_size = 0.5, train_size = 0.5)
print(f'Lengths of stratified sets:\n\tTrain: {len(c_train)}\n\tTest: {len(c_test)}\n\tValidation: {len(c_val)}')

Lengths of stratified sets:
	Train: 4435
	Test: 554
	Validation: 555


In [57]:
models = [Unigram(c_train), Bigram(c_train), Trigram(c_train)]
""" Delete, just to see the behavior of each model
for m in models: print(f'PP {m.perplexity(c_val)}\tlogP {m.getProbs(c_val, log = True)} \tP {m.getProbs(c_val)}')
"""

" Delete, just to see the behavior of each model\nfor m in models: print(f'PP {m.perplexity(c_val)}\tlogP {m.getProbs(c_val, log = True)} \tP {m.getProbs(c_val)}')\n"

In [58]:
#params = [(1/3, 1/3, 1/3), (0.4, 0.4, 0.2), (0.2, 0.4, 0.4), (0.5, 0.4, 0.1), (0.1, 0.4, 0.5), (0.9, 0.05, 0.05)]
params = [(1/3, 1/3, 1/3), (0.4, 0.4, 0.2), (0.2, 0.4, 0.4), (0.5, 0.4, 0.1), (0.1, 0.4, 0.5)]

In [59]:
bestParam, bestValue = 0, np.inf
for i, param in enumerate(params):
    m = Interpolated(models = [uni, bi, tri], lambdas = param)
    pp = m.perplexity(c_val)
    if pp < bestValue: bestValue, bestParam = pp, i
    print(f'Model {i + 1}: Params: {np.round(param, decimals = 3)} \t\tPerplexity: {pp}')

Model 1: Params: [0.333 0.333 0.333] 		Perplexity: 2.8726662975495065
Model 2: Params: [0.4 0.4 0.2] 		Perplexity: 2.8688029634557943
Model 3: Params: [0.2 0.4 0.4] 		Perplexity: 2.9280553605582855
Model 4: Params: [0.5 0.4 0.1] 		Perplexity: 2.8400518242712756
Model 5: Params: [0.1 0.4 0.5] 		Perplexity: 2.9585956785203513


In [60]:
goodInterpolated = Interpolated(models = [uni, bi, tri], lambdas = params[bestParam])
print(f'Best params {params[bestParam]} have a perplexity = {goodInterpolated.perplexity(c_test)} on the test set')

Best params (0.5, 0.4, 0.1) have a perplexity = 2.836409786819076 on the test set


#### Comment

## Text Generation

### 1. Tweet Functionality

In [61]:
nExamples = 5
for _ in range(nExamples):
    for w in goodInterpolated.tweet():
        print(w, end = " ")
    print(end = '\n\n')

<s> <s> <s> por </s> que en la <unk> <unk> al <unk> de <unk> que no joto ser <unk> a las <unk> . <unk> que <unk> <unk> lo <unk> <unk> eso no . <unk> <unk> <unk> <s> @usuario <unk> <unk> y qué putas me <unk> <unk> <unk> no 😂 </s> o solo con 

<s> <s> <s> <unk> qué es <unk> voy a @usuario <unk> con <unk> <unk> verga <unk> <unk> <unk> nos todos ! </s> nos en . <unk> <unk> o <unk> que <unk> <unk> me <unk> a <unk> <unk> me <unk> ? </s> mejor eres muy </s> qué <unk> ? <unk> <unk> <unk> <unk> y 

<s> <s> <s> <s> . bien <unk> se <s> @usuario muy <unk> qué <unk> y los <unk> pero sus <unk> y no y </s> putas <unk> <unk> <unk> <unk> <unk> mis <unk> <unk> de <unk> <unk> <unk> hasta la madre . </s> ser <unk> y el <unk> que a <unk> <unk> </s> su 

<s> <s> <s> hoy los <unk> a las <unk> para <unk> <unk> </s> hdp <unk> y <unk> con <unk> todos <unk> son y cuando @usuario soy <unk> ? <unk> más o <unk> ️ <s> @usuario voy este ver que </s> <unk> <unk> esta tu <unk> y <unk> del <unk> <unk> <s> si <unk> 

<s>

#### Comment

### 2. AMLO model

In [62]:
conf = Mananera(nFiles = 3, vocabSize = vocabulary_size).loadCorpus()

In [63]:
models = [Unigram(conf), Bigram(conf), Trigram(conf), N_Gram(gramLen = 5, corpus = conf)]
lambdas = len(models) * [1 / len(models)]

In [64]:
AMLO = Interpolated(models = models, lambdas = lambdas)

In [65]:
for w in AMLO.tweet(length = 300):
    print(w, end = " ")

<s> <s> <s> <s> <s> policías va de pregunta de la <unk> estamos ; va caso <unk> si ; los están <unk> información sobre van esta , <unk> <unk> vamos cada me los <unk> <unk> que es en el de - <unk> son hay <unk> a , nacional por <unk> ya <unk> méxico lo méxico nos va tiene se tener <unk> <unk> sobre está <unk> mil presidente obrador en hay muy de todo gobierno ciudad tema tener , <unk> se entonces hay y homicidios usted se policías si <unk> nacional <unk> un presidente vamos : ¿ tiene </s> vamos </s> ahora <unk> seguridad - ustedes <unk> méxico <unk> pero , <unk> <unk> <unk> , me , mil policías les presidente les con días . de <unk> la gobierno les vamos como ser homicidios pregunta tener , <unk> . , hay no nos le y los <unk> el <unk> del le <unk> vamos del o también el caso pero méxico por la presidente estos como ser el ha <unk> <unk> <unk> <unk> , sobre <unk> una <unk> del policías <unk> ¿ en al a todo este - tiene días , <unk> </s> esta <unk> a mil policía <unk> <unk> . <unk> <unk> <

#### Comment

### 3. Evaluation with custom phrases

In [66]:
modelsEval = {"Tweets": goodInterpolated, "AMLO": AMLO}
phrases = ['sino gano me voy a la chingada', 'ya se va a acabar la corrupción']

In [67]:
for model in modelsEval:
    print(f'Model: {model}')
    for p in phrases:
        print(f'\tPerplexity on the phrase "{p}" is: {np.round(modelsEval[model].perplexity(p), decimals = 2)}')

Model: Tweets
	Perplexity on the phrase "sino gano me voy a la chingada" is: 7.13
	Perplexity on the phrase "ya se va a acabar la corrupción" is: 5.81
Model: AMLO
	Perplexity on the phrase "sino gano me voy a la chingada" is: 9.62
	Perplexity on the phrase "ya se va a acabar la corrupción" is: 7.31


#### Comment

### 4. More evaluation 

#### Comment

## El ahorcado

### 1. Norvig's Hangman

#### Comment

### 2. Follow-up

#### Comment