# Acoustic Feature Extraction for Pathological and Healthy Speech

This notebook extracts a broad set of **acoustic features** from audio recordings of both healthy and pathological speech.  
Any subsequent analysis is handled in a **separate notebook** `filter_synth_samples.ipynb`.

### Goals
- Provide a reproducible pipeline for **audio feature extraction**.  
- Combine **signal-level** and **speech-level** measures into a single dataset.  
- Prepare data for downstream tasks such as feature importance analysis and classification.  

### Input
- A CSV file listing audio file paths and optional metadata  
  (e.g., `text`, `health_condition`, `session_id`).  
- Corresponding audio files in **MP3** format.  

### Process
1. **Load and decode audio** from the provided paths.  
2. (Optional) Apply **Voice Activity Detection (VAD)** to remove non-speech segments.  
3. Extract acoustic measures using:  
   - **Librosa / NumPy (signal-level)**: duration, RMS energy, zero-crossing rate, peak amplitude, dBFS.  
   - **Praat via Parselmouth (speech-level)**: pitch statistics, jitter, shimmer, HNR/NHR, intensity, spectral slope, spectral centroid, formants (F1–F3 + bandwidths), and MFCCs.  
4. Store all results in a structured DataFrame.  

### Output
- A single `.csv` file with one row per audio file.  
- Each row includes both metadata and the extracted acoustic features, ready for use in later analysis or modeling.  


In [52]:
# ===============================================
# CONFIG — configure your paths and columns here
# ===============================================

# --- Paths ---
CSV_PATH      = "/data/users/akloeckner/collected_dataset/dataset_mp3.csv"
AUDIO_ROOT    = "/data/users/akloeckner/collected_dataset"

# --- Column config in your CSV ---
COL_AUDIO     = "filename_path"
COLUMNS_KEEP  = ["text", "is_health", "session_id"]

# --- Output ---
OUTPUT_CSV    = "./audio_features_out.csv"



In [53]:

import os, math, warnings, logging, concurrent.futures
from pathlib import Path
from dataclasses import dataclass
from typing import Dict, Any, Optional, List, Tuple

import numpy as np
import pandas as pd
from tqdm import tqdm

import soundfile as sf
import librosa

# Praat / Parselmouth
try:
    import parselmouth
    from parselmouth.praat import call
except Exception as e:
    raise RuntimeError(
        "Parselmouth is required for pitch/jitter/shimmer/HNR/formants. "
        "Install with `pip install praat-parselmouth` and restart the kernel."
    ) from e

# --- Feature extraction config ---

LOG_EVERY_N   = 50
TARGET_SR     = None       
MONO          = True
CENTER_FRAMES = True
NUM_WORKERS   = 0

USE_VAD             = True
VAD_FRAME_MS        = 30
VAD_AGGRESSIVENESS  = 1

warnings.filterwarnings("ignore", category=UserWarning)
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")

rng = np.random.default_rng(42)

def _safe_path(audio_path: str) -> str:
    p = Path(audio_path)
    if not p.is_absolute() and AUDIO_ROOT:
        p = Path(AUDIO_ROOT) / p
    return str(p)

# Read audio file, convert to float32 PCM,
# optionally resample and downmix to mono.
def _read_audio(path: str, target_sr: Optional[int], mono: bool) -> Tuple[np.ndarray, int]:
    y, sr = sf.read(path, always_2d=False)

    # Fallback for MP3 if libsndfile lacks mp3 support:
    if isinstance(y, np.ndarray) and y.size == 0:
        y, sr = librosa.load(path, sr=None, mono=False)

    # downmix to mono if needed
    if y.ndim == 2:
        y = np.mean(y, axis=1)

    if target_sr is not None and sr != target_sr:
        y = librosa.resample(y.astype(np.float32), orig_sr=sr, target_sr=target_sr)
        sr = target_sr

    return y.astype(np.float32), sr

