# Nombre: Juan Carlos Perez Ramirez
# T1: Attention Recurrent Language Model
## Language Modelling
### Vectorizacion

In [1]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'  # activa trazas s√≠ncronas

from sklearn.feature_extraction.text import TfidfVectorizer
import nltk
from nltk.tokenize import TweetTokenizer
from nltk.corpus import stopwords
from nltk import FreqDist
import re
import torch
from torch.utils.data import Dataset, DataLoader, TensorDataset
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from math import exp, sqrt, log

stop = set(stopwords.words('spanish'))

In [None]:
# para leer datos
def get_texts_from_file(path_corpus, path_truth):
    tr_txt = []
    tr_y = []

    with open(path_corpus, "r") as f_corpus, open(path_truth, "r") as f_truth:
        for tweet in f_corpus:
            tr_txt += [tweet]
        for label in f_truth:
            tr_y += [label]
    return tr_txt, tr_y

In [None]:
# tokeniza y limpia el texto
tokenizer = TweetTokenizer(preserve_case=False, reduce_len=True)
def tweet_tokenizer(text):
    text = re.sub(r"http\S+|www\S+|https\S+", "", text, flags=re.MULTILINE)
    text = re.sub(r"\d+([.,]d+)*", "<num>", text)
    text = re.sub(r'["();:‚Ä¶]', " ", text)
    text = re.sub(r"\s+", " ", text).strip()
    text = text.lower()
    tokens = tokenizer.tokenize(text)
    filtered_tokens = [token for token in tokens if token not in stop]
    return filtered_tokens

In [4]:
class TweetsDataset(Dataset):
    def __init__(self, tweets, labels, vocab, max_len=50):
        self.tweets = tweets
        self.labels = labels
        self.vocab = vocab
        self.max_len = max_len
    '''Sustituye cada token por su id en el vocabulario. Si el token no est√° en el vocabulario,
    se sustituye por la id de <unk>. Adem√°s, se limita la longitud m√°xima de la secuencia a max_len.'''
    def encode(self, tweet):
        tokens = tweet_tokenizer(tweet)[:self.max_len]
        tokens = ["<s>"] + tokens[:self.max_len-2] + ["</s>"]
        ids = [self.vocab.get(token, self.vocab["<unk>"]) for token in tokens]
        return ids
    '''Tamano del dataset'''
    def __len__(self):
        return len(self.tweets)
    '''Devuelve los ids y la etiqueta de un tweet dado su √≠ndice'''
    def __getitem__(self, i):
        ids = self.encode(self.tweets[i])
        return ids

'''Realiza el padding para construir el batch para el DataLoader.'''
def collate(batch, pad_id=0):
    lens = torch.tensor([len(x) for x in batch], dtype=torch.long)
    T = lens.max().item()
    B = len(batch)
    padded = torch.full((B, T), pad_id, dtype=torch.long)
    for i, ids in enumerate(batch):
        padded[i, :len(ids)] = torch.tensor(ids, dtype=torch.long)
    return padded, lens

In [None]:
# mecanismo de atenci√≥n a la manera de transformer
class selfAttention(nn.Module):
    def __init__(self, d_model, n_heads=2, dropout=0.1):
        super().__init__()
        assert d_model % n_heads == 0
        self.h = n_heads
        self.dh = d_model // n_heads
        self.qkv = nn.Linear(d_model, 3*d_model, bias=False)
        self.o = nn.Linear(d_model, d_model, bias=False)
        self.drop = nn.Dropout(dropout)
        self.ln = nn.LayerNorm(d_model)

    def forward(self, X, pad_mask=None):
        B, T, D = X.shape
        X = self.ln(X)
        q, k, v = self.qkv(X).chunk(3, dim=-1)
        def split(x): return x.view(B, T, self.h, self.dh).transpose(1, 2)
        Q, K, V = map(split, (q, k, v))

        attention = (Q @ K.transpose(-2, -1)) / sqrt(self.dh)
        if pad_mask is not None:
            attention = attention.masked_fill(pad_mask[:, None, None, :], float('-inf'))
        causal = torch.triu(torch.ones(T, T, dtype=torch.bool, device=X.device), diagonal=1)
        attention = attention.masked_fill(causal, float('-inf'))

        A = torch.softmax(attention, dim=-1)
        A = self.drop(A)
        Y = (A @ V).transpose(1, 2).contiguous().view(B, T, D)

        return self.o(Y)


# arquitectura de rnn con self-attention
class LSTMSelfAttention(nn.Module):
    def __init__(self, vocab_size, emb_dim=128, hidden_dim=128, num_layers=1, num_heads=2, pad_id=0, dropout=0.2):
        super().__init__()
        self.pad_id = pad_id
        self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=pad_id)
        # definicion de la arquitectura
        self.lstm = nn.LSTM(
            input_size=emb_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=False,
            dropout=dropout if num_layers > 1 else 0.0
        )
        d_model = hidden_dim
        self.attention = selfAttention(d_model, n_heads=num_heads, dropout=dropout)
        self.dropout = nn.Dropout(dropout)

        self.head = nn.Linear(d_model, vocab_size, bias=False)
        self.head.weight = self.embedding.weight

    # definicion del metodo forward
    def forward(self, x, lens):
        pad_mask = (x == self.pad_id)
        X = self.embedding(x)
        if lens is not None:
            packed = nn.utils.rnn.pack_padded_sequence(
                X, lens.cpu(), batch_first=True, enforce_sorted=False
            )
            packed_out, _ = self.lstm(packed)
            H, _ = nn.utils.rnn.pad_packed_sequence(
                packed_out, batch_first=True
            )
        else:
            H, _ = self.lstm(X)

        H = self.attention(H, pad_mask=pad_mask)
        H = self.dropout(H)

        logits = self.head(H)
        return logits

