## SETUP

In [1]:
# Setup: rutas, helpers de datos, dataset/splits/loaders, modelo y métricas (sin entrenar).
try:
    from google.colab import drive
    drive.mount("/content/drive")
except Exception:
    pass

# --- Imports y constantes ---
import os, json, math, random, ast
from dataclasses import dataclass, asdict
from typing import List, Tuple, Dict
import numpy as np, pandas as pd
import torch, torch.nn as nn, torch.nn.functional as F
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

SEED = 42
random.seed(SEED); np.random.seed(SEED)
torch.manual_seed(SEED); torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True; torch.backends.cudnn.benchmark = False
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# --- Rutas del proyecto ---
BASE_DIR = "/content/drive/MyDrive/Colab Notebooks/TFG"
DATA_DIR = os.path.join(BASE_DIR, "Archivos preprocesamiento")  # artefactos del notebook 1
MODELS_DIR = os.path.join(BASE_DIR, "models")                   # mismos nombres/ubicación que el original
os.makedirs(MODELS_DIR, exist_ok=True)

DATA_PARQUET = os.path.join(DATA_DIR, "all_features_transformer.parquet")
DATA_CSV     = os.path.join(DATA_DIR, "all_features_transformer.csv")
CH2IDX_PATH  = os.path.join(DATA_DIR, "chord_to_idx.json")
IDX2CH_PATH  = os.path.join(DATA_DIR, "idx_to_chord.json")

# --- Carga dataset + diccionarios + checks rápidos ---
MAX_LEN = 112  # canónico
def _coerce_to_list(x):
    if isinstance(x, list): return x
    if isinstance(x, np.ndarray): return x.tolist()
    if isinstance(x, str):
        try:
            v = ast.literal_eval(x)
            if isinstance(v, list): return v
        except Exception: pass
    return []

def load_dataset_and_dicts():
    if os.path.exists(DATA_PARQUET):
        df = pd.read_parquet(DATA_PARQUET)
    elif os.path.exists(DATA_CSV):
        df = pd.read_csv(DATA_CSV)
    else:
        raise FileNotFoundError("Faltan all_features_transformer.(parquet|csv) en DATA_DIR.")
    for col in ["encoded_chords","target_chords","attention_mask"]:
        if not isinstance(df[col].iloc[0], (list, np.ndarray)):
            df[col] = df[col].apply(_coerce_to_list)
    with open(CH2IDX_PATH, "r", encoding="utf-8") as f: chord_to_idx = json.load(f)
    with open(IDX2CH_PATH, "r", encoding="utf-8") as f: idx_to_chord = json.load(f)
    pad_idx = chord_to_idx.get("[PAD]", None)
    unk_idx = chord_to_idx.get("[UNK]", None)
    if pad_idx is None or unk_idx is None or pad_idx != 0:
        raise ValueError("Vocab inválido: requiere [PAD]=0 y [UNK].")
    # check longitudes y máscara binaria
    L = df["encoded_chords"].apply(len)
    M = df["attention_mask"].apply(len)
    if not (L.eq(MAX_LEN).all() and M.eq(MAX_LEN).all()):
        raise ValueError(f"Longitudes != {MAX_LEN}.")
    mask_vals = set(int(x) for row in df["attention_mask"] for x in row)
    if not mask_vals.issubset({0,1}):
        raise ValueError("attention_mask no binaria.")
    return df, chord_to_idx, idx_to_chord, pad_idx, unk_idx

df, chord_to_idx, idx_to_chord, pad_idx, unk_idx = load_dataset_and_dicts()

# --- Datasets, splits (80/10/10 estratificado por main_genre), dataloaders ---
class ChordsDataset(torch.utils.data.Dataset):
    def __init__(self, encoded, target, mask):
        self.encoded, self.target, self.mask = encoded, target, mask
    def __len__(self): return len(self.encoded)
    def __getitem__(self, idx):
        return (torch.tensor(self.encoded[idx], dtype=torch.long),
                torch.tensor(self.target[idx],  dtype=torch.long),
                torch.tensor(self.mask[idx],    dtype=torch.long))

