# Creando un (simple) modelo de lenguaje

## Preparación del texto

In [None]:
import nltk
nltk.download('punkt')
nltk.download('stopwords')

### Función tokenizadora

In [None]:
from nltk import sent_tokenize
from nltk.tokenize.toktok import ToktokTokenizer
import string
from typing import List

tokenizer = ToktokTokenizer()
stopword_es = set(nltk.corpus.stopwords.words('spanish'))
punctuation = set(string.punctuation + '¡¿…')

def tokenize_dialog(dialog: str) -> List[str]:
    tokens = []
    for sentence in sent_tokenize(dialog):
        for token in tokenizer.tokenize(sentence):
            token = token.lower()
            # if token in stopword_es:
            #     continue
            # if token in punctuation:
            #     continue
            tokens.append(token)
    return tokens

In [None]:
diálogo = ("Y también nos planteó que se atendiera a jóvenes en casas especiales para terapias y apoyo a personas "
           "con discapacidad. También, ya se buscó una alternativa y ya tenemos una respuesta.")

tokens = tokenize_dialog(diálogo)

print(" - ".join(tokens))

### Aplicándolo a los diálogos de las conferencias presidenciales

In [None]:
import gzip

dialogos = []

with gzip.open("conferencias.txt.gz", 'rt') as r:
    for line in r:
        dialogos.append(line.strip())

In [None]:
training_tokens = [
    tokenize_dialog(dialogo) for dialogo in dialogos
]

In [None]:
training_tokens[0]

## Train a new Vocabulary

## Clase `Vocabulary`

In [None]:
from collections import Counter
from itertools import chain
from typing import Optional, List


class Vocabulary:
    START_TOKEN = "<p>"
    END_TOKEN = "</p>"
    UNK_TOKEN = "<unk>"

    def __init__(self, size: Optional[int] = None):
        self.size = size

    def fit(self, tokenized_texts: List[List[str]]) -> "Vocabulary":
        # Creamos una sola lista con todos los tokens
        tokens = chain.from_iterable(tokenized_texts)

        # Contamos las ocurrencias de cada token
        self.single_token_counts = Counter(tokens)
        self.total_number_of_tokens = sum(self.single_token_counts.values())

        # Obtenemos los tokens más comunes con `most_common`, si `size` es `None` entonces se obtienen todos los tokens
        # Si `size` no es `None` entonces se obtienen los `size` tokens más comunes menos 3 (por los tokens especiales)
        top_counts = self.single_token_counts.most_common(None if self.size is None else (self.size - 3))

        # Creamos el vocabulario con los tokens especiales y los tokens más comunes
        vocab = [self.START_TOKEN, self.END_TOKEN, self.UNK_TOKEN] + [w for w, _ in top_counts]

        # Creamos los diccionarios para convertir de id a palabra y viceversa
        self.id_to_word = dict(enumerate(vocab))
        self.word_to_id = {v: k for k, v in iter(self.id_to_word.items())}

        # Establecemos estas variables para acceder más fácilmente a la cantidad de tokens y al tamaño del vocabulario
        self.size = len(self.id_to_word)
        self.tokenset = set(self.word_to_id.keys())

        # Almacenamos los ids de los tokens especiales
        self.start_id = self.word_to_id[self.START_TOKEN]
        self.end_id = self.word_to_id[self.END_TOKEN]
        self.unk_id = self.word_to_id[self.UNK_TOKEN]

        return self

    def transform(self, tokenized_texts: List[List[str]]):
        padded_posts = ([self.START_TOKEN, self.START_TOKEN] + p + [self.END_TOKEN] for p in tokenized_texts)
        cannonical_posts = [[self.word_to_id.get(w, self.unk_id) for w in p] for p in padded_posts]
        return cannonical_posts

    def words_to_ids(self, words: List[str]) -> List[int]:
        return [self.word_to_id.get(w, self.unk_id) for w in words]

    def ids_to_words(self, ids: List[int]) -> List[str]:
        return [self.id_to_word[i] for i in ids]


In [None]:
vocab = Vocabulary()
vocab.fit(training_tokens)

In [None]:
print(list(vocab.tokenset)[:10])
vocab.total_number_of_tokens

In [None]:
vocab.single_token_counts.most_common(10)

In [None]:
transformed = vocab.transform([['hola', 'cómo', 'están']])
print("Ids:", transformed)
words = vocab.ids_to_words(transformed[0])
print("Words:", words)

## Crea un nuevo modelo de lenguaje

In [None]:
from collections import defaultdict
from typing import List


