<a href="https://colab.research.google.com/github/felipednegredo/tcc-emocoes-musicais-codigo/blob/main/TCC_Fingerprint_Shazam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install -q -U kaleido pyarrow fastparquet

In [None]:
# ===============================================================
#  FINGERPRINT por música (blocks_by_song/*.parquet)
#  - STFT (sem re-log se já estiver em dB) + contraste por percentis
#  - Picos: limiar adaptativo + cotas por banda (+ equalização opcional)
#  - Saída: pasta por música + 2 imagens por bloco:
#      (1) STFT | Constellation (dual)
#      (2) STFT + Constellation (overlay)
#  - Exporta PARQUET com todos os picos da música
# ===============================================================

import pandas as pd, numpy as np, plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.io as pio
import librosa, io, zlib, math, os, pathlib, re, warnings
warnings.filterwarnings("ignore")

try:
    from scipy.ndimage import median_filter
    SCIPY_OK = True
except Exception:
    SCIPY_OK = False

# ---------- CONFIG ----------
IN_PARQUET     = "/content/drive/MyDrive/DataSet TCC/DEAM/parquet/blocks_by_song/song_2.parquet"
OUT_ROOT       = "/content/drive/MyDrive/DataSet TCC/DEAM/fingerprint"
SAVE_PNG_WANT  = True                  # PNG se kaleido ok; HTML sempre
IMG_W, IMG_H   = 1280, 640             # maior para evitar overlaps

# Visual
YLIM_HZ        = (50, 8000)
USE_LOG_Y      = False
PCT_LOW_HIGH   = (1, 99)               # contraste por percentis
SPEC_CMAP      = "Inferno"             # mais próximo do exemplo de referência

# Picos
BANDS          = [(80,300), (300,1200), (1200,4000), (4000,8000)]
TOPK_PER_BAND  = 1
USE_ADAPT_THR  = True
ADAPT_DROP_DB  = 6.0
DB_MIN_REL     = -30.0
EQUALIZE_FOR_PEAKS = True
EQ_SIZE_FREQ   = 15
ZLIM_FIXED = (-75, 0)              # contraste visual fixo

PK_MARKER = dict(symbol="x", size=7, opacity=0.8,
                 color="#000000", line=dict(width=1.2, color="#111111"))

TITLE_PREFIX = "Fingerprint espectral — "

# ---------- PLOTLY/KALEIDO robusto ----------
def setup_kaleido_defaults():
    try:
        import kaleido  # noqa: F401
        if getattr(pio, "kaleido", None) is None:
            try:
                _ = go.Figure().to_image(format="png")
            except Exception:
                pass
        if getattr(pio, "kaleido", None) and getattr(pio.kaleido, "scope", None):
            pio.kaleido.scope.default_format = "png"
            pio.kaleido.scope.default_scale  = 2
            pio.kaleido.scope.default_width  = IMG_W
            pio.kaleido.scope.default_height = IMG_H
            return True
    except Exception:
        return False
    return False

HAS_KALEIDO = setup_kaleido_defaults()
SAVE_PNG    = bool(HAS_KALEIDO and SAVE_PNG_WANT)
if not HAS_KALEIDO and SAVE_PNG_WANT:
    print("[WARN] kaleido indisponível; PNG desativado (HTML será salvo).")

# ---------- HELPERS ----------
def ensure_dir(path): pathlib.Path(path).mkdir(parents=True, exist_ok=True)

def slugify(s):
    s = (s or "desconhecida").strip().lower()
    s = re.sub(r"[^\w\s-]+", "", s, flags=re.UNICODE)
    s = re.sub(r"\s+", "_", s)
    return s[:60] or "sem_titulo"

def bytes_to_ndarray(b):
    if b is None: return np.array([])
    if isinstance(b, (bytes, bytearray)):
        try:
            raw = zlib.decompress(b)
            return np.load(io.BytesIO(raw), allow_pickle=False)
        except Exception:
            return np.load(io.BytesIO(b), allow_pickle=False)
    if isinstance(b, np.ndarray): return b
    return np.array([])

