(15p) Using the test dataset, calculate the perplexity of each language model. Report the results obtained. If you experience variable overflow, use probabilities in log space.

El link de la carpeta donde estan los conjuntos de entrenamiento + testing y los modelos entrenados es:

https://uniandes-my.sharepoint.com/:f:/g/personal/a_mosquerah2_uniandes_edu_co/Em-od1gldI9BnnTjpUXqZXcB8fjXUoybI35zktWPWzyqpw?e=6dT4ng

(Solo se puede abrir con correo uniandes)

In [2]:
# carpeta conjunto de entrenamiento y de testing de 20news y de BAC
tercer_punto_folder = "tercer-punto"
# carpeta modelos de unigramas, bigramas y trigramas de cada corpus. 
cuarto_punto_folder = "cuarto-punto"

In [3]:
import json
import os
import math
from collections import defaultdict
from typing import Dict, Tuple, Optional, Any


def read_ngram_model(file_path: str) -> Optional[Dict[str, float]]:
    """
    Carga un modelo de n-gramas desde un archivo JSON.

    Args:
        file_path (str): Ruta del archivo JSON que contiene el modelo.

    Returns:
        Optional[Dict[str, float]]: Diccionario con las probabilidades del modelo
        o None si ocurre un error al leer el archivo.
    """
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            model: Dict[str, float] = json.load(f)
        print(f"[OK] Modelo cargado desde '{file_path}'")
        return model
    except FileNotFoundError:
        print(f"[ERROR] Archivo no encontrado: '{file_path}'")
        return None
    except Exception as e:
        print(f"[ERROR] No se pudo cargar el modelo desde '{file_path}': {e}")
        return None

def _normalize_ngram_keys(
    model_dict: Dict[str, float], n: int
) -> Tuple[Dict[Tuple[str, ...], float], Dict[Any, int]]:
    """
    Convierte llaves de un diccionario de n-gramas en tuplas para acceso O(1)
    y calcula el número de tipos de tokens siguientes por contexto.

    Args:
        model_dict (Dict[str, float]): Diccionario del modelo con claves en formato string.
        n (int): Tamaño del n-grama (1, 2 o 3).

    Returns:
        Tuple[Dict[Tuple[str, ...], float], Dict[Any, int]]:
            - Diccionario con claves como tuplas y sus probabilidades.
            - Diccionario con el número de tokens siguientes por contexto.
    """
    out: Dict[Tuple[str, ...], float] = {}
    followers: Dict[Any, int] = defaultdict(int)

    if n == 1:
        for k, v in model_dict.items():
            out[(k,)] = float(v)
        return out, {}

    elif n == 2:
        next_types = defaultdict(set)
        for k, v in model_dict.items():
            a, b = k.split(" ", 1)
            out[(a, b)] = float(v)
            next_types[a].add(b)
        followers = {a: len(bs) for a, bs in next_types.items()}
        return out, followers

    elif n == 3:
        next_types = defaultdict(set)
        for k, v in model_dict.items():
            a, b, c = k.split(" ", 2)
            out[(a, b, c)] = float(v)
            next_types[(a, b)].add(c)
        followers = {ctx: len(cs) for ctx, cs in next_types.items()}
        return out, followers

    else:
        raise ValueError("n debe ser 1, 2 o 3")
    