def make_splits(df, seed=SEED, test_size=0.1, val_size=0.1):
    stratify_col = "main_genre" if "main_genre" in df.columns else None
    df_train, df_temp = train_test_split(df, test_size=(test_size+val_size), random_state=seed,
                                         stratify=df[stratify_col] if stratify_col else None)
    rel_val = val_size/(test_size+val_size)
    df_val, df_test = train_test_split(df_temp, test_size=(1-rel_val), random_state=seed,
                                       stratify=df_temp[stratify_col] if stratify_col else None)
    return df_train.reset_index(drop=True), df_val.reset_index(drop=True), df_test.reset_index(drop=True)

def to_dataset(dframe):
    return ChordsDataset(dframe["encoded_chords"].tolist(),
                         dframe["target_chords"].tolist(),
                         dframe["attention_mask"].tolist())

def collate_batch(batch):
    enc, tgt, msk = zip(*batch)
    return torch.stack(enc), torch.stack(tgt), torch.stack(msk)

df_train, df_val, df_test = make_splits(df, seed=SEED, test_size=0.1, val_size=0.1)
BATCH_SIZE = 256
train_loader = torch.utils.data.DataLoader(to_dataset(df_train), batch_size=BATCH_SIZE, shuffle=True,  collate_fn=collate_batch)
val_loader   = torch.utils.data.DataLoader(to_dataset(df_val),   batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_batch)
test_loader  = torch.utils.data.DataLoader(to_dataset(df_test),  batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_batch)

# --- Modelo Transformer decoder-only (igual al original) ---
@dataclass
class ModelConfig:
    vocab_size: int; pad_idx: int; unk_idx: int; max_len: int = MAX_LEN
    d_model: int = 256; n_layers: int = 4; n_heads: int = 8; d_ff: int = 1024; dropout: float = 0.1

class PositionalEmbedding(nn.Module):
    def __init__(self, max_len, d_model):
        super().__init__(); self.pos_emb = nn.Embedding(max_len, d_model)
    def forward(self, x):
        B,T = x.size(); pos = torch.arange(T, device=x.device).unsqueeze(0).expand(B, T)
        return self.pos_emb(pos)

class CausalTransformer(nn.Module):
    def __init__(self, cfg: ModelConfig):
        super().__init__()
        self.tok_emb = nn.Embedding(cfg.vocab_size, cfg.d_model, padding_idx=cfg.pad_idx)
        self.pos_emb = PositionalEmbedding(cfg.max_len, cfg.d_model)
        enc = nn.TransformerEncoderLayer(d_model=cfg.d_model, nhead=cfg.n_heads,
                                         dim_feedforward=cfg.d_ff, dropout=cfg.dropout,
                                         activation="gelu", batch_first=True, norm_first=True)
        self.trf = nn.TransformerEncoder(enc, num_layers=cfg.n_layers)
        self.drop = nn.Dropout(cfg.dropout)
        self.lm_head = nn.Linear(cfg.d_model, cfg.vocab_size)
        self.apply(self._init_w)
    def _init_w(self, m):
        if isinstance(m, (nn.Linear, nn.Embedding)):
            nn.init.normal_(m.weight, 0.0, 0.02)
        if isinstance(m, nn.Linear) and m.bias is not None:
            nn.init.zeros_(m.bias)
    def _causal_mask(self, T, device):
        return torch.triu(torch.ones(T, T, device=device, dtype=torch.bool), diagonal=1)
    def forward(self, x, attention_mask=None):
        B,T = x.shape
        h = self.drop(self.tok_emb(x) + self.pos_emb(x))
        causal = self._causal_mask(T, x.device)
        key_pad = (attention_mask == 0) if attention_mask is not None else None
        h = self.trf(h, mask=causal, src_key_padding_mask=key_pad)
        return self.lm_head(h)

# --- Métricas y utilidades de entrenamiento ---
def topk_accuracy(logits, target, k=1, mask=None):
    with torch.no_grad():
        preds = logits.argmax(-1) if k==1 else logits.topk(k, dim=-1).indices
        if mask is None: mask = torch.ones_like(target, dtype=torch.long)
        valid = (mask==1) & (target!=pad_idx)
        if k==1:
            correct = ((preds==target) & valid).sum().item()
        else:
            tgt_exp = target.unsqueeze(-1).expand_as(preds)
            correct = ((preds==tgt_exp) & valid.unsqueeze(-1)).any(dim=-1).sum().item()
        total = valid.sum().item()
        return correct / max(1, total)