# Return descriptive statistics (min, max, mean, median, std) for a 1D array.
def _stats(x: np.ndarray) -> Dict[str, float]:
    if x is None or len(x) == 0:
        return {"min": np.nan, "max": np.nan, "mean": np.nan, "median": np.nan, "std": np.nan}
    return {
        "min": float(np.nanmin(x)),
        "max": float(np.nanmax(x)),
        "mean": float(np.nanmean(x)),
        "median": float(np.nanmedian(x)),
        "std": float(np.nanstd(x, ddof=1)) if len(x) > 1 else 0.0,
    }


In [None]:
import webrtcvad

# calculate zero-crossing rate (crossings per second)
def _manual_zcr(y: np.ndarray, sr: int) -> float:
    if y.size == 0 or sr <= 0:
        return np.nan
    zero_crossings = np.sum(np.diff(np.sign(np.clip(y, -1, 1))) != 0)
    duration = len(y) / sr
    return float(zero_crossings / max(duration, 1e-9))

# Apply VAD using webrtcvad on 16kHz 16-bit PCM audio
def _apply_vad_numpy(y: np.ndarray, sr: int,
                     frame_ms: int = VAD_FRAME_MS,
                     aggressiveness: int = VAD_AGGRESSIVENESS) -> Tuple[np.ndarray, int]:
    if y.size == 0:
        return y, sr
    # VAD requires 16k/mono/16-bit
    if sr != 16000:
        y = librosa.resample(y, orig_sr=sr, target_sr=16000)
        sr = 16000
    vad = webrtcvad.Vad(aggressiveness)
    # 16-bit PCM bytes
    pcm16 = (np.clip(y, -1.0, 1.0) * 32767.0).astype(np.int16)
    frame_len = int(sr * frame_ms / 1000)
    voiced_chunks = []
    for i in range(0, len(pcm16) - frame_len + 1, frame_len):
        frm = pcm16[i:i+frame_len]
        if vad.is_speech(frm.tobytes(), sr):
            voiced_chunks.append(frm)
    if not voiced_chunks:
        return np.array([], dtype=np.float32), sr
    y_v = np.concatenate(voiced_chunks).astype(np.float32) / 32768.0
    return y_v, sr

# Lightweight features with librosa (RMS, peak, dbFS, manual ZCR)
def extract_librosa_features(y: np.ndarray, sr: int) -> Dict[str, Any]:
    feats = {}
    duration = len(y) / sr if sr else np.nan
    feats["duration_sec"] = float(duration)
    feats["sr"] = int(sr)

    # RMS
    rms = librosa.feature.rms(
        y=y, frame_length=2048, hop_length=512, center=CENTER_FRAMES
    )
    feats["rms_mean"] = float(np.mean(rms)) if rms.size else np.nan
    feats["rms_std"]  = float(np.std(rms))  if rms.size else np.nan

    feats["zcr_mean"] = _manual_zcr(y, sr)

    # Peak & dbFS
    peak = float(np.max(np.abs(y))) if y.size else 0.0
    feats["peak"] = peak
    feats["dbfs"] = float(20.0 * np.log10(peak + 1e-9))
    return feats