In [26]:
# funci√≥n de p√©rdida para entrenamiento
def loss(logits, y, ignore_pad_id=0):
    logits = logits[:, :-1, :].contiguous()
    y = y[:, 1:].contiguous()
    B, T, V = logits.shape
    loss_sum = F.cross_entropy(
        logits.view(B*T, V),
        y.view(B*T),
        ignore_index=ignore_pad_id,
        reduction='sum'
    )
    valid = (y != ignore_pad_id).sum().clamp(min=1)  # tensor
    return loss_sum, valid

# funci√≥n para calcular la perplexidad del modelo
@torch.no_grad()
def perplexity_corpus(model, loader, pad_id=0, device="cpu"):
    model.eval()
    total_loss_sum = 0.0
    total_valid = 0
    for x, lens in loader:
        x, lens = x.to(device), lens.to(device)
        logits = model(x, lens)
        loss_sum, valid = loss(logits, x, ignore_pad_id=pad_id)
        total_loss_sum += float(loss_sum)
        total_valid    += int(valid.item())
    return exp(total_loss_sum / max(total_valid, 1))

In [7]:
#device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = torch.device('cpu')

In [8]:
tr_txt, tr_y = get_texts_from_file("../../corpus/mex20_train.txt", "../../corpus/mex20_train_labels.txt")

In [None]:
# obtiene la longitud m√°xima de las secuencias
lengths = [len(tweet_tokenizer(s)) for s in tr_txt]
import numpy as np
max_len = int(np.percentile(lengths, 95))  # p.ej. 95¬∫ percentil
print(f"max_len: {max_len}")

max_len: 18


In [None]:
# construye el vocabulario
fdist = FreqDist(
    tok
    for doc in tr_txt
    for tok in tweet_tokenizer(doc)
)
V = {"<pad>": 0, "<unk>": 1, "<s>": 2, "</s>": 3}
for i, (val, freq) in enumerate(fdist.most_common(10000-4)):
    if val not in V:
        V[val] = i + 4

In [11]:
V

{'<pad>': 0,
 '<unk>': 1,
 '<s>': 2,
 '</s>': 3,
 ',': 4,
 '.': 5,
 '!': 6,
 '@usuario': 7,
 'verga': 8,
 'madre': 9,
 '?': 10,
 '<num>': 11,
 'putos': 12,
 'si': 13,
 'putas': 14,
 '...': 15,
 'üòÇ': 16,
 'loca': 17,
 'gorda': 18,
 'bien': 19,
 '¬ø': 20,
 'feas': 21,
 'pinche': 22,
 'puta': 23,
 'ser': 24,
 'puto': 25,
 'hdp': 26,
 'tan': 27,
 'as√≠': 28,
 'mamar': 29,
 'q': 30,
 'quiero': 31,
 'solo': 32,
 '¬°': 33,
 '<url>': 34,
 'joto': 35,
 'cosas': 36,
 'marica': 37,
 'siempre': 38,
 'ahora': 39,
 'hace': 40,
 'ver': 41,
 'vale': 42,
 'mejor': 43,
 'hoy': 44,
 'gente': 45,
 'vida': 46,
 'Ô∏è': 47,
 'va': 48,
 'voy': 49,
 'pinches': 50,
 'd√≠a': 51,
 'jajaja': 52,
 'vez': 53,
 'mierda': 54,
 '-': 55,
 'luchona': 56,
 'üò≠': 57,
 'pues': 58,
 'hijo': 59,
 'üòç': 60,
 'jajajaja': 61,
 'alguien': 62,
 'tontas': 63,
 'hacer': 64,
 'üò°': 65,
 '‚Äú': 66,
 'toda': 67,
 'cagado': 68,
 'mas': 69,
 'gusta': 70,
 '‚Äù': 71,
 'pendejo': 72,
 'tonta': 73,
 'mamando': 74,
 'puedo': 75,
 'h

In [None]:
# inicializa el modelo, el optimizador y el DataLoader
model = LSTMSelfAttention(
    vocab_size=len(V),
    emb_dim=128,
    hidden_dim=128,
    num_layers=2,
    num_heads=4,
    pad_id=0,
    dropout=0.2
).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-2)

tr_data = TweetsDataset(tr_txt, tr_y, V)
tr_loader = DataLoader(
    tr_data,
    batch_size=64,
    shuffle=True,
    collate_fn=lambda batch: collate(batch, pad_id=0)
)

## Entrenamiento

In [None]:
epochs = 200

model.train()
for epoch in range(epochs):
    total_loss = 0
    total_valid = 0
    for batch in tr_loader:
        x, lens = batch
        x, lens = x.to(device), lens.to(device)
        optimizer.zero_grad()
        logits = model(x, lens)

        loss_sum, valid = loss(logits, x, ignore_pad_id=0)
        loss_val = loss_sum / valid
        loss_val.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()

        total_loss += loss_val.item()
        total_valid += valid.item()
    ppl = exp(total_loss / max(total_valid, 1))
    print(f"Epoch {epoch + 1}/{epochs} | Loss: {total_loss / total_valid:.4f}")

