In [None]:
import numpy as np
import librosa
import pandas as pd

# ===========================
# CONFIG
# ===========================
SR = 44100
FRAME_LENGTH = 1024        # ~23 ms a 44.1k
HOP_LENGTH = 512           # 50% overlap
TEXTURE_WIN = 512          # frames (~6s)
TEXTURE_HOP = 256
N_MFCC = 13
N_MEL_BANDS = 40
J_MOD_BANDS = 8            # per modulation features (MMFCC/MOSC)

EPS = 1e-10


# ===========================
# UTILS
# ===========================

def summarize_time_series(name, X, stats=("mean", "std")):
    """
    X shape: (D, T) or (T,)
    Restituisce dict di statistiche per dimensione.
    """
    X = np.asarray(X)
    if X.ndim == 1:
        X = X[None, :]
    D, T = X.shape
    out = {}
    for d in range(D):
        x = X[d]
        if "mean" in stats:
            out[f"{name}_{d}_mean"] = float(np.mean(x))
        if "std" in stats:
            out[f"{name}_{d}_std"] = float(np.std(x))
    return out


def frame_rate(sr=SR, hop_length=HOP_LENGTH):
    return sr / hop_length


def octave_subbands(sr=SR, n_fft=FRAME_LENGTH):
    """
    Ottave come nel paper:
    0-100,100-200,...,8000-22050 Hz  (8 subband)
    Restituisce lista di (fmin, fmax, mask_bin)
    """
    freqs = librosa.fft_frequencies(sr=sr, n_fft=n_fft)
    bands = [
        (0, 100),
        (100, 200),
        (200, 400),
        (400, 800),
        (800, 1600),
        (1600, 3200),
        (3200, 8000),
        (8000, sr / 2.0),
    ]
    out = []
    for fmin, fmax in bands:
        mask = (freqs >= fmin) & (freqs < fmax)
        out.append((fmin, fmax, mask))
    return out


# ===========================
# CORE SHORT-TERM FEATURES
# ===========================

def compute_short_term_features(y, sr=SR):
    """
    Calcola:
    - RMS
    - ZCR
    - centroid, bandwidth, rolloff, flatness
    - Spectral Flux
    - OSC (octave-based spectral contrast: valley + contrast)
    - HLCR + low-energy ratio
    - MFCC + delta MFCC
    """
    # Base STFT magnitude
    S = librosa.stft(
        y,
        n_fft=FRAME_LENGTH,
        hop_length=HOP_LENGTH,
        window="hann",
        center=True,
    )
    S_mag = np.abs(S)
    T = S_mag.shape[1]

    # Energies
    rms = librosa.feature.rms(
        S=S_mag,
        frame_length=FRAME_LENGTH,
        hop_length=HOP_LENGTH,
        center=True,
    )[0]  # (T,)

    zcr = librosa.feature.zero_crossing_rate(
        y,
        frame_length=FRAME_LENGTH,
        hop_length=HOP_LENGTH,
        center=True,
    )[0]

    centroid = librosa.feature.spectral_centroid(
        S=S_mag, sr=sr
    )[0]

    bandwidth = librosa.feature.spectral_bandwidth(
        S=S_mag, sr=sr
    )[0]

    rolloff = librosa.feature.spectral_rolloff(
        S=S_mag, sr=sr, roll_percent=0.85
    )[0]

    flatness = librosa.feature.spectral_flatness(
        S=S_mag
    )[0]

    # Spectral flux via onset strength
    flux = librosa.onset.onset_strength(
        S=S_mag,
        sr=sr,
        hop_length=HOP_LENGTH
    )
    if flux.shape[0] != T:
        # Align length if onset_strength uses a different framing
        flux = np.interp(
            np.linspace(0, len(flux) - 1, T),
            np.arange(len(flux)),
            flux
        )

    # OSC: octave-based spectral contrast (valley + contrast) per band
    freqs = librosa.fft_frequencies(sr=sr, n_fft=FRAME_LENGTH)
    bands = octave_subbands(sr=sr, n_fft=FRAME_LENGTH)
    n_bands = len(bands)

    osc_valley = np.zeros((n_bands, T))
    osc_contrast = np.zeros((n_bands, T))

    for b, (_, _, mask) in enumerate(bands):
        band_mag = S_mag[mask, :] + EPS  # (F_b, T)
        # Log domain
        band_log = np.log10(band_mag)
        # Approx: valley = 10° percentile, peak = 90°
        valley = np.percentile(band_log, 10, axis=0)
        peak = np.percentile(band_log, 90, axis=0)
        contrast = peak - valley
        osc_valley[b, :] = valley
        osc_contrast[b, :] = contrast

    # HLCR & low-energy: based on RMS (come TTF/low energy)
    rms_mean = np.mean(rms)
    low_energy_mask = rms < rms_mean
    low_energy_ratio = np.sum(low_energy_mask) / float(len(rms))
    high_energy_count = (~low_energy_mask).sum()
    low_energy_count = low_energy_mask.sum()
    hlcr = high_energy_count / (low_energy_count + EPS)

    # MFCC + delta
    mfcc = librosa.feature.mfcc(
        y=y,
        sr=sr,
        n_mfcc=N_MFCC,
        n_fft=FRAME_LENGTH,
        hop_length=HOP_LENGTH,
        center=True,
    )  # (13, T)
    d_mfcc = librosa.feature.delta(mfcc, order=1)

    feat_dict = {}
    # Core summaries
    feat_dict.update(summarize_time_series("rms", rms))
    feat_dict.update(summarize_time_series("zcr", zcr))
    feat_dict.update(summarize_time_series("centroid", centroid))
    feat_dict.update(summarize_time_series("bandwidth", bandwidth))
    feat_dict.update(summarize_time_series("rolloff", rolloff))
    feat_dict.update(summarize_time_series("flatness", flatness))
    feat_dict.update(summarize_time_series("flux", flux))
    feat_dict.update(summarize_time_series("osc_valley", osc_valley))
    feat_dict.update(summarize_time_series("osc_contrast", osc_contrast))

    # HLCR + low-energy (scalari)
    feat_dict["low_energy_ratio"] = float(low_energy_ratio)
    feat_dict["hlcr"] = float(hlcr)

    # MFCC + delta
    feat_dict.update(summarize_time_series("mfcc", mfcc))
    feat_dict.update(summarize_time_series("d_mfcc", d_mfcc))

    return feat_dict, S_mag, mfcc, osc_valley, osc_contrast