def extract_parselmouth_features_from_sound(snd: "parselmouth.Sound") -> Dict[str, Any]:
    feats = {}
    try:
        pitch = call(snd, "To Pitch (cc)", *(0.0, 60.0, 400.0, "no", 0.01, 0.15, 0.001, 0.05, 0.0001, 400.0))
        mean_pitch = call(pitch, "Get mean", 0, 0, "Hertz")
        if np.isnan(mean_pitch):
            pitch = call(snd, "To Pitch (cc)", *(0.0, 60.0, 400.0 * 1.5, "no", 0.02, 0.30, 0.005, 0.10, 0.001, 400.0 * 1.5))
            mean_pitch = call(pitch, "Get mean", 0, 0, "Hertz")
            if np.isnan(mean_pitch):
                raise RuntimeError("Pitch detection failed")

        feats["mean_pitch"] = float(mean_pitch)
        feats["std_pitch"]  = float(call(pitch, "Get standard deviation", 0, 0, "Hertz"))
        # ===== Jitter (local) =====
        pp = call(snd, "To PointProcess (periodic, cc)", 60.0, 400.0)
        try:
            feats["jitter_local_percent"] = float(call(pp, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3) * 100.0)
        except Exception:
            feats["jitter_local_percent"] = np.nan

        # shimmer (two tries)
        try:
            feats["shimmer_local_percent"] = float(call(
                [snd, pp], "Get shimmer (local)", 0, 0, 0.0001, 0.02, 1.3, 1.6
            ) * 100.0)
        except Exception:
            try:
                feats["shimmer_local_percent"] = float(call(
                    [snd, pp], "Get shimmer (local)", 0, 0, 0.0001, 0.05, 1.5, 2.0
                ) * 100.0)
            except Exception:
                feats["shimmer_local_percent"] = np.nan

        # ===== Harmonicity/HNR + NHR =====
        hnr = call(snd, "To Harmonicity (cc)", 0.01, max(75.0, 60.0), 0.1, 1.0)
        feats["hnr_mean"] = float(call(hnr, "Get mean", 0, 0))
        nhr = call(snd, "To Harmonicity (ac)", 0.03, max(75.0, 60.0), 0.1, 4.5)
        feats["nhr_mean"] = float(call(nhr, "Get mean", 0, 0))

        # ===== Intensity (+ dynamic range) =====
        intensity = call(snd, "To Intensity", 100, 0)
        feats["mean_intensity"] = float(call(intensity, "Get mean", 0, 0))
        feats["std_intensity"]  = float(call(intensity, "Get standard deviation", 0, 0))
        i_min = call(intensity, "Get minimum", 0, 0, "Parabolic")
        i_max = call(intensity, "Get maximum", 0, 0, "Parabolic")
        feats["dynamic_range"] = float(i_max - i_min)

        # ===== Spectrum: spectral slope & centroid (old choices) =====
        spec = call(snd, "To Spectrum", "yes")
        feats["spectral_slope"]    = float(call(spec, "Get band energy difference", 0, 1000, 1000, 4000))
        feats["spectral_centroid"] = float(call(spec, "Get centre of gravity", 2))

        # ===== Formants (Burg): means of F1..F3 + bandwidths at mid time =====
        formant = call(snd, "To Formant (burg)", 0.0, 5, 5000.0, 0.025, 50)
        feats["f1"] = float(call(formant, "Get mean", 1, 0, 0, "Hertz"))
        feats["f2"] = float(call(formant, "Get mean", 2, 0, 0, "Hertz"))
        feats["f3"] = float(call(formant, "Get mean", 3, 0, 0, "Hertz"))
        mid_t = snd.duration / 2.0
        feats["formant_bandwidth_1"] = float(call(formant, "Get bandwidth at time", 1, mid_t, "Hertz", "Linear"))
        feats["formant_bandwidth_2"] = float(call(formant, "Get bandwidth at time", 2, mid_t, "Hertz", "Linear"))
        feats["formant_bandwidth_3"] = float(call(formant, "Get bandwidth at time", 3, mid_t, "Hertz", "Linear"))

        # ===== Voice report (degree of voice breaks) =====
        try:
            vr = call([snd, pitch, pp], "Voice report", 0, 0, 75, 600, 1.3, 1.6, 0.03, 0.45)
            last = vr.strip().splitlines()[-1]
            feats["degree_of_voice_breaks"] = last.split()[-1]
        except Exception:
            feats["degree_of_voice_breaks"] = ""

        # ===== MFCCs from Praat (match your call) =====
        mfcc = call(snd, "To MFCC", 20, 0.015,
                    0.005, 100, 100, 0)
        # number of frames:
        try:
            n_frames = int(call(mfcc, "Get number of frames"))
        except Exception:
            n_frames = 0
        for i in range(1, 20):
            if  n_frames >= 1:
                val = call(mfcc, "Get value in frame", 1, i)
            else:
                # mean over frames for stability (if you flip the switch)
                vals = []
                for f in range(1, n_frames + 1):
                    vals.append(call(mfcc, "Get value in frame", f, i))
                val = np.nan if not vals else float(np.nanmean(vals))
            feats[f"mfcc_means_{i}"] = float(val) if np.isfinite(val) else np.nan

    except Exception as e:
        # If anything fails, fill with NaNs for the keys we advertise
        keys = [
            "mean_pitch","std_pitch","jitter_local_percent","shimmer_local_percent",
            "hnr_mean","nhr_mean","mean_intensity","std_intensity","dynamic_range",
            "spectral_slope","spectral_centroid","f1","f2","f3",
            "formant_bandwidth_1","formant_bandwidth_2","formant_bandwidth_3",
            "degree_of_voice_breaks"
        ] + [f"mfcc_means_{i}" for i in range(1, 20)]
        for k in keys: feats[k] = np.nan
        feats["_pm_error"] = f"{type(e).__name__}: {e}"
    return feats