Epoch 1/200 | Loss: 0.0115 | PPL: 1.0116
Epoch 2/200 | Loss: 0.0109 | PPL: 1.0110
Epoch 3/200 | Loss: 0.0109 | PPL: 1.0109
Epoch 4/200 | Loss: 0.0108 | PPL: 1.0108
Epoch 5/200 | Loss: 0.0107 | PPL: 1.0107
Epoch 6/200 | Loss: 0.0106 | PPL: 1.0107
Epoch 7/200 | Loss: 0.0106 | PPL: 1.0106
Epoch 8/200 | Loss: 0.0105 | PPL: 1.0106
Epoch 9/200 | Loss: 0.0105 | PPL: 1.0105
Epoch 10/200 | Loss: 0.0104 | PPL: 1.0105
Epoch 11/200 | Loss: 0.0104 | PPL: 1.0105
Epoch 12/200 | Loss: 0.0104 | PPL: 1.0104
Epoch 13/200 | Loss: 0.0103 | PPL: 1.0104
Epoch 14/200 | Loss: 0.0102 | PPL: 1.0103
Epoch 15/200 | Loss: 0.0102 | PPL: 1.0102
Epoch 16/200 | Loss: 0.0101 | PPL: 1.0102
Epoch 17/200 | Loss: 0.0101 | PPL: 1.0102
Epoch 18/200 | Loss: 0.0101 | PPL: 1.0101
Epoch 19/200 | Loss: 0.0100 | PPL: 1.0100
Epoch 20/200 | Loss: 0.0099 | PPL: 1.0100
Epoch 21/200 | Loss: 0.0099 | PPL: 1.0100
Epoch 22/200 | Loss: 0.0099 | PPL: 1.0099
Epoch 23/200 | Loss: 0.0098 | PPL: 1.0099
Epoch 24/200 | Loss: 0.0097 | PPL: 1.0098
E

In [14]:
print(f"Perplexity (train): {perplexity_corpus(model, tr_loader, pad_id=0, device=device):.4f}")

Perplexity (train): 725.9629


## Validacion

In [None]:
#se obtiene una perplejidad menor para el conjunto de validaci√≥n respe to a los otros dos modelos
val_txt, val_y = get_texts_from_file("../../corpus/mex20_val.txt", "../../corpus/mex20_val_labels.txt")
val_data = TweetsDataset(val_txt, val_y, V)
val_loader = DataLoader(
    val_data,
    batch_size=64,
    shuffle=True,
    collate_fn=lambda batch: collate(batch, pad_id=0)
)
print(f"Perplexity (val): {perplexity_corpus(model, val_loader, pad_id=0, device=device):.4f}")

Perplexity (val): 346.0325


# Statistical Language Model

In [28]:
class TrigramData:

    def __init__(self, vocab_max, tokenizer):
        self.vocab_max = vocab_max
        self.tokenizer = tokenizer
        self.UNK = "<unk>"
        self.SOS = "<s>"
        self.EOS = "</s>"
        self.final_vocabulary = set()

    def _tok(self, txt: str):
        # si es funci√≥n, ll√°mala; si es objeto con .tokenize, √∫salo
        return self.tokenizer(txt) if callable(self.tokenizer) else self.tokenizer.tokenize(txt)

    def fit(self, raw_texts):

        freq_dist = FreqDist()
        tokenized_corpus = []

        for txt in raw_texts:
            tokens = self._tok(txt)
            tokenized_corpus.append(tokens)
            for w in tokens:
                freq_dist[w] += 1

        self.final_vocabulary = {tok for tok, _ in freq_dist.most_common(self.vocab_max)}
        self.final_vocabulary.update([self.UNK, self.SOS, self.EOS])

        transformed_corpus = []
        for tokens in tokenized_corpus:
            transformed_corpus.append(self.transform(tokens))
        return transformed_corpus
    
    def mask_oov(self, w):
        return self.UNK if w not in self.final_vocabulary else w
    
    def add_sos_eos(self, tokens):
        return [self.SOS, self.SOS] + tokens + [self.EOS]

    def transform(self, tokens):
        transformed = []
        for w in tokens:
            transformed.append(self.mask_oov(w))
        transformed = self.add_sos_eos(transformed)
        return transformed
    
    def transform_text(self, txt: str):
        return self.transform(self._tok(txt))
    