def calculate_perplexity(
    testing_file: str,
    unigram_model: Dict[str, float],
    bigram_model: Dict[str, float],
    trigram_model: Dict[str, float],
    vocab_size: Optional[int] = None,
) -> Tuple[Optional[float], Optional[float], Optional[float]]:
    """
    Calcula la perplejidad de un corpus de prueba usando modelos de unigramas,
    bigramas y trigramas con suavizado de Laplace.

    Args:
        testing_file (str): Ruta del archivo con el corpus de prueba.
        unigram_model (Dict[str, float]): Modelo de unigramas.
        bigram_model (Dict[str, float]): Modelo de bigramas.
        trigram_model (Dict[str, float]): Modelo de trigramas.
        vocab_size (Optional[int], opcional): Tamaño del vocabulario. 
                                              Si es None, se calcula a partir de los unigramas.

    Returns:
        Tuple[Optional[float], Optional[float], Optional[float]]:
            Perplejidad de unigramas, bigramas y trigramas.
            Devuelve None si ocurre un error, o inf si el corpus está vacío.
    """
    if any(m is None for m in (unigram_model, bigram_model, trigram_model)):
        return None, None, None

    uni, _ = _normalize_ngram_keys(unigram_model, 1)
    bi, followers_bi = _normalize_ngram_keys(bigram_model, 2)
    tri, followers_tri = _normalize_ngram_keys(trigram_model, 3)

    if vocab_size is None:
        vocab_size = len(uni)

    laplace_uni_denom: int = len(uni) + vocab_size

    total_log_u: float = 0.0
    total_log_b: float = 0.0
    total_log_t: float = 0.0
    total_tokens: int = 0

    try:
        with open(testing_file, "r", encoding="utf-8") as f:
            for raw in f:
                s: str = raw.strip()
                if not s:
                    continue
                tokens = s.split()
                n: int = len(tokens)
                if n == 0:
                    continue

                total_tokens += n

                # -------- Unigram --------
                for w in tokens:
                    p = uni.get((w,), 0.0)
                    if p <= 0.0:
                        p = 1.0 / laplace_uni_denom
                    total_log_u += math.log(p)

                # -------- Bigram --------
                # P(w1) + Π P(w_i | w_{i-1})
                # w1 como unigrama
                p1 = uni.get((tokens[0],), 0.0)
                if p1 <= 0.0:
                    p1 = 1.0 / laplace_uni_denom
                total_log_b += math.log(p1)

                for i in range(n - 1):
                    a, b_ = tokens[i], tokens[i + 1]
                    p = bi.get((a, b_), 0.0)
                    if p <= 0.0:
                        context_types = followers_bi.get(a, 0)
                        p = 1.0 / (context_types + vocab_size)
                    total_log_b += math.log(p)

                # -------- Trigram --------
                # P(w1) * P(w2|w1) * Π P(w_i | w_{i-2}, w_{i-1})
                # w1 como unigrama
                p1 = uni.get((tokens[0],), 0.0)
                if p1 <= 0.0:
                    p1 = 1.0 / laplace_uni_denom
                total_log_t += math.log(p1)

                if n >= 2:
                    p2 = bi.get((tokens[0], tokens[1]), 0.0)
                    if p2 <= 0.0:
                        ctx_types = followers_bi.get(tokens[0], 0)
                        p2 = 1.0 / (ctx_types + vocab_size)
                    total_log_t += math.log(p2)

                for i in range(n - 2):
                    a, b_, c = tokens[i], tokens[i + 1], tokens[i + 2]
                    p = tri.get((a, b_, c), 0.0)
                    if p <= 0.0:
                        ctx_types = followers_tri.get((a, b_), 0)
                        p = 1.0 / (ctx_types + vocab_size)
                    total_log_t += math.log(p)

    except Exception as e:
        print(f"[ERROR] No se pudo leer el archivo de prueba '{testing_file}': {e}")
        return None, None, None

    if total_tokens == 0:
        return float("inf"), float("inf"), float("inf")

    ppl_u: float = math.exp(-total_log_u / total_tokens)
    ppl_b: float = math.exp(-total_log_b / total_tokens)
    ppl_t: float = math.exp(-total_log_t / total_tokens)
    return ppl_u, ppl_b, ppl_t

# ---------- Script principal ----------