def extract_features_for_one(row: pd.Series) -> Dict[str, Any]:
    result = {}
    for c in COLUMNS_KEEP:
        result[c] = row.get(c, np.nan)

    path = _safe_path(str(row[COL_AUDIO]))
    result["audio_path"] = path

    try:
        y, sr = _read_audio(path, TARGET_SR, MONO) 
        result["channels"] = 1

        if USE_VAD:
            y, sr = _apply_vad_numpy(y, sr, VAD_FRAME_MS, VAD_AGGRESSIVENESS)
            if y.size < int(0.1 * (sr or 16000)):  # <100 ms => too short
                raise RuntimeError(f"Insufficient audio after VAD: {y.size/sr:.3f}s")

        lb = extract_librosa_features(y, sr)
        result.update(lb)

        snd = parselmouth.Sound(y, sr)
        pm = extract_parselmouth_features_from_sound(snd)
        result.update(pm)

        result["error"] = ""
    except Exception as e:
        base_keys = ["duration_sec","sr","rms_mean","rms_std","zcr_mean","peak","dbfs"]
        for k in base_keys: result[k] = np.nan
        result.update({ "channels": np.nan })
        # fill PM keys with NaN
        for k in [
            "mean_pitch","std_pitch","jitter_local_percent","shimmer_local_percent",
            "hnr_mean","nhr_mean","mean_intensity","std_intensity","dynamic_range",
            "spectral_slope","spectral_centroid","f1","f2","f3",
            "formant_bandwidth_1","formant_bandwidth_2","formant_bandwidth_3",
            "degree_of_voice_breaks"
        ] + [f"mfcc_means_{i}" for i in range(1, 20)]:
            result.setdefault(k, np.nan)
        result["error"] = f"extract_failed: {type(e).__name__}: {e}"
    return result


In [None]:

# === Load CSV ===
df = pd.read_csv(CSV_PATH)
if COL_AUDIO not in df.columns:
    raise ValueError(f"CSV does not contain the audio column '{COL_AUDIO}'. Available: {list(df.columns)}")

# Ensure passthrough columns exist
missing_keep = [c for c in COLUMNS_KEEP if c not in df.columns]
if missing_keep:
    logging.warning(f"The following COLUMNS_KEEP are not in the CSV and will be filled with NaN: {missing_keep}")

# === Extract ===
records: List[Dict[str, Any]] = []
indices = list(range(len(df)))

