In [None]:
# Parámetros e imports para Spectral Flux (SF)

from pathlib import Path
import numpy as np
import pandas as pd
import librosa

# Carpeta con los .wav (una subcarpeta por especie)
IN_DIR = "../data/Tinamidae"

# Archivo de salida
OUT_CSV = "../traits/output_csv/SF.csv"

# Parámetros de audio y espectrograma Mel
SR_TARGET  = 44100      # Hz
WIN_LENGTH = 1024       # muestras (ventana Hann)
HOP_LENGTH = WIN_LENGTH // 2
N_FFT      = 2048
N_MELS     = 128
FMIN, FMAX = 0.0, None
POWER      = 2.0        # potencia (magnitud^2), no dB

# Estabilidad numérica
EPS = 1e-12


In [None]:
# Utilidades: listar archivos, extraer especie, cargar audio y z-score

def list_wavs(base_dir: str):
    base = Path(base_dir)
    return sorted([p for p in base.rglob("*.wav") if p.is_file()])

def species_from_path(p: Path, base_dir: str):
    """
    Asume estructura base_dir/especie/archivo.wav
    Devuelve el nombre de la carpeta inmediatamente bajo base_dir.
    """
    base = Path(base_dir).resolve()
    rel  = p.resolve().relative_to(base)
    return rel.parts[0] if len(rel.parts) > 1 else "unknown"

def load_mono(path: Path, sr_target: int):
    """
    Carga el audio como mono y lo re-muestrea a sr_target.
    Devuelve: y (np.ndarray), sr_target
    """
    y, sr = librosa.load(path, sr=None, mono=True)
    if sr != sr_target:
        y = librosa.resample(y, orig_sr=sr, target_sr=sr_target)
        sr = sr_target
    return y.astype(np.float64, copy=False), sr

def zscore_signal(y: np.ndarray, eps: float = EPS):
    """
    Estandariza la señal: (y - media) / sigma.
    El SF se verá afectado por re-escalado, pero al z-scorear
    quedan todas las señales en una escala comparable.
    """
    if y.size == 0:
        return y
    mu = float(np.mean(y))
    sigma = float(np.std(y, ddof=0))
    return (y - mu) / (sigma + eps)


In [None]:
# Funciones para Mel, serie SF_t y SF (log(sigma_SF))

def melspec_power(y_hat: np.ndarray, sr: int):
    """
    Espectrograma Mel de potencia lineal:
    P[f, t] = potencia en banda Mel f y frame t.
    """
    P = librosa.feature.melspectrogram(
        y=y_hat, sr=sr,
        n_fft=N_FFT, hop_length=HOP_LENGTH, win_length=WIN_LENGTH,
        window="hann", center=True, power=POWER,
        n_mels=N_MELS, fmin=FMIN, fmax=FMAX
    )
    return P  # matriz de potencia (F x T)

def sf_series(Pmel: np.ndarray):
    """
    Serie temporal SF_t a partir de la potencia Mel:

    1) S[f,t] = sqrt(Pmel[f,t])  (amplitud por banda)
    2) Delta[f,t] = S[f,t] - S[f,t-1]
    3) SF_t = sqrt( sum_f Delta[f,t]^2 ) para t = 1..T-1

    Devuelve vector de longitud T-1.
    """
    if Pmel.size == 0:
        return np.zeros(0, dtype=float)
    S = np.sqrt(np.maximum(Pmel, 0.0))
    D = S[:, 1:] - S[:, :-1]
    SF_t = np.sqrt(np.sum(D**2, axis=0))
    return SF_t

def compute_SF_from_series(SF_t: np.ndarray, eps: float = EPS):
    """
    A partir de SF_t calcula:
    SF_sigma = desviación estándar muestral de SF_t
    SF_raw   = log(SF_sigma)
    """
    if SF_t.size <= 1:
        return dict(SF_raw=np.nan, SF_sigma=np.nan, n_SF=int(SF_t.size))
    SF_sigma = float(np.std(SF_t, ddof=1))
    SF_raw   = float(np.log(SF_sigma + eps))
    return dict(SF_raw=SF_raw, SF_sigma=SF_sigma, n_SF=int(SF_t.size))


In [None]:
# Cálculo de SF crudo (SF_raw) por archivo

files = list_wavs(IN_DIR)
print(f"Archivos .wav encontrados: {len(files)}")

rows = []

for fp in files:
    especie = species_from_path(fp, IN_DIR)
    try:
        y, sr = load_mono(fp, SR_TARGET)
        y_hat = zscore_signal(y)
        Pmel  = melspec_power(y_hat, sr)   # POTENCIA lineal
        SF_t  = sf_series(Pmel)            # SF_t según definición
        stats = compute_SF_from_series(SF_t)

        rows.append({
            "species": especie,
            "relpath": str(fp.resolve()),
            "sr": sr,
            "T_frames": int(Pmel.shape[1]),
            **stats
        })
    except Exception as e:
        print(f"[WARN] Error en {fp}: {e}")
        rows.append({
            "species": especie,
            "relpath": str(fp.resolve()),
            "sr": np.nan,
            "T_frames": 0,
            "SF_raw": np.nan,
            "SF_sigma": np.nan,
            "n_SF": 0,
            "error": f"{type(e).__name__}: {e}"
        })

df_sf_files = pd.DataFrame(rows)
print("Filas en df_sf_files:", df_sf_files.shape[0])
df_sf_files.head()


In [None]:
# Corrección logarítmica por duración:
# SF_hat = SF_raw - (a + b * log(T_frames))
# Aquí estimamos a y b por MCO y los mostramos en pantalla.

df_dur = df_sf_files.dropna(subset=["SF_raw", "T_frames"]).copy()
df_dur = df_dur[df_dur["T_frames"] > 1].copy()

df_dur["log_T"] = np.log(df_dur["T_frames"].astype(float))

x = df_dur["log_T"].values
y = df_dur["SF_raw"].values

# Ajuste lineal: SF_raw ~ a + b * log(T_frames)
b, a = np.polyfit(x, y, 1)  # slope=b, intercept=a

print(f"Coeficientes de corrección (MCO) usando N = {len(x)} audios:")
print(f"  a (intercepto) = {a:.6f}")
print(f"  b (pendiente)  = {b:.6f}")

# Aplicamos la corrección SF_hat
df_dur["SF_hat"] = df_dur["SF_raw"] - (a + b * df_dur["log_T"])

print("\nEjemplo de primeras filas con SF_raw y SF_hat:")
df_dur[["species", "T_frames", "SF_raw", "SF_hat"]].head()


In [None]:
# Resumen por especie y exporte de SF.csv

df_species = (
    df_dur
    .dropna(subset=["SF_raw", "SF_hat"])
    .groupby("species")
    .agg(
        n_muestras     = ("relpath", "count"),
        T_frames_mean  = ("T_frames", "mean"),
        SF_raw_mean    = ("SF_raw", "mean"),
        SF_raw_median  = ("SF_raw", "median"),
        SF_hat_mean    = ("SF_hat", "mean"),
        SF_hat_median  = ("SF_hat", "median"),
    )
    .reset_index()
    .sort_values("n_muestras", ascending=False)
)

print("Especies con datos:", df_species.shape[0])
df_species.head(10)

df_species.to_csv(OUT_CSV, index=False)
print(f"[OK] Archivo guardado en: {OUT_CSV}")