def detect_scale_and_fix(
    S: np.ndarray,
    freqs: np.ndarray,
    times: np.ndarray,
    zlim_fixed: tuple | None = None,   # ex.: (-75, 0) para visual consistente
    pct_low_high: tuple = (1, 99)      # usado se zlim_fixed=None
):
    """
    - Garante orientação (freq x frames) quando possível pelos vetores freqs/times
    - Detecta se S já está em dB (evita re-log)
    - Conserta NaN/Inf e valores bizarros
    - Retorna S_db, freqs, times, zlim (dB)
    """
    if S is None or S.size == 0:
        raise ValueError("detect_scale_and_fix: matriz S vazia.")

    S = np.array(S, copy=False)

    # ---------- 1) ORIENTAÇÃO ----------
    # heurística: se bater com (frames=f_times, freqs=f_freqs), transpõe
    try:
        if freqs.size and times.size:
            if S.shape[0] == times.size and S.shape[1] == freqs.size:
                S = S.T  # agora (freq, frames)
        # fallback suave: se for muito “alta x estreita”, assume (freq, frames)
        # (não transpõe se já couber bem em freqs/times)
    except Exception:
        pass

    # ---------- 2) SANITIZAÇÃO ----------
    # troca Inf por NaN e depois preenche com mínimo finito
    S = np.where(np.isfinite(S), S, np.nan)
    if np.isnan(S).all():
        raise ValueError("detect_scale_and_fix: S só contém NaN/Inf.")
    finite_min = float(np.nanmin(S))
    S = np.nan_to_num(S, nan=finite_min)

    # ---------- 3) DETECÇÃO DE ESCALA ----------
    # heurísticas para "já em dB":
    # - máximos pequenos (<= +5 dB) e mínimo bem negativo (< -10 dB)
    # - grande fração de valores negativos
    S_min = float(np.min(S))
    S_max = float(np.max(S))
    frac_neg = float((S < 0).mean()) if S.size else 0.0

    is_db_like = ((S_max <= 5.0 and S_min < -10.0) or (frac_neg > 0.6))

    # se complexo, tira magnitude
    if np.iscomplexobj(S):
        S_abs = np.abs(S)
        is_db_like = False  # STFT complexa → não está em dB
    else:
        S_abs = np.abs(S)

    # ---------- 4) CONVERSÃO p/ dB ----------
    if is_db_like:
        S_db = S.astype(float, copy=False)
    else:
        # decide ref automaticamente (mais estável para diferentes músicas)
        # usa amplitude_to_db, que dá ~[-80, 0] após normalização pelo máximo
        ref = np.max(S_abs) if np.max(S_abs) > 0 else 1.0
        S_db = librosa.amplitude_to_db(S_abs, ref=ref)

    # ---------- 5) FREQS/TIMES CONSISTENTES ----------
    # se não vieram do parquet, cria e garante não-negatividade em freqs
    if freqs is None or freqs.size == 0:
        freqs = np.linspace(0.0, 22050.0, S_db.shape[0], dtype=float)
    else:
        freqs = np.maximum(np.asarray(freqs, dtype=float), 0.0)

    if times is None or times.size == 0:
        times = np.arange(S_db.shape[1], dtype=float)
    else:
        times = np.asarray(times, dtype=float)

    # ---------- 6) Z-LIMITS (dB) ----------
    if zlim_fixed is not None and len(zlim_fixed) == 2:
        zmin, zmax = float(zlim_fixed[0]), float(zlim_fixed[1])
    else:
        low, high = pct_low_high
        p1, p99 = np.nanpercentile(S_db, [low, high])
        zmin = max(-120.0, float(p1))
        zmax = min(0.0,    float(p99))
        # segurança: garante janela mínima de 20 dB
        if zmax - zmin < 20.0:
            zmin = min(zmin, zmax - 20.0)

    return S_db, freqs, times, (zmin, zmax)


def equalize_for_peaks(S_db):
    if not SCIPY_OK or EQ_SIZE_FREQ <= 1: return S_db
    return S_db - median_filter(S_db, size=(EQ_SIZE_FREQ, 1))

def estimate_peaks_per_band(S_db, freqs, times):
    if S_db.size == 0: return np.array([]), np.array([])
    pk_t, pk_f = [], []
    S_for = equalize_for_peaks(S_db) if EQUALIZE_FOR_PEAKS else S_db
    for t in range(S_db.shape[1]):
        col = S_for[:, t]
        thr = (np.percentile(col, 95) - ADAPT_DROP_DB) if USE_ADAPT_THR else (float(np.max(col)) + DB_MIN_REL)
        for (lo, hi) in BANDS:
            band = (freqs >= lo) & (freqs < hi)
            idxs = np.where((col >= thr) & band)[0]
            if idxs.size == 0: continue
            sel = idxs[np.argsort(col[idxs])[::-1][:TOPK_PER_BAND]]
            pk_t.extend([times[t]] * len(sel))
            pk_f.extend(freqs[sel].tolist())
    return np.asarray(pk_t), np.asarray(pk_f)

def add_titles_without_overlap(fig, main_title, subtitle):
    fig.update_layout(
        title=None,
        margin=dict(l=80, r=30, t=95, b=55),
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1.0)
    )
    fig.add_annotation(text=f"<b>{main_title}</b>", xref="paper", yref="paper",
                       x=0, y=1.16, showarrow=False, font=dict(size=22, color="#1f2a44"),
                       xanchor="left")
    fig.add_annotation(text=subtitle, xref="paper", yref="paper",
                       x=0, y=1.09, showarrow=False, font=dict(size=14, color="#324a66"),
                       xanchor="left")