if NUM_WORKERS and NUM_WORKERS > 0:
    with concurrent.futures.ProcessPoolExecutor(max_workers=NUM_WORKERS) as ex:
        futures = {ex.submit(extract_features_for_one, df.iloc[i]): i for i in indices}
        for k, fut in enumerate(tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Extracting")):
            rec = fut.result()
            records.append(rec)
else:
    for i in tqdm(indices, desc="Extracting"):
        rec = extract_features_for_one(df.iloc[i])
        records.append(rec)
        if (i+1) % LOG_EVERY_N == 0:
            logging.info(f"Processed {i+1}/{len(df)} files")

out = pd.DataFrame.from_records(records)

# Reorder columns: passthrough + audio_path + features
cols_front = list(COLUMNS_KEEP) + ["audio_path"]
other_cols = [c for c in out.columns if c not in cols_front]
out = out[cols_front + other_cols]

# === Save ===
out.to_csv(OUTPUT_CSV, index=False)
print(f"Saved {len(out)} rows to {OUTPUT_CSV}")

# Quick preview
out.head(10)


Extracting: 100%|██████████| 317/317 [00:42<00:00,  7.47it/s]

Saved 317 rows to ./audio_features_out.csv





Unnamed: 0,text,is_health,session_id,audio_path,channels,duration_sec,sr,rms_mean,rms_std,zcr_mean,...,mfcc_means_11,mfcc_means_12,mfcc_means_13,mfcc_means_14,mfcc_means_15,mfcc_means_16,mfcc_means_17,mfcc_means_18,mfcc_means_19,error
0,Espressione culturale.,t,39,/data/users/akloeckner/collected_dataset/uploa...,1,1.89,16000,0.083396,0.054326,2176.719577,...,-11.376999,-40.571665,-1.895215,-32.770601,26.567442,-4.450149,-3.56062,-15.533003,1.470965,
1,Molte delle opere migliori di Leiber sono racc...,t,35,/data/users/akloeckner/collected_dataset/uploa...,1,8.67,16000,0.084598,0.069165,2241.06113,...,6.82438,-12.974968,-0.9358,9.995569,6.485008,-7.251986,8.848399,11.900274,3.718041,
2,Partire in quarta.,f,3,/data/users/akloeckner/collected_dataset/uploa...,1,2.91,16000,0.020815,0.02517,2976.975945,...,-6.667653,-9.12767,-15.162441,-10.354912,7.142628,22.172245,8.84873,13.792532,14.49297,
3,Sheila,f,3,/data/users/akloeckner/collected_dataset/uploa...,1,1.29,16000,0.075899,0.067996,4816.27907,...,1.330989,-9.111283,-5.456868,-13.483251,-8.976678,-8.595638,-4.360458,1.88918,7.782902,
4,Monsignor Grano pubblicò poco dei suoi lavori.,f,3,/data/users/akloeckner/collected_dataset/uploa...,1,5.31,16000,0.046767,0.034217,3827.118644,...,9.819802,-19.644541,-0.541881,2.903117,-7.024487,23.832253,-6.549482,7.446154,7.19625,
5,Felice,f,3,/data/users/akloeckner/collected_dataset/uploa...,1,1.02,16000,0.041975,0.047888,4830.392157,...,-37.786746,18.427481,-32.129431,-1.927786,-6.268077,-14.143534,22.943169,1.46914,-5.019424,
6,Cane,f,3,/data/users/akloeckner/collected_dataset/uploa...,1,1.59,16000,0.05529,0.078552,3493.710692,...,18.320755,-3.535728,-20.910417,6.34262,3.920014,1.786925,2.630666,1.929917,-3.582289,
7,Il numero corrisponde alla rappresentazione ne...,f,3,/data/users/akloeckner/collected_dataset/uploa...,1,8.25,16000,0.01249,0.009516,4846.545455,...,-12.961813,11.914366,17.339643,-3.006924,-14.511741,-1.28765,-3.654376,-5.155642,0.143173,
8,Ricevono anche opzioni aggiuntive per la condi...,f,3,/data/users/akloeckner/collected_dataset/uploa...,1,10.2,16000,0.01113,0.010364,4492.843137,...,-38.365647,4.958333,3.214852,11.516142,2.896622,21.518198,4.892908,-5.04325,11.605391,
9,Si trova nel omonimo distretto e divisione.,f,3,/data/users/akloeckner/collected_dataset/uploa...,1,4.65,16000,0.028283,0.030195,4952.903226,...,26.459349,-4.088275,11.467694,-1.257293,4.753715,9.742682,0.415452,2.851713,-2.567054,


In [50]:
# ==============================
# Top-5 features by Gini importance
# ==============================
# Config
FEATURES_CSV   = "./audio_features_out.csv"   # path to the CSV produced by the extractor
LABEL_COL      = "is_health"                  # column that says whether the sample is healthy
EXCLUDE_COLS   = {"audio_path", "text", "session_id", "error"}  # non-features to drop
N_TREES        = 1000
RANDOM_STATE   = 42

import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# ---------- Load ----------
df = pd.read_csv(FEATURES_CSV)

if LABEL_COL not in df.columns:
    raise ValueError(f"Label column '{LABEL_COL}' not found. Available: {list(df.columns)}")

# ---------- Robust label mapping ----------
# We will create y_pathological = 1 for pathological, 0 for healthy.
def to_bool_health(v):
    if pd.isna(v): return np.nan
    s = str(v).strip().lower()
    if s in {"1","true","t","yes","y","healthy"}:  # healthy
        return 1
    if s in {"0","false","f","no","n","pathological","pathology","pat"}:  # not healthy
        return 0
    # try numeric
    try:
        f = float(s)
        # assume 1=healthy, 0=pathological if strictly 0/1
        if f in (0.0, 1.0):
            return int(f)
    except:
        pass
    return np.nan

health_bool = df[LABEL_COL].apply(to_bool_health)
if health_bool.isna().any():
    raise ValueError(
        "Some labels in is_health could not be parsed. "
        "Please standardize to {0/1, True/False, yes/no, healthy/pathological}."
    )

# pathological = 1 if not healthy
y = (1 - health_bool).astype(int).values

# ---------- Build feature matrix ----------
drop_cols = set([LABEL_COL]) | EXCLUDE_COLS
X = df.drop(columns=[c for c in df.columns if c in drop_cols])

# Keep only numeric columns (RF needs numeric)
X = X.select_dtypes(include=[np.number]).copy()

# Fill NaNs with column medians
for c in X.columns:
    if X[c].isna().any():
        X[c] = X[c].fillna(X[c].median())

# Quick sanity
assert len(X) == len(y) and X.shape[1] > 0, "Feature matrix is empty or misaligned."

# ---------- Train RF (Gini importance) ----------
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=RANDOM_STATE
)
rf = RandomForestClassifier(
    n_estimators=N_TREES,
    random_state=RANDOM_STATE,
    n_jobs=-1,
    class_weight="balanced_subsample",
    criterion="gini",
    max_features="sqrt",
)
rf.fit(X_train, y_train)

