# ASR Phonology Inference: Whisper v3 (French & Spanish)

This notebook quantifies which **phonology/prosody features** drive **WER** and how much of the **Cartesia transform** effect can be explained by those features.

**What it does** (for **French** and **Spanish** separately):
1. **Assemble datasets**:
   - *Line-paired deltas* (only lines present in both Original and Transformed)
   - *Speaker-paired deltas* (means per speaker, then difference)
   - *Pooled long set* (all observations with `treatment ∈ {0,1}`)
2. **Descriptives** and sanity checks.
3. **Δ‑Regression (primary inference)**: `Δ log1p(WER) ~ Σ βk ΔXk` with ridge CV → refit OLS for interpretable coefficients + clustered SEs (by speaker).
4. **Mixed‑effects confirmation** on the pooled set with random intercepts for speaker/country/gender and `treatment × features` interactions.
5. **Mediation**: estimate proportion of the transform’s effect explained via measured features (bootstrap CIs).
6. **Chi‑square tests**:
   - Improvement vs worsening counts (overall and by language)
   - Improvement vs feature‑change quartiles
7. **Robustness**: speaker‑paired deltas; winsorization/Huber; PCA variant.

> ⚠️ Assumptions:  
> • Your files are split into two top‑level folders: `original/` and `transformed/`, and further into `french/` and `spanish/`.  
> • Each CSV file corresponds to a `(country, gender)` combination and contains columns:  
> `speaker_id,line_id,rms_amplitude,dc_offset,articulation_rate,mean_pitch,pitch_std_dev,hnr,wer_score`.  
> • Filenames follow a pattern like: `belgium_female_*.csv` (country and gender recoverable via regex).  
> If your layout differs, edit the **Config** cell.


In [None]:
# === Config ===
from pathlib import Path
LANGUAGES = ["french", "spanish"]          # analyze separately
DATA_ROOT = Path("data")                    # change to your dataset root (relative to notebook)
ORIG_DIR = DATA_ROOT / "original"
TRANS_DIR = DATA_ROOT / "transformed"

# filename like "belgium_female_individual_sample_features.csv"
# adjust the regex if your names differ
FILENAME_RE = r"(?P<country>[a-z_]+)_(?P<gender>male|female).+\.csv"

FEATURES = ["rms_amplitude","dc_offset","articulation_rate","mean_pitch","pitch_std_dev","hnr"]
OUTCOME = "wer_score"
ID_COLS = ["speaker_id","line_id"]
GROUP_COLS = ["country","gender","language"]
TREATMENT_NAME = "treatment"               # 0=original, 1=transformed
RANDOM_SEED = 7


In [None]:
# === Imports ===
import re, math, json, warnings, numpy as np, pandas as pd
from pathlib import Path
from dataclasses import dataclass
import statsmodels.api as sm
from statsmodels.stats.multitest import multipletests
from statsmodels.formula.api import ols
from statsmodels.robust.robust_linear_model import RLM
from statsmodels.robust.norms import HuberT
from statsmodels.regression.mixed_linear_model import MixedLM
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import RidgeCV, LassoCV
from sklearn.model_selection import GroupKFold
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
warnings.filterwarnings("ignore")
np.random.seed(RANDOM_SEED)


In [None]:
# === Helper functions ===

import pandas as pd
import numpy as np
import re
from pathlib import Path

def parse_meta_from_filename(path: Path, language: str):
    m = re.search(FILENAME_RE, path.name)
    if not m:
        raise ValueError(f"Filename doesn't match expected pattern: {path.name}")
    return {
        "country": m.group("country"),
        "gender": m.group("gender"),
        "language": language
    }