def style_axes(fig, fmin, fmax, row=None, col=None):
    # aplica com/sem grid (row/col opcionais)
    ykw = dict(type="log", range=[math.log10(max(1, fmin)), math.log10(fmax)]) if USE_LOG_Y \
          else dict(type="linear", range=[fmin, fmax])
    if row is None and col is None:
        fig.update_yaxes(**ykw, showgrid=True, zeroline=False)
        fig.update_xaxes(showline=True, mirror=True, ticks="outside", showgrid=True)
    else:
        fig.update_yaxes(**ykw, showgrid=True, zeroline=False, row=row, col=col)
        fig.update_xaxes(showline=True, mirror=True, ticks="outside", showgrid=True, row=row, col=col)

# ---------- PLOT POR BLOCO ----------
def plot_block_dual_and_overlay(row, out_dir):
    # blobs
    S_blk  = bytes_to_ndarray(row.get("stft_bytes"))
    freqs  = bytes_to_ndarray(row.get("freqs_bytes"))
    times  = bytes_to_ndarray(row.get("times_bytes"))
    if S_blk.size == 0: raise ValueError(f"Bloco {row['block_idx']}: stft_bytes vazio.")

    # correções
    S_db, freqs, times, zlim = detect_scale_and_fix(S_blk, freqs, times)

    # recorte Y
    fmin, fmax = YLIM_HZ
    if freqs.size == 0: freqs = np.linspace(0, 22050, S_db.shape[0])
    freqs = np.maximum(freqs.astype(float), 0.0)
    band = (freqs >= fmin) & (freqs <= fmax)
    S_show, F_show = S_db[band, :], freqs[band]

    # picos
    peaks_f = bytes_to_ndarray(row.get("peaks_f_bytes"))
    peaks_t = bytes_to_ndarray(row.get("peaks_t_bytes"))
    if peaks_f.size == 0 or peaks_t.size == 0:
        pt, pf = estimate_peaks_per_band(S_db, freqs, times)
    else:
        pt, pf = peaks_t, peaks_f
    if pf.size:
        sel = (pf >= fmin) & (pf <= fmax)
        pf, pt = pf[sel], pt[sel]

    # textos
    title  = row.get("title", "Desconhecida")
    val, aro = row.get("valence", None), row.get("arousal", None)
    emo_lbl = row.get("emotion_label", None)
    nfft = int(row.get("stft_n_fft", 2048) or 2048)
    hop  = int(row.get("stft_hop",   512)  or 512)
    win  = str(row.get("stft_window","hann"))

    main_title = f"{TITLE_PREFIX}{title}"
    subtitle   = f"Bloco {int(row['block_idx'])} — STFT: {win}, {nfft}/{hop}"
    if pd.notna(val) and pd.notna(aro): subtitle += f" — Valência {val:.3f}, Arousal {aro:.3f}"
    if isinstance(emo_lbl, str) and emo_lbl: subtitle += f" — Rótulo: {emo_lbl}"

    # -------- FIGURA 1: Dual (STFT | Constellation) --------
    fig = make_subplots(rows=1, cols=2, column_widths=[0.6, 0.4], horizontal_spacing=0.08)

    # STFT (esq.)
    fig.add_trace(
        go.Heatmap(z=S_show, x=times, y=F_show, colorscale=SPEC_CMAP,
                   zmin=float(zlim[0]), zmax=float(zlim[1]),
                   colorbar=dict(title="Magnitude (dB)", thickness=14, x=1.02),
                   zsmooth=False,
                   hovertemplate="t=%{x:.3f}s<br>f=%{y:.0f} Hz<br>%{z:.1f} dB<extra></extra>",
                   name="STFT"),
        row=1, col=1
    )
    fig.update_xaxes(title_text="Tempo (s)", row=1, col=1)
    style_axes(fig, fmin, fmax, row=1, col=1)

    # Constellation (dir.)
    fig.add_trace(
        go.Scatter(x=pt if pt.size else None, y=pf if pf.size else None,
                   mode="markers", marker=PK_MARKER, name="Picos",
                   hovertemplate="t=%{x:.3f}s<br>f=%{y:.0f} Hz<extra></extra>"),
        row=1, col=2
    )
    fig.update_xaxes(title_text="Tempo (s)", rangeslider=dict(visible=True),
                     row=1, col=2)
    style_axes(fig, fmin, fmax, row=1, col=2)

    add_titles_without_overlap(fig, main_title, subtitle)
    fig.update_layout(template="plotly_white", width=IMG_W, height=IMG_H, hovermode="x unified")

    blk = int(row["block_idx"]); base = os.path.join(out_dir, f"block_{blk:04d}")
    ensure_dir(out_dir)
    fig.write_html(base + "_dual.html", include_plotlyjs="cdn")
    if SAVE_PNG:
        try:
            fig.write_image(base + "_dual.png", engine="kaleido")
        except Exception as e:
            print(f"[WARN] PNG dual falhou: {e}")

    # -------- FIGURA 2: Overlay (grid 1x1 para evitar erro row/col) --------
    fig2 = make_subplots(rows=1, cols=1)
    fig2.add_trace(go.Heatmap(z=S_show, x=times, y=F_show, colorscale=SPEC_CMAP,
                              zmin=float(zlim[0]), zmax=float(zlim[1]),
                              colorbar=dict(title="Magnitude (dB)", thickness=14, x=1.02),
                              zsmooth=False,
                              hovertemplate="t=%{x:.3f}s<br>f=%{y:.0f} Hz<br>%{z:.1f} dB<extra></extra>",
                              name="STFT"),
                   row=1, col=1)
    fig2.add_trace(go.Scatter(x=pt if pt.size else None, y=pf if pf.size else None,
                              mode="markers", marker=PK_MARKER, name="Picos",
                              hovertemplate="t=%{x:.3f}s<br>f=%{y:.0f} Hz<extra></extra>"),
                   row=1, col=1)
    fig2.update_xaxes(title_text="Tempo (s)", row=1, col=1)
    style_axes(fig2, fmin, fmax, row=1, col=1)
    add_titles_without_overlap(fig2, main_title, subtitle + " — Overlay")
    fig2.update_layout(template="plotly_white", width=IMG_W, height=IMG_H)

    fig2.write_html(base + "_overlay.html", include_plotlyjs="cdn")
    if SAVE_PNG:
        try:
            fig2.write_image(base + "_overlay.png", engine="kaleido")
        except Exception as e:
            print(f"[WARN] PNG overlay falhou: {e}")

    # DF de picos
    return pd.DataFrame({
        "song_id":  [row.get("song_id")] * len(pt),
        "block_idx":[blk] * len(pt),
        "time_s":   pt.astype(float) if pt.size else np.array([], dtype=float),
        "freq_hz":  pf.astype(float) if pf.size else np.array([], dtype=float),
        "title":    [row.get("title")] * len(pt)
    })

