[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/antoniotrapote/chord-prediction-tfm/blob/main/anexos/notebooks/03_modelado/01_modelos_kn_hmm.ipynb)
[![View on GitHub](https://img.shields.io/badge/View_on-GitHub-black?logo=github)](https://github.com/antoniotrapote/chord-prediction-tfm/blob/main/anexos/notebooks/03_modelado/01_modelos_kn_hmm.ipynb)

# Modelos tradicionales: Kneser–Ney y HMM (T/S/D)
Este notebook implementa dos baselines sobre **progresiones funcionales** a partir de `songdb_funcional_v4.csv`:

- **Modelo n-grama con Kneser–Ney (trigrama)** sobre tokens funcionales (e.g., `I`, `ii`, `V7`, `bVII7`), con `<unk>` para raros y límites `<s>`, `</s>`.
- **HMM de funciones (T/S/D)** con estados ocultos **Tónica (T)**, **Subdominante (S)** y **Dominante (D)** derivados heurísticamente de la etiqueta funcional; emisiones son **tokens funcionales**.

Se evalúa **predicción del siguiente acorde** con **Top-k** y **MRR**, con partición **train/val/test por canción**.


In [None]:
import pandas as pd
import numpy as np
import re
from collections import Counter, defaultdict
from pathlib import Path
from typing import List, Tuple, Dict

## 1) Cargamos el dataset
Leemos el CSV y preparamos las secuencias de tokens por canción. Usamos `title` + `composedby` como ID de canción.

In [None]:
# Configuración para acceder al dataset `songdb_funcional_v4.csv` en GitHub
USER = "antoniotrapote"
REPO = "chord-prediction-tfm"
BRANCH = "main"
PATH_IN_REPO = "anexos/data/songdb_funcional_v4.csv"
URL = f"https://raw.githubusercontent.com/{USER}/{REPO}/{BRANCH}/{PATH_IN_REPO}"

In [3]:
# Importamos el dataset de canciones con las progresiones funcionales
df = pd.read_csv(URL)

def tokenize_progression(prog: str):
    """
    Tokeniza una progresión en acordes separados
    """
    if pd.isna(prog):
        return []
    return [t for t in str(prog).strip().split() if t]

def build_sequences_by_song(df: pd.DataFrame):
    """
    Construye secuencias de acordes por canción a partir del DataFrame.
    """
    if "title" in df.columns and "composedby" in df.columns:
        song_ids = (df["title"].astype(str) + " — " + df["composedby"].astype(str)).tolist()
    elif "title" in df.columns:
        song_ids = df["title"].astype(str).tolist()
    else:
        song_ids = [f"song_{i}" for i in range(len(df))]
    seqs = {}
    for sid, prog in zip(song_ids, df["funcional_prog"].tolist()):
        seqs[sid] = tokenize_progression(prog)
    return seqs

seqs = build_sequences_by_song(df)
seqs = {k:v for k,v in seqs.items() if len(v) >= 3}
len(seqs), list(seqs)[:3]  # tamaño y primeras claves


(2591,
 ['Lullaby of Birdland — George Shearing',
  "It's A Most Unusual Day — Jimmy McHugh and HYarold Adamson",
  'Jump Monk — Charles Mingus'])

## 2) Partición train/val/test por canción
Usamos 80/10/10 con barajado determinista.

In [4]:

rng = np.random.default_rng(42)
song_list = list(seqs.keys())
rng.shuffle(song_list)

n = len(song_list)
n_train = int(n * 0.8)
n_val   = int(n * 0.1)
train_ids = song_list[:n_train]
val_ids   = song_list[n_train:n_train+n_val]
test_ids  = song_list[n_train+n_val:]

train_seqs = [seqs[sid] for sid in train_ids]
val_seqs   = [seqs[sid] for sid in val_ids]
test_seqs  = [seqs[sid] for sid in test_ids]

len(train_seqs), len(val_seqs), len(test_seqs)


(2072, 259, 260)

## 3) N-grama Kneser–Ney (trigrama)
Implementamos Kneser–Ney interpolado con descuento absoluto `D=0.75`. Mapeamos a `<unk>` tokens con frecuencia ≤ 1 en train.

In [5]:

class KNTrigramLM:
    def __init__(self, discount: float = 0.75, unk_threshold: int = 1):
        self.D = discount
        self.unk_threshold = unk_threshold
        self.vocab = set()
        self.uni = Counter(); self.bi = Counter(); self.tri = Counter()
        self.continuation_counts = Counter()
        self.bigram_continuation_counts = Counter()
        self.context_totals = Counter()
        self.trigram_context_totals = Counter()
        self.fitted = False

    @staticmethod
    def add_bounds(seq):
        return ["<s>", "<s>"] + seq + ["</s>"]

    def fit(self, sequences):
        token_counts = Counter(t for seq in sequences for t in seq)
        vocab = set([t for t,c in token_counts.items() if c > self.unk_threshold])
        vocab.update({"<s>", "</s>", "<unk>"})
        self.vocab = vocab

        def map_unk(seq):
            return [t if t in vocab else "<unk>" for t in seq]

        uni, bi, tri = Counter(), Counter(), Counter()
        left_contexts_for_w = defaultdict(set)
        left_contexts_for_bigram = defaultdict(set)

        for seq in sequences:
            seq2 = self.add_bounds(map_unk(seq))
            for i in range(len(seq2)):
                w = seq2[i]; uni[w] += 1
                if i >= 1:
                    w1 = seq2[i-1]; bi[(w1, w)] += 1; left_contexts_for_w[w].add(w1)
                if i >= 2:
                    w2, w1 = seq2[i-2], seq2[i-1]
                    tri[(w2, w1, w)] += 1; left_contexts_for_bigram[(w1, w)].add(w2)

        self.uni, self.bi, self.tri = uni, bi, tri
        for (w1, w), c in bi.items():
            self.context_totals[w1] += c
        for (w2, w1, w), c in tri.items():
            self.trigram_context_totals[(w2, w1)] += c

        self.continuation_counts = Counter({w: len(ctxs) for w, ctxs in left_contexts_for_w.items()})
        self.bigram_continuation_counts = Counter({(w1, w): len(ctxs) for (w1, w), ctxs in left_contexts_for_bigram.items()})
        self.total_unique_bigrams = sum(1 for _ in self.bi.keys())
        self.fitted = True

    def prob_unigram(self, w):
        cc = self.continuation_counts.get(w, 0)
        if self.total_unique_bigrams == 0:
            return 1.0 / max(1, len(self.vocab))
        return cc / self.total_unique_bigrams

    def prob_bigram(self, w_prev, w):
        c_wprev = self.context_totals.get(w_prev, 0)
        c_wprev_w = self.bi.get((w_prev, w), 0)
        if c_wprev > 0:
            lambda_wprev = (self.D * len([u for u in self.vocab if self.bi.get((w_prev, u), 0) > 0])) / c_wprev
        else:
            lambda_wprev = 1.0
        p_cont = self.prob_unigram(w)
        base = max(c_wprev_w - self.D, 0) / c_wprev if c_wprev > 0 else 0.0
        return base + lambda_wprev * p_cont

    def prob_trigram(self, w_prev2, w_prev, w):
        c_ctx = self.trigram_context_totals.get((w_prev2, w_prev), 0)
        c_trigram = self.tri.get((w_prev2, w_prev, w), 0)
        if c_ctx > 0:
            num_continuations = len([u for u in self.vocab if self.tri.get((w_prev2, w_prev, u), 0) > 0])
            lambda_ctx = (self.D * num_continuations) / c_ctx
        else:
            lambda_ctx = 1.0
        base = max(c_trigram - self.D, 0) / c_ctx if c_ctx > 0 else 0.0
        return base + lambda_ctx * self.prob_bigram(w_prev, w)

    def predict_ranking(self, history):
        hist = ["<s>", "<s>"] + [t if t in self.vocab else "<unk>" for t in history]
        w_prev2, w_prev = hist[-2], hist[-1]
        candidates = [w for w in self.vocab if w not in {"<s>"}]
        scores = []
        for w in candidates:
            p = self.prob_trigram(w_prev2, w_prev, w)
            scores.append((w, p))
        scores.sort(key=lambda x: x[1], reverse=True)
        return scores

kn = KNTrigramLM(discount=0.75, unk_threshold=1)
kn.fit(train_seqs)
len(kn.vocab)


85

## 4) HMM (Hidden Markov model) utilizando funciones (T/S/D)
Derivamos el estado oculto de cada token (`T`, `S`, `D`) a partir del **grado** del romano (e.g., `I, III, VI → T`; `ii, IV → S`; `V, vii → D`). Tratamos `♭II, ♭VI, ♭VII` como **S** y `♭III` como **T**. Estimamos **transiciones** y **emisiones** con suavizado add-k.

In [6]:

ROMAN_MAP = {"I":1,"II":2,"III":3,"IV":4,"V":5,"VI":6,"VII":7}

import re
def extract_roman_base(token: str):
    t = token.split('/')[0]
    m = re.match(r'^[b♭#♯]*([ivIV]+)', t)
    is_flat = bool(re.match(r'^[b♭]', t))
    if not m: return ("", is_flat)
    return (m.group(1).upper(), is_flat)

def degree_from_roman(r: str) -> int:
    return ROMAN_MAP.get(r, 0)

def function_from_token(token: str) -> str:
    base, is_flat = extract_roman_base(token)
    deg = degree_from_roman(base)
    if is_flat and deg in {2,6,7}:  # bII, bVI, bVII
        return "S"
    if is_flat and deg == 3:        # bIII
        return "T"
    if deg in {1,3,6}:
        return "T"
    if deg in {2,4}:
        return "S"
    if deg in {5,7}:
        return "D"
    return "T"

STATES = ["T","S","D"]

from collections import Counter, defaultdict
class SupervisedHMM_TSD:
    def __init__(self, add_k: float = 1.0):
        self.add_k = add_k
        self.states = STATES
        self.A = None; self.B = None; self.pi = None
        self.vocab = set()

    def fit(self, train_sequences):
        tok_counts = Counter(t for seq in train_sequences for t in seq)
        self.vocab = set(tok_counts.keys())
        trans = {s: Counter() for s in self.states}
        emit  = {s: Counter() for s in self.states}
        pi_counts = Counter()

        for seq in train_sequences:
            if not seq: continue
            states_seq = [function_from_token(t) for t in seq]
            pi_counts[states_seq[0]] += 1
            for t, s in zip(seq, states_seq):
                emit[s][t] += 1
            for s1, s2 in zip(states_seq[:-1], states_seq[1:]):
                trans[s1][s2] += 1

        total_pi = sum(pi_counts.values()) + self.add_k * len(self.states)
        self.pi = {s: (pi_counts[s] + self.add_k) / total_pi for s in self.states}

        self.A = {}
        for s in self.states:
            total = sum(trans[s].values()) + self.add_k * len(self.states)
            self.A[s] = {s2: (trans[s][s2] + self.add_k) / total for s2 in self.states}

        self.B = {}
        V = len(self.vocab)
        for s in self.states:
            total = sum(emit[s].values()) + self.add_k * (V + 1)
            self.B[s] = defaultdict(float)
            for w in self.vocab:
                self.B[s][w] = (emit[s][w] + self.add_k) / total
            self.B[s]["<unk>"] = self.add_k / total

    def predict_ranking(self, history):
        if len(history) == 0:
            state_probs = self.pi
        else:
            s_last = function_from_token(history[-1])
            state_probs = self.A[s_last]

        scores = {}
        for w in list(self.vocab) + ["<unk>"]:
            p = 0.0
            for s_next in self.states:
                p += state_probs[s_next] * self.B[s_next].get(w, self.B[s_next]["<unk>"])
            scores[w] = p
        scores.pop("<unk>", None)
        items = list(scores.items())
        items.sort(key=lambda x: x[1], reverse=True)
        return items

hmm = SupervisedHMM_TSD(add_k=0.5)
hmm.fit(train_seqs)
STATES, list(hmm.A.items())[:1]


(['T', 'S', 'D'],
 [('T',
   {'T': 0.44894101307819917,
    'S': 0.3055823463600941,
    'D': 0.24547664056170673})])

## 5) Evaluación (Top-k, MRR)
Medimos la calidad de **predicción del siguiente token** en test. Reportamos **Top@1/3/5** y **MRR**.

In [7]:

def evaluate_next_token_ranking(model, test_sequences, topk_list=(1,3,5)):
    total_positions = 0
    topk_hits = {k: 0 for k in topk_list}
    mrr_sum = 0.0
    for seq in test_sequences:
        for i in range(len(seq)-1):
            history = seq[:i+1]
            gold = seq[i+1]
            ranking = model.predict_ranking(history)
            ranks = {w: r+1 for r, (w, _) in enumerate(ranking)}
            rank_gold = ranks.get(gold, None)
            total_positions += 1
            if rank_gold is not None:
                for k in topk_list:
                    if rank_gold <= k:
                        topk_hits[k] += 1
                mrr_sum += 1.0 / rank_gold
    results = {
        "positions": total_positions,
        "MRR": mrr_sum / total_positions if total_positions > 0 else 0.0,
    }
    for k in topk_list:
        results[f"Top@{k}"] = topk_hits[k] / total_positions if total_positions > 0 else 0.0
    return results

class KNWrapperForEval:
    def __init__(self, kn_model):
        self.kn = kn_model
    def predict_ranking(self, history):
        hist = [t if t in self.kn.vocab else "<unk>" for t in history]
        return self.kn.predict_ranking(hist)

kn_res = evaluate_next_token_ranking(KNWrapperForEval(kn), test_seqs, topk_list=(1,3,5))
hmm_res = evaluate_next_token_ranking(hmm, test_seqs, topk_list=(1,3,5))
pd.DataFrame([{"Model":"Kneser–Ney (trigrama)", **kn_res},
              {"Model":"HMM funciones (T/S/D)", **hmm_res}])


Unnamed: 0,Model,positions,MRR,Top@1,Top@3,Top@5
0,Kneser–Ney (trigrama),12779,0.56052,0.415212,0.651616,0.737538
1,HMM funciones (T/S/D),12779,0.34576,0.191564,0.41889,0.512481


Kneser–Ney (trigrama) supera claramente al HMM en todas las métricas.

Interpretación de columnas:
- positions: 12 779 puntos de predicción evaluados (mismas posiciones para ambos).
- Top@k: proporción de veces que el acorde real aparece entre las k mejores sugerencias.
  - Top@1: acierto exacto. 41.52% vs 19.15%.
  - Top@3: el LM cubre 65.16% frente a 41.89%.
  - Top@5: 73.75% vs 51.24%.
- MRR: media de 1/rango (sensible a colocar el gold alto). 0.5605 vs 0.3457 confirma mejor ranking global del trigram KN.

Conclusión: El modelo KN capta dependencias locales específicas entre acordes; el HMM (solo 3 estados T/S/D + emisiones) pierde granularidad y distribuye probabilidad más difusa, bajando precisión. 

## 6) Roadmap (siguientes mejoras)
- Añadir **KN 4-grama** y optimización de `D` vía validación.
- Para HMM: probar **EM (Baum–Welch)** (peores resultados, vía descartada).
