# N gramas
The `NgramData` class, used to build language models with n-grams, is completely defined in the notebook.

In [24]:
#importar librerias
import nltk
import numpy as np
from nltk.tokenize import TweetTokenizer
from sklearn.manifold import TSNE
from collections import Counter
from nltk.util import ngrams


In [1]:
# 📌 This notebook assumes that corpus processing, tokenization and BoW construction was already performed on the notebook:
# 👉 'feature-extraction/bag_of_words.ipynb'

#The variables used here (such as `BoW_tr`, `tr_txt`, `V1`, `dict_indices1`) were built there.
#If you want to re-run the pipeline from scratch, check that file first.

> 🔗 **Note:** The corpus loading, tokenization and construction of the Bag of Words is at
> [`bag_of_words.ipynb`](./feature-extraction/bag_of_words.ipynb)

# Ngram Models

In [27]:
class NgramData():
    def __init__(self, N: int, vocab_max, tokenizer=None, embeddings_model=None):
        self.tokenizer = tokenizer if tokenizer else self.default_tokenizer
        self.punct = set(['.', '?', '!', ',', ';', ':', '^', '*', '+', '/', '\\', '"', "´", "`", "¨", "~", "{", "}", "[", "]", "(", ")", "_", "-", "&", "%", "$", "#", "@", "¿", "?", "¡", "!", "<", ">", "=", "|", "°", "¬", "¦", "ª", "º", "©", "®", "«", "»", "“", "”", "‘", "’", "…", "–", "—", "•", "·", "»", "«", "…", "‘", "’", "“", "”", "–", "—", "•", "·", "¡", "¿", "<url>", "@usuario", "..."])
        self.N = N
        self.vocab_max = vocab_max
        self.UNK = "<unk>"
        self.SOS = '<s>'
        self.EOS = '</s>'
        self.embeddings_model = embeddings_model
        self.ngram_counts = Counter()
        self.freq_counts = Counter()
    
    def get_vocab_size(self) -> int:
        return len(self.vocab)
    
    def default_tokenizer(self, doc: str) -> list:
        return doc.split(" ")

    def remove_word(self, word: str) -> bool:
        word = word.lower()
        is_punct = word in self.punct
        is_digit = word.isnumeric()
        return is_punct or is_digit
    
    def get_vocabulary(self, corpus: list) -> set:
        freq_dist = Counter([w.lower() for sentence in corpus for w in self.tokenizer(sentence) if not self.remove_word(w)])
        sorted_words = [word for word, _ in freq_dist.most_common(self.vocab_max - 3)]
        return set(sorted_words)
    
    def fit(self, corpus: list) -> None:
        self.vocab = self.get_vocabulary(corpus)
        self.vocab.update({self.UNK, self.SOS, self.EOS})
        
        self.w2id = {word: i for i, word in enumerate(self.vocab)}
        self.id2w = {i: word for word, i in self.w2id.items()}
        
        for doc in corpus:
            for ngram in self.get_ngram_doc(doc):
                self.ngram_counts[ngram] += 1
        

        self.freq_counts = Counter(self.ngram_counts.values())
    
    def transform(self, corpus: list) -> tuple:
        X_ngrams, y = [], []
        for doc in corpus:
            for words_window in self.get_ngram_doc(doc):
                words_window_ids = [self.w2id.get(w, self.w2id[self.UNK]) for w in words_window]
                X_ngrams.append(words_window_ids[:-1])
                y.append(words_window_ids[-1])
        return np.array(X_ngrams), np.array(y)
    
    def get_ngram_doc(self, doc: str) -> list:
        doc_tokens = [self.SOS] + self.replace_unk(self.tokenizer(doc)) + [self.EOS]
        return list(ngrams(doc_tokens, self.N))
    
    def replace_unk(self, doc_tokens: list) -> list:
        return [token if token.lower() in self.vocab else self.UNK for token in doc_tokens]
    
 
    def turing_smoothing(self, ngram: tuple) -> float:
        if any(word not in self.vocab for word in ngram):
            print(f"El n-grama {ngram} contiene palabras fuera del vocabulario.")
            return 1 / sum(self.ngram_counts.values())  # Probabilidad mínima
    
        count = self.ngram_counts.get(ngram, 0)
        next_count = count + 1
    
        if count == 0:
            return 1 / sum(self.ngram_counts.values())  
        if count not in self.freq_counts or next_count not in self.freq_counts:
            return count / sum(self.ngram_counts.values())
  
        if self.freq_counts[count] == 0:
            return 1 / sum(self.ngram_counts.values())     
        adjusted_count = (next_count * self.freq_counts[next_count]) / self.freq_counts[count]
        return adjusted_count / sum(self.ngram_counts.values())
    
    
    def turing_smoothing(self, ngram: tuple) -> float:
        if any(word not in self.vocab for word in ngram):
            return 1 / sum(self.ngram_counts.values())  
        count = self.ngram_counts.get(ngram, 0)
        next_count = count + 1
        if count == 0:
            return 1 / sum(self.ngram_counts.values())  
        if next_count not in self.freq_counts:
            adjusted_count = count
        else:
            adjusted_count = (next_count * self.freq_counts[next_count]) / self.freq_counts[count]
        return adjusted_count / sum(self.ngram_counts.values())