def load_language_tables(language: str, base_orig: Path, base_trans: Path):
    # Load original & transformed CSVs for a given language
    orig_dir = base_orig / language
    trans_dir = base_trans / language
    if not orig_dir.exists() or not trans_dir.exists():
        raise FileNotFoundError(f"Expected folders for language '{language}' not found at {orig_dir} or {trans_dir}")
    
    def load_dir(d: Path, treatment_flag: int):
        frames = []
        for p in sorted(d.glob("*.csv")):
            meta = parse_meta_from_filename(p, language)
            df = pd.read_csv(p)
            # attach meta
            for k,v in meta.items():
                df[k] = v
            df["treatment"] = treatment_flag
            frames.append(df)
        if not frames:
            raise FileNotFoundError(f"No CSV files found in {d}")
        return pd.concat(frames, ignore_index=True)
    
    orig = load_dir(orig_dir, 0)
    trans = load_dir(trans_dir, 1)
    return orig, trans

def zscore_within_language(df: pd.DataFrame, feature_cols):
    # standardize features within each language on pooled (orig+trans) data
    df = df.copy()
    for lang, sub in df.groupby("language"):
        scaler = StandardScaler()
        vals = scaler.fit_transform(sub[feature_cols])
        df.loc[sub.index, [f"{c}_z" for c in feature_cols]] = vals
    return df

def make_line_paired_deltas(orig: pd.DataFrame, trans: pd.DataFrame, feature_cols):
    # Inner-join on (country, gender, language, speaker_id, line_id)
    idx_cols = ["country","gender","language","speaker_id","line_id"]
    left = orig[idx_cols + feature_cols + [OUTCOME]].rename(columns={c:f"{c}_orig" for c in feature_cols+[OUTCOME]})
    right = trans[idx_cols + feature_cols + [OUTCOME]].rename(columns={c:f"{c}_trans" for c in feature_cols+[OUTCOME]})
    merged = pd.merge(left, right, on=idx_cols, how="inner")
    if merged.empty:
        print("⚠️ No line-level pairs found after inner join. Check IDs/cleaning.")
    # deltas
    for c in feature_cols:
        merged[f"d_{c}"] = merged[f"{c}_trans"] - merged[f"{c}_orig"]
    merged["d_wer"] = merged[f"{OUTCOME}_trans"] - merged[f"{OUTCOME}_orig"]
    # log1p WER
    merged["d_log1p_wer"] = np.log1p(merged[f"{OUTCOME}_trans"]) - np.log1p(merged[f"{OUTCOME}_orig"])
    return merged

def make_speaker_paired_deltas(orig: pd.DataFrame, trans: pd.DataFrame, feature_cols):
    grp = ["country","gender","language","speaker_id"]
    agg = {c:"mean" for c in feature_cols+[OUTCOME]}
    o = orig.groupby(grp, as_index=False).agg(agg).rename(columns={c:f"{c}_orig" for c in feature_cols+[OUTCOME]})
    t = trans.groupby(grp, as_index=False).agg(agg).rename(columns={c:f"{c}_trans" for c in feature_cols+[OUTCOME]})
    merged = pd.merge(o, t, on=grp, how="inner")
    # counts for weights
    cnt_o = orig.groupby(grp)["line_id"].count().rename("n_orig")
    cnt_t = trans.groupby(grp)["line_id"].count().rename("n_trans")
    merged = merged.merge(cnt_o, on=grp).merge(cnt_t, on=grp)
    merged["weight"] = 2 * merged["n_orig"] * merged["n_trans"] / (merged["n_orig"] + merged["n_trans"]).replace(0, np.nan)
    for c in feature_cols:
        merged[f"d_{c}"] = merged[f"{c}_trans"] - merged[f"{c}_orig"]
    merged["d_wer"] = merged[f"{OUTCOME}_trans"] - merged[f"{OUTCOME}_orig"]
    merged["d_log1p_wer"] = np.log1p(merged[f"{OUTCOME}_trans"]) - np.log1p(merged[f"{OUTCOME}_orig"])
    return merged

def make_pooled_long(orig: pd.DataFrame, trans: pd.DataFrame):
    pooled = pd.concat([orig, trans], ignore_index=True)
    pooled["log1p_wer"] = np.log1p(pooled[OUTCOME])
    return pooled

def bh_fdr(pvals, alpha=0.05):
    rej, pcor, _, _ = multipletests(pvals, alpha=alpha, method="fdr_bh")
    return rej, pcor