if __name__ == '__main__':
    group_code = "my_group"

    training_file_20N = os.path.join(tercer_punto_folder, f"20N_{group_code}_training.txt")
    testing_file_20N  = os.path.join(tercer_punto_folder, f"20N_{group_code}_testing.txt")
    training_file_BAC = os.path.join(tercer_punto_folder, f"BAC_{group_code}_training.txt")
    testing_file_BAC  = os.path.join(tercer_punto_folder, f"BAC_{group_code}_testing.txt")

    print("\n\n--- Calculando la perplejidad ---")

    # Modelos 20N
    unigrams_20N = read_ngram_model(os.path.join(cuarto_punto_folder, "models", f"20N_{group_code}_unigrams.json"))
    bigrams_20N  = read_ngram_model(os.path.join(cuarto_punto_folder, "models", f"20N_{group_code}_bigrams.json"))
    trigrams_20N = read_ngram_model(os.path.join(cuarto_punto_folder, "models", f"20N_{group_code}_trigrams.json"))

    # vocab: usa #unigramas; evita re-leer el corpus completo
    vocab_size_20N = len(unigrams_20N) if unigrams_20N else None

    if all([unigrams_20N, bigrams_20N, trigrams_20N]):
        pp_uni_20N, pp_bi_20N, pp_tri_20N = calculate_perplexity(
            testing_file_20N,
            unigrams_20N, bigrams_20N, trigrams_20N,
            vocab_size=vocab_size_20N
        )
        print(f"\nResultados de Perplejidad para 20N (Corpus):")
        print(f"  Unigramas: {pp_uni_20N:.2f}")
        print(f"  Bigramas:  {pp_bi_20N:.2f}")
        print(f"  Trigramas: {pp_tri_20N:.2f}")

    # Modelos BAC
    unigrams_BAC = read_ngram_model(os.path.join(cuarto_punto_folder, "models", f"BAC_{group_code}_unigrams.json"))
    bigrams_BAC  = read_ngram_model(os.path.join(cuarto_punto_folder, "models", f"BAC_{group_code}_bigrams.json"))
    trigrams_BAC = read_ngram_model(os.path.join(cuarto_punto_folder, "models", f"BAC_{group_code}_trigrams.json"))

    vocab_size_BAC = len(unigrams_BAC) if unigrams_BAC else None

    if all([unigrams_BAC, bigrams_BAC, trigrams_BAC]):
        pp_uni_BAC, pp_bi_BAC, pp_tri_BAC = calculate_perplexity(
            testing_file_BAC,
            unigrams_BAC, bigrams_BAC, trigrams_BAC,
            vocab_size=vocab_size_BAC
        )
        print(f"\nResultados de Perplejidad para BAC (Corpus):")
        print(f"  Unigramas: {pp_uni_BAC:.2f}")
        print(f"  Bigramas:  {pp_bi_BAC:.2f}")
        print(f"  Trigramas: {pp_tri_BAC:.2f}")



--- Calculando la perplejidad ---
[OK] Modelo cargado desde 'cuarto-punto/models/20N_my_group_unigrams.json'
[OK] Modelo cargado desde 'cuarto-punto/models/20N_my_group_bigrams.json'
[OK] Modelo cargado desde 'cuarto-punto/models/20N_my_group_trigrams.json'

Resultados de Perplejidad para 20N (Corpus):
  Unigramas: 1095.89
  Bigramas:  2327.66
  Trigramas: 11091.42
[OK] Modelo cargado desde 'cuarto-punto/models/BAC_my_group_unigrams.json'
[OK] Modelo cargado desde 'cuarto-punto/models/BAC_my_group_bigrams.json'
[OK] Modelo cargado desde 'cuarto-punto/models/BAC_my_group_trigrams.json'

Resultados de Perplejidad para BAC (Corpus):
  Unigramas: 850.07
  Bigramas:  1215.92
  Trigramas: 13157.86


(15p) Using your best language model, build a method/function that automatically generates sentences by receiving the first word of a sentence as input. Take different tests and document them.

In [4]:
import json
import os
import math
import random
from collections import defaultdict
from typing import Dict, Set, Tuple, List, Optional, Callable, Any