class TrigramLanguageModel:

    def __init__(self, lambda1=0.4, lambda2=0.3, lambda3=0.3):
        self.lambda1 = lambda1 # trigramas
        self.lambda2 = lambda2 # bigramas
        self.lambda3 = lambda3 # unigramas

        # Contadores
        self.unigram_counts = {}
        self.bigram_counts = {}
        self.trigram_counts = {}

        self.vocab = 0
        self.total_tokens = 0
        self.V = 0

    def train(self, transformed_corpus, final_vocabulary):
        self.vocab = final_vocabulary
        self.V = len(final_vocabulary)

        for tokens in transformed_corpus:
            for i, w in enumerate(tokens):

                # Unigramas
                self.unigram_counts[w] = self.unigram_counts.get(w, 0) + 1

                # Bigramas
                if i > 0:
                    w_prev = tokens[i-1]
                    self.bigram_counts[(w_prev, w)] = self.bigram_counts.get((w_prev, w), 0) + 1
                    
                # Trigramas
                if i > 1:
                    w_prev2 = tokens[i-2]
                    self.trigram_counts[(w_prev2, w_prev, w)] = \
                        self.trigram_counts.get((w_prev2, w_prev, w), 0) + 1
        self.total_tokens = sum(self.unigram_counts.values())

    def mask_oov(self, w):
        return "<unk>" if w not in self.vocab else w

    def unigram_probability(self, w):
        return (self.unigram_counts.get(self.mask_oov(w), 0) + 1) / (self.total_tokens + self.V)
    
    def bigram_probability(self, w_prev, w):
        w_prev = self.mask_oov(w_prev)
        w = self.mask_oov(w)
        
        numerator = self.bigram_counts.get((w_prev, w), 0) + 1
        denominator = self.unigram_counts.get(w_prev, 0) + self.V
        return numerator / denominator
    
    def trigram_probability(self, w_prev2, w_prev, w):
        w_prev2 = self.mask_oov(w_prev2)
        w_prev = self.mask_oov(w_prev)
        w = self.mask_oov(w)

        numerator = self.trigram_counts.get((w_prev2, w_prev, w), 0) + 1
        denominator = self.bigram_counts.get((w_prev2, w_prev), 0) + self.V
        return numerator / denominator
    
    def probability_of_word(self, w_prev2, w_prev, w):
        return self.lambda1 * self.trigram_probability(w_prev2, w_prev, w) + \
                self.lambda2 * self.bigram_probability(w_prev, w) + \
                self.lambda3 * self.unigram_probability(w)
    
    def sequence_probability(self, sequence):
        import math
        log_prob = 0.0
        for i in range(2, len(sequence)):
            w_prev2 = sequence[i-2]
            w_prev = sequence[i-1]
            w = sequence[i]

            p = self.probability_of_word(w_prev2, w_prev, w)
            log_prob += math.log(p)
        return math.exp(log_prob)
    
    def check_prob(self):
        print(sum(self.unigram_probability(w) for w in self.vocab))

        print(sum(self.bigram_probability("hola", w) for w in self.vocab))

        print(sum(self.trigram_probability("hola", "como", w) for w in self.vocab))

In [29]:
def corpus_perplexity_trigram(model, transformed_corpus):
    nll_sum = 0.0
    tok_count = 0
    for seq in transformed_corpus:
        for i in range(2, len(seq)):
            w2, w1, w = seq[i-2], seq[i-1], seq[i]
            p = model.probability_of_word(w2, w1, w)
            if p <= 0.0: p = 1e-12
            nll_sum += -log(p)
            tok_count += 1
    return exp(nll_sum / max(tok_count, 1))


In [31]:
# 1) Fit en TRAIN: arma vocab y corpus transformado con <s>,</s>,<unk>
tri_data = TrigramData(vocab_max=5000, tokenizer=tweet_tokenizer)
train_transformed = tri_data.fit(tr_txt)   # lista de listas de tokens
# 2) Instancia y ‚Äúentrena‚Äù (cuenta n-gramas)
tri_lm = TrigramLanguageModel(lambda1=0.4, lambda2=0.3, lambda3=0.3)
tri_lm.train(train_transformed, tri_data.final_vocabulary)

# 3) TRANSFORM de VAL/TEST usando el MISMO vocab
def transform_corpus(raw_texts, tri_data):
    out = []
    for txt in raw_texts:
        toks = tri_data.tokenizer(txt)
        out.append(tri_data.transform(toks))   # aplica UNK + <s>,<s>,...,</s>
    return out

val_transformed = transform_corpus(val_txt, tri_data)

train_ppl = corpus_perplexity_trigram(tri_lm, train_transformed)
val_ppl   = corpus_perplexity_trigram(tri_lm, val_transformed)
print(f"[TRIGRAM] train PPL={train_ppl:.3f} | val PPL={val_ppl:.3f}")


[TRIGRAM] train PPL=463.030 | val PPL=411.093


# Neural Language Model

In [32]:
from types import SimpleNamespace
from torch.utils.data import TensorDataset, DataLoader

