Generates **baseline noise audio** and overwrites:
- `experiment/resources/lists/base_run1.csv`
- `experiment/resources/lists/base_run2.csv`
- `experiment/resources/audio/base_run1_*.wav`
- `experiment/resources/audio/base_run2_*.wav`

**Inputs:**
- `experiment/resources/lists/con_run1.csv`, `abs_run1.csv`, `con_run2.csv`, `abs_run2.csv`
- corresponding audio files referenced in `stimFile`


In [1]:
from __future__ import annotations

import re
from pathlib import Path

import numpy as np
import pandas as pd
import librosa

try:
    import soundfile as sf
except Exception as e:
    raise RuntimeError(
        "Missing dependency: soundfile. Install in your env: pip install soundfile"
    ) from e

from scipy.signal import butter, filtfilt


In [2]:
# paths

THIS_DIR = Path.cwd().resolve()
ROOT = THIS_DIR.parents[1]  

LISTS_DIR = ROOT / "experiment" / "resources" / "lists"
AUDIO_DIR = ROOT / "experiment" / "resources" / "audio"
AUDIO_DIR.mkdir(parents=True, exist_ok=True)

LISTS_DIR, AUDIO_DIR


(WindowsPath('C:/Users/kinga/Documents/Blindbrain/4. Courses/fMRI - design of the experiment and data analysis/cognes-auditory-1back-pilot/experiment/resources/lists'),
 WindowsPath('C:/Users/kinga/Documents/Blindbrain/4. Courses/fMRI - design of the experiment and data analysis/cognes-auditory-1back-pilot/experiment/resources/audio'))

In [3]:
# config

CSV_ENCODING = "utf-8-sig"
RNG_SEED = 12345
rng = np.random.default_rng(RNG_SEED)

SR_TARGET = 48000 # fixed sampling rate across outputs

SMOOTH_HZ = 220.0 # spectral smoothing: larger -> less speech-like detail

HP_HZ = 120.0 # mild high-pass to remove "horror" low end

SSN_SMOOTH_HZ = 260.0   
SSN_HP_HZ = 320.0       
SSN_LP_HZ = 5200.0      
SSN_BP_HZ = (320.0, 5200.0)  

N_BASE_PER_RUN: int | None = None

SR_TARGET, SMOOTH_HZ, HP_HZ


(48000, 220.0, 120.0)

In [4]:
# audio helpers

def read_wav(path: Path) -> tuple[np.ndarray, int]:
    x, sr = sf.read(path, dtype="float32")
    if x.ndim > 1:
        x = x.mean(axis=1)
    return x.astype(np.float32), int(sr)

def write_wav(path: Path, x: np.ndarray, sr: int) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    x = np.asarray(x, dtype=np.float32)
    sf.write(path, x, sr, subtype="PCM_16")

def resample_if_needed(x: np.ndarray, sr: int, target_sr: int) -> tuple[np.ndarray, int]:
    if sr == target_sr:
        return x, sr
    y = librosa.resample(x, orig_sr=sr, target_sr=target_sr)
    return y.astype(np.float32), target_sr

def pad_or_trim(x: np.ndarray, target_len: int) -> np.ndarray:
    x = np.asarray(x, dtype=np.float32)
    if len(x) == target_len:
        return x
    if len(x) > target_len:
        return x[:target_len]
    pad = np.zeros(target_len - len(x), dtype=np.float32)
    return np.concatenate([x, pad])

def rms(x: np.ndarray) -> float:
    x = np.asarray(x, dtype=np.float32)
    return float(np.sqrt(np.mean(x * x) + 1e-12))

def match_rms(y: np.ndarray, target_rms: float) -> np.ndarray:
    y_rms = rms(y)
    if y_rms <= 0:
        return y
    return (y * (target_rms / y_rms)).astype(np.float32)

def hard_clip(x: np.ndarray, limit: float = 0.98) -> np.ndarray:
    return np.clip(x, -limit, limit).astype(np.float32)


In [5]:
# speech-shaped noise