# ===========================
# HARMONIC / RHYTHMIC / HARM-PERC
# ===========================

def compute_harmonic_rhythmic_features(y, sr=SR):
    feat_dict = {}

    # Chroma, Chroma CENS, Tonnetz
    chroma_cqt = librosa.feature.chroma_cqt(y=y, sr=sr)
    chroma_cens = librosa.feature.chroma_cens(y=y, sr=sr)
    tonnetz = librosa.feature.tonnetz(y=y, sr=sr)

    feat_dict.update(summarize_time_series("chroma_cqt", chroma_cqt))
    feat_dict.update(summarize_time_series("chroma_cens", chroma_cens))
    feat_dict.update(summarize_time_series("tonnetz", tonnetz))

    # Onset strength + tempo
    onset_env = librosa.onset.onset_strength(y=y, sr=sr)
    tempo, _ = librosa.beat.beat_track(
        onset_envelope=onset_env, sr=sr
    )
    feat_dict["tempo_bpm"] = float(tempo)
    feat_dict.update(summarize_time_series("onset_env", onset_env))

    # Tempogram (riassunto solo stat)
    tempogram = librosa.feature.tempogram(
        onset_envelope=onset_env,
        sr=sr,
        hop_length=HOP_LENGTH,
    )
    feat_dict.update(summarize_time_series("tempogram", tempogram))

    # Harmonic/Percussive (HPSS)
    y_harm, y_perc = librosa.effects.hpss(y)
    # RMS su harm/percuss
    rms_harm = librosa.feature.rms(y=y_harm)[0]
    rms_perc = librosa.feature.rms(y=y_perc)[0]

    feat_dict.update(summarize_time_series("rms_harm", rms_harm))
    feat_dict.update(summarize_time_series("rms_perc", rms_perc))

    # Rapporto medio
    harm_energy = np.mean(rms_harm**2)
    perc_energy = np.mean(rms_perc**2)
    feat_dict["harmonic_energy_mean"] = float(harm_energy)
    feat_dict["percussive_energy_mean"] = float(perc_energy)
    feat_dict["harm_perc_ratio"] = float(
        harm_energy / (perc_energy + EPS)
    )

    return feat_dict


# ===========================
# POWER-UP: SPETTRALI (entropy, crest, slope, decrease)
# ===========================