class NgramData():
  """
  Esta clase toma un corpus y a trav√©s de los m√©todos fit y transform, se crea una lista de 
  n-gramas pensada para el entrenamiento de la red neuronal de Bengio pensando en una CBOW.
  """

  def __init__(self,
               N: int,
               vocab_max: int = 5000,
               tokenizer: callable = None,
               embeddings: np.ndarray = None):
    """
    Constructor de la clase.

    Args:
        N (int): Tama√±o de los n-gramas.
        vocab_max (int, optional): Tama√±o m√°ximo del vocabulario a considerar. Defaults to 5000.
        tokenizier (callable, optional): Tokenizador. Defaults to None.
        embeddings (np.ndarray, optional): Matriz de embeddings pre-entrenada. Debe entrar en el orden en el que entran las palabras. Defaults to None.
    """
    self.N = N
    self.vocab_max = vocab_max
    self.tokenizer = tokenizer if tokenizer else self.default_tokenizer
    self.embeddings = embeddings

    # Tokens que no queremos en nuestro corpus.
    self.punct = ['.', ',', ';', ':', '-', '^', '"'
                  '"', '!', '¬°', '¬ø', '?', '<url>', '#', '@usuario']

    # Tokens especiales
    self.UNK = "<unk>"
    self.SOS = "<s>"
    self.EOS = "</s>"

  def get_vocab_size(self) -> int:
    """
    Devuelve el tama√±o del vocabulario.

    Returns:
        int: Tama√±o del vocabulario.
    """
    return len(self.vocab)

  def default_tokenizer(self, doc: str) -> list:
    """
    Tokenizador por defecto. Simplemente separa cada oraci√≥n por espacios.

    Args:
        doc (str): Documento a tokenizar.

    Returns:
        list: Lista de tokens.
    """
    return doc.split(" ")

  def remove_word(self, word: str) -> bool:
    """
    Verifica si la palabra en cuesti√≥n debe eliminarse seg√∫n los siguientes criterios:
    - Es un signo de puntuaci√≥n
    - Es un d√≠gito

    Args:
        word (str): Palabra a evaluar.

    Returns:
        bool: True si se elimina.
    """
    word = word.lower()
    is_punct = True if word in self.punct else False
    is_digit = word.isnumeric()
    return is_punct or is_digit

  def sortFreqDist(self, freq_dist: nltk.FreqDist) -> list:
    """
    Devuelve una lista con el top de palabras por frecuencia. El tama√±o de la lista es self.vocab_max.

    Args:
        freq_dist (nltk.FreqDist): Objeto de frecuencias (nltk) del corpus considerado.

    Returns:
        list: Lista de tama√±o self.vocab_max.
    """
    freq_dist = dict(freq_dist)
    # Aqu√≠ key es una funci√≥n que se aplica a cada par√°metro
    # antes de compararlo. En este caso se pasa
    # freq_dist.get para asegurarse de que el ordenamiento
    # se haga por las frecuencias y no por orden alfab√©tico.
    return sorted(freq_dist,
                  key=freq_dist.get,
                  reverse=True)

  def get_vocab(self, corpus: list[str]) -> set:
    """
    Devuelve el vocabulario a partir de un corpus dado.

    Args:
        corpus (list[str]): Corpus del cual se quiere obtener el vocabulario. Lista de documentos.

    Returns:
        set: Vocabulario.
    """
    freq_dist = FreqDist(
      [w.lower()
       for sentence in corpus
       for w in self.tokenizer(sentence)
       if not self.remove_word(w)]
    )
    sorted_words = self.sortFreqDist(freq_dist)[:self.vocab_max-3]
    return set(sorted_words)

  def fit(self, corpus: list[str]) -> None:
    """
    Carga el vocabulario y crea diccionarios de √≠ndices <-> palabras. Adem√°s, si se aporta una matriz de embeddings pre-entrenados, tambi√©n construye la submatriz con los elementos del vocabulario.

    Args:
        corpus (list[str]): Lista de documentos.
    """
    # Cargamos el vocabulario
    self.vocab = self.get_vocab(corpus)
    self.vocab.add(self.UNK)
    self.vocab.add(self.SOS)
    self.vocab.add(self.EOS)

    # Diccionarios palabras <-> ids
    self.w2id = dict()
    self.id2w = dict()

    if self.embeddings:
      self.embeddings_matrix = np.empty([self.vocab_max,
                                         self.embeddings.vector_size])

    id = 0
    for doc in corpus:
      for word in self.tokenizer(doc):
        word_ = word.lower()
        if (word_ in self.vocab) and (not word_ in self.w2id):
          self.w2id[word_] = id
          self.id2w[id] = word_

          # Si se aporta una matriz de embeddings,
          # aqu√≠ se crea la submatriz.
          if self.embeddings:
            if word in self.embeddings:
              self.embeddings_matrix[id] = self.embeddings[word_]
            else:
              self.embeddings_matrix[id] = np.random.rand(
                self.embeddings.vector_size)

          id += 1

    # A√±adirmos los tokens especiales a los diccionarios.
    self.w2id.update(
      {self.UNK: id,
       self.SOS: id + 1,
       self.EOS: id + 2}
    )
    self.id2w.update(
      {id: self.UNK,
       id + 1: self.SOS,
       id + 2: self.EOS}
    )

  def get_ngram_doc(self, doc: str) -> list:
    """
    Devuelve una lista con n-gramas de un documento dado.

    Args:
        doc (str): Documento del que se quieren obtener los n-gramas.

    Returns:
        list: Lista de n-gramas.
    """
    doc_tokens = self.tokenizer(doc)
    doc_tokens = self.replace_unk(doc_tokens)
    doc_tokens = [w.lower()
                  for w in doc_tokens]
    doc_tokens = [self.SOS] * (self.N - 1) + doc_tokens + [self.EOS]

    return list(nltk.ngrams(doc_tokens, self.N))

  def replace_unk(self, doc_tokens: list[str]) -> list:
    """
    Toma un lista de tokens e intercambia los tokens out-of-vocabulary por el token especial self.UNK.

    Args:
        doc_tokens (list[str]): Lista de tokens.

    Returns:
        list: Lista de tokens procesada.
    """
    for i, token in enumerate(doc_tokens):
      if token.lower() not in self.vocab:
        doc_tokens[i] = self.UNK
    return doc_tokens

  def transform(self, corpus: list[str]) -> tuple[np.ndarray, np.ndarray]:
    """
    Devuelve una tupla de arreglos de Numpy. El primero tendr√° los ids de las palabras en el contexto, mientras que la segunda el id de la palabra que se debe predecir.

    Se piensa en un modelo de CBOW. Damos el contexto y queremos predecir la palabra que sigue.

    Args:
        corpus (list[str]): Lista de documentos.

    Returns:
        tuple[np.ndarray, np.ndarray]: Arreglos de numpy con ids de los contextos y id de la palabra objetivo.
    """
    X_ngrams = list()
    y = []

    for doc in corpus:
      doc_ngram = self.get_ngram_doc(doc)
      for words_window in doc_ngram:
        words_window_ids = [self.w2id[w]
                            for w in words_window]
        X_ngrams.append(list(words_window_ids[:-1]))
        y.append(words_window_ids[-1])

    return np.array(X_ngrams), np.array(y)
  