# ---------- EXECUÇÃO POR MÚSICA ----------
df = pd.read_parquet(IN_PARQUET)
df["block_idx"] = pd.to_numeric(df["block_idx"], errors="coerce").astype("Int64")
df = df.sort_values(by=["block_idx"]).reset_index(drop=True)

# pasta por música
song_id  = int(df.loc[df["block_idx"].idxmin(), "song_id"]) if "song_id" in df else 0
song_tit = str(df.loc[df["block_idx"].idxmin(), "title"]) if "title" in df else "Desconhecida"
song_dir = os.path.join(OUT_ROOT, f"{song_id:04d}_{slugify(song_tit)}")
ensure_dir(song_dir)

all_peaks = []
for _, row in df.iterrows():
    try:
        dfp = plot_block_dual_and_overlay(row, song_dir)
        all_peaks.append(dfp)
        print(f"[OK] Bloco {int(row['block_idx'])}  →  {song_dir}")
    except Exception as e:
        print(f"[ERRO] Bloco {row.get('block_idx')}: {e}")

# PARQUET de picos dessa música
if all_peaks:
    out_pq = os.path.join(song_dir, "peaks_all_blocks.parquet")
    pd.concat(all_peaks, ignore_index=True).to_parquet(out_pq, index=False, compression="snappy")
    print(f"[DONE] Parquet de picos salvo em: {out_pq}")
else:
    print("[DONE] Nenhum pico coletado.")


[WARN] kaleido indisponível; PNG desativado (HTML será salvo).
[OK] Bloco 0  →  /content/drive/MyDrive/DataSet TCC/DEAM/fingerprint/0002_tonight_a_lonely_century
[OK] Bloco 1  →  /content/drive/MyDrive/DataSet TCC/DEAM/fingerprint/0002_tonight_a_lonely_century
[OK] Bloco 2  →  /content/drive/MyDrive/DataSet TCC/DEAM/fingerprint/0002_tonight_a_lonely_century
[OK] Bloco 3  →  /content/drive/MyDrive/DataSet TCC/DEAM/fingerprint/0002_tonight_a_lonely_century
[DONE] Parquet de picos salvo em: /content/drive/MyDrive/DataSet TCC/DEAM/fingerprint/0002_tonight_a_lonely_century/peaks_all_blocks.parquet