def compute_spectral_power_features(S_mag, sr=SR):
    """
    S_mag: (F, T)
    """
    feat_dict = {}
    S_power = S_mag**2 + EPS
    S_norm = S_power / (np.sum(S_power, axis=0, keepdims=True) + EPS)

    # Entropy
    entropy = -np.sum(S_norm * np.log2(S_norm + EPS), axis=0)

    # Crest: max / mean
    crest = np.max(S_mag, axis=0) / (np.mean(S_mag, axis=0) + EPS)

    # Slope & Decrease
    freqs = librosa.fft_frequencies(sr=sr, n_fft=(S_mag.shape[0]-1)*2)
    # Assicuriamoci dimensione compatibile
    freqs = freqs[:S_mag.shape[0]]

    # slope via regressione lineare in log-domain
    S_db = librosa.amplitude_to_db(S_mag, ref=np.max)
    slope = np.zeros(S_mag.shape[1])
    decrease = np.zeros(S_mag.shape[1])

    # Peeters' spectral decrease
    k = np.arange(1, S_mag.shape[0])  # 1..F-1
    for t in range(S_mag.shape[1]):
        y = S_db[:, t]
        # regressione lineare freq-vs-dB
        coef = np.polyfit(freqs, y, 1)
        slope[t] = coef[0]

        X = S_mag[:, t]
        num = np.sum((X[1:] - X[0]) / k)
        den = np.sum(X[1:] + EPS)
        decrease[t] = num / (den + EPS)

    feat_dict.update(summarize_time_series("spec_entropy", entropy))
    feat_dict.update(summarize_time_series("spec_crest", crest))
    feat_dict.update(summarize_time_series("spec_slope", slope))
    feat_dict.update(summarize_time_series("spec_decrease", decrease))

    return feat_dict


# ===========================
# LOG-MEL BANDS
# ===========================

def compute_logmel_features(y, sr=SR):
    feat_dict = {}
    mel_spec = librosa.feature.melspectrogram(
        y=y,
        sr=sr,
        n_fft=FRAME_LENGTH,
        hop_length=HOP_LENGTH,
        n_mels=N_MEL_BANDS,
        power=2.0,
    )
    logmel = librosa.power_to_db(mel_spec, ref=np.max)
    feat_dict.update(summarize_time_series("logmel", logmel))
    return feat_dict


# ===========================
# MODULATION FEATURES (MSFM, MSCM, MMFCC, MOSC)
# ===========================