In [28]:
class NgramLanguageModel:
    def __init__(self, N, lambda1=0.4, lambda2=0.3, lambda3=0.3, lambda4=0.0):
        self.N = N  # (1 para unigramas, 2 para bigramas, etc.)
        
        self.lambda1 = lambda1 
        self.lambda2 = lambda2  
        self.lambda3 = lambda3  
        self.lambda4 = lambda4  
        self.unigram_counts = Counter()
        self.bigram_counts = Counter()
        self.trigram_counts = Counter()
        self.tetragram_counts = Counter()
        
        self.vocab = set()
        self.total_tokens = 0
        self.V = 0
    
    def train(self, transformed_corpus):
        for tokens in transformed_corpus:
            self.total_tokens += len(tokens)
            for i, w in enumerate(tokens):
                self.unigram_counts[w] += 1
                if i > 0:
                    w_prev = tokens[i - 1]
                    self.bigram_counts[(w_prev, w)] += 1
                if i > 1:
                    w_prev2 = tokens[i - 2]
                    self.trigram_counts[(w_prev2, w_prev, w)] += 1
                if i > 2:
                    w_prev3 = tokens[i - 3]
                    self.tetragram_counts[(w_prev3, w_prev2, w_prev, w)] += 1
        
        self.vocab = set(self.unigram_counts.keys())
        self.V = len(self.vocab)
    
    def mask_oov(self, word):
        return "<unk>" if word not in self.vocab else word
    
    def unigram_prob(self, w):
        return (self.unigram_counts.get(self.mask_oov(w), 0) + 1) / (self.total_tokens + self.V)
    
    def bigram_prob(self, w_prev, w):
        return (self.bigram_counts.get((self.mask_oov(w_prev), self.mask_oov(w)), 0) + 1) / (self.unigram_counts.get(self.mask_oov(w_prev), 0) + self.V)
    
    def trigram_prob(self, w_prev2, w_prev, w):
        return (self.trigram_counts.get((self.mask_oov(w_prev2), self.mask_oov(w_prev), self.mask_oov(w)), 0) + 1) / (self.bigram_counts.get((self.mask_oov(w_prev2), self.mask_oov(w_prev)), 0) + self.V)
    
    def tetragram_prob(self, w_prev3, w_prev2, w_prev, w):
        return (self.tetragram_counts.get((self.mask_oov(w_prev3), self.mask_oov(w_prev2), self.mask_oov(w_prev), self.mask_oov(w)), 0) + 1) / (self.trigram_counts.get((self.mask_oov(w_prev3), self.mask_oov(w_prev2), self.mask_oov(w_prev)), 0) + self.V)
    
    def probability_of_word(self, context, w):
        context = [self.mask_oov(w) for w in context]
        if self.N == 1:
            return self.unigram_prob(w)
        elif self.N == 2:
            return self.lambda3 * self.unigram_prob(w) + self.lambda2 * self.bigram_prob(context[0], w)
        elif self.N == 3:
            return self.lambda3 * self.unigram_prob(w) + self.lambda2 * self.bigram_prob(context[1], w) + self.lambda1 * self.trigram_prob(context[0], context[1], w)
        elif self.N == 4:
            return self.lambda4 * self.tetragram_prob(context[0], context[1], context[2], w) + self.lambda3 * self.unigram_prob(w) + self.lambda2 * self.bigram_prob(context[2], w) + self.lambda1 * self.trigram_prob(context[1], context[2], w)
    
    def sequence_probability(self, sequence):
        prob = 1.0
        for i in range(self.N - 1, len(sequence)):
            context = sequence[i - self.N + 1:i]
            w = sequence[i]
            p = self.probability_of_word(context, w)
            if p == 0:
                return 0.0
            prob *= p
        return prob

In [29]:
# Configuración
N = 3  # trigramas
vocab_max = 5000

# Instancia y entrenamiento del extractor de n-gramas
ngram_data = NgramData(N=N, vocab_max=vocab_max, tokenizer=lambda x: tokenizer.tokenize(x.lower()))
ngram_data.fit(tr_txt)  # Aprende vocabulario y cuenta n-gramas


In [30]:
X, y = ngram_data.transform(tr_txt)

print("Ejemplo de entrada (IDs):", X[0])
print("Palabras del n-grama:", [ngram_data.id2w[i] for i in X[0]])
print("Palabra objetivo:", ngram_data.id2w[y[0]])


Ejemplo de entrada (IDs): [3116 3655]
Palabras del n-grama: ['<s>', '<unk>']
Palabra objetivo: <unk>


In [31]:
# Convertir todo el corpus a listas de tokens para el modelo
corpus_tokenizado = [ngram_data.replace_unk(ngram_data.tokenizer(doc)) for doc in tr_txt]

# Agregar <s> y </s> a cada oración
corpus_tokens = [[ngram_data.SOS] + tokens + [ngram_data.EOS] for tokens in corpus_tokenizado]