Mounted at /content/drive


## ENTRENAMIENTO

In [None]:
# Entrenamiento: misma arquitectura/hiperparámetros, scheduler lineal con warmup y grad clip; guarda checkpoints+config.
cfg = ModelConfig(vocab_size=len(chord_to_idx), pad_idx=pad_idx, unk_idx=unk_idx, max_len=MAX_LEN)
model = CausalTransformer(cfg).to(device)

LR = 2e-4
EPOCHS = 15
WARMUP_PCT = 0.05
optimizer = torch.optim.AdamW(model.parameters(), lr=LR, betas=(0.9,0.999), weight_decay=0.01)

def build_scheduler(optimizer, num_steps, warmup_steps):
    def lr_lambda(step):
        if step < warmup_steps:
            return float(step) / float(max(1, warmup_steps))
        return max(0.0, float(num_steps - step) / float(max(1, num_steps - warmup_steps)))
    return torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)

num_train_steps = EPOCHS * math.ceil(len(df_train)/BATCH_SIZE)
scheduler = build_scheduler(optimizer, num_steps=num_train_steps, warmup_steps=int(num_train_steps*WARMUP_PCT))
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)

history = {"train_loss":[], "val_loss":[], "val_top1":[], "val_top5":[]}
best_val, best_path = float("inf"), os.path.join(MODELS_DIR, "checkpoint_best.pt")

def run_epoch(loader, train: bool):
    model.train(train)
    total_loss, total_items, total_t1, total_t5 = 0.0, 0, 0.0, 0.0
    for enc, tgt, msk in loader:
        enc, tgt, msk = enc.to(device), tgt.to(device), msk.to(device)
        logits = model(enc, attention_mask=msk)
        loss = criterion(logits.reshape(-1, cfg.vocab_size), tgt.reshape(-1))
        B = enc.size(0)
        if train:
            optimizer.zero_grad(set_to_none=True)
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step(); scheduler.step()
        total_loss += loss.item()*B; total_items += B
        total_t1   += topk_accuracy(logits, tgt, k=1, mask=msk)*B
        total_t5   += topk_accuracy(logits, tgt, k=5, mask=msk)*B
    avg_loss = total_loss / max(1,total_items)
    avg_t1   = total_t1   / max(1,total_items)
    avg_t5   = total_t5   / max(1,total_items)
    return avg_loss, avg_t1, avg_t5

for epoch in range(1, EPOCHS+1):
    tr_loss, _, _   = run_epoch(train_loader, train=True)
    va_loss, v1, v5 = run_epoch(val_loader,   train=False)
    history["train_loss"].append(tr_loss); history["val_loss"].append(va_loss)
    history["val_top1"].append(v1);        history["val_top5"].append(v5)
    # checkpointing
    torch.save({"model_state": model.state_dict(), "epoch": epoch}, os.path.join(MODELS_DIR, "checkpoint_last.pt"))
    if va_loss < best_val:
        best_val = va_loss
        torch.save({"model_state": model.state_dict(), "epoch": epoch}, best_path)

# Guardar configuración (para reconstrucción en evaluación/inferencia)
with open(os.path.join(MODELS_DIR, "config.json"), "w", encoding="utf-8") as f:
    json.dump({**asdict(cfg), "seed": SEED, "lr": LR, "epochs": EPOCHS, "warmup_pct": WARMUP_PCT}, f)

# Evaluación final rápida (test)
def evaluate(loader):
    model.eval()
    total_loss, total_items, total_t1, total_t5 = 0.0, 0, 0.0, 0.0
    with torch.no_grad():
        for enc, tgt, msk in loader:
            enc, tgt, msk = enc.to(device), tgt.to(device), msk.to(device)
            logits = model(enc, attention_mask=msk)
            loss = criterion(logits.reshape(-1, cfg.vocab_size), tgt.reshape(-1))
            B = enc.size(0)
            total_loss += loss.item()*B; total_items += B
            total_t1 += topk_accuracy(logits, tgt, k=1, mask=msk)*B
            total_t5 += topk_accuracy(logits, tgt, k=5, mask=msk)*B
    avg_loss = total_loss/max(1,total_items); ppl = math.exp(avg_loss)
    return avg_loss, ppl, total_t1/max(1,total_items), total_t5/max(1,total_items)