class NeuralLanguageModel(nn.Module):
  """
  Red neuronal de Bengio :)
  """

  def __init__(self, args):
    """
    Constructor  de la clase.

    El modelo de red neuronal par lenguaje de Bengio tiene la siguiente estructura:
    Para un modelo de n-gramas, se dan las primeras n-1 palabras como contexto y se intenta predecir la n-√©sima palabra.
    (1) n-1 representaciones iniciales: suelen ser one-hot. Pero aqu√≠ se toman de NgramData.
        x
    (2) n-1 representaciones aprendidas de tama√±o m: se obtienen de manera individual (por palabra). 
        (x = Cx)
        En esta implementaci√≥n C se inicia de manera aleatoria.
    (3) Capa oculta de tama√±o h: se mezclan las n-1 representaciones del paso anterior y se aplica tanh. 
        (h = tanh(Hx + d))
        Nosotros vamos a usar ReLu en vez de tanh.
    (4) Capa de salida de tama√±o m: se aplica softmax a la salida de la capa anterior.
        (y = softmax(Uh + b))
        Nosotros no vamos a aplicar softmax aqu√≠, sino afuerita.

    Args:
        args (Any): Diccionario de variables.
    """
    super(NeuralLanguageModel, self).__init__()

    self.window_size = args.N - 1  # Las n-1 palabras que entran (el contexto).
    self.embedding_size = args.m  # Tama√±o de las representaciones.

    # Matriz C para convertir las representaciones. Pero est√° chido porque sus entradas son "entrenables".
    self.emb = nn.Embedding(args.vocab_size, args.m)
    # Primera capa oculta de las representaciones aprendidas a la oculta.
    self.fc1 = nn.Linear(args.m * (args.N - 1), args.d_h)
    # Un dropout para alocarnos
    self.drop1 = nn.Dropout(p=args.dropout)
    # Aqu√≠ solamente se va a hacer el producto por la matriz U.
    # La softmax se va a aplicar por fuera de la red para obtener la siguiente palabra seg√∫n la red.
    self.fc2 = nn.Linear(args.d_h, args.vocab_size, bias=False)

  def forward(self, x):
    # Aqu√≠ se cambia la representaci√≥n inicial por la aprendida.
    # Es un producto matricial. Aqu√≠ las representaciones siguen siendo matrices.
    x = self.emb(x)
    # Se cambia el tama√±o para que se considere como una sola capa.
    x = x.view(-1, self.window_size * self.embedding_size)
    # Aqu√≠ se hace relu(Hx + d)
    h = F.relu(self.fc1(x))  # relu(z) = max{0, z}
    # El dropout para alocarnoooos wuuuuuuuu
    h = self.drop1(h)

    # Devolvemos solamente (Uh + b)
    return self.fc2(h)
  
def get_preds(raw_logits: torch.Tensor) -> torch.Tensor:
  """
  Aqu√≠ se toma la salida de la red neuronal (las neuronas de la √∫ltima capa oculta).
  Uh + b
  Se les aplica la softmax
  softmax(Uh + b)
  Y luego se devuelve el √≠ndice de la neurona de mayor valor.

  Args:
      raw_logits (torch.Tensor | float): La salida de la red (Uh + b)

  Returns:
      torch.Tensor | int: √çndice de la neurona con mayor valor despu√©s de softmax.
  """
  # Se aplica softmax.
  probs = F.softmax(raw_logits.detach(), dim=1)
  # Se obtiene el √≠ndice del valor m√°ximo.
  y_pred = torch.argmax(probs, dim=1).cpu().numpy()

  return y_pred

In [None]:
def loss_ngram(logits, y):
    loss_sum = F.cross_entropy(logits, y, reduction='sum')
    valid = torch.tensor(y.numel(), device=y.device)
    return loss_sum, valid

@torch.no_grad()
def perplexity_corpus_ngram(model, loader, device='cpu'):
    model.eval()
    tot_sum, tot_valid = 0.0, 0
    for X_win, y in loader:
        X_win, y = X_win.to(device), y.to(device)
        logits = model(X_win)
        ls, v = loss_ngram(logits, y)
        tot_sum += ls.item()
        tot_valid += int(v.item())
    return exp(tot_sum / max(tot_valid, 1))


In [None]:
N = 5
ng = NgramData(N=N, vocab_max=5000, tokenizer=tweet_tokenizer)
ng.fit(tr_txt)                                 # arma vocab y mapeos
X_np, y_np = ng.transform(tr_txt)              # X: (num_samples, N-1) ; y: (num_samples,)

# 3.2 Dataset/Loader
X_t = torch.from_numpy(X_np).long()
y_t = torch.from_numpy(y_np).long()
ds_ng = TensorDataset(X_t, y_t)
loader_ng = DataLoader(ds_ng, batch_size=256, shuffle=True)

# 3.3 Modelo de Bengio
args = SimpleNamespace(
    N=N,
    m=128,                # tama√±o de embedding
    d_h=256,              # tama√±o de capa oculta
    dropout=0.2,
    vocab_size=ng.get_vocab_size()
)
model_ng = NeuralLanguageModel(args).to(device)
opt_ng = torch.optim.AdamW(model_ng.parameters(), lr=1e-3, weight_decay=1e-2)

## Entrenamiento