In [None]:
# === Load data (edit paths if needed) ===
all_results = {}
for language in LANGUAGES:
    print(f"Loading language: {language}")
    orig, trans = load_language_tables(language, ORIG_DIR, TRANS_DIR)
    # attach to dict
    all_results[language] = {
        "orig": orig,
        "trans": trans,
        "line_paired": make_line_paired_deltas(orig, trans, FEATURES),
        "speaker_paired": make_speaker_paired_deltas(orig, trans, FEATURES),
        "pooled": make_pooled_long(orig, trans),
    }
print("Done.")


In [None]:
# === Descriptives & sanity checks ===
for language, d in all_results.items():
    print(f"\n--- {language.upper()} ---")
    lp = d["line_paired"]
    sp = d["speaker_paired"]
    pooled = d["pooled"]
    print(f"Line-paired rows: {len(lp):,}")
    print(f"Speaker-paired rows: {len(sp):,}")
    print(f"Pooled rows: {len(pooled):,}")
    
    # Basic summaries
    print(pooled.groupby("treatment")[OUTCOME].describe()[["mean","std","50%","min","max"]])
    
    # Improvement counts (line-paired only)
    if len(lp):
        improved = (lp["d_wer"] < 0).sum()
        worsened = (lp["d_wer"] >= 0).sum()
        print(f"Improved (ΔWER<0): {improved}, Worsened: {worsened}")


In [None]:
# === Δ-Regression (primary inference) ===
from sklearn.pipeline import Pipeline

results_delta = {}
for language, d in all_results.items():
    lp = d["line_paired"].copy()
    if lp.empty:
        print(f"{language}: no line-paired data; skipping Δ-regression.")
        continue
    
    # Standardize Δ features within language
    dcols = [f"d_{c}" for c in FEATURES]
    # Replace any inf/nan
    lp = lp.replace([np.inf,-np.inf], np.nan).dropna(subset=dcols+["d_log1p_wer","speaker_id"])
    
    X = lp[dcols].values
    y = lp["d_log1p_wer"].values
    speakers = lp["speaker_id"].astype(str).values
    
    # Ridge CV with GroupKFold by speaker for λ selection
    gkf = GroupKFold(n_splits=min(5, len(np.unique(speakers))))
    alphas = np.logspace(-3, 3, 25)
    ridge = RidgeCV(alphas=alphas, cv=gkf)
    ridge.fit(X, y, groups=speakers)
    
    # Selected alpha and coefficients
    coefs = pd.Series(ridge.coef_, index=dcols, name="ridge_coef")
    
    # Refit OLS for interpretable β + clustered SEs (by speaker)
    df_model = lp[dcols + ["d_log1p_wer","speaker_id"]].copy()
    formula = "d_log1p_wer ~ " + " + ".join(dcols)
    ols_fit = ols(formula, data=df_model).fit(cov_type="cluster", cov_kwds={"groups": df_model["speaker_id"]})
    summ = pd.DataFrame({
        "beta_ols": ols_fit.params.reindex(["Intercept"]+dcols, fill_value=np.nan),
        "se_clustered": ols_fit.bse.reindex(["Intercept"]+dcols, fill_value=np.nan),
        "pval": ols_fit.pvalues.reindex(["Intercept"]+dcols, fill_value=np.nan)
    })
    # FDR on feature p-values
    mask = summ.index.isin(dcols)
    rej, pcor = bh_fdr(summ.loc[mask,"pval"].values, alpha=0.05)
    summ.loc[mask,"pval_fdr"] = pcor
    summ.loc[mask,"significant_fdr"] = rej
    
    results_delta[language] = {
        "ridge_alpha": ridge.alpha_,
        "ridge_coefs": coefs,
        "ols_summary": summ,
        "ols_model": ols_fit
    }
    print(f"\\n[{language.upper()}] Δ-Regression done. Selected ridge alpha = {ridge.alpha_:.4g}")
    display(summ)


In [None]:
# === Chi-square tests ===
from scipy.stats import chi2_contingency, binomtest