test_loss, test_ppl, test_top1, test_top5 = evaluate(test_loader)
print(f"[TEST] loss={test_loss:.4f} | ppl={test_ppl:.2f} | top1={test_top1:.3f} | top5={test_top5:.3f}")


## CARGA + EVALUACIÓN

In [2]:
# Celda 3 — Carga + Evaluación (solo TEST; progreso en CPU; misma lógica que el original)

# 1) Cargar dataset y diccionarios
df, chord_to_idx, idx_to_chord, pad_idx, unk_idx = load_dataset_and_dicts()

# 2) Reconstruir splits (mismo SEED) + diagnóstico
df_train, df_val, df_test = make_splits(df, seed=SEED, test_size=0.1, val_size=0.1)
print(f"device={device.type} | val_size={len(df_val)} | test_size={len(df_test)}")

# 3) DataLoaders (como el original: num_workers=0; batch menor en CPU)
from torch.utils.data import DataLoader
pin = (device.type == "cuda")
num_workers = 0
BATCH_EVAL = 64 if device.type == "cpu" else 256
val_loader  = DataLoader(to_dataset(df_val),  batch_size=BATCH_EVAL, shuffle=False,
                         collate_fn=collate_batch, pin_memory=pin, num_workers=num_workers, persistent_workers=False)
test_loader = DataLoader(to_dataset(df_test), batch_size=BATCH_EVAL, shuffle=False,
                         collate_fn=collate_batch, pin_memory=pin, num_workers=num_workers, persistent_workers=False)
print(f"BATCH_EVAL={BATCH_EVAL} | pin_memory={pin} | num_workers={num_workers}")

# 4) Reconstruir modelo desde config.json + cargar checkpoint_best.pt
import math, time
import torch.nn as nn
import torch.nn.functional as F
with open(os.path.join(MODELS_DIR, "config.json"), "r", encoding="utf-8") as f:
    cfg_json = json.load(f)
cfg = ModelConfig(
    vocab_size=len(chord_to_idx), pad_idx=pad_idx, unk_idx=unk_idx,
    max_len=int(cfg_json.get("max_len", MAX_LEN)),
    d_model=int(cfg_json.get("d_model", 256)),
    n_layers=int(cfg_json.get("n_layers", 4)),
    n_heads=int(cfg_json.get("n_heads", 8)),
    d_ff=int(cfg_json.get("d_ff", 1024)),
    dropout=float(cfg_json.get("dropout", 0.1))
)
model = CausalTransformer(cfg).to(device)
ckpt = torch.load(os.path.join(MODELS_DIR, "checkpoint_best.pt"), map_location=device)
model.load_state_dict(ckpt["model_state"])
model.eval()
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)

# 5) Evaluación (solo TEST; progreso cada 20 batches; autocast API nueva)
if device.type == "cuda":
    try: torch.set_float32_matmul_precision("high")
    except Exception: pass
    torch.backends.cudnn.deterministic = False
    torch.backends.cudnn.benchmark = True

def evaluate(loader, name="EVAL"):
    model.eval()
    total_loss, total_items = 0.0, 0
    total_t1, total_t5 = 0.0, 0.0
    use_amp = (device.type == "cuda")
    total_batches = len(loader)
    start = time.time()
    with torch.no_grad():
        for b, (enc, tgt, msk) in enumerate(loader, 1):
            enc = enc.to(device, non_blocking=True)
            tgt = tgt.to(device, non_blocking=True)
            msk = msk.to(device, non_blocking=True)
            with torch.amp.autocast("cuda", enabled=use_amp):
                logits = model(enc, attention_mask=msk)
                loss = criterion(logits.reshape(-1, cfg.vocab_size), tgt.reshape(-1))
            B = enc.size(0)
            total_loss += loss.item()*B; total_items += B
            total_t1 += topk_accuracy(logits, tgt, k=1, mask=msk)*B
            total_t5 += topk_accuracy(logits, tgt, k=5, mask=msk)*B
            if b % 20 == 0 or b == total_batches:
                print(f"[{name}] {b}/{total_batches} batches", end="\r")
    avg_loss = total_loss / max(1, total_items)
    ppl = math.exp(avg_loss)
    dur = time.time() - start
    print(f"\n[{name}] done in {dur:.1f}s")
    return avg_loss, ppl, total_t1/max(1,total_items), total_t5/max(1,total_items)

