In [8]:
# Post-Mix I/O Layer — Notebook Version
# - WAV-focused (robust float32 pipeline; no external deps)
# - Safe loading/saving, resampling, preview slicing, hashing, path helpers
# - Designed to later lift into .py modules with minimal changes
#
# Note: This cell only defines functions/classes. No files are written.

from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import Optional, Tuple, Dict, Any
import os
import math
import hashlib
import numpy as np
from scipy.io import wavfile
from scipy import signal
from datetime import datetime


# ----------------------------- Dataclasses -----------------------------

@dataclass
class AudioBuffer:
    """In-memory audio container standardized to float32 in [-1, 1]."""
    sr: int
    samples: np.ndarray  # shape: (N,) mono or (N, 2) stereo (float32)
    path: Optional[str] = None
    meta: Optional[Dict[str, Any]] = None

    @property
    def n_samples(self) -> int:
        return int(self.samples.shape[0])

    @property
    def n_channels(self) -> int:
        return 1 if self.samples.ndim == 1 else int(self.samples.shape[1])

    @property
    def duration_s(self) -> float:
        return float(self.n_samples) / float(self.sr if self.sr else 1)

    @property
    def peak(self) -> float:
        return float(np.max(np.abs(self.samples))) if self.n_samples else 0.0

    @property
    def rms(self) -> float:
        return float(np.sqrt(np.mean(np.square(self.samples)))) if self.n_samples else 0.0

    def summary(self) -> Dict[str, Any]:
        return {
            "sr": self.sr,
            "channels": self.n_channels,
            "duration_s": round(self.duration_s, 3),
            "peak": round(self.peak, 6),
            "rms": round(self.rms, 6),
            "path": self.path,
            "meta": self.meta or {},
        }


# ----------------------------- Helpers -----------------------------

def _to_float32(x: np.ndarray) -> np.ndarray:
    """Convert common PCM/float types to float32 in [-1, 1]."""
    if x.dtype == np.int16:
        y = x.astype(np.float32) / 32768.0
    elif x.dtype == np.int32:
        y = x.astype(np.float32) / 2147483648.0
    elif x.dtype == np.uint8:
        # 8-bit WAV is offset binary [0..255]
        y = (x.astype(np.float32) - 128.0) / 128.0
    elif x.dtype in (np.float32, np.float64):
        y = x.astype(np.float32)
        # Assume already -1..1 but sanitize below
    else:
        raise ValueError(f"Unsupported WAV dtype: {x.dtype}")
    # Sanitize
    y = np.nan_to_num(y, nan=0.0, posinf=0.0, neginf=0.0)
    # Clip extreme outliers for safety
    y = np.clip(y, -1.0, 1.0)
    return y


def _ensure_stereo(x: np.ndarray) -> np.ndarray:
    if x.ndim == 1:
        return np.stack([x, x], axis=-1)
    if x.shape[1] == 1:
        return np.repeat(x, 2, axis=1)
    return x


def _downmix_mono(x: np.ndarray) -> np.ndarray:
    return x if x.ndim == 1 else np.mean(x, axis=1).astype(np.float32)


def tpdf_dither_16bit(x: np.ndarray) -> np.ndarray:
    """Triangular PDF dithering to 16-bit PCM."""
    x = np.clip(x, -1.0, 1.0)
    lsb = 1.0 / 32768.0
    noise = (np.random.rand(*x.shape).astype(np.float32) - np.random.rand(*x.shape).astype(np.float32)) * lsb
    y = x + noise
    y = np.clip(y, -1.0, 1.0)
    return np.int16(np.round(y * 32767.0))


def resample_poly(x: np.ndarray, sr_in: int, sr_out: int) -> Tuple[np.ndarray, int]:
    """High-quality rational resampling with polyphase filtering."""
    if sr_in == sr_out:
        return x, sr_in
    gcd = math.gcd(sr_in, sr_out)
    up = sr_out // gcd
    down = sr_in // gcd
    y = signal.resample_poly(x, up, down, axis=0 if x.ndim > 1 else 0)
    return y.astype(np.float32), sr_out


def ensure_dir(path: str) -> None:
    """Create parent dir for a file path if missing."""
    os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True)


def with_suffix(input_path: str, suffix: str, ext: Optional[str] = None) -> str:
    """Returns a new path by inserting a suffix before extension (or switching ext)."""
    d, base = os.path.split(input_path)
    stem, old_ext = os.path.splitext(base)
    new_ext = ext if ext else old_ext
    name = f"{stem}{suffix}{new_ext}"
    return os.path.join(d, name)


def sha256_file(path: str, chunk_size: int = 1 << 20) -> str:
    """Compute SHA-256 for reproducibility tracking."""
    h = hashlib.sha256()
    with open(path, "rb") as f:
        while True:
            b = f.read(chunk_size)
            if not b:
                break
            h.update(b)
    return h.hexdigest()


def slice_preview(buf: AudioBuffer, start_s: float = 0.0, dur_s: float = 30.0) -> AudioBuffer:
    """Return a window for fast preview rendering."""
    n0 = max(0, int(start_s * buf.sr))
    n1 = min(buf.n_samples, n0 + int(dur_s * buf.sr))
    sl = buf.samples[n0:n1].copy()
    return AudioBuffer(sr=buf.sr, samples=sl, path=buf.path, meta={"slice": [start_s, dur_s], **(buf.meta or {})})


# ----------------------------- I/O API -----------------------------

def load_wav(path: str,
             target_sr: Optional[int] = None,
             mono: bool = False,
             sanitize: bool = True) -> AudioBuffer:
    """Load a WAV into float32[-1,1], optional resample & mono downmix."""
    sr, data = wavfile.read(path)
    y = _to_float32(data)
    if mono:
        y = _downmix_mono(y)
    # Resample if needed
    if target_sr is not None and target_sr != sr:
        y, sr = resample_poly(y, sr, target_sr)
    # Final sanitize
    if sanitize:
        y = np.nan_to_num(y, nan=0.0, posinf=0.0, neginf=0.0)
        y = np.clip(y, -1.0, 1.0)
    # Pack
    meta = {
        "loaded_at": datetime.utcnow().isoformat() + "Z",
        "source_sha256": sha256_file(path),
        "dtype_in": str(data.dtype),
        "channels_in": 1 if data.ndim == 1 else int(data.shape[1]),
    }
    return AudioBuffer(sr=sr, samples=y, path=os.path.abspath(path), meta=meta)


def save_wav(path: str,
             buf: AudioBuffer | np.ndarray,
             sr: Optional[int] = None,
             bitdepth: str = "float32",
             dither_16bit: bool = True) -> str:
    """Save an AudioBuffer or ndarray to WAV. Supports float32 or int16 PCM output."""
    if isinstance(buf, AudioBuffer):
        y = buf.samples
        sr_out = buf.sr
    else:
        if sr is None:
            raise ValueError("When saving a raw ndarray, 'sr' must be provided.")
        y = np.asarray(buf, dtype=np.float32)
        sr_out = sr

    ensure_dir(path)
    y = np.nan_to_num(y, nan=0.0, posinf=0.0, neginf=0.0)
    y = np.clip(y, -1.0, 1.0).astype(np.float32)

    if bitdepth == "float32":
        wavfile.write(path, sr_out, y.astype(np.float32))
    elif bitdepth == "int16":
        pcm16 = tpdf_dither_16bit(y) if dither_16bit else np.int16(np.round(np.clip(y, -1, 1) * 32767.0))
        wavfile.write(path, sr_out, pcm16)
    else:
        raise ValueError("bitdepth must be 'float32' or 'int16'")

    return os.path.abspath(path)


# ----------------------------- Convenience Summaries -----------------------------

def print_audio_summary(buf: AudioBuffer, name: str = "Audio"):
    s = buf.summary()
    print(f"{name}: sr={s['sr']} | ch={s['channels']} | dur={s['duration_s']}s | peak={s['peak']} | rms={s['rms']}")
    if buf.path:
        print(f"  path: {buf.path}")
    if buf.meta:
        if "source_sha256" in buf.meta:
            print(f"  sha256: {buf.meta['source_sha256'][:16]}...")
        if "dtype_in" in buf.meta:
            print(f"  src dtype: {buf.meta['dtype_in']} | src ch: {buf.meta.get('channels_in')}")


def auto_out_path(input_path: str, stage: str, ext: str = ".wav") -> str:
    """Create a standardized output filename like 'song__stage.wav' in same folder."""
    d, base = os.path.split(input_path)
    stem, _ = os.path.splitext(base)
    name = f"{stem}__{stage}{ext}"
    return os.path.join(d, name)


print("Post-Mix I/O layer loaded: AudioBuffer, load_wav, save_wav, resample_poly, slice_preview, with_suffix, auto_out_path, sha256_file, print_audio_summary.")


Post-Mix I/O layer loaded: AudioBuffer, load_wav, save_wav, resample_poly, slice_preview, with_suffix, auto_out_path, sha256_file, print_audio_summary.


In [51]:
%%writefile myscript.py

UsageError: %%writefile is a cell magic, but the cell body is empty.


In [52]:
# Post-Mix I/O — Convenience helpers for notebook workflows
# Adds:
# - batch_load_wavs (accepts glob or list)
# - run workspace scaffolding (timestamped work dir tree)
# - manifest creation/update (JSON)
# - artifact registration (copy + hash + metadata)
# - small utilities for copying, naming, and environment capture
#
# This cell *extends* the I/O layer previously loaded.


from __future__ import annotations
from dataclasses import dataclass, asdict, field
from typing import Optional, Tuple, Dict, Any, List, Iterable, Union
import os, glob, json, shutil, sys, platform
from datetime import datetime
import numpy as np

# --- Workspace paths model ---
@dataclass
class RunPaths:
    root: str
    inputs: str
    work: str
    outputs: str
    reports: str

def _timestamp() -> str:
    return datetime.utcnow().strftime("%Y%m%d-%H%M%S")

def make_workspace(base_dir: str = "./postmix_runs", project: str = "default", slug: Optional[str] = None) -> RunPaths:
    ts = _timestamp()
    slug_part = f"_{slug}" if slug else ""
    root = os.path.abspath(os.path.join(base_dir, f"{project}_{ts}{slug_part}"))
    paths = RunPaths(
        root=root,
        inputs=os.path.join(root, "inputs"),
        work=os.path.join(root, "work"),
        outputs=os.path.join(root, "outputs"),
        reports=os.path.join(root, "reports"),
    )
    for p in asdict(paths).values():
        os.makedirs(p, exist_ok=True)
    print(f"Workspace created at: {paths.root}")
    return paths

# --- Batch load WAVs ---
def batch_load_wavs(paths: Union[str, Iterable[str]], target_sr: Optional[int] = None, mono: bool = False) -> Dict[str, AudioBuffer]:
    """
    Load multiple WAVs. 'paths' can be a glob pattern ('/path/*.wav') or an iterable of paths.
    Returns dict: {stem: AudioBuffer}
    """
    if isinstance(paths, str):
        file_list = sorted(glob.glob(paths))
    else:
        file_list = list(paths)
    if not file_list:
        print("No files matched.")
        return {}

    buffers: Dict[str, AudioBuffer] = {}
    for p in file_list:
        try:
            buf = load_wav(p, target_sr=target_sr, mono=mono)
            stem = os.path.splitext(os.path.basename(p))[0]
            buffers[stem] = buf
            print_audio_summary(buf, name=stem)
        except Exception as e:
            print(f"Failed to load {p}: {e}")
    return buffers

# --- Environment capture (for reproducibility) ---
def env_fingerprint() -> Dict[str, Any]:
    return {
        "python": sys.version.split()[0],
        "platform": platform.platform(),
        "numpy": np.__version__,
        # scipy and others may be present from earlier cells
        "scipy": __import__("scipy").__version__ if "scipy" in sys.modules else None,
        "timestamp_utc": datetime.utcnow().isoformat() + "Z",
    }

# --- Manifest model ---
@dataclass
class Manifest:
    project: str
    workspace: RunPaths
    inputs: List[Dict[str, Any]] = field(default_factory=list)     # list of audio inputs and hashes
    params: Dict[str, Any] = field(default_factory=dict)           # top-level run params (optional)
    artifacts: List[Dict[str, Any]] = field(default_factory=list)  # produced files
    env: Dict[str, Any] = field(default_factory=env_fingerprint)

    def to_dict(self) -> Dict[str, Any]:
        d = asdict(self)
        # Expand dataclass paths as dict
        d["workspace"] = asdict(self.workspace)
        return d

def manifest_path(paths: RunPaths) -> str:
    return os.path.join(paths.root, "manifest.json")

def write_manifest(man: Manifest) -> str:
    path = manifest_path(man.workspace)
    with open(path, "w") as f:
        json.dump(man.to_dict(), f, indent=2)
    print(f"Manifest written: {path}")
    return path

def read_manifest(path: str) -> Dict[str, Any]:
    with open(path, "r") as f:
        return json.load(f)

# --- Artifact registration ---
def copy_into(dst_dir: str, src_path: str, new_name: Optional[str] = None) -> str:
    os.makedirs(dst_dir, exist_ok=True)
    base = new_name if new_name else os.path.basename(src_path)
    dst = os.path.join(dst_dir, base)
    shutil.copy2(src_path, dst)
    return os.path.abspath(dst)

def register_input(man: Manifest, path: str, alias: Optional[str] = None) -> Dict[str, Any]:
    info = {
        "alias": alias or os.path.splitext(os.path.basename(path))[0],
        "path": os.path.abspath(path),
        "sha256": sha256_file(path),
    }
    man.inputs.append(info)
    return info

def register_artifact(man: Manifest, file_path: str, kind: str, params: Optional[Dict[str, Any]] = None, stage: Optional[str] = None) -> Dict[str, Any]:
    rec = {
        "kind": kind,                       # e.g., "premaster", "master_landr", "stream_spotify"
        "stage": stage,                     # optional string label
        "path": os.path.abspath(file_path),
        "sha256": sha256_file(file_path),
        "params": params or {},
        "created_at": datetime.utcnow().isoformat() + "Z",
    }
    man.artifacts.append(rec)
    return rec

# --- Convenience: bring source mix into workspace/inputs ---
def import_mix(paths: RunPaths, source_path: str, alias: Optional[str] = None) -> str:
    dst_name = (alias or os.path.basename(source_path))
    dst = copy_into(paths.inputs, source_path, new_name=dst_name)
    print(f"Imported mix → {dst}")
    return dst

print("Convenience helpers loaded: make_workspace, batch_load_wavs, env_fingerprint, Manifest, write/read_manifest, register_input, register_artifact, import_mix.")


Convenience helpers loaded: make_workspace, batch_load_wavs, env_fingerprint, Manifest, write/read_manifest, register_input, register_artifact, import_mix.


### Analysis layer

In [10]:
# Analysis Layer — Notebook Version
# Robust audio analysis utilities for post-mix:
# - Health checks (DC, peak/RMS, true-peak approx, headroom, NaN/Inf)
# - Loudness (K-weighted momentary/short-term + approx integrated LUFS)
# - Dynamics proxies (crest, short-term distribution, DR proxy)
# - Spectrum & band energy (bass/air %), spectral flatness
# - Stereo metrics (phase correlation, width proxy, mid/side peaks)
# - Plots: spectrum, short-term loudness, waveform excerpt (matplotlib; no seaborn)
#
# Designed to integrate with the earlier I/O layer (AudioBuffer, load_wav).

from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import Dict, Any, Optional, Tuple
import numpy as np
import pandas as pd
from scipy import signal
from scipy.io import wavfile
import matplotlib.pyplot as plt

try:
    from caas_jupyter_tools import display_dataframe_to_user
except Exception:
    display_dataframe_to_user = None


# ----------------------------- Utilities -----------------------------