def _load_trigram_unigram(
    prefix: str = "20N", 
    group_code: str = "my_group", 
    base_dir: str = "cuarto-punto/models"
) -> Tuple[Dict[Tuple[str, str, str], float], Dict[Tuple[str, str], Set[str]], Dict[Tuple[str], float], Set[str]]:
    """
    Carga modelos de trigramas y unigramas desde archivos JSON.

    Args:
        prefix (str, opcional): Prefijo del corpus (ejemplo: "20N" o "BAC").
        group_code (str, opcional): Código de grupo para diferenciar los modelos.
        base_dir (str, opcional): Directorio base donde se almacenan los modelos.

    Returns:
        Tuple:
            - Dict[Tuple[str, str, str], float]: Diccionario de trigramas con probabilidades.
            - Dict[Tuple[str, str], Set[str]]: Diccionario de contextos de trigramas con tokens siguientes.
            - Dict[Tuple[str], float]: Diccionario de unigramas con probabilidades.
            - Set[str]: Conjunto del vocabulario.
    """
    with open(os.path.join(base_dir, f"{prefix}_{group_code}_trigrams.json"), "r", encoding="utf-8") as f:
        tri_raw: Dict[str, float] = json.load(f)
    with open(os.path.join(base_dir, f"{prefix}_{group_code}_unigrams.json"), "r", encoding="utf-8") as f:
        uni_raw: Dict[str, float] = json.load(f)

    tri: Dict[Tuple[str, str, str], float] = {}
    followers: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
    for k, v in tri_raw.items():
        a, b, c = k.split(" ", 2)
        tri[(a, b, c)] = float(v)
        followers[(a, b)].add(c)

    uni: Dict[Tuple[str], float] = {(k,): float(v) for k, v in uni_raw.items()}
    vocab: Set[str] = {k[0] for k in uni.keys()}
    return tri, followers, uni, vocab


def _sample(
    scored: List[Tuple[str, float]], 
    temperature: float = 0.9, 
    top_k: int = 50, 
    top_p: float = 0.95, 
    rng: Optional[random.Random] = None
) -> str:
    """
    Realiza un muestreo de tokens usando temperatura, top-k y top-p.

    Args:
        scored (List[Tuple[str, float]]): Lista de pares (token, score).
        temperature (float, opcional): Factor de suavizado de probabilidades.
        top_k (int, opcional): Número máximo de tokens candidatos.
        top_p (float, opcional): Probabilidad acumulada mínima para aplicar nucleus sampling.
        rng (Optional[random.Random], opcional): Generador de números aleatorios reproducible.

    Returns:
        str: Token seleccionado.
    """
    if rng is None:
        rng = random

    scored = sorted(scored, key=lambda x: x[1], reverse=True)

    if top_k and top_k > 0:
        scored = scored[:min(top_k, len(scored))]

    total: float = sum(s for _, s in scored) or 1e-12
    probs: List[float] = [s / total for _, s in scored]

    cut: List[Tuple[str, float]] = []
    acc: float = 0.0
    for (t, s), p in zip(scored, probs):
        cut.append((t, s))
        acc += p
        if top_p is not None and acc >= top_p:
            break
    scored = cut

    logs: List[float] = [math.log(max(1e-12, s)) / max(1e-6, temperature) for _, s in scored]
    m: float = max(logs)
    exps: List[float] = [math.exp(x - m) for x in logs]
    z: float = sum(exps)
    probs = [e / z for e in exps]

    r: float = rng.random()
    acc = 0.0
    for (tok, _), p in zip(scored, probs):
        acc += p
        if r <= acc:
            return tok
    return scored[-1][0]