def speech_shaped_noise(
    x: np.ndarray,
    sr: int,
    seed: int = 0,
    smooth_hz: float = 260.0,
    hp_hz: float | None = None,
    lp_hz: float | None = None,
    bp_hz: tuple[float, float] | None = (320.0, 5200.0),
) -> np.ndarray:
    """Generate noise shaped to a smoothed magnitude spectrum of x (FFT-based)."""
    from scipy.signal import butter, filtfilt

    rng_local = np.random.default_rng(seed)

    x = np.asarray(x, dtype=np.float32)
    x = x / (np.max(np.abs(x)) + 1e-8)

    n = len(x)

    # magnitude spectrum of speech
    X = np.fft.rfft(x)
    mag = np.abs(X)

    # smooth magnitude in frequency domain (moving average, width â‰ˆ smooth_hz)
    freqs = np.fft.rfftfreq(n, d=1.0 / sr)
    df = float(freqs[1] - freqs[0]) if len(freqs) > 1 else 1.0
    win = max(3, int(round(smooth_hz / df)))
    if win % 2 == 0:
        win += 1
    kernel = np.ones(win, dtype=np.float32) / win
    mag_s = np.convolve(mag, kernel, mode="same")

    # random-phase noise with same (smoothed) magnitude
    noise = rng_local.standard_normal(n).astype(np.float32)
    N = np.fft.rfft(noise)
    phase = np.exp(1j * np.angle(N))
    Y = mag_s * phase
    y = np.fft.irfft(Y, n=n).astype(np.float32)

    # filtering: prefer band-pass to keep it higher + not harsh
    if bp_hz is not None:
        lo, hi = bp_hz
        lo = max(10.0, float(lo))
        hi = min(float(hi), sr / 2 - 100.0)
        b_bp, a_bp = butter(2, [lo / (sr / 2), hi / (sr / 2)], btype="bandpass")
        y = filtfilt(b_bp, a_bp, y).astype(np.float32)
    else:
        if hp_hz is not None and hp_hz > 0:
            b_hp, a_hp = butter(2, float(hp_hz) / (sr / 2), btype="highpass")
            y = filtfilt(b_hp, a_hp, y).astype(np.float32)

        if lp_hz is not None and 0 < lp_hz < (sr / 2):
            b_lp, a_lp = butter(2, float(lp_hz) / (sr / 2), btype="lowpass")
            y = filtfilt(b_lp, a_lp, y).astype(np.float32)


    y = y / (np.max(np.abs(y)) + 1e-8)
    return y



In [6]:
# load con/abs lists (read-only) 

def load_list(path: Path) -> pd.DataFrame:
    if not path.exists():
        raise FileNotFoundError(f"Missing list: {path}")
    df = pd.read_csv(path, encoding="utf-8-sig")
    for col in ["stimFile", "word"]:
        if col not in df.columns:
            raise ValueError(f"{path} must contain column '{col}'")
    return df

con1 = load_list(LISTS_DIR / "con_run1.csv")
abs1 = load_list(LISTS_DIR / "abs_run1.csv")
con2 = load_list(LISTS_DIR / "con_run2.csv")
abs2 = load_list(LISTS_DIR / "abs_run2.csv")

len(con1), len(abs1), len(con2), len(abs2)


(24, 24, 24, 24)

In [7]:
# audio paths referenced by stimFile

def resolve_audio(stim_file: str) -> Path:
    stim_file = str(stim_file).replace("\\", "/").strip()
    return ROOT / "experiment" / stim_file

def get_audio_paths(df: pd.DataFrame) -> list[Path]:
    paths = [resolve_audio(s) for s in df["stimFile"].tolist()]
    missing = [p for p in paths if not p.exists()]
    if missing:
        raise FileNotFoundError(f"Missing audio files (first 5): {missing[:5]}")
    return paths

con1_audio = get_audio_paths(con1)
abs1_audio = get_audio_paths(abs1)
con2_audio = get_audio_paths(con2)
abs2_audio = get_audio_paths(abs2)

con1_audio[0]


WindowsPath('C:/Users/kinga/Documents/Blindbrain/4. Courses/fMRI - design of the experiment and data analysis/cognes-auditory-1back-pilot/experiment/resources/audio/con_run1_001.wav')

In [8]:
# target length per run (match existing stimuli)

def max_len_samples(paths: list[Path]) -> int:
    mx = 0
    for p in paths:
        x, sr = read_wav(p)
        x, sr = resample_if_needed(x, sr, SR_TARGET)
        mx = max(mx, len(x))
    return int(mx)

target_len_run1 = max_len_samples(con1_audio + abs1_audio)
target_len_run2 = max_len_samples(con2_audio + abs2_audio)

target_len_run1, target_len_run2


(72000, 72000)

In [9]:
# build baseline per trial

def pick_sources(con_paths: list[Path], abs_paths: list[Path], n: int, rng_local: np.random.Generator):
    """Pick ~50/50 CON/ABS source files without replacement."""
    n_con = n // 2
    n_abs = n - n_con

    con_sel = rng_local.choice(con_paths, size=min(n_con, len(con_paths)), replace=False).tolist()
    abs_sel = rng_local.choice(abs_paths, size=min(n_abs, len(abs_paths)), replace=False).tolist()

    chosen = [("CON", p) for p in con_sel] + [("ABS", p) for p in abs_sel]

    if len(chosen) < n:
        pool = [("CON", p) for p in con_paths if p not in con_sel] + [("ABS", p) for p in abs_paths if p not in abs_sel]
        rng_local.shuffle(pool)
        chosen += pool[: (n - len(chosen))]

    rng_local.shuffle(chosen)
    return chosen[:n]