In [35]:
epochs = 10
for epoch in range(1, epochs+1):
    model_ng.train()
    tot_sum, tot_valid = 0.0, 0

    for X_win, y in loader_ng:
        X_win, y = X_win.to(device), y.to(device)

        opt_ng.zero_grad()
        logits = model_ng(X_win)            # (B,V)
        ls, v = loss_ngram(logits, y)
        loss_val = ls / v
        loss_val.backward()
        torch.nn.utils.clip_grad_norm_(model_ng.parameters(), 1.0)
        opt_ng.step()

        tot_sum += ls.item()
        tot_valid += v.item()

    ppl_train = exp(tot_sum / max(tot_valid, 1))
    print(f"[NGRAM] Epoch {epoch}/{epochs} | train PPL: {ppl_train:.3f}")

# (opcional) ppl de validaci√≥n: perplexity_corpus_ngram(model_ng, loader_ng_val, device=device)


[NGRAM] Epoch 1/10 | train PPL: 332.248
[NGRAM] Epoch 2/10 | train PPL: 175.245
[NGRAM] Epoch 3/10 | train PPL: 125.279
[NGRAM] Epoch 4/10 | train PPL: 85.923
[NGRAM] Epoch 5/10 | train PPL: 54.140
[NGRAM] Epoch 6/10 | train PPL: 33.652
[NGRAM] Epoch 7/10 | train PPL: 23.067
[NGRAM] Epoch 8/10 | train PPL: 17.570
[NGRAM] Epoch 9/10 | train PPL: 14.266
[NGRAM] Epoch 10/10 | train PPL: 12.053


In [48]:
X_np, y_np = ng.transform(val_txt)              # X: (num_samples, N-1) ; y: (num_samples,)

# 3.2 Dataset/Loader
X_t = torch.from_numpy(X_np).long()
y_t = torch.from_numpy(y_np).long()
ds_ng = TensorDataset(X_t, y_t)
loader_val_ng = DataLoader(ds_ng, batch_size=256, shuffle=True)

ppl_train_ngram = perplexity_corpus_ngram(model_ng, loader_ng, device=device)
print(f"[NGRAM] train PPL: {ppl_train_ngram:.3f}")
ppl_val_ngram = perplexity_corpus_ngram(model_ng, loader_val_ng, device=device)
print(f"[NGRAM] val PPL: {ppl_val_ngram:.3f}")


[NGRAM] train PPL: 7.555
[NGRAM] val PPL: 564.345


|Modelo|PPL train|PP val|
|:----:|:-------:|:----:|
|LSTM + self att|725.9629|346.0325|
|Statistical|463.030|411.093|
|Neural|7.555|564.345|

# Generacion de texto

In [51]:
import math, torch, random
import torch.nn.functional as F

# construye vocabulario inverso (id: token)
def build_inv_vocab(V):
    # V: dict token->id
    inv = {i:t for t,i in V.items()}
    return inv

# muestrea un token a partir de los logits
def sample_from_logits(logits, temperature=1.0, top_p=None):
    """
    logits: 1D tensor (V,)
    Devuelve: √≠ndice amuestrado (int)
    """
    if temperature <= 0:
        return int(torch.argmax(logits).item())

    logits = logits / temperature

    # muestreo de probabilidad acumulada (top-p)
    if top_p is not None and 0 < top_p < 1.0:
        probs = torch.softmax(logits, dim=-1)
        sorted_probs, sorted_idx = torch.sort(probs, descending=True)
        cum = torch.cumsum(sorted_probs, dim=-1)
        cutoff = (cum > top_p).nonzero(as_tuple=True)[0]
        if len(cutoff) > 0:
            k = int(cutoff[0].item() + 1)
            keep = sorted_idx[:k]
            mask = torch.ones_like(logits, dtype=torch.bool)
            mask[keep] = False
            logits = logits.masked_fill(mask, float("-inf"))

    probs = torch.softmax(logits, dim=-1)
    idx = torch.multinomial(probs, num_samples=1)
    return int(idx.item())


In [65]:
@torch.no_grad()
def generate_lstm_lm(model, V, prompt_tokens=None, max_new_tokens=30,
                     temperature=1.0, top_k=None, top_p=None, device="cpu"):
    """
    model: LSTMSelfAttention entrenado (autoregresivo)
    V: dict token->id
    prompt_tokens: lista de tokens (si None, empieza con <s>)
    Devuelve: lista de tokens (incluye el prompt y lo generado)
    """
    model.eval()
    invV = build_inv_vocab(V)

    # tokens iniciales
    if prompt_tokens is None or len(prompt_tokens) == 0:
        toks = ["<s>"]
    else:
        toks = prompt_tokens

    # ids iniciales
    ids = [V.get(t, V["<unk>"]) for t in toks]
    
    for _ in range(max_new_tokens):
        x = torch.tensor(ids, dtype=torch.long, device=device).unsqueeze(0)
        lens = torch.tensor([x.size(1)], dtype=torch.long, device=device)
        logits = model(x, lens)
        next_logits = logits[0, -1, :]

        idx = sample_from_logits(next_logits, temperature, top_p)
        next_tok = invV.get(idx, "<unk>")
        ids.append(idx)
        toks.append(next_tok)
        if next_tok == "</s>":
            break

    return toks