class AddKTrigramLM:
    """Modelo de lenguaje de trigramas con suavizado Add-k."""

    def __init__(self, k: float = 0.0):
        """Inicializa el modelo de lenguaje con el valor de k."""
        self.k = k

    def fit(self, corpus: List[List[str]]) -> "AddKTrigramLM":
        """Entrena el modelo de lenguaje a partir de un corpus."""

        # Creamos un diccionario de diccionarios para guardar las cuentas de los trigramas
        # En este diccionario las ocurrencias del token `w` dado `w_1 w_2` se almacenan como `counts[(w_2,w_1)][w]`
        counts = defaultdict(lambda: defaultdict(lambda: 0))

        # Necesitamos tener una lista de todos los tokens en el corpus
        # Esto es necesario para calcular el tamaño del vocabulario
        # y para calcular las probabilidades de los tokens
        all_tokens = set()

        # Iteramos sobre cada documento del corpus
        # Para cada documento iteramos sobre cada token
        # En cada iteración actualizamos el contexto y las cuentas de los trigramas
        w_1, w_2 = None, None
        for document in corpus:
            for token in document:
                all_tokens.add(token)
                if w_1 is not None and w_2 is not None:
                    counts[(w_2, w_1)][token] += 1
                # Update context
                w_2 = w_1
                w_1 = token

        # Convertir los defaultdicts en diccionarios normales
        # esto es solo por fines de presentación
        self.token_totals = dict()
        for context, ctr in counts.items():
            self.token_totals[context] = dict(ctr)

        # En `context_totals` almacenamos las ocurrencias de los bigramas `w_2,w_1`
        self.context_totals = dict()
        for context, ctr in counts.items():
            self.context_totals[context] = sum(ctr.values())

        self.tokens = list(all_tokens)
        self.V = len(self.tokens)

        return self

    def set_k(self, k: float = 0.0) -> None:
        self.k = k

    def next_token_proba(self, token: str, current_sequence: List[str]) -> float:
        """Calcula la probabilidad de un token dada una secuencia de tokens."""

        # Obtenemos los 2 últimos tokens de la secuencia: `w_1` y `w_2`
        context = tuple(current_sequence[-2:])

        # Count word es la cuenta de las veces que ocurren los tokens `w_1`, `w_2` y `token` en el corpus
        count_word = self.token_totals.get(context, {}).get(token, 0)

        # Context count es la cuenta de las veces que ocurren los tokens `w_1` y `w_2` en el corpus
        context_count = self.context_totals.get(context, 0)

        if self.k == 0:
            # Si k = 0, entonces no se aplica suavizado y la probabilidad es la división de la cuenta de los
            # tokens `w_1`, `w_2` y `token` entre la cuenta de los tokens `w_1` y `w_2`
            return count_word / context_count
        else:
            # El cáculo de la probabilidad es la división de la cuenta de los tokens `w_1`, `w_2` y `token` entre la
            # cuenta de los tokens `w_1` y `w_2`
            return (count_word + self.k) / (context_count + self.k * self.V)


In [None]:
lm = AddKTrigramLM()

lm.fit(vocab.transform(training_tokens))

In [None]:
token_inicial = 'hola'
id_inicial = vocab.word_to_id[token_inicial]
lm.token_totals[(vocab.start_id, vocab.start_id)][id_inicial]

In [None]:
lm.next_token_proba(id_inicial, [vocab.start_id,vocab.start_id])

In [None]:
from typing import Tuple

import numpy as np

from add_k_trigram_lm import AddKTrigramLM
from vocabulary import Vocabulary


class SequenceGenerator:
    def __init__(self, language_model: AddKTrigramLM, vocabulary: Vocabulary):
        self.lm = language_model
        self.vocab = vocabulary

    def sample_next_token(self, *sequence: Tuple[str]) -> str:
        # Esto busca cada palabra en el vocabulario y obtiene su probabilidad condicional.
        # Esto puede ser lento si el vocabulario es muy grande, ¿podríamos hacerlo mejor?
        probs = [self.lm.next_token_proba(word, sequence) for word in self.lm.tokens]

        # Elegimos una palabra al azar de acuerdo a sus probabilidades
        return np.random.choice(self.lm.tokens, p=probs)

    def generate_sequences(self, *start: Tuple[str], max_length: int = 200) -> str:
        # Given it the start sequence to indicate the start of a post.
        seq = [self.vocab.start_id, self.vocab.start_id]
        if start:
            start = [self.vocab.word_to_id[w] for w in start]
            seq.extend(start)
        for i in range(max_length):
            seq.append(self.sample_next_token(*seq))
            # Stop at post
            if seq[-1] == self.vocab.end_id:
                break
        return " ".join([f"{self.vocab.id_to_word[s]}" for s in seq])


In [None]:
# lm.set_k(1e-3)
lm.set_k(0)
generador = SequenceGenerator(lm, vocab)
start = ['el', 'pueblo']
for _ in range(5):
    print(generador.generate_sequences(*start))
    print()