def generate_baseline_for_run(
    run: int,
    con_paths: list[Path],
    abs_paths: list[Path],
    target_len: int,
    out_list_path: Path,
    out_audio_prefix: str,
    n_trials: int,
    seed: int,
) -> pd.DataFrame:
    rng_local = np.random.default_rng(seed)
    chosen = pick_sources(con_paths, abs_paths, n_trials, rng_local)

    rows = []
    for i, (src_cond, src_path) in enumerate(chosen, start=1):
        x, sr = read_wav(src_path)
        x, sr = resample_if_needed(x, sr, SR_TARGET)
        x = pad_or_trim(x, target_len)

        y = speech_shaped_noise(
            x,
            sr=sr,
            seed=seed + i,
            smooth_hz=SSN_SMOOTH_HZ,
            bp_hz=SSN_BP_HZ,
        )


        y = match_rms(y, target_rms=rms(x))
        y = y * 0.45
        y = pad_or_trim(y, target_len)

        y = hard_clip(y, limit=0.98)

        out_wav_name = f"{out_audio_prefix}_{i:03d}.wav"
        out_wav_path = AUDIO_DIR / out_wav_name
        write_wav(out_wav_path, y, sr)

        rows.append(
            {
                "word": "<BASELINE>",
                "condition": "BASE",
                "run": run,
                "stimFile": f"resources/audio/{out_wav_name}",
            }
        )


    base_df = pd.DataFrame(rows)

    # zapis do PsychoPy: tylko 4 kolumny
    base_df[["word", "condition", "run", "stimFile"]].to_csv(
        out_list_path, index=False, encoding=CSV_ENCODING
    )

    return base_df



n_base1 = int(N_BASE_PER_RUN if N_BASE_PER_RUN is not None else len(con1))
n_base2 = int(N_BASE_PER_RUN if N_BASE_PER_RUN is not None else len(con2))

base1_df = generate_baseline_for_run(
    run=1,
    con_paths=con1_audio,
    abs_paths=abs1_audio,
    target_len=target_len_run1,
    out_list_path=LISTS_DIR / "base_run1.csv",
    out_audio_prefix="base_run1",
    n_trials=n_base1,
    seed=RNG_SEED,
)

base2_df = generate_baseline_for_run(
    run=2,
    con_paths=con2_audio,
    abs_paths=abs2_audio,
    target_len=target_len_run2,
    out_list_path=LISTS_DIR / "base_run2.csv",
    out_audio_prefix="base_run2",
    n_trials=n_base2,
    seed=RNG_SEED + 1,
)

base1_df.head(), base2_df.head()


(         word condition  run                           stimFile
 0  <BASELINE>      BASE    1  resources/audio/base_run1_001.wav
 1  <BASELINE>      BASE    1  resources/audio/base_run1_002.wav
 2  <BASELINE>      BASE    1  resources/audio/base_run1_003.wav
 3  <BASELINE>      BASE    1  resources/audio/base_run1_004.wav
 4  <BASELINE>      BASE    1  resources/audio/base_run1_005.wav,
          word condition  run                           stimFile
 0  <BASELINE>      BASE    2  resources/audio/base_run2_001.wav
 1  <BASELINE>      BASE    2  resources/audio/base_run2_002.wav
 2  <BASELINE>      BASE    2  resources/audio/base_run2_003.wav
 3  <BASELINE>      BASE    2  resources/audio/base_run2_004.wav
 4  <BASELINE>      BASE    2  resources/audio/base_run2_005.wav)

In [10]:
def list_audio(prefix: str):
    return sorted([p.name for p in AUDIO_DIR.glob(prefix)])

print("Run1 BASE wavs:", len(list_audio("base_run1_*.wav")))
print("Run2 BASE wavs:", len(list_audio("base_run2_*.wav")))
print("Saved lists:")
print(" -", LISTS_DIR / "base_run1.csv")
print(" -", LISTS_DIR / "base_run2.csv")

p = AUDIO_DIR / list_audio("base_run1_*.wav")[0]
x, sr = read_wav(p)
print("Example baseline:", p.name, "sr=", sr, "len_s=", len(x)/sr)

for label, df_ in [("Run1", base1_df), ("Run2", base2_df)]:
    if "source_condition" in df_.columns:
        print(label, "counterbalance:")
        print(df_["source_condition"].value_counts())
    else:
        print(label, "counterbalance: (source_condition not stored)")



Run1 BASE wavs: 24
Run2 BASE wavs: 24
Saved lists:
 - C:\Users\kinga\Documents\Blindbrain\4. Courses\fMRI - design of the experiment and data analysis\cognes-auditory-1back-pilot\experiment\resources\lists\base_run1.csv
 - C:\Users\kinga\Documents\Blindbrain\4. Courses\fMRI - design of the experiment and data analysis\cognes-auditory-1back-pilot\experiment\resources\lists\base_run2.csv
Example baseline: base_run1_001.wav sr= 48000 len_s= 1.5
Run1 counterbalance: (source_condition not stored)
Run2 counterbalance: (source_condition not stored)