def build_trigram_only_generator(
    prefix: str = "20N", 
    group_code: str = "my_group", 
    base_dir: str = "cuarto-punto/models"
) -> Callable[..., Tuple[str, List[str]]]:
    """
    Construye un generador de oraciones basado únicamente en trigramas
    con retroceso a unigramas en caso de que el contexto no exista.

    Args:
        prefix (str, opcional): Prefijo del corpus (ejemplo: "20N" o "BAC").
        group_code (str, opcional): Código de grupo para diferenciar modelos.
        base_dir (str, opcional): Directorio base donde se almacenan los modelos.

    Returns:
        Callable[..., Tuple[str, List[str]]]: Función generadora que recibe
        la primera palabra y devuelve la oración generada y la lista de tokens.
    """
    tri, followers, uni, vocab = _load_trigram_unigram(prefix, group_code, base_dir)

    def gen(
        first_word: str, 
        max_len: int = 30, 
        temperature: float = 0.9, 
        top_k: int = 50, 
        top_p: float = 0.95, 
        seed: Optional[int] = None
    ) -> Tuple[str, List[str]]:
        """
        Genera una oración a partir de una palabra inicial.

        Args:
            first_word (str): Primera palabra de la oración.
            max_len (int, opcional): Longitud máxima de la oración.
            temperature (float, opcional): Factor de suavizado de probabilidades.
            top_k (int, opcional): Número máximo de candidatos para muestreo top-k.
            top_p (float, opcional): Probabilidad acumulada mínima para nucleus sampling.
            seed (Optional[int], opcional): Semilla para reproducibilidad.

        Returns:
            Tuple[str, List[str]]:
                - str: Oración generada sin etiquetas.
                - List[str]: Tokens generados incluyendo etiquetas <s> y </s>.
        """
        rng = random.Random(seed)
        fw: str = first_word.strip().lower()
        if fw not in vocab:
            fw = "<unk>" if "<unk>" in vocab else fw

        tokens: List[str] = ["<s>", fw]
        while len(tokens) < max_len:
            if tokens[-1] == "</s>":
                break
            prev2: str = tokens[-2] if len(tokens) >= 2 else "<s>"
            prev1: str = tokens[-1]

            cand: Optional[Set[str]] = followers.get((prev2, prev1), None)
            if cand:
                scored: List[Tuple[str, float]] = [(w, tri.get((prev2, prev1, w), 0.0)) for w in cand]
                nxt: str = _sample(scored, temperature, top_k, top_p, rng)
            else:
                scored = [(w, uni.get((w,), 0.0)) for w in vocab]
                nxt = _sample(scored, temperature, top_k, top_p, rng)

            tokens.append(nxt)
            if len(tokens) >= max_len - 1 and "</s>" in vocab:
                tokens.append("</s>")
                break

        surface: List[str] = [t for t in tokens if t not in ("<s>", "</s>")]
        return " ".join(surface), tokens

    return gen


if __name__ == "__main__":
    gen_tri_20n = build_trigram_only_generator(prefix="20N", group_code="my_group")
    gen_tri_bac = build_trigram_only_generator(prefix="BAC", group_code="my_group")

    for w in ["the", "this", "i", "if"]:
        s, _ = gen_tri_20n(w, max_len=25, temperature=0.9, top_k=60, top_p=0.95, seed=7)
        print(f"[20N|tri-only] '{w}': {s}")

    for w in ["i", "my", "today", "we"]:
        s, _ = gen_tri_bac(w, max_len=25, temperature=0.9, top_k=60, top_p=0.95, seed=7)
        print(f"[BAC|tri-only] '{w}': {s}")

[20N|tri-only] 'the': the fact that he was still too cold
[20N|tri-only] 'this': this is a different kind of scary when you have a NUM bit serial converters someone was kind to ones intuition can be
[20N|tri-only] 'i': i am not able to run
[20N|tri-only] 'if': if you have an NUM simulator on an absolute truth is that the majority rather than a pawn of the new reform party
[BAC|tri-only] 'i': i was a big deal to you by urllink quizilla urllink what kind of strange
[BAC|tri-only] 'my': my first day i was able to get some sleep
[BAC|tri-only] 'today': today i was supposed to be in the back of my life
[BAC|tri-only] 'we': we have to admit that im going to get a good thing