def _lin_to_db(x: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    return 20.0 * np.log10(np.maximum(eps, np.abs(x)))

def _db_to_lin(db: float) -> float:
    return 10.0 ** (db / 20.0)

def _ensure_stereo(x: np.ndarray) -> np.ndarray:
    if x.ndim == 1:
        return np.stack([x, x], axis=-1)
    if x.shape[1] == 1:
        return np.repeat(x, 2, axis=1)
    return x

def _mono(x: np.ndarray) -> np.ndarray:
    return x if x.ndim == 1 else np.mean(x, axis=1)

def _sanitize(x: np.ndarray) -> np.ndarray:
    return np.clip(np.nan_to_num(x, nan=0.0, posinf=0.0, neginf=0.0), -1.0, 1.0)


# ----------------------------- True Peak (approx) -----------------------------

def true_peak_dbfs(x: np.ndarray, sr: int, oversample: int = 4) -> float:
    """
    Approximate true peak by oversampling with polyphase resampling and taking the max.
    """
    x = _sanitize(x)
    x_os = signal.resample_poly(x, oversample, 1, axis=0 if x.ndim > 1 else 0)
    tp = float(np.max(np.abs(x_os)))
    return float(_lin_to_db(np.array([tp]))[0])


# ----------------------------- K-Weighting (BS.1770-style) -----------------------------

def _k_weighting_sos(sr: int):
    """
    Return SOS for the K-weighting pre-filter + high-frequency shelf per ITU-R BS.1770.
    Using standard bilinear transforms for the defined z-plane filters.
    """
    # High-pass (2nd order) at 38 Hz (pre-filter)
    f_hp = 38.0
    # High-shelf (2nd order) with +4 dB above ~1 kHz
    f_shelf = 1681.974450955533
    Q_shelf = 0.7071752369554196
    gain_db = 3.99984385397  # ~+4 dB

    # HPF
    sos_hp = signal.butter(2, f_hp/(sr*0.5), btype='highpass', output='sos')

    # High-shelf (RBJ biquad in SOS form)
    A = 10**(gain_db/40.0)
    w0 = 2*np.pi*f_shelf/sr
    alpha = np.sin(w0)/(2*Q_shelf)
    cosw0 = np.cos(w0)
    b0 =    A*((A+1) + (A-1)*cosw0 + 2*np.sqrt(A)*alpha)
    b1 = -2*A*((A-1) + (A+1)*cosw0)
    b2 =    A*((A+1) + (A-1)*cosw0 - 2*np.sqrt(A)*alpha)
    a0 =        (A+1) - (A-1)*cosw0 + 2*np.sqrt(A)*alpha
    a1 =    2*((A-1) - (A+1)*cosw0)
    a2 =        (A+1) - (A-1)*cosw0 - 2*np.sqrt(A)*alpha
    sos_shelf = signal.tf2sos([b0/a0, b1/a0, b2/a0], [1.0, a1/a0, a2/a0])

    return np.vstack([sos_hp, sos_shelf])

def k_weight(x: np.ndarray, sr: int) -> np.ndarray:
    """Apply K-weighting to mono signal."""
    sos = _k_weighting_sos(sr)
    return signal.sosfilt(sos, x)

def lufs_momentary(x_mono: np.ndarray, sr: int, window_s: float = 0.4) -> Tuple[np.ndarray, np.ndarray]:
    """
    400 ms momentary LUFS approximation (BS.1770 weighting, no gating).
    Returns (time_axis, lufs_array).
    """
    xk = k_weight(x_mono, sr)
    win = int(max(1, round(window_s * sr)))
    # mean square via moving average
    kernel = np.ones(win, dtype=np.float32) / float(win)
    ms = np.convolve(xk**2, kernel, mode='same')
    lufs = -0.691 + 10.0 * np.log10(np.maximum(1e-12, ms))  # -0.691 is the BS.1770 absolute scale offset
    t = np.arange(len(lufs)) / sr
    return t, lufs

def lufs_integrated_approx(x_mono: np.ndarray, sr: int) -> float:
    """
    Very lightweight integrated LUFS approximation (K-weighted, no gating).
    For streaming normalization preview, this is often sufficient.
    """
    xk = k_weight(x_mono, sr)
    ms = np.mean(xk**2)
    lufs = -0.691 + 10.0 * np.log10(np.maximum(1e-12, ms))
    return float(lufs)


# ----------------------------- Spectrum & Bands -----------------------------

def spectrum_db(mono: np.ndarray, sr: int, n_fft: int = 1<<16) -> Tuple[np.ndarray, np.ndarray]:
    seg = mono[:min(len(mono), n_fft)]
    if len(seg) < 2048:
        pad = np.zeros(2048, dtype=seg.dtype)
        pad[:len(seg)] = seg
        seg = pad
    win = np.hanning(len(seg))
    sp = np.fft.rfft(seg * win)
    freqs = np.fft.rfftfreq(len(seg), 1/sr)
    mag_db = _lin_to_db(np.abs(sp))
    return freqs, mag_db

def band_energy_percent(mono: np.ndarray, sr: int, f_lo: float, f_hi: float) -> float:
    n = 1<<16
    seg = mono[:min(len(mono), n)]
    win = np.hanning(len(seg))
    sp = np.fft.rfft(seg * win)
    freqs = np.fft.rfftfreq(len(seg), 1/sr)
    power = (np.abs(sp)**2)
    total = np.sum(power) + 1e-20
    mask = (freqs >= f_lo) & (freqs < f_hi)
    band = np.sum(power[mask])
    return float(100.0 * band / total)

def spectral_flatness(mono: np.ndarray, sr: int) -> float:
    n = 1<<14
    seg = mono[:min(len(mono), n)]
    win = np.hanning(len(seg))
    sp = np.abs(np.fft.rfft(seg * win)) + 1e-12
    geo = np.exp(np.mean(np.log(sp)))
    ari = np.mean(sp)
    return float(np.clip(geo / ari, 0.0, 1.0))


# ----------------------------- Stereo Metrics -----------------------------

def stereo_metrics(x: np.ndarray) -> Dict[str, float]:
    x = _ensure_stereo(x)
    L = x[:, 0]; R = x[:, 1]
    # Phase correlation: mean of normalized instantaneous product
    denom = np.maximum(1e-12, np.sqrt(L**2) * np.sqrt(R**2))
    corr = float(np.mean((L * R) / denom))
    # Width proxy using mid/side
    M = 0.5 * (L + R); S = 0.5 * (L - R)
    width = float(np.mean(np.abs(S)) / (np.mean(np.abs(M)) + 1e-12))
    return {
        "phase_correlation": float(np.clip(corr, -1.0, 1.0)),
        "stereo_width": width,
        "mid_peak_db": float(_lin_to_db(np.array([np.max(np.abs(M))]))[0]),
        "side_peak_db": float(_lin_to_db(np.array([np.max(np.abs(S))]))[0]),
    }


# ----------------------------- Health & Dynamics -----------------------------

def health_metrics(x: np.ndarray, sr: int) -> Dict[str, float]:
    mono = _mono(x)
    dc = float(np.mean(mono))
    dc_db = float(_lin_to_db(np.array([abs(dc) if abs(dc) > 0 else 1e-12]))[0])
    peak = float(np.max(np.abs(mono)))
    peak_db = float(_lin_to_db(np.array([peak]))[0])
    rms = float(np.sqrt(np.mean(mono**2)))
    rms_db = float(_lin_to_db(np.array([rms]))[0])
    crest = peak_db - rms_db
    sub_pct = band_energy_percent(mono, sr, 0.0, 30.0)
    return {
        "peak_dbfs": peak_db,
        "rms_dbfs": rms_db,
        "crest_db": crest,
        "dc_offset": dc,
        "dc_dbfs": dc_db,
        "sub_30Hz_%": sub_pct,
    }

def short_term_loudness(mono: np.ndarray, sr: int, win_s: float = 3.0, hop_s: float = 0.5) -> Tuple[np.ndarray, np.ndarray]:
    win = int(max(2, round(win_s * sr)))
    hop = int(max(1, round(hop_s * sr)))
    kernel = np.ones(win, dtype=np.float32) / float(win)
    pow_sig = mono**2
    rms = np.sqrt(np.maximum(1e-20, np.convolve(pow_sig, kernel, mode="same")))
    idx = np.arange(0, len(mono), hop)
    return idx / sr, _lin_to_db(rms[idx])

def dr_proxy(mono: np.ndarray, sr: int) -> float:
    t, st = short_term_loudness(mono, sr, win_s=3.0, hop_s=0.5)
    return float(np.percentile(st, 95) - np.percentile(st, 10))


# ----------------------------- Analyzer Facade -----------------------------

@dataclass
class AnalysisReport:
    sr: int
    duration_s: float
    basic: Dict[str, float]
    stereo: Dict[str, float]
    lufs_integrated: float
    true_peak_dbfs: float
    bass_energy_pct: float
    air_energy_pct: float
    spectral_flatness: float

    def to_dataframe(self) -> pd.DataFrame:
        rows = []
        rows.append({"metric": "sr", "value": self.sr})
        rows.append({"metric": "duration_s", "value": self.duration_s})
        for k, v in self.basic.items():
            rows.append({"metric": k, "value": v})
        for k, v in self.stereo.items():
            rows.append({"metric": k, "value": v})
        rows.append({"metric": "lufs_integrated", "value": self.lufs_integrated})
        rows.append({"metric": "true_peak_dbfs", "value": self.true_peak_dbfs})
        rows.append({"metric": "bass_energy_%", "value": self.bass_energy_pct})
        rows.append({"metric": "air_energy_%", "value": self.air_energy_pct})
        rows.append({"metric": "spectral_flatness", "value": self.spectral_flatness})
        return pd.DataFrame(rows)

def analyze_audio_array(x: np.ndarray, sr: int) -> AnalysisReport:
    x = _sanitize(x)
    x = _ensure_stereo(x)
    mono = _mono(x)
    dur_s = len(mono) / sr

    basic = health_metrics(x, sr)
    stereo = stereo_metrics(x)
    tp = true_peak_dbfs(x, sr, oversample=4)
    bass_pct = band_energy_percent(mono, sr, 20.0, 120.0)
    air_pct = band_energy_percent(mono, sr, 8000.0, sr/2)
    flat = spectral_flatness(mono, sr)
    lufs_i = lufs_integrated_approx(mono, sr)

    return AnalysisReport(
        sr=sr,
        duration_s=dur_s,
        basic=basic,
        stereo=stereo,
        lufs_integrated=lufs_i,
        true_peak_dbfs=tp,
        bass_energy_pct=bass_pct,
        air_energy_pct=air_pct,
        spectral_flatness=flat,
    )

def analyze_wav(path: str, target_sr: Optional[int] = None) -> AnalysisReport:
    sr, data = wavfile.read(path)
    x = data.astype(np.float32)
    if data.dtype == np.int16:
        x = data.astype(np.float32) / 32768.0
    elif data.dtype == np.int32:
        x = data.astype(np.float32) / 2147483648.0
    elif data.dtype == np.uint8:
        x = (data.astype(np.float32) - 128.0) / 128.0
    # resample if requested
    if target_sr and target_sr != sr:
        gcd = np.gcd(sr, target_sr)
        x = signal.resample_poly(x, target_sr//gcd, sr//gcd, axis=0 if x.ndim > 1 else 0)
        sr = target_sr
    return analyze_audio_array(x, sr)


# ----------------------------- Plot Helpers -----------------------------

def plot_spectrum(path_or_array, sr: Optional[int] = None, fmax: float = 20000.0):
    if isinstance(path_or_array, str):
        rep = analyze_wav(path_or_array)
        sr0, data = wavfile.read(path_or_array)
        x = data.astype(np.float32)
        if data.dtype == np.int16:
            x = data.astype(np.float32) / 32768.0
        elif data.dtype == np.int32:
            x = data.astype(np.float32) / 2147483648.0
        elif data.dtype == np.uint8:
            x = (data.astype(np.float32) - 128.0) / 128.0
        mono = _mono(x)
        freqs, mag_db = spectrum_db(mono, sr0)
    else:
        x = path_or_array
        assert sr is not None, "When passing an array, provide sr."
        mono = _mono(x)
        freqs, mag_db = spectrum_db(mono, sr)
        rep = None

    mask = freqs <= fmax
    plt.figure()
    plt.plot(freqs[mask], mag_db[mask])
    plt.xlabel("Frequency (Hz)")
    plt.ylabel("Magnitude (dB)")
    plt.title("Magnitude Spectrum")
    plt.show()
    return rep

def plot_short_term_loudness(path_or_array, sr: Optional[int] = None, win_s: float = 3.0, hop_s: float = 0.5):
    if isinstance(path_or_array, str):
        sr0, data = wavfile.read(path_or_array)
        x = data.astype(np.float32)
        if data.dtype == np.int16:
            x = data.astype(np.float32) / 32768.0
        elif data.dtype == np.int32:
            x = data.astype(np.float32) / 2147483648.0
        elif data.dtype == np.uint8:
            x = (data.astype(np.float32) - 128.0) / 128.0
        mono = _mono(x)
        t, st = short_term_loudness(mono, sr0, win_s=win_s, hop_s=hop_s)
    else:
        x = path_or_array
        assert sr is not None, "When passing an array, provide sr."
        mono = _mono(x)
        t, st = short_term_loudness(mono, sr, win_s=win_s, hop_s=hop_s)

    plt.figure()
    plt.plot(t, st)
    plt.xlabel("Time (s)")
    plt.ylabel("Short-term RMS (dBFS)")
    plt.title("Short-term Loudness")
    plt.show()

def plot_waveform_excerpt(path_or_array, sr: Optional[int] = None, start_s: float = 0.0, dur_s: float = 10.0):
    if isinstance(path_or_array, str):
        sr0, data = wavfile.read(path_or_array)
        x = data.astype(np.float32)
        if data.dtype == np.int16:
            x = data.astype(np.float32) / 32768.0
        elif data.dtype == np.int32:
            x = data.astype(np.float32) / 2147483648.0
        elif data.dtype == np.uint8:
            x = (data.astype(np.float32) - 128.0) / 128.0
        sr = sr0
    else:
        x = path_or_array
        assert sr is not None, "When passing an array, provide sr."

    x = _ensure_stereo(x)
    n0 = int(start_s * sr); n1 = int((start_s + dur_s) * sr)
    n1 = min(n1, x.shape[0])
    t = np.arange(n0, n1) / sr
    mono = _mono(x[n0:n1, :]) if x.ndim > 1 else _mono(x[n0:n1])

    plt.figure()
    plt.plot(t, mono)
    plt.xlabel("Time (s)")
    plt.ylabel("Amplitude (mono)")
    plt.title(f"Waveform Excerpt ({start_s:.1f}s–{start_s+dur_s:.1f}s)")
    plt.show()


# ----------------------------- Tabular Summary -----------------------------

def analysis_table(report: AnalysisReport, name: str = "Track Analysis") -> pd.DataFrame:
    df = report.to_dataframe()
    if display_dataframe_to_user:
        display_dataframe_to_user(name, df)
    else:
        print(df.to_string(index=False))
    return df

print("Analysis layer loaded: analyze_wav/analyze_audio_array, analysis_table, plot_spectrum, plot_short_term_loudness, plot_waveform_excerpt, LUFS approx, true-peak approx, stereo & health metrics.")


Analysis layer loaded: analyze_wav/analyze_audio_array, analysis_table, plot_spectrum, plot_short_term_loudness, plot_waveform_excerpt, LUFS approx, true-peak approx, stereo & health metrics.


### DSP Primitives Layer

In [11]:
# DSP Primitives Layer — Notebook Implementation
# Reusable, low-level DSP blocks for building post‑mix features.
# Dependencies: numpy, scipy.signal
#
# All functions accept/return numpy arrays:
# - audio: shape (N,) mono or (N,2) stereo
# - sr: sample rate (int)
#
# Notes:
# - Uses numerically-stable SOS filters where applicable
# - Sanitizes NaN/Inf and clamps frequencies to safe ranges
# - Stereo‑aware (processes both channels consistently)
#
# ---------------------------------------------------------------------

from __future__ import annotations
import numpy as np
from scipy import signal
from typing import Tuple

# --------- Core helpers ---------

def _sanitize(x: np.ndarray) -> np.ndarray:
    """Replace NaN/Inf and clamp extreme outliers to avoid IIR blowups."""
    return np.clip(np.nan_to_num(x, nan=0.0, posinf=0.0, neginf=0.0), -4.0, 4.0).astype(np.float32)

def _ensure_stereo(x: np.ndarray) -> np.ndarray:
    if x.ndim == 1:
        return np.stack([x, x], axis=-1)
    if x.shape[1] == 1:
        return np.repeat(x, 2, axis=1)
    return x

def _mono(x: np.ndarray) -> np.ndarray:
    return x if x.ndim == 1 else np.mean(x, axis=1)

def _safe_freq(f: float, sr: int, lo: float = 10.0, hi_ratio: float = 0.49) -> float:
    """Clamp frequency to a safe absolute Hz range based on sample rate."""
    return float(max(lo, min(f, hi_ratio * sr)))

def _db_to_lin(db: float) -> float:
    return 10.0 ** (db / 20.0)

def _lin_to_db(lin: float, eps: float = 1e-12) -> float:
    return 20.0 * np.log10(max(eps, abs(lin)))

# --------- Gain & leveling ---------

def apply_gain_db(audio: np.ndarray, db: float) -> np.ndarray:
    """Apply linear gain in dB (stereo‑safe)."""
    g = _db_to_lin(db)
    return (_sanitize(audio) * g).astype(np.float32)

def measure_peak(audio: np.ndarray) -> float:
    """Return peak linear amplitude (mono‑collapsed)."""
    x = _sanitize(audio)
    x = _mono(x)
    return float(np.max(np.abs(x)))

def measure_rms(audio: np.ndarray) -> float:
    """Return RMS (linear) on mono‑collapsed signal."""
    x = _sanitize(audio)
    x = _mono(x)
    return float(np.sqrt(np.mean(x**2)))

def normalize_peak(audio: np.ndarray, target_dbfs: float = -1.0, eps: float = 1e-9) -> np.ndarray:
    """Scale so that the absolute peak ≈ target dBFS."""
    x = _sanitize(audio)
    peak = float(np.max(np.abs(x)))
    if peak < eps:
        return np.zeros_like(x, dtype=np.float32)
    g = _db_to_lin(target_dbfs) / peak
    return (x * g).astype(np.float32)

# --------- K‑weighting & LUFS (approx) ---------

def _k_weighting_sos(sr: int) -> np.ndarray:
    """SOS for ITU‑R BS.1770 K‑weighting: 2nd‑order HPF (~38 Hz) + 2nd‑order high‑shelf (~+4 dB @ 1 kHz)."""
    f_hp = 38.0
    sos_hp = signal.butter(2, _safe_freq(f_hp, sr)/(sr*0.5), btype='highpass', output='sos')
    # High‑shelf (RBJ)
    f_shelf = 1681.974450955533
    Q_shelf = 0.7071752369554196
    gain_db = 3.99984385397
    A = 10**(gain_db/40.0)
    w0 = 2*np.pi*f_shelf/sr
    alpha = np.sin(w0)/(2*Q_shelf)
    cosw0 = np.cos(w0)
    b0 =    A*((A+1) + (A-1)*cosw0 + 2*np.sqrt(A)*alpha)
    b1 = -2*A*((A-1) + (A+1)*cosw0)
    b2 =    A*((A+1) + (A-1)*cosw0 - 2*np.sqrt(A)*alpha)
    a0 =        (A+1) - (A-1)*cosw0 + 2*np.sqrt(A)*alpha
    a1 =    2*((A-1) - (A+1)*cosw0)
    a2 =        (A+1) - (A-1)*cosw0 - 2*np.sqrt(A)*alpha
    sos_shelf = signal.tf2sos([b0/a0, b1/a0, b2/a0], [1.0, a1/a0, a2/a0])
    return np.vstack([sos_hp, sos_shelf])

def k_weight(audio: np.ndarray, sr: int) -> np.ndarray:
    """Apply K‑weighting to mono signal (array)."""
    x = _sanitize(audio)
    x = _mono(x)
    sos = _k_weighting_sos(sr)
    return signal.sosfilt(sos, x).astype(np.float32)

def lufs_integrated_approx(audio: np.ndarray, sr: int) -> float:
    """Lightweight integrated LUFS (BS.1770 K‑weighting, no gating)."""
    xk = k_weight(audio, sr)
    ms = float(np.mean(xk**2))
    return -0.691 + 10.0 * np.log10(max(1e-12, ms))

def normalize_lufs(audio: np.ndarray, sr: int, target_lufs: float = -14.0) -> np.ndarray:
    """Normalize integrated loudness to target LUFS (approximate, no gating)."""
    x = _sanitize(audio)
    current = lufs_integrated_approx(x, sr)
    delta = target_lufs - current  # dB to add
    return apply_gain_db(x, delta)

# --------- Filtering (SOS) ---------

def highpass_filter(audio: np.ndarray, sr: int, cutoff_hz: float, order: int = 4) -> np.ndarray:
    x = _sanitize(audio)
    sos = signal.butter(order, _safe_freq(cutoff_hz, sr)/(sr*0.5), btype='highpass', output='sos')
    return signal.sosfilt(sos, x, axis=0 if x.ndim > 1 else 0).astype(np.float32)

def lowpass_filter(audio: np.ndarray, sr: int, cutoff_hz: float, order: int = 4) -> np.ndarray:
    x = _sanitize(audio)
    sos = signal.butter(order, _safe_freq(cutoff_hz, sr)/(sr*0.5), btype='lowpass', output='sos')
    return signal.sosfilt(sos, x, axis=0 if x.ndim > 1 else 0).astype(np.float32)

def bandpass_filter(audio: np.ndarray, sr: int, f_lo: float, f_hi: float, order: int = 4) -> np.ndarray:
    x = _sanitize(audio)
    lo = _safe_freq(f_lo, sr)
    hi = _safe_freq(f_hi, sr)
    if hi <= lo:  # enforce valid band
        hi = min(max(lo * 1.2, lo + 5.0), 0.49 * sr)
    sos = signal.butter(order, [lo/(sr*0.5), hi/(sr*0.5)], btype='bandpass', output='sos')
    return signal.sosfilt(sos, x, axis=0 if x.ndim > 1 else 0).astype(np.float32)

# --------- Shelving & Parametric EQ (RBJ biquads -> SOS) ---------

def _biquad_peaking_sos(sr: int, f0: float, gain_db: float, Q: float = 0.707) -> np.ndarray:
    A = 10**(gain_db/40.0)
    w0 = 2*np.pi*_safe_freq(f0, sr)/sr
    alpha = np.sin(w0)/(2*Q)
    cosw0 = np.cos(w0)
    b0 = 1 + alpha*A
    b1 = -2*cosw0
    b2 = 1 - alpha*A
    a0 = 1 + alpha/A
    a1 = -2*cosw0
    a2 = 1 - alpha/A
    b = np.array([b0, b1, b2])/a0
    a = np.array([1.0, a1/a0, a2/a0])
    return signal.tf2sos(b, a)

def _biquad_lowshelf_sos(sr: int, f0: float, gain_db: float, S: float = 0.5) -> np.ndarray:
    A = 10**(gain_db/40.0)
    w0 = 2*np.pi*_safe_freq(f0, sr)/sr
    cosw0 = np.cos(w0); sinw0 = np.sin(w0)
    alpha = sinw0/2 * np.sqrt((A + 1/A)*(1/S - 1) + 2)
    b0 =    A*((A+1) - (A-1)*cosw0 + 2*np.sqrt(A)*alpha)
    b1 =  2*A*((A-1) - (A+1)*cosw0)
    b2 =    A*((A+1) - (A-1)*cosw0 - 2*np.sqrt(A)*alpha)
    a0 =        (A+1) + (A-1)*cosw0 + 2*np.sqrt(A)*alpha
    a1 =   -2*((A-1) + (A+1)*cosw0)
    a2 =        (A+1) + (A-1)*cosw0 - 2*np.sqrt(A)*alpha
    b = np.array([b0, b1, b2])/a0
    a = np.array([1.0, a1/a0, a2/a0])
    return signal.tf2sos(b, a)

def _biquad_highshelf_sos(sr: int, f0: float, gain_db: float, S: float = 0.5) -> np.ndarray:
    A = 10**(gain_db/40.0)
    w0 = 2*np.pi*_safe_freq(f0, sr)/sr
    cosw0 = np.cos(w0); sinw0 = np.sin(w0)
    alpha = sinw0/2 * np.sqrt((A + 1/A)*(1/S - 1) + 2)
    b0 =    A*((A+1) + (A-1)*cosw0 + 2*np.sqrt(A)*alpha)
    b1 = -2*A*((A-1) + (A+1)*cosw0)
    b2 =    A*((A+1) + (A-1)*cosw0 - 2*np.sqrt(A)*alpha)
    a0 =        (A+1) - (A-1)*cosw0 + 2*np.sqrt(A)*alpha
    a1 =    2*((A-1) - (A+1)*cosw0)
    a2 =        (A+1) - (A-1)*cosw0 - 2*np.sqrt(A)*alpha
    b = np.array([b0, b1, b2])/a0
    a = np.array([1.0, a1/a0, a2/a0])
    return signal.tf2sos(b, a)

def peaking_eq(audio: np.ndarray, sr: int, f0: float, gain_db: float, Q: float = 0.707) -> np.ndarray:
    """Parametric peaking EQ at f0 with gain_db and quality Q."""
    x = _sanitize(audio)
    sos = _biquad_peaking_sos(sr, f0, gain_db, Q)
    return signal.sosfilt(sos, x, axis=0 if x.ndim > 1 else 0).astype(np.float32)

def shelf_filter(audio: np.ndarray, sr: int, cutoff_hz: float, gain_db: float, kind: str = "low", S: float = 0.5) -> np.ndarray:
    """Low/High shelf EQ using RBJ biquad. kind ∈ {'low','high'}"""
    x = _sanitize(audio)
    if kind == "low":
        sos = _biquad_lowshelf_sos(sr, cutoff_hz, gain_db, S)
    elif kind == "high":
        sos = _biquad_highshelf_sos(sr, cutoff_hz, gain_db, S)
    else:
        raise ValueError("kind must be 'low' or 'high'")
    return signal.sosfilt(sos, x, axis=0 if x.ndim > 1 else 0).astype(np.float32)

def notch_filter(audio: np.ndarray, sr: int, f0: float, Q: float = 10.0) -> np.ndarray:
    """Narrow notch (peaking with large negative gain)."""
    # Implement via iirnotch for convenience
    w0 = _safe_freq(f0, sr)/(sr*0.5)
    b, a = signal.iirnotch(w0, Q)
    sos = signal.tf2sos(b, a)
    return signal.sosfilt(sos, _sanitize(audio), axis=0 if audio.ndim > 1 else 0).astype(np.float32)

def tilt_eq(audio: np.ndarray, sr: int, pivot_hz: float = 1000.0, gain_db: float = 1.5) -> np.ndarray:
    """Simple 'tilt' EQ via two wide peaks (approximate): low cut + high lift around pivot."""
    x = peaking_eq(audio, sr, f0=max(80.0, pivot_hz/5), gain_db=-gain_db/2, Q=0.7)
    x = peaking_eq(x, sr, f0=min(sr*0.45, pivot_hz*5), gain_db=+gain_db/2, Q=0.7)
    return x.astype(np.float32)

# --------- Mid/Side & Stereo ---------

def mid_side_encode(audio: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """Return (M, S) mid/side from stereo input."""
    x = _ensure_stereo(_sanitize(audio))
    L, R = x[:,0], x[:,1]
    M = 0.5 * (L + R)
    S = 0.5 * (L - R)
    return M.astype(np.float32), S.astype(np.float32)

def mid_side_decode(M: np.ndarray, S: np.ndarray) -> np.ndarray:
    """Return stereo from (M, S)."""
    L = M + S
    R = M - S
    return np.column_stack([L, R]).astype(np.float32)

def stereo_widener(audio: np.ndarray, width: float = 1.0) -> np.ndarray:
    """
    Adjust stereo width by scaling the side channel.
    width = 1.0 unchanged; >1 wider; <1 narrower.
    """
    x = _ensure_stereo(_sanitize(audio))
    M, S = mid_side_encode(x)
    y = mid_side_decode(M, S * float(width))
    return y.astype(np.float32)

# --------- Dynamics ---------

def _envelope_detector(mono: np.ndarray, sr: int, attack_ms: float = 10.0, release_ms: float = 100.0) -> np.ndarray:
    a_a = np.exp(-1.0 / ((attack_ms/1000.0) * sr))
    a_r = np.exp(-1.0 / ((release_ms/1000.0) * sr))
    env = np.zeros_like(mono, dtype=np.float32)
    prev = 0.0
    for i, v in enumerate(np.abs(mono)):
        if v > prev:
            prev = a_a*prev + (1 - a_a)*v
        else:
            prev = a_r*prev + (1 - a_r)*v
        env[i] = prev
    return env

def compressor(audio: np.ndarray, sr: int,
               threshold_db: float = -18.0, ratio: float = 2.0,
               attack_ms: float = 15.0, release_ms: float = 120.0,
               makeup_db: float = 0.0, knee_db: float = 3.0,
               link_stereo: bool = True) -> np.ndarray:
    """
    Feed‑forward compressor with soft knee. If link_stereo=True, uses shared gain for both channels.
    """
    x = _ensure_stereo(_sanitize(audio))
    M = _mono(x) if link_stereo else None
    if link_stereo:
        env = _envelope_detector(M, sr, attack_ms, release_ms)
    else:
        envL = _envelope_detector(x[:,0], sr, attack_ms, release_ms)
        envR = _envelope_detector(x[:,1], sr, attack_ms, release_ms)

    thr = _db_to_lin(threshold_db)
    knee = _db_to_lin(max(0.0, knee_db))

    def gain_curve(env_val: float) -> float:
        e = env_val
        if e <= thr / knee:
            g = 1.0
        elif e <= thr * knee:
            edb = _lin_to_db(e)
            over = max(0.0, edb - threshold_db)
            comp_db = over - (over / ratio)
            g = _db_to_lin(-comp_db)
        else:
            edb = _lin_to_db(e)
            over = edb - threshold_db
            comp_db = over - (over / ratio)
            g = _db_to_lin(-comp_db)
        return g

    if link_stereo:
        gains = np.array([gain_curve(v) for v in env], dtype=np.float32)
        y = np.column_stack([x[:,0]*gains, x[:,1]*gains]).astype(np.float32)
    else:
        gainsL = np.array([gain_curve(v) for v in envL], dtype=np.float32)
        gainsR = np.array([gain_curve(v) for v in envR], dtype=np.float32)
        y = np.column_stack([x[:,0]*gainsL, x[:,1]*gainsR]).astype(np.float32)

    if makeup_db != 0.0:
        y = apply_gain_db(y, makeup_db)
    return y.astype(np.float32)

def transient_shaper(audio: np.ndarray, sr: int,
                     attack_gain_db: float = 0.0, sustain_gain_db: float = 0.0,
                     split_hz: float = 4000.0) -> np.ndarray:
    """
    Simple transient shaper: high‑band emphasizes transient (attack), low‑band controls sustain.
    Not a full envelope‑splitter, but useful as a primitive.
    """
    x = _ensure_stereo(_sanitize(audio))
    # Split into low/high
    low = lowpass_filter(x, sr, cutoff_hz=split_hz, order=2)
    high = x - low
    # Envelope of high band ~ transients proxy
    env_high = _envelope_detector(_mono(high), sr, attack_ms=2.0, release_ms=50.0)
    att = apply_gain_db(high, attack_gain_db)
    sus = apply_gain_db(low, sustain_gain_db)
    y = att + sus
    return y.astype(np.float32)

# --------- Fades ---------

def fade_in(audio: np.ndarray, sr: int, dur_s: float = 0.01) -> np.ndarray:
    x = _sanitize(audio)
    n = int(max(1, dur_s * sr))
    env = np.linspace(0.0, 1.0, n, dtype=np.float32)
    y = x.copy()
    if x.ndim == 1:
        y[:n] *= env
    else:
        y[:n, :] *= env[:, None]
    return y.astype(np.float32)

def fade_out(audio: np.ndarray, sr: int, dur_s: float = 0.01) -> np.ndarray:
    x = _sanitize(audio)
    n = int(max(1, dur_s * sr))
    env = np.linspace(1.0, 0.0, n, dtype=np.float32)
    y = x.copy()
    if x.ndim == 1:
        y[-n:] *= env
    else:
        y[-n:, :] *= env[:, None]
    return y.astype(np.float32)

print("DSP Primitives Layer loaded:")
print("- Gain/level: apply_gain_db, normalize_peak, normalize_lufs, measure_peak, measure_rms")
print("- Filters: highpass_filter, lowpass_filter, bandpass_filter, shelf_filter, peaking_eq, notch_filter, tilt_eq")
print("- Stereo: mid_side_encode, mid_side_decode, stereo_widener")
print("- Dynamics: compressor (soft‑knee), transient_shaper")
print("- Fades: fade_in, fade_out")
print("- K‑weighting/LUFS approx: k_weight, lufs_integrated_approx")


DSP Primitives Layer loaded:
- Gain/level: apply_gain_db, normalize_peak, normalize_lufs, measure_peak, measure_rms
- Filters: highpass_filter, lowpass_filter, bandpass_filter, shelf_filter, peaking_eq, notch_filter, tilt_eq
- Stereo: mid_side_encode, mid_side_decode, stereo_widener
- Dynamics: compressor (soft‑knee), transient_shaper
- Fades: fade_in, fade_out
- K‑weighting/LUFS approx: k_weight, lufs_integrated_approx


### Processors (Feature Macros) layer

In [12]:
# Processors (Feature Macros) — Notebook Layer
# High-level, musical “dials” built on top of DSP primitives.
# - Bass (low-shelf + optional dynamic control)
# - Punch (kick/bass tightening via low-band ducking)
# - Clarity (mud dip around 160–250 Hz)
# - Air (HF shelf)
# - Width (M/S scaling with safety)
# - Pre‑master Prep (DC/sub cleanup + headroom target)
# - Dial mapping 0–100 → safe internal params
# - Fast preview via one-time preprocess cache
#
# Assumes the DSP primitives cell has been executed already.

from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import Dict, Any, Optional, Tuple
import numpy as np

# ---------- Dial mapping helpers ----------

def _map(amount: float, a0: float, a1: float) -> float:
    """Linear map amount (0..100) to [a0..a1]."""
    amt = float(np.clip(amount, 0.0, 100.0))
    return a0 + (a1 - a0) * (amt / 100.0)

def _exp_map(amount: float, a0: float, a1: float) -> float:
    """Exponential-ish feel (more resolution at low values)."""
    amt = float(np.clip(amount, 0.0, 100.0)) / 100.0
    t = amt**1.6
    return a0 + (a1 - a0) * t

# ---------- Feature Macros (stateless) ----------

def make_bassier(x: np.ndarray, sr: int, amount: float,
                 base_hz: float = 80.0, max_db: float = 6.0,
                 dynamic: bool = False) -> Tuple[np.ndarray, Dict[str, Any]]:
    """
    Bass dial: low-shelf at ~80 Hz. amount=0..100 → 0..max_db dB.
    If dynamic=True, add mild low-band compression to control boom.
    """
    gain_db = _exp_map(amount, 0.0, max_db)  # gentle taper
    y = shelf_filter(x, sr, cutoff_hz=base_hz, gain_db=gain_db, kind="low", S=0.5)
    params = {"bass_gain_db": round(gain_db, 2), "bass_hz": base_hz}
    if dynamic and gain_db > 0.5:
        # compress lows below ~120 Hz slightly (ratio 1.5–2.0)
        low = lowpass_filter(y, sr, cutoff_hz=120.0, order=4)
        low_c = compressor(low, sr, threshold_db=-28.0, ratio=1.8, attack_ms=10, release_ms=120, makeup_db=0.0)
        y = y - low + low_c
        params["bass_dynamic"] = True
    else:
        params["bass_dynamic"] = False
    return y.astype(np.float32), params

def make_punchier(x: np.ndarray, sr: int, amount: float,
                  kick_lo: float = 40.0, kick_hi: float = 110.0,
                  low_cutoff: float = 120.0) -> Tuple[np.ndarray, Dict[str, Any]]:
    """
    Punch dial: duck non-kick lows under the kick envelope. amount 0..100 → 0..~6 dB depth.
    """
    # Depth 0..6 dB, attack 3..6 ms, release 80..140 ms
    depth_db = _map(amount, 0.0, 6.0)
    atk_ms = _map(amount, 6.0, 3.0)
    rel_ms = _map(amount, 80.0, 140.0)  # a bit longer release for higher amounts

    # Build bands & envelope (single-pass per call; for fast preview use cache below)
    low = lowpass_filter(x, sr, cutoff_hz=low_cutoff, order=4)
    kick = bandpass_filter(x, sr, f_lo=kick_lo, f_hi=kick_hi, order=4)
    nonkick_low = low - kick

    env = _envelope_detector(_mono(kick), sr, attack_ms=4.0, release_ms=90.0)
    # Normalize envelope to 0..1 robustly
    p95 = np.percentile(env, 95) if env.size else 0.0
    if p95 <= 1e-9:
        gain_curve = np.ones_like(env, dtype=np.float32)
    else:
        env = np.clip(env / p95, 0.0, 1.0)
        # Map to gain curve
        floor_gain = 10**(-abs(depth_db)/20.0)
        gain_curve = (floor_gain + (1.0 - floor_gain) * (1.0 - env)).astype(np.float32)
    gain_curve = gain_curve[:, None]

    ducked_nonkick = nonkick_low * gain_curve
    lows_tight = ducked_nonkick + kick
    high = x - low
    y = lows_tight + high

    params = {"punch_depth_db": round(depth_db, 2), "kick_lo": kick_lo, "kick_hi": kick_hi, "low_cutoff": low_cutoff,
              "attack_ms": round(atk_ms, 1), "release_ms": round(rel_ms, 1)}
    return y.astype(np.float32), params

def reduce_mud(x: np.ndarray, sr: int, amount: float,
               mud_hz_center: float = 200.0) -> Tuple[np.ndarray, Dict[str, Any]]:
    """
    Clarity dial: dip around 160–250 Hz. amount 0..100 → 0..3 dB cut.
    """
    cut_db = -_exp_map(amount, 0.0, 3.0)
    hz = _map(amount, 180.0, 230.0)  # shift center slightly with amount
    y = peaking_eq(x, sr, f0=hz, gain_db=cut_db, Q=1.0)
    params = {"mud_cut_db": round(cut_db, 2), "mud_hz": round(hz, 1)}
    return y.astype(np.float32), params

def add_air(x: np.ndarray, sr: int, amount: float) -> Tuple[np.ndarray, Dict[str, Any]]:
    """
    Air dial: high-shelf at ~10 kHz. amount 0..100 → 0..4 dB.
    """
    db = _exp_map(amount, 0.0, 4.0)
    y = shelf_filter(x, sr, cutoff_hz=10000.0, gain_db=db, kind="high", S=0.5)
    return y.astype(np.float32), {"air_db": round(db, 2), "air_hz": 10000.0}

def widen_stereo(x: np.ndarray, sr: int, amount: float) -> Tuple[np.ndarray, Dict[str, Any]]:
    """
    Width dial: scale side channel 1.0..1.4 while guarding mono-compat (soft limit).
    """
    width = _map(amount, 1.0, 1.4)
    # Soft limit if correlation is already low
    M, S = mid_side_encode(x)
    # Estimate correlation quickly
    denom = np.maximum(1e-9, np.sqrt(M**2) * np.sqrt(S**2))
    corr_est = float(np.mean((M * S) / denom))
    if corr_est < 0.15:
        width = min(width, 1.2)  # avoid over-wide if already decorrelated
    y = mid_side_decode(M, S * width)
    return y.astype(np.float32), {"width_factor": round(width, 3)}

def premaster_prep(x: np.ndarray, sr: int,
                   target_peak_dbfs: float = -6.0,
                   hpf_hz: float = 20.0) -> Tuple[np.ndarray, Dict[str, Any]]:
    """
    Pre-master prep: gentle 20 Hz HPF + peak normalization to -6 dBFS.
    """
    y = highpass_filter(x, sr, cutoff_hz=hpf_hz, order=2)
    y = normalize_peak(y, target_dbfs=target_peak_dbfs)
    return y.astype(np.float32), {"hpf_hz": hpf_hz, "target_peak_dbfs": target_peak_dbfs}

# ---------- Fast preview via preprocess cache ----------

@dataclass
class PreviewCache:
    sr: int
    high: np.ndarray        # > low_cutoff
    low: np.ndarray         # < low_cutoff
    kick: np.ndarray        # kick band
    env01: np.ndarray       # normalized kick envelope 0..1
    low_cutoff: float
    kick_lo: float
    kick_hi: float

def build_preview_cache(x: np.ndarray, sr: int,
                        low_cutoff: float = 120.0,
                        kick_lo: float = 40.0,
                        kick_hi: float = 110.0) -> PreviewCache:
    low = lowpass_filter(x, sr, cutoff_hz=low_cutoff, order=4)
    high = x - low
    kick = bandpass_filter(x, sr, f_lo=kick_lo, f_hi=kick_hi, order=4)
    env = _envelope_detector(_mono(kick), sr, attack_ms=4.0, release_ms=90.0)
    p95 = np.percentile(env, 95) if env.size else 0.0
    env01 = np.zeros_like(env, dtype=np.float32) if p95 <= 1e-9 else np.clip(env / p95, 0.0, 1.0).astype(np.float32)
    return PreviewCache(sr=sr, high=high.astype(np.float32), low=low.astype(np.float32),
                        kick=kick.astype(np.float32), env01=env01,
                        low_cutoff=low_cutoff, kick_lo=kick_lo, kick_hi=kick_hi)

# Patch: fix render_from_cache width call (was returning a (array, params) tuple)
# Now we correctly unpack the tuple so Y stays a numpy array.

def render_from_cache(cache: PreviewCache,
                      bass_amount: float = 0.0,
                      punch_amount: float = 0.0,
                      clarity_amount: float = 0.0,
                      air_amount: float = 0.0,
                      width_amount: float = 0.0,
                      target_peak_dbfs: Optional[float] = None) -> Tuple[np.ndarray, Dict[str, Any]]:
    """
    Millisecond re-render from cached bands/envelope for interactive dials.
    """
    # Start from separate bands
    M = cache.low.copy()
    H = cache.high.copy()
    K = cache.kick.copy()
    nonkick = M - K

    # Bass: scale low band (acts like a shelf)
    bass_db = _exp_map(bass_amount, 0.0, 6.0)
    M = M * (10**(bass_db/20.0))

    # Punch: duck non-kick lows with precomputed envelope
    depth_db = _map(punch_amount, 0.0, 6.0)
    floor_gain = 10**(-abs(depth_db)/20.0)
    g_sc = (floor_gain + (1.0 - floor_gain) * (1.0 - cache.env01)).astype(np.float32)
    ducked_nonkick = nonkick * g_sc[:, None]
    lows_tight = ducked_nonkick + K

    Y = lows_tight + H

    # Clarity: light peaking dip ~200 Hz (approximate via single biquad now)
    if clarity_amount > 0.0:
        cut_db = -_exp_map(clarity_amount, 0.0, 3.0)
        Y = peaking_eq(Y, cache.sr, f0=_map(clarity_amount, 180.0, 230.0), gain_db=cut_db, Q=1.0)

    # Air: high shelf ~10 kHz
    if air_amount > 0.0:
        air_db = _exp_map(air_amount, 0.0, 4.0)
        Y = shelf_filter(Y, cache.sr, cutoff_hz=10000.0, gain_db=air_db, kind="high", S=0.5)

    # Width: side scaling (correctly unpack tuple)
    if width_amount > 0.0:
        Y, _params_w = widen_stereo(Y, cache.sr, amount=width_amount)

    params = {
        "bass_db": round(bass_db, 2),
        "punch_depth_db": round(depth_db, 2),
        "clarity_db": round(-_exp_map(clarity_amount, 0.0, 3.0), 2) if clarity_amount>0 else 0.0,
        "air_db": round(_exp_map(air_amount, 0.0, 4.0), 2) if air_amount>0 else 0.0,
        "width_amount": round(width_amount, 2),
    }

    if target_peak_dbfs is not None:
        Y = normalize_peak(Y, target_dbfs=target_peak_dbfs)

    return Y.astype(np.float32), params

print("Patched: render_from_cache now unpacks widen_stereo tuple correctly.")


print("Processors (Feature Macros) loaded: make_bassier, make_punchier, reduce_mud, add_air, widen_stereo, premaster_prep, build_preview_cache, render_from_cache.")


Patched: render_from_cache now unpacks widen_stereo tuple correctly.
Processors (Feature Macros) loaded: make_bassier, make_punchier, reduce_mud, add_air, widen_stereo, premaster_prep, build_preview_cache, render_from_cache.


### REDNER ENGINE

In [18]:
# Render Engine — Notebook Layer
# Requires previous cells (I/O, Analysis, DSP Primitives, Processors) to be loaded.

from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import Optional, Dict, Any, List, Tuple
import os
import numpy as np
import soundfile as sf

# ---- Dial state (what the user controls) ----

@dataclass
class DialState:
    bass: float = 0.0     # 0..100
    punch: float = 0.0    # 0..100
    clarity: float = 0.0  # 0..100
    air: float = 0.0      # 0..100
    width: float = 0.0    # 0..100

@dataclass
class PreprocessConfig:
    low_cutoff: float = 120.0
    kick_lo: float = 40.0
    kick_hi: float = 110.0

@dataclass
class RenderOptions:
    target_peak_dbfs: Optional[float] = -1.0   # normalize peak at the end; set None to skip
    hpf_hz: Optional[float] = None             # optional extra HPF before normalize (None = skip)
    bit_depth: str = "PCM_24"                  # "PCM_24" (recommended), "FLOAT", "PCM_16"
    save_headroom_first: bool = False          # if True, do a premaster-style -6 dB pass before dials

# ---- Render Engine ----

class RenderEngine:
    def __init__(self, x: np.ndarray, sr: int, preprocess: Optional[PreprocessConfig] = None):
        """
        x: input stereo/mono numpy array
        sr: sample rate
        preprocess: parameters for cache building (bands + kick envelope)
        """
        self.x = x.astype(np.float32)
        self.sr = int(sr)
        self.pre_cfg = preprocess or PreprocessConfig()
        self.cache = None  # filled by self.preprocess()
    
    def preprocess(self) -> Dict[str, Any]:
        """Build fast preview cache (low/high split, kick band, envelope)."""
        self.cache = build_preview_cache(
            self.x, self.sr,
            low_cutoff=self.pre_cfg.low_cutoff,
            kick_lo=self.pre_cfg.kick_lo,
            kick_hi=self.pre_cfg.kick_hi
        )
        return {
            "sr": self.sr,
            "n_samples": int(self.x.shape[0]),
            "low_cutoff": self.pre_cfg.low_cutoff,
            "kick_lo": self.pre_cfg.kick_lo,
            "kick_hi": self.pre_cfg.kick_hi
        }
    
    def _ensure_cache(self):
        if self.cache is None:
            self.preprocess()

    # ---------- PREVIEW ----------
    def preview(self,
                dials: DialState,
                start_s: float = 0.0,
                dur_s: Optional[float] = 30.0,
                opts: Optional[RenderOptions] = None) -> Tuple[np.ndarray, Dict[str, Any]]:
        """
        Fast preview using cached bands/envelope. Optional time window.
        Returns (audio_preview, params).
        """
        self._ensure_cache()
        opts = opts or RenderOptions()
        
        # window the cached arrays (no re-filtering)
        n0 = int(max(0, start_s * self.sr))
        n1 = int(self.cache.high.shape[0]) if dur_s is None else int(min(self.cache.high.shape[0], n0 + dur_s * self.sr))
        
        # build a temporary mini-cache slice for fast render
        slice_cache = type(self.cache)(
            sr=self.cache.sr,
            high=self.cache.high[n0:n1],
            low=self.cache.low[n0:n1],
            kick=self.cache.kick[n0:n1],
            env01=self.cache.env01[n0:n1],
            low_cutoff=self.cache.low_cutoff,
            kick_lo=self.cache.kick_lo,
            kick_hi=self.cache.kick_hi
        )
        
        y, params = render_from_cache(
            slice_cache,
            bass_amount=dials.bass,
            punch_amount=dials.punch,
            clarity_amount=dials.clarity,
            air_amount=dials.air,
            width_amount=dials.width,
            target_peak_dbfs=opts.target_peak_dbfs
        )
        return y, params

    # ---------- COMMIT (FULL RENDER) ----------
    def commit(self,
               out_path: str,
               dials: DialState,
               opts: Optional[RenderOptions] = None) -> Dict[str, Any]:
        """
        Full-length render using the precomputed cache (fast) and export to disk.
        Returns a dict of render metadata.
        """
        self._ensure_cache()
        opts = opts or RenderOptions()
        
        # optional premaster headroom first (use the primitive so it HPFs and normalizes)
        x_work = self.x
        pre_meta = None
        if opts.save_headroom_first:
            x_work, pre_meta = premaster_prep(x_work, self.sr, target_peak_dbfs=-6.0, hpf_hz=20.0)

            # If we premastered first, rebuild cache so dials work on the premastered signal
            self.x = x_work
            self.preprocess()

        # dial render over the *full* cache
        y, params = render_from_cache(
            self.cache,
            bass_amount=dials.bass,
            punch_amount=dials.punch,
            clarity_amount=dials.clarity,
            air_amount=dials.air,
            width_amount=dials.width,
            target_peak_dbfs=opts.target_peak_dbfs
        )
        
        # optional extra HPF after dials
        if opts.hpf_hz is not None:
            y = highpass_filter(y, self.sr, cutoff_hz=opts.hpf_hz, order=2)

        # export
        os.makedirs(os.path.dirname(os.path.abspath(out_path)), exist_ok=True)
        subtype = {
            "PCM_24": "PCM_24",
            "PCM_16": "PCM_16",
            "FLOAT": "FLOAT"
        }.get(opts.bit_depth.upper(), "PCM_24")
        sf.write(out_path, y, self.sr, subtype=subtype)

        meta = {
            "sr": self.sr,
            "samples": int(y.shape[0]),
            "dials": asdict(dials),
            "preprocess": asdict(self.pre_cfg),
            "params": params,
            "options": asdict(opts),
            "out_path": os.path.abspath(out_path),
            "bit_depth": subtype
        }
        if pre_meta:
            meta["premaster_first"] = pre_meta
        return meta

    # ---------- BATCH VARIANTS ----------
    def commit_variants(self,
                        base_outdir: str,
                        variants: List[Tuple[str, DialState]],
                        opts: Optional[RenderOptions] = None) -> List[Dict[str, Any]]:
        """
        Render multiple named variants and return their metadata.
        variants: list of (name, DialState)
        """
        results = []
        for name, d in variants:
            out_path = os.path.join(base_outdir, f"{name}.wav")
            info = self.commit(out_path, dials=d, opts=opts)
            results.append(info)
        return results


### Pre-Master Prep

In [None]:
# --- Pre-Master Prep Layer ---

import numpy as np
import soundfile as sf

def sanitize_audio(x: np.ndarray) -> np.ndarray:
    """Replace NaN/Inf and clamp extreme outliers."""
    return np.clip(np.nan_to_num(x, nan=0.0, posinf=0.0, neginf=0.0),
                   -4.0, 4.0).astype(np.float32)

def highpass_filter(audio: np.ndarray, sr: int, cutoff_hz: float = 20.0, order: int = 2) -> np.ndarray:
    """Gentle high-pass to clear subsonics."""
    from scipy.signal import butter, filtfilt
    b, a = butter(order, cutoff_hz / (sr/2.0), btype='highpass')
    return filtfilt(b, a, audio, axis=0)

def normalize_peak(audio: np.ndarray, target_dbfs: float = -6.0, eps: float = 1e-9) -> tuple[np.ndarray, float]:
    """Scale so peak hits target_dbfs. Returns (scaled_audio, applied_gain_db)."""
    x = sanitize_audio(audio)
    peak = float(np.max(np.abs(x)))
    if peak < eps:
        return x, 0.0
    current_db = 20 * np.log10(peak + eps)
    gain_db = target_dbfs - current_db
    gain_lin = 10 ** (gain_db/20)
    return (x * gain_lin).astype(np.float32), gain_db

def premaster_prep(audio: np.ndarray, sr: int,
                   target_peak_dbfs: float = -6.0,
                   hpf_hz: float = 20.0) -> tuple[np.ndarray, dict]:
    """Do full pre-master prep: sanitize, HPF, normalize to headroom."""
    y = sanitize_audio(audio)
    if hpf_hz:
        y = highpass_filter(y, sr, cutoff_hz=hpf_hz)
    y, gain_db = normalize_peak(y, target_dbfs=target_peak_dbfs)

    meta = {
        "target_peak_dbfs": target_peak_dbfs,
        "applied_gain_db": round(gain_db, 2),
        "hpf_hz": hpf_hz,
        "sr": sr,
        "peak_after": round(float(np.max(np.abs(y))), 4)
    }
    return y, meta


### Mastering Orchestrator

In [47]:
# Mastering Orchestrator — Notebook Layer
# Provider-agnostic runner that:
#  - accepts a pre-master WAV
#  - runs 1..N mastering providers (local + external)
#  - collects outputs, level-matches (optional), and registers artifacts
#
# Includes:
#  - LocalMasterProvider: simple "house master" with a few styles (neutral/warm/bright/loud)
#  - LandrProvider (stub): method contracts to implement when you wire real API calls

# ---- SAFE Mastering patch: limiter + toned-down styles ----
import numpy as np
from dataclasses import dataclass
from typing import Tuple, Dict, Any
import soundfile as sf
from scipy import signal
import os

def _sanitize(x): 
    return np.clip(np.nan_to_num(x, nan=0.0, posinf=0.0, neginf=0.0), -4.0, 4.0).astype(np.float32)

def _db_to_lin(db): return 10.0**(db/20.0)

def true_peak_db_approx(x: np.ndarray, sr: int, oversample: int = 4) -> float:
    x_os = signal.resample_poly(_sanitize(x), oversample, 1, axis=0 if x.ndim>1 else 0)
    tp = float(np.max(np.abs(x_os)))
    return 20*np.log10(max(1e-12, tp))

def normalize_true_peak(x: np.ndarray, sr: int, target_dbtp: float = -1.0) -> np.ndarray:
    tp = true_peak_db_approx(x, sr, oversample=4)
    gain_db = target_dbtp - tp
    return (_sanitize(x) * _db_to_lin(gain_db)).astype(np.float32)

# Gentle shelves/peaks (unchanged; just keep amounts conservative)
def lowshelf(x, sr, hz, db): 
    A = 10**(db/40.0)
    w0 = 2*np.pi*hz/sr; cosw0=np.cos(w0); sinw0=np.sin(w0); S=0.5
    alpha = sinw0/2*np.sqrt((A+1/A)*(1/S-1)+2)
    b0=A*((A+1)-(A-1)*cosw0+2*np.sqrt(A)*alpha)
    b1=2*A*((A-1)-(A+1)*cosw0)
    b2=A*((A+1)-(A-1)*cosw0-2*np.sqrt(A)*alpha)
    a0=(A+1)+(A-1)*cosw0+2*np.sqrt(A)*alpha
    a1=-2*((A-1)+(A+1)*cosw0)
    a2=(A+1)+(A-1)*cosw0-2*np.sqrt(A)*alpha
    sos = signal.tf2sos([b0/a0,b1/a0,b2/a0],[1.0,a1/a0,a2/a0])
    return signal.sosfilt(sos,_sanitize(x),axis=0 if x.ndim>1 else 0).astype(np.float32)

def highshelf(x, sr, hz, db):
    A = 10**(db/40.0)
    w0 = 2*np.pi*hz/sr; cosw0=np.cos(w0); sinw0=np.sin(w0); S=0.5
    alpha = sinw0/2*np.sqrt((A+1/A)*(1/S-1)+2)
    b0=A*((A+1)+(A-1)*cosw0+2*np.sqrt(A)*alpha)
    b1=-2*A*((A-1)+(A+1)*cosw0)
    b2=A*((A+1)+(A-1)*cosw0)
    a0=(A+1)-(A-1)*cosw0+2*np.sqrt(A)*alpha
    a1=2*((A-1)-(A+1)*cosw0)
    a2=(A+1)-(A-1)*cosw0-2*np.sqrt(A)*alpha
    sos = signal.tf2sos([b0/a0,b1/a0,b2/a0],[1.0,a1/a0,a2/a0])
    return signal.sosfilt(sos,_sanitize(x),axis=0 if x.ndim>1 else 0).astype(np.float32)

def broad_peak(x, sr, f0, db, Q=0.7):
    A = 10**(db/40.0)
    w0=2*np.pi*f0/sr; alpha=np.sin(w0)/(2*Q); cosw0=np.cos(w0)
    b0=1+alpha*A; b1=-2*cosw0; b2=1-alpha*A
    a0=1+alpha/A; a1=-2*cosw0; a2=1-alpha/A
    sos = signal.tf2sos([b0/a0,b1/a0,b2/a0],[1.0,a1/a0,a2/a0])
    return signal.sosfilt(sos,_sanitize(x),axis=0 if x.ndim>1 else 0).astype(np.float32)

# --- new: lookahead soft-knee limiter (no screechy clamp) ---
def lookahead_limiter(x: np.ndarray, sr: int,
                      ceiling_dbfs: float = -1.0,
                      lookahead_ms: float = 2.0,
                      attack_ms: float = 1.0,
                      release_ms: float = 50.0,
                      knee_db: float = 1.5) -> np.ndarray:
    """
    Feed-forward, soft-knee, lookahead limiter. Mono/stereo safe.
    """
    x = _sanitize(x)
    la = max(1, int(sr * lookahead_ms/1000.0))
    c = _db_to_lin(ceiling_dbfs)
    knee = _db_to_lin(-abs(knee_db))  # knee expressed as a soft blend near the ceiling

    # Lookahead via simple delay
    if x.ndim == 1:
        pad = np.zeros(la, dtype=x.dtype); x_del = np.concatenate([pad, x])
        x_for_det = np.concatenate([x, pad])
    else:
        pad = np.zeros((la, x.shape[1]), dtype=x.dtype); x_del = np.vstack([pad, x])
        x_for_det = np.vstack([x, pad])

    # Peak detector with attack/release
    atk = np.exp(-1.0 / max(1, int(sr*attack_ms/1000.0)))
    rel = np.exp(-1.0 / max(1, int(sr*release_ms/1000.0)))
    env = np.zeros_like(x_for_det, dtype=np.float32)
    mag = np.abs(x_for_det)
    if x.ndim == 1:
        e = 0.0
        for n in range(len(mag)):
            e = max(mag[n], e* (atk if mag[n] > e else rel))
            env[n] = e
    else:
        e = np.zeros(x.shape[1], dtype=np.float32)
        for n in range(len(mag)):
            cur = mag[n]
            e = np.maximum(cur, e*(atk if np.any(cur>e) else rel))
            env[n] = e

    # Gain computer
    # soft knee near the ceiling: reduce ratio smoothly as we approach c
    eps = 1e-12
    over = np.maximum(0.0, env - c)
    knee_mix = (env / (env + knee*c + eps))
    raw_gain = c / (env + eps)
    gain = 1.0 - knee_mix + knee_mix * np.minimum(1.0, raw_gain)

    # Apply gain to delayed signal, trim back to original length
    y = (x_del * gain[:len(x_del)]).astype(np.float32)
    y = y[la: la + len(x)]
    return y

# --- patched LocalMasterProvider using safe limiter and milder EQ ---
class LocalMasterProvider(MasteringProvider):
    name = "local"
    def __init__(self, bit_depth: str = "PCM_24"):
        self.bit_depth = bit_depth

    def _process(self, x: np.ndarray, sr: int, style: str, strength: float) -> Tuple[np.ndarray, Dict[str, Any]]:
        s = float(np.clip(strength, 0.0, 1.0))
        y = _sanitize(x)

        # tiny safety headroom before EQ
        y = normalize_true_peak(y, sr, target_dbtp=-2.5)

        # toned-down styles (keep boosts small; avoid big >10 kHz lifts)
        if style == "neutral":
            y = highshelf(y, sr, 9000, +0.8*s)
            y = lowshelf(y, sr, 90, +0.5*s)
            glue = 0.10 + 0.10*s
        elif style == "warm":
            y = lowshelf(y, sr, 120, +1.2*s)
            y = broad_peak(y, sr, 3500, -0.6*s, Q=1.0)
            glue = 0.12 + 0.12*s
        elif style == "bright":
            # cap bright lift and keep the shelf lower (8–10 kHz) to avoid fizz
            y = highshelf(y, sr, 8500, +1.4*s)
            y = broad_peak(y, sr, 220, -0.5*s, Q=0.9)
            glue = 0.10 + 0.12*s
        elif style == "loud":
            y = highshelf(y, sr, 9000, +1.0*s)
            y = lowshelf(y, sr, 90, +0.8*s)
            glue = 0.16 + 0.18*s
        else:
            style = "neutral"
            glue = 0.10 + 0.10*s

        # "glue" via parallel into limiter (safe, lookahead)
        limited = lookahead_limiter(y, sr, ceiling_dbfs=-1.2, lookahead_ms=2.0, attack_ms=1.0, release_ms=60.0, knee_db=1.5)
        y = (1.0 - glue)*y + glue*limited

        # Final true-peak trim to -1.0 dBTP (prevents inter-sample spikes)
        y = normalize_true_peak(y, sr, target_dbtp=-1.0)

        params = {
            "style": style,
            "strength": s,
            "glue": round(glue, 3),
            "true_peak_dbtp": round(true_peak_db_approx(y, sr), 3)
        }
        return y.astype(np.float32), params

    def submit(self, req: MasterRequest) -> str:
        return "local-sync"

    def run_sync(self, req: MasterRequest, out_path: str) -> MasterResult:
        y, sr = sf.read(req.input_path)
        y_proc, params = self._process(y, sr, req.style, req.strength)
        os.makedirs(os.path.dirname(os.path.abspath(out_path)), exist_ok=True)
        sf.write(out_path, y_proc, sr, subtype=self.bit_depth)
        return MasterResult(provider=self.name, style=req.style, strength=req.strength,
                            out_path=os.path.abspath(out_path), sr=sr, bit_depth=self.bit_depth, params=params)

# --- LANDR provider (stub) ---
class LandrProvider(MasteringProvider):
    """
    Stubbed adapter. Fill in with real API calls when ready:
      - __init__(api_key: str)
      - submit(): upload pre-master, select style/strength, returns job_id
      - poll(job_id): query job status ("queued"|"processing"|"done"|"error")
      - download(job_id, out_path): fetch mastered WAV to out_path
    """
    name = "landr"
    def __init__(self, api_key: Optional[str] = None, bit_depth: str = "PCM_24"):
        self.api_key = api_key or os.environ.get("LANDR_API_KEY", None)
        self.bit_depth = bit_depth
        # self.endpoint = "https://api.landr.com/..."  # example placeholder

    def submit(self, req: MasterRequest) -> str:
        raise NotImplementedError("LANDR adapter not wired yet. Implement API call here.")

    def poll(self, job_id: str) -> str:
        raise NotImplementedError("LANDR adapter not wired yet. Implement job status polling.")

    def download(self, job_id: str, out_path: str) -> MasterResult:
        raise NotImplementedError("LANDR adapter not wired yet. Implement download to out_path.")

# --- Orchestrator ---
class MasteringOrchestrator:
    """
    Runs 1..N providers for a given pre-master and registers outputs.
    """
    def __init__(self, workspace_paths, manifest):
        self.paths = workspace_paths
        self.man = manifest

    def run(self,
            premaster_path: str,
            providers: List[MasteringProvider],
            styles: List[Tuple[str,float]],     # list of (style, strength 0..1)
            out_tag: str = "master",
            level_match_preview_lufs: Optional[float] = None  # if set, write *preview* copies level-matched for A/B
            ) -> List[MasterResult]:

        results: List[MasterResult] = []
        base_outdir = os.path.join(self.paths.outputs, out_tag)
        os.makedirs(base_outdir, exist_ok=True)

        for prov in providers:
            for style, strength in styles:
                name = f"{prov.name}_{style}_{int(round(strength*100))}"
                out_path = os.path.join(base_outdir, f"{name}.wav")

                if isinstance(prov, LocalMasterProvider):
                    res = prov.run_sync(MasterRequest(premaster_path, style=style, strength=strength), out_path)
                else:
                    # external provider flow (submit -> poll -> download)
                    job_id = prov.submit(MasterRequest(premaster_path, style=style, strength=strength))
                    status = prov.poll(job_id)
                    while status not in ("done", "error"):
                        time.sleep(2.0)
                        status = prov.poll(job_id)
                    if status == "error":
                        print(f"[{prov.name}] job failed for style={style} strength={strength}")
                        continue
                    res = prov.download(job_id, out_path)

                # register artifact
                register_artifact(self.man, res.out_path, kind=out_tag, params={
                    "provider": res.provider,
                    "style": res.style,
                    "strength": res.strength,
                    **res.params
                }, stage=name)

                results.append(res)

                # optional preview copies level-matched to a LUFS target (for A/B only)
                if level_match_preview_lufs is not None:
                    # lightweight LUFS approx + gain
                    from scipy import signal
                    def k_weight(mono, sr):
                        # simple K-weight from earlier; inline here for convenience
                        sos_hp = signal.butter(2, 38.0/(sr*0.5), btype='highpass', output='sos')
                        y = signal.sosfilt(sos_hp, mono)
                        return y
                    x, sr = sf.read(res.out_path)
                    mono = x if x.ndim==1 else np.mean(x, axis=1)
                    xk = k_weight(mono, sr)
                    ms = float(np.mean(xk**2)); cur_lufs = -0.691 + 10*np.log10(max(1e-12, ms))
                    delta = level_match_preview_lufs - cur_lufs
                    x_matched = (x * _db_to_lin(delta)).astype(np.float32)
                    # keep true peak safe
                    x_matched = normalize_true_peak(x_matched, sr, target_dbtp=-1.0)
                    prev_path = os.path.join(base_outdir, f"{name}__LM{int(level_match_preview_lufs)}LUFS.wav")
                    sf.write(prev_path, x_matched, sr, subtype=res.bit_depth)
                    register_artifact(self.man, prev_path, kind=f"{out_tag}_preview", params={
                        "provider": res.provider, "style": res.style, "strength": res.strength,
                        "level_matched_lufs": level_match_preview_lufs
                    }, stage=f"{name}__preview")

        return results


### Streaming Normalization Simulator

In [21]:
# Streaming Normalization Simulator — Notebook Layer
# Requires: I/O + Analysis layer (for lufs_integrated_approx), and manifest helpers.
# Outputs "as-heard" WAVs for each platform and returns a summary table.

from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import Dict, Any, List, Optional, Tuple
import os
import numpy as np
import pandas as pd
import soundfile as sf
from scipy import signal

# ---- Reuse analysis helpers if present; else provide fallbacks ----
def _sanitize(x: np.ndarray) -> np.ndarray:
    return np.clip(np.nan_to_num(x, nan=0.0, posinf=0.0, neginf=0.0), -4.0, 4.0).astype(np.float32)

def _db_to_lin(db: float) -> float:
    return 10.0**(db/20.0)

def _lin_to_db(l: float, eps: float = 1e-12) -> float:
    return 20.0*np.log10(max(eps, abs(l)))

def _k_weighting_sos(sr: int):
    f_hp = 38.0
    sos_hp = signal.butter(2, f_hp/(sr*0.5), btype="highpass", output="sos")
    # ~+4 dB shelf around 1 kHz
    f_shelf = 1681.974450955533; Q_shelf = 0.7071752369554196; gain_db = 3.99984385397
    A = 10**(gain_db/40.0); w0 = 2*np.pi*f_shelf/sr
    alpha = np.sin(w0)/(2*Q_shelf); cosw0 = np.cos(w0)
    b0 =    A*((A+1) + (A-1)*cosw0 + 2*np.sqrt(A)*alpha)
    b1 = -2*A*((A-1) + (A+1)*cosw0)
    b2 =    A*((A+1) + (A-1)*cosw0 - 2*np.sqrt(A)*alpha)
    a0 =        (A+1) - (A-1)*cosw0 + 2*np.sqrt(A)*alpha
    a1 =    2*((A-1) - (A+1)*cosw0)
    a2 =        (A+1) - (A-1)*cosw0 - 2*np.sqrt(A)*alpha
    sos_sh = signal.tf2sos([b0/a0, b1/a0, b2/a0], [1.0, a1/a0, a2/a0])
    return np.vstack([sos_hp, sos_sh])

def _lufs_integrated_approx(x: np.ndarray, sr: int) -> float:
    # Use existing function if present
    try:
        return lufs_integrated_approx(x, sr)  # defined in your Analysis layer
    except NameError:
        mono = x if x.ndim == 1 else np.mean(x, axis=1)
        sos = _k_weighting_sos(sr)
        xk = signal.sosfilt(sos, mono)
        ms = float(np.mean(xk**2))
        return -0.691 + 10.0*np.log10(max(1e-12, ms))

def _true_peak_dbtp(x: np.ndarray, sr: int, oversample: int = 4) -> float:
    try:
        # Prefer previously defined function name if present
        return true_peak_dbfs(x, sr, oversample=oversample)  # returns dBFS (≈ dBTP here)
    except NameError:
        x_os = signal.resample_poly(_sanitize(x), oversample, 1, axis=0 if x.ndim>1 else 0)
        tp = float(np.max(np.abs(x_os)))
        return 20.0*np.log10(max(1e-12, tp))

def _normalize_true_peak(x: np.ndarray, sr: int, target_dbtp: float = -1.0, oversample: int = 4) -> Tuple[np.ndarray, float]:
    tp = _true_peak_dbtp(x, sr, oversample=oversample)
    gain_db = target_dbtp - tp
    y = (_sanitize(x) * _db_to_lin(gain_db)).astype(np.float32)
    return y, gain_db

def _gentle_limiter(x: np.ndarray, ceiling_dbfs: float = -1.0, knee_db: float = 0.8) -> np.ndarray:
    # super-simple soft ceiling; not a brickwall TP limiter (good enough for preview)
    c = _db_to_lin(ceiling_dbfs)
    y = _sanitize(x).copy()
    mag = np.abs(y)
    over = np.maximum(0.0, mag - c)
    knee = _db_to_lin(-knee_db)
    over = over / (1.0 + (over / (knee*c))**2)
    y = np.sign(y) * np.minimum(mag, c)  # clamp
    # gentle blend to reduce clicks
    return (0.6*_sanitize(x) + 0.4*y).astype(np.float32)

# ---- Profiles ----
@dataclass
class StreamingProfile:
    name: str
    target_lufs: float       # platform loudness target (track mode)
    tp_ceiling_db: float     # approximate true-peak ceiling
    tp_strategy: str = "trim"  # "trim" (reduce gain) or "limit" (lightly limit)

def default_streaming_profiles() -> Dict[str, StreamingProfile]:
    # Typical/commonly-cited targets (approximate, for preview). Override as needed.
    return {
        "Spotify":     StreamingProfile("Spotify",     target_lufs=-14.0, tp_ceiling_db=-1.0, tp_strategy="trim"),
        "AppleMusic":  StreamingProfile("AppleMusic",  target_lufs=-16.0, tp_ceiling_db=-1.0, tp_strategy="trim"),
        "YouTube":     StreamingProfile("YouTube",     target_lufs=-14.0, tp_ceiling_db=-1.0, tp_strategy="trim"),
        "TIDAL":       StreamingProfile("TIDAL",       target_lufs=-14.0, tp_ceiling_db=-1.0, tp_strategy="trim"),
        "Amazon":      StreamingProfile("Amazon",      target_lufs=-14.0, tp_ceiling_db=-1.0, tp_strategy="trim"),
    }

# ---- Core simulation ----
def simulate_streaming_as_heard(x: np.ndarray, sr: int, profile: StreamingProfile,
                                oversample_tp: int = 4) -> Tuple[np.ndarray, Dict[str, Any]]:
    """
    Return an 'as-heard' version after platform gain normalization and TP guard.
    """
    x = _sanitize(x)
    in_lufs = _lufs_integrated_approx(x, sr)
    in_tp = _true_peak_dbtp(x, sr, oversample=oversample_tp)

    # 1) loudness normalization (pure gain)
    delta_db = profile.target_lufs - in_lufs
    y = (x * _db_to_lin(delta_db)).astype(np.float32)

    # 2) true-peak guard
    after_tp = _true_peak_dbtp(y, sr, oversample=oversample_tp)
    tp_over = after_tp - profile.tp_ceiling_db
    tp_action = None
    trim_db = 0.0

    if tp_over > 0.0:
        if profile.tp_strategy == "limit":
            y = _gentle_limiter(y, ceiling_dbfs=profile.tp_ceiling_db, knee_db=0.8)
            tp_action = "limit"
        else:
            # trim enough to meet the ceiling
            y, trim_db = _normalize_true_peak(y, sr, target_dbtp=profile.tp_ceiling_db, oversample=oversample_tp)
            tp_action = "trim"

    out_lufs = _lufs_integrated_approx(y, sr)
    out_tp = _true_peak_dbtp(y, sr, oversample=oversample_tp)

    meta = {
        "profile": asdict(profile),
        "in_lufs": round(in_lufs, 2),
        "in_true_peak_dbTP": round(in_tp, 2),
        "gain_to_target_db": round(delta_db, 2),
        "tp_action": tp_action,
        "extra_trim_db": round(trim_db, 2) if tp_action == "trim" else 0.0,
        "out_lufs": round(out_lufs, 2),
        "out_true_peak_dbTP": round(out_tp, 2),
        "lufs_error_db": round(out_lufs - profile.target_lufs, 2)  # non-zero if we trimmed for TP
    }
    return y, meta

# ---- Batch runner + export ----
def simulate_and_export_for_platforms(
    input_path: str,
    out_dir: str,
    profiles: Optional[Dict[str, StreamingProfile]] = None,
    bit_depth: str = "PCM_24",
    register_to_manifest: Optional[tuple] = None,  # (manifest, kind_str)
) -> Tuple[List[str], pd.DataFrame]:
    """
    Generate 'as-heard' files for multiple platforms from a pre-master or master.
    Returns (list_of_paths, summary_dataframe).
    """
    profiles = profiles or default_streaming_profiles()

    # read
    x, sr = sf.read(input_path)
    os.makedirs(out_dir, exist_ok=True)

    rows = []
    out_paths = []
    base = os.path.splitext(os.path.basename(input_path))[0]

    for name, prof in profiles.items():
        y, meta = simulate_streaming_as_heard(x, sr, prof)
        out_name = f"{base}__asheard_{name}.wav"
        out_path = os.path.join(out_dir, out_name)
        sf.write(out_path, y, sr, subtype=bit_depth)
        out_paths.append(out_path)

        row = {
            "platform": name,
            "target_lufs": prof.target_lufs,
            "tp_ceiling_db": prof.tp_ceiling_db,
            "tp_strategy": prof.tp_strategy,
            **{k: meta[k] for k in ["in_lufs","in_true_peak_dbTP","gain_to_target_db","tp_action","extra_trim_db","out_lufs","out_true_peak_dbTP","lufs_error_db"]},
            "asheard_path": out_path,
        }
        rows.append(row)

        # optional manifest registration
        if register_to_manifest is not None:
            man, kind = register_to_manifest
            register_artifact(man, out_path, kind=kind, params={"profile": name, **meta}, stage=f"asheard_{name}")

    df = pd.DataFrame(rows)
    return out_paths, df

def print_streaming_summary(df: pd.DataFrame):
    cols = [
        "platform","target_lufs","tp_ceiling_db",
        "in_lufs","gain_to_target_db","tp_action","extra_trim_db",
        "out_lufs","lufs_error_db","out_true_peak_dbTP"
    ]
    if set(cols).issubset(df.columns):
        print(df[cols].to_string(index=False))
    else:
        print(df.to_string(index=False))


### Comparison & Reporting

In [23]:
# ============================================
# Comparison & Reporting — Notebook Layer
# ============================================
# What this provides:
# - collect_metrics(paths): one-call metrics for many files (uses your Analysis layer)
# - build_comparison_tables(...): summary + deltas vs reference
# - plot_overlays(...): spectrum + short-term loudness overlays (saved to reports/)
# - make_blind_ab_pack(...): copies/renames files for unbiased listening
# - write_report_html(...): self-contained HTML report with tables + plots
# - write_report_bundle(...): one-shot wrapper that does all of the above
#
# Notes:
# - No code is executed on import; you’ll call these functions later.
# - Uses only matplotlib (no seaborn), and saves figures (no blocking .show()).
# - Registers generated artifacts in your manifest when you pass it in.

from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import List, Dict, Any, Optional, Tuple
import os, io, shutil, warnings, json, uuid
import numpy as np
import pandas as pd
import matplotlib
matplotlib.rcParams["figure.dpi"] = 110
import matplotlib.pyplot as plt
import soundfile as sf
from scipy.io import wavfile
try:
    from scipy.io.wavfile import WavFileWarning
except ImportError:
    class WavFileWarning(UserWarning): pass
warnings.filterwarnings("ignore", category=WavFileWarning)

# ---------- Config dataclasses ----------

@dataclass
class CompareConfig:
    preview_seconds: int = 60
    nfft: int = 1 << 16     # spectrum segment size
    win_s: float = 3.0      # short-term loudness window
    hop_s: float = 0.5
    level_match_mode: str = "none"   # "none" | "ref_lufs" | "target_lufs"
    target_lufs: float = -14.0       # used when level_match_mode == "target_lufs"
    reference_name: Optional[str] = None  # which row is the reference for Δ tables

# ---------- Internals: fast audio readers & helpers ----------

def _read_mono_preview(path: str, preview_seconds: Optional[int]) -> Tuple[int, np.ndarray]:
    sr0, data0 = wavfile.read(path)
    if data0.dtype == np.int16:
        x0 = data0.astype(np.float32) / 32768.0
    elif data0.dtype == np.int32:
        x0 = data0.astype(np.float32) / 2147483648.0
    elif data0.dtype == np.uint8:
        x0 = (data0.astype(np.float32) - 128.0) / 128.0
    else:
        x0 = data0.astype(np.float32)
    mono = x0 if x0.ndim == 1 else np.mean(x0, axis=1)
    if preview_seconds is not None:
        n = int(min(len(mono), preview_seconds * sr0))
        mono = mono[:n]
    return sr0, mono

def _spectrum_xy(mono: np.ndarray, sr: int, nfft: int) -> Tuple[np.ndarray, np.ndarray]:
    seg = mono[:min(len(mono), nfft)]
    if len(seg) < 2048:
        pad = np.zeros(2048, dtype=seg.dtype); pad[:len(seg)] = seg; seg = pad
    win = np.hanning(len(seg))
    sp = np.fft.rfft(seg * win)
    freqs = np.fft.rfftfreq(len(seg), 1/sr)
    mag_db = 20*np.log10(np.maximum(1e-12, np.abs(sp)))
    return freqs, mag_db

def _short_term_loudness(mono: np.ndarray, sr: int, win_s: float, hop_s: float) -> Tuple[np.ndarray, np.ndarray]:
    win = int(max(2, round(win_s * sr)))
    hop = int(max(1, round(hop_s * sr)))
    kernel = np.ones(win, dtype=np.float32) / float(win)
    pow_sig = mono**2
    rms = np.sqrt(np.maximum(1e-20, np.convolve(pow_sig, kernel, mode="same")))
    idx = np.arange(0, len(mono), hop)
    t = idx / sr
    return t, 20*np.log10(np.maximum(1e-12, rms[idx]))

def _lufs_approx(x: np.ndarray, sr: int) -> float:
    # Uses your Analysis layer function if available
    try:
        return lufs_integrated_approx(x, sr)
    except NameError:
        # tiny inline fallback (not gated)
        from scipy import signal
        sos_hp = signal.butter(2, 38.0/(sr*0.5), btype='highpass', output='sos')
        mono = x if x.ndim == 1 else np.mean(x, axis=1)
        y = signal.sosfilt(sos_hp, mono)
        ms = float(np.mean(y**2))
        return -0.691 + 10.0*np.log10(max(1e-12, ms))

def _tp_approx_db(x: np.ndarray, sr: int, oversample: int = 4) -> float:
    try:
        return true_peak_dbfs(x, sr, oversample=oversample)
    except NameError:
        from scipy import signal
        x_os = signal.resample_poly(x, oversample, 1, axis=0 if x.ndim>1 else 0)
        tp = float(np.max(np.abs(x_os)))
        return 20.0*np.log10(max(1e-12, tp))

# ---------- 1) Metrics collection ----------

def collect_metrics(file_paths: List[str]) -> pd.DataFrame:
    """
    For each path, run your Analysis layer and collect key metrics.
    """
    rows = []
    for p in file_paths:
        rep = analyze_wav(p)  # uses your Analysis layer
        rows.append({
            "name": os.path.splitext(os.path.basename(p))[0],
            "path": os.path.abspath(p),
            "sr": rep.sr,
            "duration_s": rep.duration_s,
            "peak_dbfs": rep.basic["peak_dbfs"],
            "true_peak_dbfs": rep.true_peak_dbfs,
            "rms_dbfs": rep.basic["rms_dbfs"],
            "lufs_int": rep.lufs_integrated,
            "crest_db": rep.basic["crest_db"],
            "bass_%": rep.bass_energy_pct,
            "air_%": rep.air_energy_pct,
            "phase_corr": rep.stereo["phase_correlation"],
            "stereo_width": rep.stereo["stereo_width"],
            "spectral_flatness": rep.spectral_flatness,
        })
    df = pd.DataFrame(rows)
    return df

# ---------- 2) Build comparison tables (summary + deltas) ----------

def build_comparison_tables(df_metrics: pd.DataFrame, cfg: CompareConfig) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]:
    """
    Returns (summary_df, deltas_df_vs_reference|None).
    """
    summary = df_metrics.copy()

    deltas = None
    ref_name = cfg.reference_name or (summary.iloc[0]["name"] if len(summary) else None)
    if ref_name and ref_name in list(summary["name"]):
        ref_row = summary[summary["name"] == ref_name].iloc[0]
        # metrics to delta
        delta_cols = ["peak_dbfs","true_peak_dbfs","rms_dbfs","lufs_int","crest_db","bass_%","air_%","phase_corr","stereo_width","spectral_flatness"]
        rows = []
        for _, r in summary.iterrows():
            d = { "name": r["name"] }
            for c in delta_cols:
                d[f"Δ {c}"] = float(r[c] - ref_row[c])
            rows.append(d)
        deltas = pd.DataFrame(rows)
    return summary, deltas

# ---------- 3) Plots: spectrum & loudness overlays ----------
# Saves files under reports_dir and returns list of paths

def plot_overlays(file_paths: List[str], labels: Optional[List[str]], reports_dir: str, cfg: CompareConfig) -> Dict[str, str]:
    os.makedirs(reports_dir, exist_ok=True)
    labels = labels or [os.path.splitext(os.path.basename(p))[0] for p in file_paths]

    # Spectrum
    plt.figure()
    for p, lbl in zip(file_paths, labels):
        sr, mono = _read_mono_preview(p, cfg.preview_seconds)
        f, m = _spectrum_xy(mono, sr, cfg.nfft)
        plt.plot(f, m, label=lbl)
    plt.xscale('log'); plt.xlim(20, 20000)
    plt.xlabel("Frequency (Hz)"); plt.ylabel("Magnitude (dB)")
    plt.title(f"Spectrum Overlay (first {cfg.preview_seconds}s)")
    plt.legend()
    spec_png = os.path.join(reports_dir, "spectrum_overlay.png")
    plt.savefig(spec_png, bbox_inches="tight"); plt.close()

    # Short-term loudness
    plt.figure()
    series, min_len = [], None
    for p, lbl in zip(file_paths, labels):
        sr, mono = _read_mono_preview(p, cfg.preview_seconds)
        t, s = _short_term_loudness(mono, sr, cfg.win_s, cfg.hop_s)
        series.append((t, s, lbl))
        min_len = len(s) if min_len is None else min(min_len, len(s))
    for t, s, lbl in series:
        plt.plot(t[:min_len], s[:min_len], label=lbl)
    plt.xlabel("Time (s)"); plt.ylabel("Short-term RMS (dBFS)")
    plt.title(f"Short-term Loudness Overlay (first {cfg.preview_seconds}s)")
    plt.legend()
    loud_png = os.path.join(reports_dir, "loudness_overlay.png")
    plt.savefig(loud_png, bbox_inches="tight"); plt.close()

    return {"spectrum_png": spec_png, "loudness_png": loud_png}

# ---------- 4) Blind A/B pack (optional) ----------

def make_blind_ab_pack(file_paths: List[str], out_dir: str, bit_depth: str = "PCM_24") -> List[str]:
    """
    Copies files with randomized neutral labels (A/B/C...), returns the new paths.
    """
    os.makedirs(out_dir, exist_ok=True)
    labels = [chr(ord('A') + i) for i in range(len(file_paths))]
    order = np.arange(len(file_paths))
    np.random.shuffle(order)

    out_paths = []
    for idx, new_lbl in zip(order, labels):
        src = file_paths[idx]
        x, sr = sf.read(src)
        dst = os.path.join(out_dir, f"Blind_{new_lbl}.wav")
        sf.write(dst, x, sr, subtype=bit_depth)
        out_paths.append(dst)
    return out_paths

# ---------- 5) HTML Report ----------

def _df_to_html_table(df: pd.DataFrame, caption: str) -> str:
    buf = io.StringIO()
    buf.write(f"<h3>{caption}</h3>\n")
    buf.write(df.to_html(index=False, float_format=lambda v: f"{v:.6g}"))
    return buf.getvalue()

def write_report_html(
    summary_df: pd.DataFrame,
    deltas_df: Optional[pd.DataFrame],
    plots: Dict[str, str],
    out_path: str,
    title: str = "Post-Mix Comparison Report",
    extra_notes: Optional[str] = None
) -> str:
    os.makedirs(os.path.dirname(os.path.abspath(out_path)), exist_ok=True)
    html = io.StringIO()
    html.write(f"<!doctype html><html><head><meta charset='utf-8'><title>{title}</title>")
    html.write("<style>body{font-family:system-ui,Arial,sans-serif;margin:24px} h1{margin-top:0} img{max-width:100%;height:auto} table{border-collapse:collapse} th,td{border:1px solid #ddd;padding:6px} caption{margin:6px 0}</style>")
    html.write("</head><body>")
    html.write(f"<h1>{title}</h1>")
    if extra_notes:
        html.write(f"<p><em>{extra_notes}</em></p>")
    # tables
    html.write(_df_to_html_table(summary_df, "Summary Metrics"))
    if deltas_df is not None and len(deltas_df):
        html.write(_df_to_html_table(deltas_df, "Δ vs Reference"))
    # plots
    if "spectrum_png" in plots and os.path.exists(plots["spectrum_png"]):
        html.write("<h3>Spectrum Overlay</h3>")
        html.write(f"<img src='{os.path.basename(plots['spectrum_png'])}' alt='Spectrum Overlay'/>")
    if "loudness_png" in plots and os.path.exists(plots["loudness_png"]):
        html.write("<h3>Short-Term Loudness Overlay</h3>")
        html.write(f"<img src='{os.path.basename(plots['loudness_png'])}' alt='Loudness Overlay'/>")
    html.write("</body></html>")

    # write HTML and copy plot assets next to it
    with open(out_path, "w", encoding="utf-8") as f:
        f.write(html.getvalue())
    # copy images to same folder for portability
    for k, p in plots.items():
        if os.path.exists(p):
            shutil.copy2(p, os.path.join(os.path.dirname(out_path), os.path.basename(p)))
    return os.path.abspath(out_path)

# ---------- 6) One-shot bundle: metrics → plots → HTML (+ manifest) ----------

def write_report_bundle(
    file_paths: List[str],
    reports_dir: str,
    cfg: Optional[CompareConfig] = None,
    manifest: Optional[Any] = None,   # pass your Manifest object to auto-register
    report_name: str = "comparison_report.html",
    extra_notes: Optional[str] = None
) -> Dict[str, Any]:
    """
    Convenience wrapper to:
      - compute metrics for all files
      - build summary + deltas (vs reference)
      - render plots (saved)
      - write HTML report
      - register artifacts in manifest (optional)
    Returns a dict with paths and DataFrames.
    """
    cfg = cfg or CompareConfig()
    os.makedirs(reports_dir, exist_ok=True)

    # 1) metrics
    df_metrics = collect_metrics(file_paths)
    # 2) tables
    summary_df, deltas_df = build_comparison_tables(df_metrics, cfg)
    # 3) plots
    labels = list(summary_df["name"])
    plot_paths = plot_overlays(file_paths, labels, reports_dir, cfg)
    # 4) HTML
    html_path = os.path.join(reports_dir, report_name)
    html_path = write_report_html(summary_df, deltas_df, plot_paths, html_path, extra_notes=extra_notes)

    # 5) optional manifest registration (HTML + images as a single "report" artifact)
    if manifest is not None:
        register_artifact(manifest, html_path, kind="report", params={
            "type": "comparison",
            "reference": cfg.reference_name,
            "preview_seconds": cfg.preview_seconds,
        }, stage="compare_html")
        for p in plot_paths.values():
            if os.path.exists(p):
                register_artifact(manifest, p, kind="report_asset", params={"linked_report": os.path.basename(html_path)})

    return {
        "html_path": html_path,
        "summary_df": summary_df,
        "deltas_df": deltas_df,
        "plots": plot_paths
    }

print("Comparison & Reporting layer loaded:")
print("- collect_metrics, build_comparison_tables")
print("- plot_overlays (saves to reports/)")
print("- make_blind_ab_pack (optional)")
print("- write_report_html (self-contained)")
print("- write_report_bundle (one-shot with manifest)")


Comparison & Reporting layer loaded:
- collect_metrics, build_comparison_tables
- plot_overlays (saves to reports/)
- make_blind_ab_pack (optional)
- write_report_html (self-contained)
- write_report_bundle (one-shot with manifest)


### Presets & Recommendations

In [24]:
# ============================================
# Presets & Recommendations — Notebook Layer
# ============================================
# Provides:
# - DialState presets (named, ready to render)
# - Rule-based recommender from analysis metrics → dial suggestions
# - Plan builders for batch rendering with RenderEngine / Orchestrator
#
# Nothing executes on import; you’ll call funcs later.

from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import Dict, Any, List, Tuple, Optional
import math

# Reuse the DialState from Render Engine if already defined; else define a tiny fallback.
try:
    DialState
except NameError:
    @dataclass
    class DialState:
        bass: float = 0.0     # 0..100
        punch: float = 0.0    # 0..100
        clarity: float = 0.0  # 0..100
        air: float = 0.0      # 0..100
        width: float = 0.0    # 0..100

# ---------- 1) Preset Library (you can tweak these) ----------

PRESETS: Dict[str, DialState] = {
    # Gentle, safe polish
    "Balanced Gentle":   DialState(bass=15, punch=15, clarity=10, air=10, width=5),
    # Bass-forward modern pop/hip-hop
    "Modern Low-End":    DialState(bass=38, punch=28, clarity=8,  air=8,  width=8),
    # Tight low end for dance/electronic
    "Tight & Punchy":    DialState(bass=28, punch=42, clarity=12, air=10, width=6),
    # De-mud + sparkle for dense guitars/vocals
    "Clarity & Air":     DialState(bass=10, punch=12, clarity=28, air=28, width=6),
    # Wider image, modest tone moves
    "Wide Pop":          DialState(bass=18, punch=14, clarity=10, air=16, width=18),
    # Minimal changes (QA/reference)
    "Transparent":       DialState(bass=0,  punch=0,  clarity=0,  air=0,  width=0),
}

def list_presets() -> List[str]:
    return list(PRESETS.keys())

def get_preset(name: str) -> DialState:
    return PRESETS[name]

# ---------- 2) Metric-driven Recommendations ----------

@dataclass
class Recommendation:
    name: str                       # a label for this suggestion
    dials: DialState                # suggested dial positions (0..100)
    priority: int                   # lower = earlier to try
    rationale: List[str]            # bullet points (human-readable “why”)
    notes: Optional[str] = None     # extra context

def _clip01(x: float, lo=0.0, hi=100.0) -> float:
    return float(max(lo, min(hi, x)))

def _scale(val: float, in_lo: float, in_hi: float, out_lo: float, out_hi: float) -> float:
    # linear map with clamping
    if in_hi == in_lo:
        return out_lo
    t = (val - in_lo) / (in_hi - in_lo)
    t = max(0.0, min(1.0, t))
    return out_lo + t * (out_hi - out_lo)

def recommend_from_analysis(rep) -> List[Recommendation]:
    """
    Takes an AnalysisReport (from analyze_wav) OR a dict with the same fields we need.
    Returns a sorted list of Recommendation objects.
    """
    # Extract metrics in a defensive way
    def _get(dct, k, default=0.0):
        return float(dct.get(k, default))

    # normalize input
    if hasattr(rep, "bass_energy_pct"):
        # It's an AnalysisReport
        basic = rep.basic
        stereo = rep.stereo
        lufs = rep.lufs_integrated
        tp   = rep.true_peak_dbfs
        bass = rep.bass_energy_pct
        airp = rep.air_energy_pct
        flat = rep.spectral_flatness
    else:
        # Expect a dict-like structure
        basic = rep.get("basic", rep)
        stereo = rep.get("stereo", rep)
        lufs = _get(rep, "lufs_integrated", -20.0)
        tp   = _get(rep, "true_peak_dbfs", -1.5)
        bass = _get(rep, "bass_energy_pct", 40.0)
        airp = _get(rep, "air_energy_pct", 0.2)
        flat = _get(rep, "spectral_flatness", 0.05)

    peak_dbfs = _get(basic, "peak_dbfs", -3.0)
    rms_dbfs  = _get(basic, "rms_dbfs", -20.0)
    crest_db  = _get(basic, "crest_db", 12.0)
    phase_corr = _get(stereo, "phase_correlation", 0.5)
    width      = _get(stereo, "stereo_width", 0.4)

    recs: List[Recommendation] = []

    # --- Heuristics (tunable) ---
    # Tonal balance
    very_bassy  = bass > 65.0
    super_bassy = bass > 80.0
    very_dark   = airp < 0.02
    dark        = airp < 0.1
    brightish   = airp > 0.7

    # Loudness/dynamics context
    very_dynamic = crest_db > 16
    squashed     = crest_db < 8

    # Stereo context
    too_narrow   = width < 0.35
    very_wide    = phase_corr < 0.2

    #################################################################
    # 1) Fix dark + bass-heavy → more Air, some Clarity, moderate Punch
    #################################################################
    if very_bassy and (dark or very_dark):
        air_amt = 25 if very_dark else 18
        clar_amt = 18 if super_bassy else 12
        punch_amt = 30 if very_dynamic else 22
        dials = DialState(
            bass= max(10, _scale(bass, 60, 90, 14, 28)),  # keep some bass but don’t add more
            punch= punch_amt,
            clarity= clar_amt,
            air= air_amt,
            width= 8 if too_narrow else 6
        )
        recs.append(Recommendation(
            name="De-mud & Open Top",
            dials=dials,
            priority=1,
            rationale=[
                f"bass_energy %{bass:.1f} is high → reduce mud via clarity + keep lows controlled",
                f"air_energy %{airp:.3f} is low → add air shelf for brightness",
                f"crest {crest_db:.1f} dB → moderate punch to emphasize transients",
            ],
            notes="Start here if the mix feels boomy/dull."
        ))

    #################################################################
    # 2) If very dynamic + quiet → add Punch, little Bass, gentle Air
    #################################################################
    if very_dynamic and lufs < -18:
        dials = DialState(
            bass= 16,
            punch= 38,
            clarity= 10,
            air= 14,
            width= 8 if too_narrow else 6
        )
        recs.append(Recommendation(
            name="Punch Up the Transients",
            dials=dials,
            priority=2,
            rationale=[
                f"crest {crest_db:.1f} dB (dynamic) and LUFS {lufs:.1f} (quiet) → add transient definition",
                "small air lift to help intelligibility",
            ]
        ))

    #################################################################
    # 3) If midrange congested (not dark, not bright, but high flatness) → Clarity focus
    #################################################################
    if 0.03 < flat < 0.12 and not dark and not brightish:
        dials = DialState(
            bass= 10,
            punch= 18,
            clarity= 26,
            air= 10,
            width= 8
        )
        recs.append(Recommendation(
            name="Clear Midrange",
            dials=dials,
            priority=3,
            rationale=[
                f"spectral_flatness {flat:.3f} suggests dense content → de-mud around 180–230 Hz",
                "moderate punch for definition without aggression",
            ]
        ))

    #################################################################
    # 4) If image is narrow → widen (safely)
    #################################################################
    if too_narrow:
        dials = DialState(
            bass= 14,
            punch= 14,
            clarity= 10,
            air= 12,
            width= _scale(width, 0.2, 0.4, 12, 22)
        )
        recs.append(Recommendation(
            name="Widen Image (Safe)",
            dials=dials,
            priority=4,
            rationale=[
                f"stereo_width {width:.2f} is narrow → add width",
                "tone moves kept gentle to avoid destabilizing center"
            ],
            notes="If mono compatibility is critical, keep width ≤ 15."
        ))

    #################################################################
    # 5) Default safe polish if no strong issues
    #################################################################
    if not recs:
        recs.append(Recommendation(
            name="Balanced Gentle",
            dials=PRESETS["Balanced Gentle"],
            priority=9,
            rationale=["No strong issues detected → start with light, broad polish."]
        ))

    # Stable ordering
    recs.sort(key=lambda r: r.priority)
    return recs

# ---------- 3) Render/Orchestrator Plans (without executing) ----------

def build_premaster_plan_from_recs(
    recs: List[Recommendation],
    limit: int = 3,
    prefix: str = "PM"
) -> List[Tuple[str, DialState]]:
    """
    Convert top-N recommendations to (name, DialState) tuples for RenderEngine.commit_variants().
    """
    planned: List[Tuple[str, DialState]] = []
    for i, r in enumerate(recs[:limit], start=1):
        tag = f"{prefix}{i}_{r.name.replace(' ', '')}"
        planned.append((tag, r.dials))
    return planned

def recommend_mastering_styles_from_metrics(rep) -> List[Tuple[str, float, str]]:
    """
    Suggest LocalMasterProvider styles from analysis:
    Returns list of tuples: (style, strength 0..1, why)
    """
    lufs = getattr(rep, "lufs_integrated", -20.0)
    crest = rep.basic["crest_db"] if hasattr(rep, "basic") else rep.get("crest_db", 12.0)
    airp  = getattr(rep, "air_energy_pct", 0.2)
    bass  = getattr(rep, "bass_energy_pct", 40.0)

    out: List[Tuple[str,float,str]] = []
    # If dark → try "bright"
    if airp < 0.05:
        out.append(("bright", 0.6, f"Low air ({airp:.3f}) → add sheen"))
    # If bass-heavy → try "neutral" vs "warm" (depending on taste)
    if bass > 70:
        out.append(("neutral", 0.5, f"High bass energy ({bass:.1f}%) → keep low-end in check"))
    else:
        out.append(("warm", 0.5, f"Moderate bass ({bass:.1f}%) → touch of weight"))
    # If very dynamic & quiet → try "loud" moderately
    if crest > 16 and lufs < -18:
        out.append(("loud", 0.55, f"Dynamic ({crest:.1f} dB) & quiet ({lufs:.1f} LUFS) → more forward"))

    # Always keep a neutral baseline
    if not any(s == "neutral" for s,_,_ in out):
        out.insert(0, ("neutral", 0.5, "Baseline reference"))

    # Deduplicate, preserve order
    seen=set(); filtered=[]
    for s in out:
        if s[0] in seen: continue
        seen.add(s[0]); filtered.append(s)
    return filtered

# ---------- 4) Human-readable summary helpers ----------

def recommendation_summary(recs: List[Recommendation]) -> str:
    lines = []
    for r in recs:
        lines.append(f"- {r.name} (priority {r.priority}): "
                     f"B{r.dials.bass:.0f} P{r.dials.punch:.0f} C{r.dials.clarity:.0f} A{r.dials.air:.0f} W{r.dials.width:.0f}")
        for why in r.rationale:
            lines.append(f"    • {why}")
        if r.notes:
            lines.append(f"    ↪ {r.notes}")
    return "\n".join(lines)

print("Presets & Recommendations layer loaded:")
print("- PRESETS dict, list_presets(), get_preset(name)")
print("- recommend_from_analysis(rep) → [Recommendation]")
print("- build_premaster_plan_from_recs(recs, limit)")
print("- recommend_mastering_styles_from_metrics(rep)")
print("- recommendation_summary(recs)")


Presets & Recommendations layer loaded:
- PRESETS dict, list_presets(), get_preset(name)
- recommend_from_analysis(rep) → [Recommendation]
- build_premaster_plan_from_recs(recs, limit)
- recommend_mastering_styles_from_metrics(rep)
- recommendation_summary(recs)


### Logging, Versioning, Reproducibility

In [25]:
# ============================================
# Logging · Versioning · Reproducibility Layer
# ============================================
# What this provides:
# - RunLogger: creates a run_id and writes structured logs (JSONL) + summary JSON
# - Env capture: library versions, Python/OS, CPU info, pip freeze snapshot
# - Determinism: simple seed manager for NumPy & Python hash seed
# - Provenance: content hashes for audio/code/config; processing graph digest
# - Artifact registry helpers (alongside your Manifest)
# - Repro bundle: packs config, logs, environment, and outputs into a zip

from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import Dict, Any, Optional, List, Iterable, Tuple
import os, sys, io, json, time, uuid, hashlib, platform, subprocess, zipfile, shutil, textwrap
from datetime import datetime
import numpy as np

# ---------- small utils ----------

def _now_iso() -> str:
    return datetime.utcnow().replace(microsecond=0).isoformat() + "Z"

def _mkdirp(p: str) -> str:
    os.makedirs(p, exist_ok=True); return p

def _sha256_bytes(b: bytes) -> str:
    return hashlib.sha256(b).hexdigest()

def file_sha256(path: str, bufsize: int = 1<<20) -> str:
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(bufsize), b""):
            h.update(chunk)
    return h.hexdigest()

def json_sha256(obj: Any) -> str:
    """Stable JSON hash (sorted keys, no whitespace)."""
    b = json.dumps(obj, sort_keys=True, separators=(",", ":")).encode("utf-8")
    return _sha256_bytes(b)

# ---------- determinism ----------

class SeedScope:
    """
    Context manager to set deterministic seeds for NumPy and PYTHONHASHSEED.
    Use: with SeedScope(42): ...  (or call SeedScope.set_global(42))
    """
    def __init__(self, seed: int = 42):
        self.seed = int(seed)
        self._prev_hashseed = os.environ.get("PYTHONHASHSEED")

    def __enter__(self):
        np.random.seed(self.seed)
        os.environ["PYTHONHASHSEED"] = str(self.seed)
        return self

    def __exit__(self, exc_type, exc, tb):
        if self._prev_hashseed is None:
            os.environ.pop("PYTHONHASHSEED", None)
        else:
            os.environ["PYTHONHASHSEED"] = self._prev_hashseed

    @staticmethod
    def set_global(seed: int = 42):
        np.random.seed(int(seed))
        os.environ["PYTHONHASHSEED"] = str(int(seed))

# ---------- environment capture ----------

def capture_environment() -> Dict[str, Any]:
    info = {
        "timestamp": _now_iso(),
        "python": sys.version.replace("\n", " "),
        "platform": {
            "system": platform.system(),
            "release": platform.release(),
            "version": platform.version(),
            "machine": platform.machine(),
            "processor": platform.processor(),
        },
        "packages": {},
        "pip_freeze": None,
    }
    # library versions (best-effort)
    try:
        import scipy, soundfile, pandas, matplotlib
        info["packages"].update({
            "numpy": np.__version__,
            "scipy": scipy.__version__,
            "soundfile": soundfile.__version__,
            "pandas": pandas.__version__,
            "matplotlib": matplotlib.__version__,
        })
    except Exception:
        info["packages"]["numpy"] = np.__version__
    # pip freeze (optional, can be heavy)
    try:
        out = subprocess.check_output([sys.executable, "-m", "pip", "freeze"], timeout=20)
        info["pip_freeze"] = out.decode("utf-8").splitlines()
    except Exception:
        info["pip_freeze"] = None
    return info

# ---------- processing graph digest ----------

def processing_digest(name: str, *, code_versions: Dict[str, str], params: Dict[str, Any]) -> str:
    """
    Create a short fingerprint for a processing step: hashes code + params.
    code_versions: e.g., {"dsp_primitives":"v1.2.0+sha...", "processors":"v0.9.3", ...}
    params: the dial values, thresholds, etc.
    """
    payload = {"step": name, "code": code_versions, "params": params}
    h = json_sha256(payload)
    return h[:16]  # short id

# ---------- RunLogger ----------

@dataclass
class RunLogger:
    root_dir: str
    run_id: str
    dir_logs: str
    dir_meta: str
    dir_bundle: str
    summary_path: str
    jsonl_path: str

    @staticmethod
    def start(workspace_root: str, tag: str = "session") -> "RunLogger":
        rid = f"{tag}_{datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')}_{uuid.uuid4().hex[:6]}"
        logs = _mkdirp(os.path.join(workspace_root, "reports", "logs", rid))
        meta = _mkdirp(os.path.join(workspace_root, "reports", "meta", rid))
        bund = _mkdirp(os.path.join(workspace_root, "reports", "bundles", rid))
        return RunLogger(
            root_dir=workspace_root,
            run_id=rid,
            dir_logs=logs,
            dir_meta=meta,
            dir_bundle=bund,
            summary_path=os.path.join(meta, "summary.json"),
            jsonl_path=os.path.join(logs, "events.jsonl"),
        )

    # event logging (append JSON lines)
    def log_event(self, kind: str, payload: Dict[str, Any]):
        evt = {"ts": _now_iso(), "run_id": self.run_id, "kind": kind, **payload}
        with open(self.jsonl_path, "a", encoding="utf-8") as f:
            f.write(json.dumps(evt, ensure_ascii=False) + "\n")

    # param/metric helpers
    def log_params(self, step: str, params: Dict[str, Any], code_versions: Optional[Dict[str,str]] = None):
        digest = processing_digest(step, code_versions=code_versions or {}, params=params)
        self.log_event("params", {"step": step, "digest": digest, "params": params, "code_versions": code_versions})

    def log_metrics(self, step: str, metrics: Dict[str, Any]):
        self.log_event("metrics", {"step": step, "metrics": metrics})

    def log_artifact(self, kind: str, path: str, extra: Optional[Dict[str,Any]] = None):
        payload = {"kind": kind, "path": os.path.abspath(path)}
        if extra: payload.update(extra)
        self.log_event("artifact", payload)

    # summary (overwrites)
    def write_summary(self, summary: Dict[str, Any]):
        with open(self.summary_path, "w", encoding="utf-8") as f:
            json.dump(summary, f, indent=2, ensure_ascii=False)

# ---------- provenance for files & configs ----------

def capture_file_provenance(path: str, role: str = "input") -> Dict[str, Any]:
    return {
        "role": role,
        "path": os.path.abspath(path),
        "sha256": file_sha256(path),
        "bytes": os.path.getsize(path),
        "mtime": int(os.path.getmtime(path)),
    }

def capture_config_snapshot(config: Dict[str, Any], name: str = "config") -> Dict[str, Any]:
    snap = {"name": name, "sha256": json_sha256(config), "config": config}
    return snap

# ---------- reproducibility bundle ----------

def make_repro_zip(
    out_zip_path: str,
    *,
    workspace_root: str,
    run_logger: RunLogger,
    env_info: Dict[str, Any],
    inputs: List[str],
    outputs: List[str],
    extra_jsons: Optional[Dict[str, Any]] = None,
    readme_text: Optional[str] = None
) -> str:
    """
    Creates a zip with:
      - logs/events.jsonl, meta/summary.json
      - environment.json (+ pip_freeze)
      - file provenance for inputs/outputs
      - any extra JSON config you pass in
    Returns absolute path to the zip.
    """
    os.makedirs(os.path.dirname(os.path.abspath(out_zip_path)), exist_ok=True)

    prov_inputs = [capture_file_provenance(p, role="input") for p in inputs]
    prov_outputs = [capture_file_provenance(p, role="output") for p in outputs]

    bundle_meta = {
        "created": _now_iso(),
        "run_id": run_logger.run_id,
        "workspace_root": os.path.abspath(workspace_root),
        "environment": env_info,
        "inputs": prov_inputs,
        "outputs": prov_outputs,
    }
    if extra_jsons:
        bundle_meta.update(extra_jsons)

    # write temp files in bundle dir for consistent names
    env_json = os.path.join(run_logger.dir_meta, "environment.json")
    with open(env_json, "w", encoding="utf-8") as f:
        json.dump(env_info, f, indent=2, ensure_ascii=False)

    prov_json = os.path.join(run_logger.dir_meta, "provenance.json")
    with open(prov_json, "w", encoding="utf-8") as f:
        json.dump({"inputs": prov_inputs, "outputs": prov_outputs}, f, indent=2, ensure_ascii=False)

    bundle_json = os.path.join(run_logger.dir_meta, "bundle_meta.json")
    with open(bundle_json, "w", encoding="utf-8") as f:
        json.dump(bundle_meta, f, indent=2, ensure_ascii=False)

    readme_md = os.path.join(run_logger.dir_meta, "README.txt")
    with open(readme_md, "w", encoding="utf-8") as f:
        f.write(textwrap.dedent(readme_text or f"""
        Post-Mix Reproducibility Bundle
        =================================
        Run ID: {run_logger.run_id}
        Created: {_now_iso()}

        Contents:
          - logs/events.jsonl      : structured event log
          - meta/summary.json      : high-level run summary
          - meta/environment.json  : Python/OS/pkg versions (+pip freeze when available)
          - meta/provenance.json   : input/output file hashes and sizes
          - meta/bundle_meta.json  : collected metadata for quick inspection

        To reproduce:
          1) Recreate Python env from pip_freeze (if present).
          2) Use the same inputs (verified by sha256) and configs.
          3) Run via the same code versions; dials/params are in events.jsonl and summary.json.
        """).strip() + "\n")

    # zip it up
    with zipfile.ZipFile(out_zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z:
        # logs + meta
        for root in [run_logger.dir_logs, run_logger.dir_meta]:
            for dirpath, _, filenames in os.walk(root):
                for fn in filenames:
                    ap = os.path.join(dirpath, fn)
                    arc = os.path.relpath(ap, os.path.dirname(run_logger.dir_logs))
                    z.write(ap, arcname=arc)

    return os.path.abspath(out_zip_path)

# ---------- convenience: wire into your Manifest ----------

def register_and_log_artifact(manifest, logger: RunLogger, path: str, kind: str, params: Optional[Dict[str,Any]] = None, stage: Optional[str] = None):
    register_artifact(manifest, path, kind=kind, params=params or {}, stage=stage)
    logger.log_artifact(kind, path, extra={"stage": stage, "params": params or {}})

# ---------- version stamps for your code layers (edit these in each layer once) ----------

CODE_VERSIONS = {
    "io_layer":           "v0.1.0",
    "analysis_layer":     "v0.3.0",
    "dsp_primitives":     "v0.4.1",
    "processors":         "v0.5.0",
    "render_engine":      "v0.2.0",
    "premaster_prep":     "v0.1.0",
    "orchestrator":       "v0.1.0",
    "stream_sim":         "v0.1.0",
    "compare_reporting":  "v0.1.0",
    "presets_recs":       "v0.1.0",
}

print("Logging · Versioning · Reproducibility layer loaded:")
print("- RunLogger.start(workspace_root, tag) → logger")
print("- logger.log_params/metrics/artifact(), logger.write_summary()")
print("- capture_environment(), file_sha256(), capture_config_snapshot()")
print("- processing_digest(name, code_versions, params)")
print("- make_repro_zip(out_zip, workspace_root, logger, env_info, inputs, outputs)")
print("- register_and_log_artifact(manifest, logger, path, kind, params, stage)")
print("- CODE_VERSIONS dict (update per layer when you change code)")


Logging · Versioning · Reproducibility layer loaded:
- RunLogger.start(workspace_root, tag) → logger
- logger.log_params/metrics/artifact(), logger.write_summary()
- capture_environment(), file_sha256(), capture_config_snapshot()
- processing_digest(name, code_versions, params)
- make_repro_zip(out_zip, workspace_root, logger, env_info, inputs, outputs)
- register_and_log_artifact(manifest, logger, path, kind, params, stage)
- CODE_VERSIONS dict (update per layer when you change code)


### Utilities

In [26]:
# === Utility: exports + true-peak guard ===
import os, numpy as np, soundfile as sf
from dataclasses import dataclass

# Use your existing TP approx if available; else fallback
try:
    true_peak_dbfs
except NameError:
    from scipy import signal
    def true_peak_dbfs(x: np.ndarray, sr: int, oversample: int = 4) -> float:
        x_os = signal.resample_poly(np.asarray(x, dtype=np.float32), oversample, 1, axis=0 if np.asarray(x).ndim>1 else 0)
        tp = float(np.max(np.abs(x_os)))
        return 20.0*np.log10(max(1e-12, tp))

def _db_to_lin(db: float) -> float: return 10.0**(db/20.0)

def save_wav_24bit(path: str, y: np.ndarray, sr: int):
    """Always export 24-bit PCM with dirs created."""
    os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True)
    sf.write(path, np.asarray(y, dtype=np.float32), int(sr), subtype="PCM_24")
    return os.path.abspath(path)

@dataclass
class TPGuardResult:
    out: np.ndarray
    gain_db: float
    in_dbtp: float
    out_dbtp: float
    ceiling_db: float

def safe_true_peak(y: np.ndarray, sr: int, ceiling_db: float = -1.0) -> TPGuardResult:
    """Trim overall gain so true-peak ≤ ceiling (no limiting)."""
    tp_in = true_peak_dbfs(y, sr, oversample=4)
    gain_db = ceiling_db - tp_in
    y2 = (np.asarray(y, dtype=np.float32) * _db_to_lin(gain_db)).astype(np.float32)
    tp_out = true_peak_dbfs(y2, sr, oversample=4)
    return TPGuardResult(out=y2, gain_db=gain_db, in_dbtp=tp_in, out_dbtp=tp_out, ceiling_db=ceiling_db)


In [27]:
# === Global config + validators ===
from dataclasses import dataclass, asdict
import numpy as np

@dataclass
class GlobalConfig:
    # I/O
    default_bit_depth: str = "PCM_24"
    # Prep
    prep_hpf_hz: float = 20.0
    prep_peak_target_dbfs: float = -6.0
    # Rendering
    render_peak_target_dbfs: float = -1.0
    # Streaming sim
    tp_ceiling_db: float = -1.0
    # Reporting
    preview_seconds: int = 60
    nfft: int = 1<<16

CFG = GlobalConfig()

class InputError(Exception): pass

def ensure_audio_valid(x: np.ndarray, name: str = "audio"):
    if not isinstance(x, np.ndarray):
        raise InputError(f"{name}: expected numpy array, got {type(x)}")
    if x.ndim not in (1,2):
        raise InputError(f"{name}: expected 1D (mono) or 2D (stereo), got shape {x.shape}")
    if not np.isfinite(x).all():
        raise InputError(f"{name}: contains NaN/Inf values; sanitize before processing")
    if x.size == 0:
        raise InputError(f"{name}: empty array")
    if x.ndim == 2 and x.shape[1] not in (1,2):
        raise InputError(f"{name}: expected (N,), (N,1) or (N,2); got {x.shape}")


In [28]:
# === Reporting helpers (enhanced HTML) ===
import os, io, html
from typing import Optional, Dict, Any
import pandas as pd

def df_with_links(df: pd.DataFrame) -> pd.DataFrame:
    """If 'path' column exists, add a clickable 'file' column for HTML report."""
    if "path" in df.columns:
        def mk_link(p):
            base = os.path.basename(str(p))
            return f"<a href='{html.escape(base)}' target='_blank'>{html.escape(base)}</a>"
        df2 = df.copy()
        df2["file"] = df2["path"].apply(mk_link)
        # put 'file' right after 'name' if present
        cols = list(df2.columns)
        if "name" in cols:
            cols.insert(cols.index("name")+1, cols.pop(cols.index("file")))
            df2 = df2[cols]
        return df2
    return df

def html_header_block(title: str, code_versions: Dict[str,str], dials: Optional[Dict[str,Any]] = None, notes: Optional[str] = None) -> str:
    buf = io.StringIO()
    buf.write(f"<h1>{html.escape(title)}</h1>\n")
    if notes:
        buf.write(f"<p><em>{html.escape(notes)}</em></p>\n")
    # versions
    if code_versions:
        buf.write("<h3>Code Versions</h3><ul>")
        for k,v in code_versions.items():
            buf.write(f"<li><code>{html.escape(k)}</code>: {html.escape(v)}</li>")
        buf.write("</ul>")
    # dials snapshot
    if dials:
        buf.write("<h3>Dial Snapshot</h3><ul>")
        for k,v in dials.items():
            buf.write(f"<li>{html.escape(k)}: {html.escape(str(v))}</li>")
        buf.write("</ul>")
    return buf.getvalue()

# Patch your write_report_html to use these (drop-in):
def write_report_html_enhanced(
    summary_df: pd.DataFrame,
    deltas_df: Optional[pd.DataFrame],
    plots: Dict[str, str],
    out_path: str,
    title: str = "Post-Mix Comparison Report",
    extra_notes: Optional[str] = None,
    code_versions: Optional[Dict[str,str]] = None,
    dial_snapshot: Optional[Dict[str,Any]] = None
) -> str:
    os.makedirs(os.path.dirname(os.path.abspath(out_path)), exist_ok=True)
    # linkify
    summary_df2 = df_with_links(summary_df)
    # base HTML
    html_doc = io.StringIO()
    html_doc.write(f"<!doctype html><html><head><meta charset='utf-8'><title>{title}</title>")
    html_doc.write("<style>body{font-family:system-ui,Arial,sans-serif;margin:24px} h1{margin-top:0} img{max-width:100%;height:auto} table{border-collapse:collapse} th,td{border:1px solid #ddd;padding:6px} code{background:#f5f5f5;padding:2px 4px;border-radius:4px}</style>")
    html_doc.write("</head><body>")
    html_doc.write(html_header_block(title, code_versions or {}, dials=dial_snapshot, notes=extra_notes))
    # tables
    html_doc.write("<h3>Summary Metrics</h3>")
    html_doc.write(summary_df2.to_html(index=False, escape=False, float_format=lambda v: f"{v:.6g}"))
    if deltas_df is not None and len(deltas_df):
        html_doc.write("<h3>Δ vs Reference</h3>")
        html_doc.write(deltas_df.to_html(index=False, float_format=lambda v: f"{v:.6g}"))
    # plots
    if "spectrum_png" in plots and os.path.exists(plots["spectrum_png"]):
        html_doc.write("<h3>Spectrum Overlay</h3>")
        html_doc.write(f"<img src='{os.path.basename(plots['spectrum_png'])}' alt='Spectrum Overlay'/>")
    if "loudness_png" in plots and os.path.exists(plots["loudness_png"]):
        html_doc.write("<h3>Short-Term Loudness Overlay</h3>")
        html_doc.write(f"<img src='{os.path.basename(plots['loudness_png'])}' alt='Loudness Overlay'/>")
    html_doc.write("</body></html>")
    with open(out_path, "w", encoding="utf-8") as f:
        f.write(html_doc.getvalue())
    # copy assets next to HTML (same as before)
    import shutil
    for p in plots.values():
        if os.path.exists(p):
            shutil.copy2(p, os.path.join(os.path.dirname(out_path), os.path.basename(p)))
    return os.path.abspath(out_path)


## MAIN

In [36]:
# ---- Patch: make report writers skip same-file copies ----
import os, shutil

def _safe_copy_to_dir(src_path: str, target_dir: str):
    """Copy src_path into target_dir unless it's already there."""
    if not os.path.exists(src_path):
        return None
    dst = os.path.join(target_dir, os.path.basename(src_path))
    try:
        # if already same path or same inode → skip
        if os.path.abspath(src_path) == os.path.abspath(dst) or (
            os.path.exists(dst) and os.path.samefile(src_path, dst)
        ):
            return dst
    except Exception:
        # os.path.samefile might fail on some platforms; fall through to copy
        pass
    shutil.copy2(src_path, dst)
    return dst

# Rebind write_report_html to use safe copy
def write_report_html(summary_df, deltas_df, plots, out_path, title="Post-Mix Comparison Report", extra_notes=None):
    import io
    os.makedirs(os.path.dirname(os.path.abspath(out_path)), exist_ok=True)
    html = io.StringIO()
    html.write(f"<!doctype html><html><head><meta charset='utf-8'><title>{title}</title>")
    html.write("<style>body{font-family:system-ui,Arial,sans-serif;margin:24px} h1{margin-top:0} img{max-width:100%;height:auto} table{border-collapse:collapse} th,td{border:1px solid #ddd;padding:6px} caption{margin:6px 0}</style>")
    html.write("</head><body>")
    html.write(f"<h1>{title}</h1>")
    if extra_notes:
        html.write(f"<p><em>{extra_notes}</em></p>")
    # tables
    html.write(summary_df.to_html(index=False, float_format=lambda v: f"{v:.6g}"))
    if deltas_df is not None and len(deltas_df):
        html.write(deltas_df.to_html(index=False, float_format=lambda v: f"{v:.6g}"))
    # plots
    if "spectrum_png" in plots and os.path.exists(plots["spectrum_png"]):
        html.write("<h3>Spectrum Overlay</h3>")
        html.write(f"<img src='{os.path.basename(plots['spectrum_png'])}' alt='Spectrum Overlay'/>")
    if "loudness_png" in plots and os.path.exists(plots["loudness_png"]):
        html.write("<h3>Short-Term Loudness Overlay</h3>")
        html.write(f"<img src='{os.path.basename(plots['loudness_png'])}' alt='Loudness Overlay'/>")
    html.write("</body></html>")

    with open(out_path, "w", encoding="utf-8") as f:
        f.write(html.getvalue())

    # copy assets next to HTML, but skip if already there
    target_dir = os.path.dirname(out_path)
    for p in plots.values():
        if p:
            _safe_copy_to_dir(p, target_dir)
    return os.path.abspath(out_path)

# If you use the enhanced writer, patch its copy loop too
try:
    _old_enh = write_report_html_enhanced  # if defined earlier

    def write_report_html_enhanced(summary_df, deltas_df, plots, out_path,
                                   title="Post-Mix Comparison Report",
                                   extra_notes=None, code_versions=None, dial_snapshot=None):
        import io, html
        os.makedirs(os.path.dirname(os.path.abspath(out_path)), exist_ok=True)

        # (keep your previous enhanced content generation…)
        html_doc = io.StringIO()
        html_doc.write(f"<!doctype html><html><head><meta charset='utf-8'><title>{title}</title>")
        html_doc.write("<style>body{font-family:system-ui,Arial,sans-serif;margin:24px} h1{margin-top:0} img{max-width:100%;height:auto} table{border-collapse:collapse} th,td{border:1px solid #ddd;padding:6px} code{background:#f5f5f5;padding:2px 4px;border-radius:4px}</style>")
        html_doc.write("</head><body>")
        # simple header
        html_doc.write(f"<h1>{html.escape(title)}</h1>")
        if extra_notes:
            html_doc.write(f"<p><em>{html.escape(extra_notes)}</em></p>")
        if code_versions:
            html_doc.write("<h3>Code Versions</h3><ul>")
            for k,v in code_versions.items():
                html_doc.write(f"<li><code>{html.escape(k)}</code>: {html.escape(str(v))}</li>")
            html_doc.write("</ul>")
        if dial_snapshot:
            html_doc.write("<h3>Dial Snapshot</h3><ul>")
            for k,v in dial_snapshot.items():
                html_doc.write(f"<li>{html.escape(k)}: {html.escape(str(v))}</li>")
            html_doc.write("</ul>")

        html_doc.write("<h3>Summary Metrics</h3>")
        html_doc.write(summary_df.to_html(index=False, escape=False, float_format=lambda v: f"{v:.6g}"))
        if deltas_df is not None and len(deltas_df):
            html_doc.write("<h3>Δ vs Reference</h3>")
            html_doc.write(deltas_df.to_html(index=False, float_format=lambda v: f"{v:.6g}"))

        if "spectrum_png" in plots and os.path.exists(plots["spectrum_png"]):
            html_doc.write("<h3>Spectrum Overlay</h3>")
            html_doc.write(f"<img src='{os.path.basename(plots['spectrum_png'])}' alt='Spectrum Overlay'/>")
        if "loudness_png" in plots and os.path.exists(plots["loudness_png"]):
            html_doc.write("<h3>Short-Term Loudness Overlay</h3>")
            html_doc.write(f"<img src='{os.path.basename(plots['loudness_png'])}' alt='Loudness Overlay'/>")

        html_doc.write("</body></html>")
        with open(out_path, "w", encoding="utf-8") as f:
            f.write(html_doc.getvalue())

        # safe-copy assets
        target_dir = os.path.dirname(out_path)
        for p in plots.values():
            if p:
                _safe_copy_to_dir(p, target_dir)
        return os.path.abspath(out_path)
except NameError:
    pass

In [49]:
# MAIN: end-to-end sanity test for I/O → Analysis → DSP Primitives → Processors
# Run this cell after loading the previous cells.
# It will:
#  - create a workspace
#  - import your mix (set MIX_SRC below)
#  - analyze the original
#  - build a preview cache
#  - render a few dial presets
#  - save outputs, register artifacts
#  - compare metrics in a table
#  - plot spectrum & loudness overlays
#
# Safe to re-run; it creates a new timestamped workspace each time.

import os, numpy as np, matplotlib.pyplot as plt
import soundfile as sf
import pandas as pd


# ---------- 1) SET YOUR SOURCE FILE HERE ----------
MIX_SRC = "/Users/itay/Documents/muxing/ITAY - CRASHING v.6 MIX ONLY.wav"   # <-- change me
MIX_SRC = "/Users/itay/Documents/muxing/ITAY - 4 CHORDS v.21 STEFAN.wav"   # <-- change me
MIX_SRC = "/Users/itay/Documents/muxing/Full Song-jerrysmith1984-C7F08-SONGIVA-V4.wav"   # <-- change me



In [50]:
# ============================================
# MAIN — Full End-to-End Pipeline (fixed report paths)
# ============================================
# Saves plots AND the HTML report inside reports/assets/ to avoid SameFileError.

import os, json, numpy as np, soundfile as sf
from dataclasses import asdict

# ---------- 0) SET YOUR SOURCE FILE HERE ----------
PROJECT  = "postmix_v1"  # MIX_SRC must be set in a previous cell

# ---------- 1) Global toggles ----------
DO_STREAMING_SIM = True
RECS_LIMIT       = 3
LM_PREVIEW_LUFS  = -14.0
REPORT_REF_NAME  = "Original"

# ---------- 2) Workspace, manifest, logger, environment ----------
paths = make_workspace(project=PROJECT)
man   = Manifest(project=PROJECT, workspace=paths)

logger = RunLogger.start(paths.root, tag=PROJECT)
env    = capture_environment()
logger.log_event("env", {"environment": env})

# bring input into workspace + register/log
if not os.path.exists(MIX_SRC):
    raise FileNotFoundError(f"Set MIX_SRC to a valid path, got: {MIX_SRC}")
mix_path = import_mix(paths, MIX_SRC, alias="mix.wav")
register_input(man, mix_path, alias="mix")
register_and_log_artifact(man, logger, mix_path, kind="input", params={"alias":"mix"}, stage="import_mix")

# ---------- 3) Load, validate, analyze original ----------
x, sr = sf.read(mix_path)
ensure_audio_valid(x, "mix")

buf = load_wav(mix_path)
print_audio_summary(buf, "Original Mix")

rep_orig = analyze_wav(mix_path)
logger.log_metrics("analysis_original", {
    "sr": rep_orig.sr, "duration_s": rep_orig.duration_s,
    "peak_dbfs": rep_orig.basic["peak_dbfs"],
    "true_peak_dbfs": rep_orig.true_peak_dbfs,
    "rms_dbfs": rep_orig.basic["rms_dbfs"],
    "lufs_int": rep_orig.lufs_integrated,
    "crest_db": rep_orig.basic["crest_db"],
    "bass_%": rep_orig.bass_energy_pct,
    "air_%": rep_orig.air_energy_pct,
    "phase_corr": rep_orig.stereo["phase_correlation"],
    "stereo_width": rep_orig.stereo["stereo_width"],
    "spectral_flatness": rep_orig.spectral_flatness,
})

# ---------- 4) Recommendations (dials) ----------
recs = recommend_from_analysis(rep_orig)
print(recommendation_summary(recs))

variants_plan = build_premaster_plan_from_recs(recs, limit=RECS_LIMIT, prefix="PM")
variants_plan.insert(0, ("PM0_Transparent", DialState(bass=0, punch=0, clarity=0, air=0, width=0)))
logger.log_params("recommendations", {
    "plan": [(name, asdict(d)) for (name, d) in variants_plan]
}, code_versions={"presets_recs": CODE_VERSIONS["presets_recs"]})

# ---------- 5) RenderEngine: preprocess + premaster variants ----------
engine = RenderEngine(x, sr, preprocess=PreprocessConfig(low_cutoff=120.0, kick_lo=40.0, kick_hi=110.0))
pre_meta = engine.preprocess()
logger.log_params("render_preprocess_cache", pre_meta, code_versions={"processors": CODE_VERSIONS["processors"], "render_engine": CODE_VERSIONS["render_engine"]})

# A) classic premaster prep (HPF 20 Hz + peak -6 dBFS)
prep_audio, prep_info = premaster_prep(x, sr, target_peak_dbfs=CFG.prep_peak_target_dbfs, hpf_hz=CFG.prep_hpf_hz)
premaster_prep_path = os.path.join(paths.outputs, "premaster", "premaster_prep.wav")
os.makedirs(os.path.dirname(premaster_prep_path), exist_ok=True)
save_wav_24bit(premaster_prep_path, prep_audio, sr)
register_and_log_artifact(man, logger, premaster_prep_path, kind="premaster", params=prep_info, stage="premaster_prep")

# B) dial-based premaster variants
variant_dir = os.path.join(paths.outputs, "premasters")
opts = RenderOptions(target_peak_dbfs=CFG.render_peak_target_dbfs, bit_depth=CFG.default_bit_depth, hpf_hz=None, save_headroom_first=False)
var_metas = engine.commit_variants(variant_dir, variants_plan, opts=opts)
for meta in var_metas:
    register_and_log_artifact(man, logger, meta["out_path"], kind="premaster", params=meta, stage=f"variant__{os.path.basename(meta['out_path'])}")

# Choose a “premaster for mastering”
premaster_choice_path = var_metas[1]["out_path"] if len(var_metas) > 1 else premaster_prep_path  # skip transparent baseline

# true-peak guard before sending to mastering
y_in, sr2 = sf.read(premaster_choice_path)
tpres = safe_true_peak(y_in, sr2, ceiling_db=CFG.tp_ceiling_db)  # TPGuardResult
premaster_for_mastering_path = os.path.join(paths.outputs, "premaster", "premaster_for_mastering.wav")
save_wav_24bit(premaster_for_mastering_path, tpres.out, sr2)
tp_meta = {"gain_db": float(tpres.gain_db), "in_dbtp": float(tpres.in_dbtp), "out_dbtp": float(tpres.out_dbtp), "ceiling_db": float(tpres.ceiling_db)}
register_and_log_artifact(man, logger, premaster_for_mastering_path, kind="premaster", params={"tp_guard": tp_meta}, stage="premaster_for_mastering")

# ---------- 6) Mastering Orchestrator (Local provider; LANDR stub available) ----------
orch = MasteringOrchestrator(paths, man)
styles = [(s, strength) for (s, strength, why) in recommend_mastering_styles_from_metrics(analyze_wav(premaster_for_mastering_path))]
providers = [LocalMasterProvider(bit_depth=CFG.default_bit_depth)]
logger.log_params("mastering_styles", {"styles": styles}, code_versions={"orchestrator": CODE_VERSIONS["orchestrator"]})

master_results = orch.run(
    premaster_path=premaster_for_mastering_path,
    providers=providers,
    styles=styles,
    out_tag="master",
    level_match_preview_lufs=LM_PREVIEW_LUFS
)
master_paths = [r.out_path for r in master_results]

# ---------- 7) Streaming Normalization Simulator (as-heard files) ----------
stream_paths = []
stream_df = None
if DO_STREAMING_SIM and master_paths:
    stream_outdir = os.path.join(paths.outputs, "stream_previews")
    stream_paths, stream_df = simulate_and_export_for_platforms(
        input_path=master_paths[0],
        out_dir=stream_outdir,
        profiles=default_streaming_profiles(),
        bit_depth=CFG.default_bit_depth,
        register_to_manifest=(man, "stream_sim")
    )
    logger.log_event("stream_sim_summary", {"rows": None if stream_df is None else len(stream_df)})

# ---------- 8) Reporting (metrics, plots, HTML) ----------
compare_files = [mix_path, premaster_for_mastering_path] + master_paths
compare_files = [p for p in compare_files if os.path.exists(p)]

cfg_cmp = CompareConfig(
    preview_seconds=CFG.preview_seconds,
    nfft=CFG.nfft,
    reference_name=REPORT_REF_NAME
)

# Save plots AND HTML into reports/assets/ (no cross-copy collisions)
reports_assets_dir = os.path.join(paths.reports, "assets")
os.makedirs(reports_assets_dir, exist_ok=True)

bundle = write_report_bundle(
    file_paths=compare_files,
    reports_dir=reports_assets_dir,               # plots + HTML here
    cfg=cfg_cmp,
    manifest=man,
    report_name="comparison_report.html",         # filename only
    extra_notes="Auto-generated end-to-end run."
)

# Optional enhanced HTML (keep it in the same assets dir)
try:
    html_enh = write_report_html_enhanced(
        bundle["summary_df"], bundle["deltas_df"], bundle["plots"],
        os.path.join(reports_assets_dir, "comparison_report_enhanced.html"),
        title="Post-Mix Comparison Report",
        extra_notes="Client review pack",
        code_versions=CODE_VERSIONS,
        dial_snapshot=variants_plan[1][1].__dict__ if len(variants_plan)>1 else {}
    )
    register_and_log_artifact(man, logger, html_enh, kind="report", params={"enhanced": True}, stage="compare_html_enhanced")
except NameError:
    pass

# ---------- 9) Reproducibility bundle ----------
zip_path = os.path.join(paths.reports, "bundles", f"{logger.run_id}.zip")
repro_zip = make_repro_zip(
    zip_path,
    workspace_root=paths.root,
    run_logger=logger,
    env_info=env,
    inputs=[mix_path],
    outputs=list(set([premaster_prep_path, premaster_for_mastering_path] + [m for m in master_paths] + stream_paths)),
    extra_jsons={"code_versions": CODE_VERSIONS, "recommendations": [(n, asdict(d)) for (n,d) in variants_plan]},
    readme_text="Bundle created by MAIN end-to-end run."
)
register_and_log_artifact(man, logger, repro_zip, kind="bundle", params={"run_id": logger.run_id}, stage="repro_zip")

# ---------- 10) Finalize ----------
logger.write_summary({
    "project": PROJECT,
    "run_id": logger.run_id,
    "mix": mix_path,
    "premaster_choice": premaster_for_mastering_path,
    "masters": master_paths,
    "stream_previews": stream_paths,
    "report": bundle["html_path"],   # lives in reports/assets/
    "env_sha": json_sha256(env),
})
write_manifest(man)

print("\n=== DONE ===")
print("Workspace:", paths.root)
print("Premaster (for mastering):", premaster_for_mastering_path)
print("Masters:", *master_paths, sep="\n - ")
if stream_paths: print("As-heard previews:", *stream_paths, sep="\n - ")
print("Report:", bundle["html_path"])
print("Repro bundle:", repro_zip)


Workspace created at: /Users/itay/Documents/muxing/postmix_runs/postmix_v1_20250820-134354
Imported mix → /Users/itay/Documents/muxing/postmix_runs/postmix_v1_20250820-134354/inputs/mix.wav
Original Mix: sr=48000 | ch=2 | dur=356.333s | peak=0.928901 | rms=0.251534
  path: /Users/itay/Documents/muxing/postmix_runs/postmix_v1_20250820-134354/inputs/mix.wav
  sha256: f6086ee74bf35fbf...
  src dtype: int32 | src ch: 2
- Balanced Gentle (priority 9): B15 P15 C10 A10 W5
    • No strong issues detected → start with light, broad polish.
Manifest written: /Users/itay/Documents/muxing/postmix_runs/postmix_v1_20250820-134354/manifest.json

=== DONE ===
Workspace: /Users/itay/Documents/muxing/postmix_runs/postmix_v1_20250820-134354
Premaster (for mastering): /Users/itay/Documents/muxing/postmix_runs/postmix_v1_20250820-134354/outputs/premaster/premaster_for_mastering.wav
Masters:
 - /Users/itay/Documents/muxing/postmix_runs/postmix_v1_20250820-134354/outputs/master/local_neutral_50.wav
 - /Users/