def chisq_improvement(language, d):
    lp = d["line_paired"]
    if lp.empty:
        return None
    tbl = pd.crosstab(index=[language], columns=(lp["d_wer"]<0).map({True:"Improved", False:"Worsened"}))
    return tbl

# 1) Overall improved vs worsened per language + binomial test
all_tbls = []
for language, d in all_results.items():
    lp = d["line_paired"]
    if lp.empty:
        print(f"{language}: no line-paired data, skipping chi-square.")
        continue
    improved = int((lp["d_wer"]<0).sum())
    worsened = int((lp["d_wer"]>=0).sum())
    n = improved + worsened
    print(f"\\n[{language.upper()}] Improved={improved}, Worsened={worsened}")
    # Binomial test vs 50/50
    bt = binomtest(improved, n, p=0.5, alternative="two-sided")
    print(f"Binomial test p-value (improvement rate ≠ 50%): {bt.pvalue:.4g}")
    all_tbls.append(pd.DataFrame({"Improved":[improved],"Worsened":[worsened]}, index=[language]))
    
# 2) Chi-square comparing improvement rates across languages
if len(all_tbls)>=2:
    combo = pd.concat(all_tbls)
    chi2, p, dof, exp = chi2_contingency(combo.values, correction=False)
    print("\\n[FR vs ES] Chi-square on improvement rates")
    print(combo)
    print(f"Chi2={chi2:.3f}, dof={dof}, p={p:.4g}")

# 3) Improvement vs feature-change quartiles (per language, top features by |ridge_coef|)
for language, d in all_results.items():
    lp = d["line_paired"]
    if lp.empty:
        continue
    print(f"\\n[{language.upper()}] Chi-square: Improvement vs Δ-feature quartiles")
    # pick top 3 by absolute ridge coef (from delta results if available)
    dcols = [f"d_{c}" for c in FEATURES]
    if language in results_delta:
        top = results_delta[language]["ridge_coefs"].reindex(dcols).abs().sort_values(ascending=False).head(3).index.tolist()
    else:
        top = dcols[:3]
    for col in top:
        try:
            q = pd.qcut(lp[col], 4, labels=["Q1","Q2","Q3","Q4"])
            tbl = pd.crosstab(q, (lp["d_wer"]<0).map({True:"Improved",False:"Worsened"}))
            chi2, p, dof, exp = chi2_contingency(tbl.values, correction=False)
            print(f" - {col}: Chi2={chi2:.3f}, dof={dof}, p={p:.4g}")
        except Exception as e:
            print(f" - {col}: skipped ({e})")


In [None]:
# === Mixed-effects confirmation on pooled long set ===
mixed_results = {}
for language, d in all_results.items():
    pooled = d["pooled"].copy()
    if pooled.empty:
        continue
    # Standardize features within language (pooled)
    for c in FEATURES:
        pooled[f"{c}_z"] = (pooled[c] - pooled[c].mean()) / (pooled[c].std(ddof=0) + 1e-9)
    # Build fixed effects: treatment + features + interactions (limited to top 3 to keep model stable)
    # We'll pick top 3 Δ features from delta ridge for interactions guidance, otherwise first 3.
    if language in results_delta:
        top_base = [c.replace("d_","") for c in results_delta[language]["ridge_coefs"].abs().sort_values(ascending=False).head(3).index.tolist()]
    else:
        top_base = FEATURES[:3]
    fixed_cols = ["treatment"] + [f"{c}_z" for c in FEATURES] + [f"treatment:{c}_z" for c in top_base]
    
    # Design matrices
    exog = pd.DataFrame({"Intercept":1.0}, index=pooled.index)
    for c in FEATURES:
        exog[f"{c}_z"] = pooled[f"{c}_z"]
    exog["treatment"] = pooled["treatment"].astype(int)
    for c in top_base:
        exog[f"treatment:{c}_z"] = exog["treatment"] * exog[f"{c}_z"]
    endog = pooled["log1p_wer"]
    
    # Random intercepts: speaker, country, gender (use combined groups to approximate multiple REs)
    # statsmodels MixedLM accepts one grouping var; we fit with speaker_id, then as a robustness fit with country-gender.
    for grouping in ["speaker_id", None]:
        if grouping is None:
            pooled["cg"] = pooled["country"].astype(str) + "|" + pooled["gender"].astype(str)
            grouping = "cg"
        try:
            md = MixedLM(endog, exog, groups=pooled[grouping].astype(str))
            mdf = md.fit(method="lbfgs", reml=True, maxiter=200, disp=False)
            mixed_results[(language, grouping)] = mdf
            print(f"[{language.upper()}] MixedLM fit with groups={grouping}: converged, AIC={mdf.aic:.1f}")
        except Exception as e:
            print(f"[{language.upper()}] MixedLM failed with groups={grouping}: {e}")