# 6) Ejecutar evaluación (solo TEST, como el original)
test_loss, test_ppl, test_top1, test_top5 = evaluate(test_loader, name="TEST")
print(f"[TEST] loss={test_loss:.4f} | ppl={test_ppl:.2f} | top1={test_top1:.3f} | top5={test_top5:.3f}")

# 7) Ejemplos cualitativos (top-k en validación; muestra pequeña)
def idxs_to_chords(idxs):
    return [idx_to_chord.get(str(int(i)), f"<{int(i)}>") for i in idxs]

def qualitative_examples(df_split, n=6, k=5):
    model.eval()
    print("\n[Qualitative] Predicción del siguiente acorde (top-k)")
    sample_idx = np.random.choice(len(df_split), size=min(n, len(df_split)), replace=False)
    for i in sample_idx:
        enc = torch.tensor(df_split["encoded_chords"].iloc[i], dtype=torch.long).unsqueeze(0).to(device, non_blocking=True)
        msk = torch.tensor(df_split["attention_mask"].iloc[i], dtype=torch.long).unsqueeze(0).to(device, non_blocking=True)
        with torch.inference_mode():
            logits = model(enc, attention_mask=msk)
            valid_len = int(msk.sum().item()); pos = max(1, valid_len) - 1
            next_logits = logits[0, pos, :]
            probs = F.softmax(next_logits, dim=-1)
            top = torch.topk(probs, k)
            pred_idxs = top.indices.tolist()
            pred_probs = [float(x) for x in top.values.tolist()]
        context = enc[0, max(0, valid_len-8):valid_len].tolist()
        print(f"- idx={i} | contexto últimos 8: {idxs_to_chords(context)}")
        print(f"  top-{k}: {[(idxs_to_chords([p])[0], pr) for p, pr in zip(pred_idxs, pred_probs)]}")

qualitative_examples(df_val, n=6, k=5)


device=cuda | val_size=30156 | test_size=30157
BATCH_EVAL=256 | pin_memory=True | num_workers=0




[TEST] 118/118 batches
[TEST] done in 8.2s
[TEST] loss=0.8000 | ppl=2.23 | top1=0.764 | top5=0.960

[Qualitative] Predicción del siguiente acorde (top-k)
- idx=22830 | contexto últimos 8: ['B', 'A#m', 'D#m', 'C#', 'B', 'A#m', 'D#m', 'C#']
  top-5: [('B', 0.8965573310852051), ('F#', 0.06570390611886978), ('D#m', 0.026767481118440628), ('G#m', 0.004140423145145178), ('A#m', 0.0032838312909007072)]
- idx=8796 | contexto últimos 8: ['A', 'A', 'A', 'A', 'A', 'A', 'A', 'A']
  top-5: [('A', 0.8251206278800964), ('F#m', 0.1399698406457901), ('D', 0.007280388846993446), ('E7', 0.006934138480573893), ('A7', 0.004347603768110275)]
- idx=25843 | contexto últimos 8: ['B7', 'E', 'Bm', 'Am', 'Bm', 'Am', 'B7', 'E']
  top-5: [('B7', 0.5017744898796082), ('A', 0.17257916927337646), ('B', 0.11076533794403076), ('F#m', 0.09145978093147278), ('G#7', 0.024557190015912056)]
- idx=28339 | contexto últimos 8: ['E', 'A', 'D', 'A', 'E', 'A', 'E', 'A']
  top-5: [('D', 0.4962645471096039), ('E', 0.4248898625373840