@torch.no_grad()
def generate_bengio_ngram(model, ng, prompt_tokens=None, max_new_tokens=20,
                          temperature=1.0, top_p=None, device="cpu"):
    model.eval()
    Nm1 = getattr(model, "window_size")  # usa el tama√±o real del modelo

    # contexto inicial de longitud EXACTA Nm1
    prompt_tokens = prompt_tokens or []
    ctx = (["<s>"]*(Nm1-1) + prompt_tokens)[-Nm1:]

    def id_of(tok): return ng.w2id.get(tok, ng.w2id["<unk>"])

    out = list(prompt_tokens)  # lo que imprimimos
    for _ in range(max_new_tokens):
        x_win = torch.tensor([[id_of(t) for t in ctx]], dtype=torch.long, device=device)  # (1, Nm1)
        logits = model(x_win)[0]  # (V,)
        idx = sample_from_logits(logits, temperature=temperature, top_p=top_p)
        tok = ng.id2w.get(idx, "<unk>")

        out.append(tok)
        ctx = (ctx + [tok])[-Nm1:]
        if tok == "</s>":
            break
    return out


def generate_trigram(tri_lm, tri_data, prompt_tokens=None, max_new_tokens=20,
                     temperature=1.0, top_p=None, greedy=False):
    """
    tri_lm: TrigramLanguageModel entrenado
    tri_data: TrigramData (para vocab y mask_oov)
    """
    # contexto inicial: dos BOS
    ctx = ["<s>", "<s>"]
    if prompt_tokens:
        ctx += prompt_tokens
    ctx = ctx[-2:]

    toks = list(ctx)

    # lista estable del vocabulario (para indexar)
    vocab = sorted(list(tri_lm.vocab))  # set -> lista
    # opcional: evita escoger <s> como siguiente palabra
    if "<s>" in vocab:
        vocab_wo_bos = [w for w in vocab if w != "<s>"]
    else:
        vocab_wo_bos = vocab

    for _ in range(max_new_tokens):
        w2, w1 = ctx[-2], ctx[-1]

        # construye la distribuci√≥n sobre todo el vocabulario
        probs = np.array([tri_lm.probability_of_word(w2, w1, w) for w in vocab_wo_bos], dtype=np.float64)
        probs = np.clip(probs, 1e-12, 1.0)
        probs = probs / probs.sum()

        # temperatura/top-p opcional (en numpy)
        if temperature > 0 and temperature != 1.0:
            # aplicamos a logits ficticios: log(p)^1/T -> p^(1/T) (aprox estabilizada)
            logits = np.log(probs + 1e-12) / temperature
            probs = np.exp(logits - logits.max())
            probs = probs / probs.sum()

        if top_p is not None and 0 < top_p < 1.0:
            sidx = np.argsort(-probs)
            sorted_probs = probs[sidx]
            csum = np.cumsum(sorted_probs)
            k = np.searchsorted(csum, top_p) + 1
            keep = sidx[:k]
            mask = np.ones_like(probs, dtype=bool)
            mask[keep] = False
            probs = np.where(mask, 0.0, probs)
            probs = probs / probs.sum()

        if greedy or temperature <= 0:
            next_w = vocab_wo_bos[int(np.argmax(probs))]
        else:
            next_w = np.random.choice(vocab_wo_bos, p=probs)

        toks.append(next_w)
        ctx = (ctx + [next_w])[-2:]
        if next_w == "</s>":
            break

    return toks


# LSTM

In [None]:
gen = generate_lstm_lm(
    model, V,
    prompt_tokens=["<s>", "tu", "eres", "un"],
    max_new_tokens=30, temperature=0.9, top_p=0.9, device=device
)
print(" ".join(gen))


tu eres un #gay asco ! </s>


In [59]:
gen = generate_lstm_lm(
    model, V,
    prompt_tokens=[],
    max_new_tokens=30, temperature=0.9, top_p=0.9, device=device
)
print(" ".join(gen))


<s> hora dos as√≠ che pasada coca üòç </s>


# Statistical

In [100]:
gen_b = generate_bengio_ngram(
    model_ng, ng,
    prompt_tokens=["<s>", "tu", "eres", "un"],
    max_new_tokens=20, temperature=0.9, top_p=0.9, device=device
)
print(" ".join(gen_b))


<s> tu eres un pinche calor inmamable mosquitos <unk> </s>


In [108]:
gen_b = generate_bengio_ngram(
    model_ng, ng,
    prompt_tokens=["<s>", "<s>", "<s>", "<s>"],
    max_new_tokens=20, temperature=0.9, top_p=0.9, device=device
)
print(" ".join(gen_b))


<s> <s> <s> <s> dice s√∫per <unk> viendo <unk> <unk> puede ser c√≠nico tipo <unk> </s>


## Neural

In [60]:
gen_tri = generate_trigram(
    tri_lm, tri_data,
    prompt_tokens=tweet_tokenizer("tu eres un"),
    max_new_tokens=25, top_p=0.9, temperature=0.9
)
print(" ".join(gen_tri))

<s> <s> pinche #vivam√©xico ü§ó ay <unk> ‚úäüèº estr√©s contando sueldo temprano bueno d'alessio sacarle haha . </s>


In [62]:
gen_tri = generate_trigram(
    tri_lm, tri_data,
    prompt_tokens=[],
    max_new_tokens=25, top_p=0.9, temperature=0.9
)
print(" ".join(gen_tri))

<s> <s> aniversario bonita ctm ‚òπ </s>


Todos los modelos generan textos con cierto nivel de coherencia, sin embargo muchos de los textos generados son agresivos debido a la naturaleza del corpus.

Fue complicado decidir que arquitectura de RNN proponer desde cero, particularmente la parte de decidir hiperparametros para poder vencer a los otros modelos de lenguaje; si bien se logro una perplejidad menor a la de los otros dos, la generacion de texto aun es complicada debido a que no es el objetivo principal de los modelos de lenguaje.