In [None]:
# === Simple mediation estimate (proportion explained) ===
# We estimate how much of the treatment effect (theta) is explained by feature shifts.
# Approach: compare two pooled OLS models — with and without features — and compute the
# change in the estimated treatment coefficient. Bootstrap by speaker for CI.
from tqdm.auto import tqdm

def treatment_coef(pooled, with_features=True, top_feats=None):
    df = pooled.copy()
    df["log1p_wer"] = np.log1p(df[OUTCOME])
    if with_features:
        Xcols = ["treatment"] + top_feats
    else:
        Xcols = ["treatment"]
    X = sm.add_constant(df[Xcols])
    y = df["log1p_wer"]
    model = sm.OLS(y, X).fit(cov_type="cluster", cov_kwds={"groups": df["speaker_id"].astype(str)})
    return model.params.get("treatment", np.nan)

def bootstrap_prop_mediated(pooled, B=200, top_feats=None):
    speakers = pooled["speaker_id"].astype(str).unique()
    pm = []
    for b in range(B):
        samp = np.random.choice(speakers, size=len(speakers), replace=True)
        dfb = pd.concat([pooled[pooled["speaker_id"].astype(str)==s] for s in samp], ignore_index=True)
        theta0 = treatment_coef(dfb, with_features=False, top_feats=top_feats)
        theta1 = treatment_coef(dfb, with_features=True, top_feats=top_feats)
        pm.append(1 - (theta1/theta0) if theta0!=0 else np.nan)
    pm = pd.Series(pm).dropna()
    return pm.mean(), pm.quantile([0.025, 0.975]).values

for language, d in all_results.items():
    pooled = d["pooled"].copy()
    if pooled.empty:
        continue
    # choose top features by |ridge| on deltas (fallback to all z-features if not available)
    if language in results_delta:
        top = [c.replace("d_","") for c in results_delta[language]["ridge_coefs"].abs().sort_values(ascending=False).head(4).index.tolist()]
    else:
        top = FEATURES
    top_z = [f"{c}_z" for c in top]
    # create z features
    for c in FEATURES:
        pooled[f"{c}_z"] = (pooled[c] - pooled[c].mean()) / (pooled[c].std(ddof=0) + 1e-9)
    try:
        mean_pm, (lo, hi) = bootstrap_prop_mediated(pooled, B=200, top_feats=top_z)
        print(f"[{language.upper()}] Estimated proportion of treatment effect mediated by features: {mean_pm:.2%} (95% CI {lo:.2%}–{hi:.2%})")
    except Exception as e:
        print(f"[{language.upper()}] Mediation bootstrap failed: {e}")


In [None]:
# === Quick plots (optional) ===
for language, d in all_results.items():
    lp = d["line_paired"]
    if lp.empty:
        continue
    plt.figure()
    plt.hist(lp["d_wer"].dropna(), bins=40)
    plt.title(f"{language.capitalize()}: ΔWER histogram")
    plt.xlabel("ΔWER (transformed - original)")
    plt.ylabel("Count")
    plt.show()


---

## Next steps you may customize
- Adjust the **Config** cell paths/regex to match your data layout in Cursor.
- If you want stricter causal language, keep analysis descriptive + mediation as *attribution* (not identification).
- Consider exporting tables (coefficients, chi‑square results) to CSV for your report.