# Entrenar el modelo
lm = NgramLanguageModel(N=3)
lm.train(corpus_tokens)


In [32]:
# Probar probabilidad de una secuencia
ejemplo = ["odio", "a", "todos"]
ejemplo_ids = [w if w in ngram_data.vocab else "<unk>" for w in ejemplo]
print("Probabilidad de la secuencia:", lm.sequence_probability(ejemplo_ids))


Probabilidad de la secuencia: 0.0018864636553304092


# N gram Models with $\lambda$ interpolated 

In [40]:
import math
import random

In [41]:
class InterpolatedNgramModel:
    def __init__(self, lambda1=0.4, lambda2=0.3, lambda3=0.2, lambda4=0.1):
        self.lambda1 = lambda1  
        self.lambda2 = lambda2  
        self.lambda3 = lambda3  
        self.lambda4 = lambda4  
        
        self.unigram_counts = Counter()
        self.bigram_counts = Counter()
        self.trigram_counts = Counter()
        self.tetragram_counts = Counter()
        self.total_tokens = 0
        self.vocab = set()
        self.V = 0
    
    def train(self, corpus):
        for tokens in corpus:
            self.total_tokens += len(tokens)
            for i, w in enumerate(tokens):
                self.unigram_counts[w] += 1
                if i > 0:
                    self.bigram_counts[(tokens[i - 1], w)] += 1
                if i > 1:
                    self.trigram_counts[(tokens[i - 2], tokens[i - 1], w)] += 1
                if i > 2:
                    self.tetragram_counts[(tokens[i - 3], tokens[i - 2], tokens[i - 1], w)] += 1
        
        self.vocab = set(self.unigram_counts.keys())
        self.V = len(self.vocab)
    
    def mask_oov(self, word):
        return "<unk>" if word not in self.vocab else word
    
    def unigram_prob(self, w):
        return (self.unigram_counts.get(self.mask_oov(w), 0) + 1) / (self.total_tokens + self.V)
    
    def bigram_prob(self, w_prev, w):
        return (self.bigram_counts.get((self.mask_oov(w_prev), self.mask_oov(w)), 0) + 1) / (self.unigram_counts.get(self.mask_oov(w_prev), 0) + self.V)
    
    def trigram_prob(self, w_prev2, w_prev, w):
        return (self.trigram_counts.get((self.mask_oov(w_prev2), self.mask_oov(w_prev), self.mask_oov(w)), 0) + 1) / (self.bigram_counts.get((self.mask_oov(w_prev2), self.mask_oov(w_prev)), 0) + self.V)
    
    def tetragram_prob(self, w_prev3, w_prev2, w_prev, w):
        return (self.tetragram_counts.get((self.mask_oov(w_prev3), self.mask_oov(w_prev2), self.mask_oov(w_prev), self.mask_oov(w)), 0) + 1) / (self.trigram_counts.get((self.mask_oov(w_prev3), self.mask_oov(w_prev2), self.mask_oov(w_prev)), 0) + self.V)
    
    def interpolated_probability(self, context, w):
        context = [self.mask_oov(w) for w in context]
        return (self.lambda1 * self.tetragram_prob(*context[-3:], w) +
                self.lambda2 * self.trigram_prob(*context[-2:], w) +
                self.lambda3 * self.bigram_prob(*context[-1:], w) +
                self.lambda4 * self.unigram_prob(w))
    
    def perplexity(self, corpus):
        log_prob = 0
        N = sum(len(sentence) for sentence in corpus)
        for sentence in corpus:
            for i in range(3, len(sentence)):
                context = sentence[i - 3:i]
                w = sentence[i]
                prob = self.interpolated_probability(context, w)
                log_prob += math.log(prob)
        return math.exp(-log_prob / N)

In [42]:
def split_data(corpus):
    random.shuffle(corpus)
    train_size = int(0.8 * len(corpus))
    val_size = int(0.1 * len(corpus))
    train = corpus[:train_size]
    val = corpus[train_size:train_size + val_size]
    test = corpus[train_size + val_size:]
    return train, val, test

In [43]:
tokenizer = TweetTokenizer()
ngram_model = NgramData(N=4, vocab_max=5000, tokenizer=lambda x: tokenizer.tokenize(x.lower()))
ngram_model.fit(tr_txt)


In [44]:
# Convertir las oraciones al formato con tokens especiales
corpus = [[ngram_model.SOS] + ngram_model.replace_unk(ngram_model.tokenizer(doc)) + [ngram_model.EOS]
          for doc in tr_txt]


In [45]:
from random import shuffle
import math

train_corpus, val_corpus, test_corpus = split_data(corpus)


In [46]:
inter_model = InterpolatedNgramModel(lambda1=0.4, lambda2=0.3, lambda3=0.2, lambda4=0.1)
inter_model.train(train_corpus)


In [47]:
print(f"Perplejidad en validación: {inter_model.perplexity(val_corpus):.2f}")
print(f"Perplejidad en prueba: {inter_model.perplexity(test_corpus):.2f}")


Perplejidad en validación: 216.37
Perplejidad en prueba: 204.77