print("Held-out performance (for context):")
print(classification_report(y_test, rf.predict(X_test), target_names=["healthy(0)","pathological(1)"]))

# ---------- Rank importances ----------
imp = pd.Series(rf.feature_importances_, index=X.columns).sort_values(ascending=False)
top5 = imp.head(10).to_frame("gini_importance")

# Add class-wise means to show directionality (not part of Gini, just helpful)
means = df.assign(_y=y).groupby("_y")[X.columns].mean().rename(index={0:"healthy_mean",1:"pathological_mean"})
top5 = top5.join(means.T.loc[:, ["healthy_mean", "pathological_mean"]])

# Save + display
top5_path = "top5_gini_features.csv"
top5.to_csv(top5_path)
print("\nTop-5 features by Gini importance (saved to top5_gini_features.csv):")
display(top5)

# If you also want the full ranked list:
full_rank_path = "all_features_gini_ranked.csv"
imp.to_frame("gini_importance").to_csv(full_rank_path)
print(f"\nFull ranked list saved to {full_rank_path}")


Held-out performance (for context):
                 precision    recall  f1-score   support

     healthy(0)       0.87      0.92      0.89        36
pathological(1)       0.88      0.82      0.85        28

       accuracy                           0.88        64
      macro avg       0.88      0.87      0.87        64
   weighted avg       0.88      0.88      0.87        64


Top-5 features by Gini importance (saved to top5_gini_features.csv):


Unnamed: 0,gini_importance,healthy_mean,pathological_mean
hnr_mean,0.17549,12.290471,2.834379
nhr_mean,0.173725,10.325148,1.903261
mean_pitch,0.088516,168.200367,130.527787
jitter_local_percent,0.080545,2.099072,5.308571
shimmer_local_percent,0.058262,11.232805,18.689299
spectral_centroid,0.042919,760.705399,1485.460413
spectral_slope,0.037043,-9.605625,-4.328553
zcr_mean,0.029676,2413.017144,3222.318236
std_pitch,0.028905,51.503224,69.604382
std_intensity,0.026699,22.465571,15.204507



Full ranked list saved to all_features_gini_ranked.csv