def modulation_spectrum_average(x, win=TEXTURE_WIN, hop=TEXTURE_HOP):
    """
    x: (T,) time series (per band/per feature dim)
    restituisce: averaged modulation magnitude spectrum (L,)
    """
    x = np.asarray(x)
    T = x.shape[0]
    specs = []
    for start in range(0, T - win + 1, hop):
        seg = x[start:start + win]
        # rimuovo mean per evitare componente DC dominante
        seg = seg - np.mean(seg)
        X = np.fft.rfft(seg)
        mag = np.abs(X)
        specs.append(mag)
    if not specs:
        return np.zeros(win // 2 + 1)
    specs = np.stack(specs, axis=0)  # (n_win, L)
    return np.mean(specs, axis=0)


def modulation_subband_indices(L, J=J_MOD_BANDS):
    """
    Divide indice [1..L-1] in J subbande log-spaced.
    Ignoriamo bin 0 (DC) per la modulazione.
    """
    idx = np.arange(1, L)  # escludo DC
    # logspace tra 1 e L-1
    edges = np.unique(
        np.round(
            np.logspace(0, np.log10(L - 1 + EPS), J + 1)
        ).astype(int)
    )
    # garantisco che gli edges siano ordinati e >=1
    edges = edges[edges < L]
    if len(edges) <= 1:
        edges = np.array([1, L - 1])
    bands = []
    for i in range(len(edges) - 1):
        start = edges[i]
        end = edges[i + 1]
        if start >= end:
            continue
        mask = (idx >= start) & (idx < end)
        bands.append(np.where(mask)[0] + 1)  # +1 per mappare su 1..L-1
    # se troppe poche bande, riaggiusto
    if len(bands) == 0:
        bands = [np.arange(1, L)]
    return bands


def compute_MSFM_MSCM(S_mag, sr=SR):
    """
    MSFM e MSCM su 8 octave-based subbands
    """
    bands = octave_subbands(sr=sr, n_fft=(S_mag.shape[0] - 1) * 2)
    n_bands = len(bands)
    msfm = np.zeros(n_bands)
    mscm = np.zeros(n_bands)

    # per ogni banda: time series di energia
    for b, (_, _, mask) in enumerate(bands):
        band_power = np.mean(S_mag[mask, :]**2, axis=0)  # (T,)
        mod_spec = modulation_spectrum_average(
            band_power,
            win=TEXTURE_WIN,
            hop=TEXTURE_HOP
        )  # (L,)
        # MSFM: geometric / arithmetic mean
        gm = np.exp(np.mean(np.log(mod_spec + EPS)))
        am = np.mean(mod_spec + EPS)
        msfm[b] = gm / (am + EPS)
        # MSCM: max / mean
        mscm[b] = np.max(mod_spec) / (am + EPS)

    feat_dict = {}
    feat_dict.update(summarize_time_series("MSFM", msfm))
    feat_dict.update(summarize_time_series("MSCM", mscm))
    return feat_dict


def compute_MMFCC_MOSC(mfcc, osc_contrast, J=J_MOD_BANDS):
    """
    Modulation spectral analysis di:
    - MFCC (MMFCC)
    - OSC (MOSC) -> usiamo solo osc_contrast (D_osc x T)

    Implementiamo: per ogni dimensione d:
        1) modulazione media (fft sui frame, media sulle texture windows)
        2) divisione in J subband
        3) MSP (max), MSV (min), MSC (peak-valley) -> matrici D x J
        4) mean/std di righe e colonne per MSV e MSC => 4D + 4J

    Restituisce feature dict con chiavi "MMFCC_..." e "MOSC_..."
    """
    def modulation_matrix(feat_mat, name_prefix):
        D, T = feat_mat.shape
        # step 1: averaged modulation spectrum per feature dim
        # per ogni d: vettore mod_spec_d (L,)
        mod_specs = []
        for d in range(D):
            ms = modulation_spectrum_average(
                feat_mat[d],
                win=TEXTURE_WIN,
                hop=TEXTURE_HOP
            )  # (L,)
            mod_specs.append(ms)
        mod_specs = np.stack(mod_specs, axis=0)  # (D, L)
        L = mod_specs.shape[1]

        # step 2: modulation subbands
        bands_idx = modulation_subband_indices(L, J=J)
        J_eff = len(bands_idx)
        MSP = np.zeros((D, J_eff))
        MSV = np.zeros((D, J_eff))
        MSC = np.zeros((D, J_eff))

        for j, idxs in enumerate(bands_idx):
            sub = mod_specs[:, idxs]  # (D, n_idx)
            MSP[:, j] = np.max(sub, axis=1)
            MSV[:, j] = np.min(sub, axis=1)
            MSC[:, j] = MSP[:, j] - MSV[:, j]

        feat_dict_local = {}

        # row-wise mean/std per MSV & MSC
        for d in range(D):
            # MSV
            feat_dict_local[f"{name_prefix}_MSV_row_{d}_mean"] = float(
                np.mean(MSV[d])
            )
            feat_dict_local[f"{name_prefix}_MSV_row_{d}_std"] = float(
                np.std(MSV[d])
            )
            # MSC
            feat_dict_local[f"{name_prefix}_MSC_row_{d}_mean"] = float(
                np.mean(MSC[d])
            )
            feat_dict_local[f"{name_prefix}_MSC_row_{d}_std"] = float(
                np.std(MSC[d])
            )

        # column-wise mean/std per MSV & MSC
        for j in range(J_eff):
            feat_dict_local[f"{name_prefix}_MSV_col_{j}_mean"] = float(
                np.mean(MSV[:, j])
            )
            feat_dict_local[f"{name_prefix}_MSV_col_{j}_std"] = float(
                np.std(MSV[:, j])
            )
            feat_dict_local[f"{name_prefix}_MSC_col_{j}_mean"] = float(
                np.mean(MSC[:, j])
            )
            feat_dict_local[f"{name_prefix}_MSC_col_{j}_std"] = float(
                np.std(MSC[:, j])
            )

        return feat_dict_local

    feat_dict = {}
    # MFCC modulation
    feat_dict.update(modulation_matrix(mfcc, "MMFCC"))
    # OSC modulation - usiamo contrast come feature base (D_osc x T)
    feat_dict.update(modulation_matrix(osc_contrast, "MOSC"))
    return feat_dict


# ===========================
# MAIN EXTRACTION
# ===========================

def extract_all_features_for_file(path):
    print(f"Loading audio: {path}")
    y, sr = librosa.load(path, sr=SR, mono=True)

    all_feats = {}

    # Short-term core + MFCC/OSC
    st_feats, S_mag, mfcc, osc_valley, osc_contrast = compute_short_term_features(
        y, sr=sr
    )
    all_feats.update(st_feats)

    # Harmonic, rhythmic, HPSS-based
    hr_feats = compute_harmonic_rhythmic_features(y, sr=sr)
    all_feats.update(hr_feats)

    # Spectral entropy/crest/slope/decrease
    sp_feats = compute_spectral_power_features(S_mag, sr=sr)
    all_feats.update(sp_feats)

    # Log-mel bands
    lm_feats = compute_logmel_features(y, sr=sr)
    all_feats.update(lm_feats)

    # Modulation MSFM/MSCM
    msfm_mscm_feats = compute_MSFM_MSCM(S_mag, sr=sr)
    all_feats.update(msfm_mscm_feats)

    # MMFCC + MOSC
    mmfcc_mosc_feats = compute_MMFCC_MOSC(mfcc, osc_contrast)
    all_feats.update(mmfcc_mosc_feats)

    return all_feats


if __name__ == "__main__":
    audio_path = "beethoven_fr_elise_piano_version.wav"
    feats = extract_all_features_for_file(audio_path)

    # Mettiamo tutto in una Series e salviamo
    s = pd.Series(feats)
    s.to_csv("beethoven_fr_elise_features.csv")
    print("Features salvate in beethoven_fr_elise_features.csv")


: 