# Spectral FM — Interactive Notebook

This notebook provides a clean, user-focused control panel for FM spectral matching.

Run cells from top to bottom: set parameters, preview target, optimize, synthesize, listen, and save.

## 1. Setup

In [1]:
import numpy as np
from py_scripts.interactive_controls import InteractiveSpectralUI
from py_scripts.generate_wave_file import generate_wave_file
# Add any other imports you need here (objective functions, FM synth, etc.)

## 2. Helper Wrappers
These helpers bridge your existing code into the UI callbacks.

In [2]:
# Save helper using the provided generate_wave_file()

def save_wav(y, fs, filename):
    return generate_wave_file(y, fs, fs_target_name='44.1kHz', bit_rate=16, custom_filename=filename, save_to_file=True)


def _build_waveforms():
    from py_scripts.waveform_generators import (
        sine_wave,
        square_wave,
        triangle_wave,
        sawtooth_wave,
    )
    return [sine_wave, square_wave, triangle_wave, sawtooth_wave]


def synthesize_from_params(settings, params):
    import numpy as np

    fs = int(settings.get('fs', 44100))
    duration = float(settings.get('duration', 2.0))
    num_ops = int(settings.get('num_operators', 4))

    n = int(fs * duration)
    y = np.zeros(n, dtype=np.float64)
    if params is None:
        return y.astype(np.float32), fs

    vec = np.array(params, dtype=float).reshape(-1)
    if vec.size < num_ops * 3:
        pad = np.zeros(num_ops * 3 - vec.size, dtype=float)
        vec = np.concatenate([vec, pad])

    waveforms = _build_waveforms()
    for i in range(num_ops):
        f = float(np.clip(vec[i * 3 + 0], 20.0, fs * 0.45))
        a = float(np.clip(vec[i * 3 + 1], 0.0, 1.0))
        w = int(np.clip(round(vec[i * 3 + 2]), 0, len(waveforms) - 1))
        y += waveforms[w](f, a, duration, fs)

    mx = np.max(np.abs(y))
    if mx > 0:
        y = y / mx
    return y.astype(np.float32), fs


def my_optimize(settings):
    import numpy as np
    try:
        from scipy import optimize as spo
    except Exception:
        spo = None
    try:
        from tqdm.auto import tqdm
    except Exception:
        tqdm = None

    # Load target spectrum using the active loader
    target_tsv = settings.get('target_tsv')
    freqs, amps = compute_target_spectrum(target_tsv)

    fs = int(settings.get('fs', 44100))
    duration = float(settings.get('duration', 2.0))
    num_ops = int(settings.get('num_operators', 4))
    iterations = int(settings.get('iterations', 200))
    optimizer_name = str(settings.get('optimizer', 'differential_evolution'))
    metric_key = str(settings.get('metric', 'spectral_convergence'))

    waveforms = _build_waveforms()

    from py_scripts import objective_functions as of
    metric_map = {
        'spectral_convergence': of.spectral_convergence_distance,
        'cosine_similarity': of.cosine_similarity,
        'itakura_saito': of.itakura_saito_distance,
        'euclidean': of.euclidean_distance,
        'manhattan': of.manhattan_distance,
        'kullback_leibler': of.kullback_leibler_divergence,
    }
    metric_fn = metric_map.get(metric_key, of.spectral_convergence_distance)

    def objective(vec):
        vec = np.array(vec, dtype=float)
        return metric_fn(vec, freqs, amps, waveforms, duration, fs)

    bounds = []
    for _ in range(num_ops):
        bounds.extend([
            (20.0, fs * 0.45),  # frequency
            (0.0, 1.0),         # amplitude
            (0.0, len(waveforms) - 1.0),  # waveform index (rounded in synth/objective)
        ])

    result = None
    log = ''

    if spo is not None:
        if optimizer_name == 'differential_evolution':
            pbar = tqdm(total=iterations, desc=f"DE ({num_ops} ops)") if tqdm else None
            def cb_de(xk, convergence):
                if pbar:
                    pbar.update(1)
                return False
            try:
                result = spo.differential_evolution(
                    objective,
                    bounds,
                    maxiter=iterations,
                    popsize=8,
                    polish=True,
                    tol=1e-6,
                    callback=cb_de,
                )
            finally:
                if pbar:
                    pbar.close()
        elif optimizer_name == 'basinhopping':
            # Seed x0 from strongest target peaks when available
            x0 = []
            if len(freqs) > 0:
                order = np.argsort(amps)[::-1]
            else:
                order = []
            for i in range(num_ops):
                base_f = float(freqs[order[i % len(freqs)]] if len(freqs) else 440.0)
                x0.extend([np.clip(base_f, 20.0, fs * 0.45), 0.5, 0.0])
            x0 = np.array(x0, dtype=float)
            minimizer_kwargs = {'method': 'L-BFGS-B', 'bounds': bounds}
            pbar = tqdm(total=max(1, iterations), desc=f"BH ({num_ops} ops)") if tqdm else None
            def cb_bh(x, f, accept):
                if pbar:
                    pbar.update(1)
                return False
            try:
                result = spo.basinhopping(
                    objective,
                    x0,
                    niter=max(1, iterations),
                    minimizer_kwargs=minimizer_kwargs,
                    disp=False,
                    callback=cb_bh,
                )
            finally:
                if pbar:
                    pbar.close()
        elif optimizer_name == 'dual_annealing':
            pbar = tqdm(total=iterations, desc=f"DA ({num_ops} ops)") if tqdm else None
            def cb_da(x, f, context):
                if pbar and context == 0:
                    pbar.update(1)
                return False
            try:
                try:
                    result = spo.dual_annealing(
                        objective,
                        bounds=bounds,
                        maxiter=iterations,
                        callback=cb_da,
                    )
                except TypeError:
                    result = spo.dual_annealing(
                        objective,
                        bounds=bounds,
                        maxiter=iterations,
                    )
            finally:
                if pbar:
                    pbar.close()
        else:
            pbar = tqdm(total=iterations, desc=f"DE ({num_ops} ops)") if tqdm else None
            def cb_de2(xk, convergence):
                if pbar:
                    pbar.update(1)
                return False
            try:
                result = spo.differential_evolution(
                    objective,
                    bounds,
                    maxiter=iterations,
                    popsize=8,
                    polish=True,
                    tol=1e-6,
                    callback=cb_de2,
                )
            finally:
                if pbar:
                    pbar.close()
        params = result.x if hasattr(result, 'x') else None
        best_val = result.fun if hasattr(result, 'fun') else None
        log = f"method={optimizer_name}, score={best_val}"
    else:
        # Fallback: simple random search if SciPy is unavailable
        rng = np.random.default_rng(0)
        best_val = float('inf')
        params = None
        trials = max(50, iterations * 5)
        iterator = range(trials)
        if tqdm:
            iterator = tqdm(iterator, desc=f"Random ({num_ops} ops)")
        for _ in iterator:
            cand = []
            for _ in range(num_ops):
                cand.extend([
                    rng.uniform(20.0, fs * 0.45),
                    rng.uniform(0.0, 1.0),
                    rng.uniform(0.0, len(waveforms) - 1.0),
                ])
            val = objective(cand)
            if val < best_val:
                best_val = val
                params = np.array(cand, dtype=float)
        log = f"random_search score={best_val}"

    y, fs2 = synthesize_from_params(settings, params)
    return {"params": np.array(params, dtype=float), "y": y, "fs": fs2, "log": log}


def my_synthesize(settings, params=None):
    return synthesize_from_params(settings, params)


In [3]:
# Robust TSV loader: handles headers, BOM, bad rows, and missing pandas

def compute_target_spectrum(tsv_path):
    import os
    import numpy as np

    # Normalize path to project tsv folder if a bare name was provided
    if tsv_path and not os.path.exists(tsv_path):
        candidate = os.path.join("tsv", os.path.basename(tsv_path))
        if os.path.exists(candidate):
            tsv_path = candidate

    # Try pandas first for convenience
    try:
        import pandas as pd
        df = pd.read_csv(tsv_path, sep='\t', encoding='utf-8-sig')
        cols_map = {str(c).strip().lower(): c for c in df.columns}
        if 'frequency (hz)' in cols_map and 'amplitude' in cols_map:
            fcol, acol = cols_map['frequency (hz)'], cols_map['amplitude']
            freqs = pd.to_numeric(df[fcol], errors='coerce').to_numpy()
            amps = pd.to_numeric(df[acol], errors='coerce').to_numpy()
        else:
            # Fallback: take first two numeric columns
            num_df = df.select_dtypes(include=['number'])
            if num_df.shape[1] >= 2:
                freqs = num_df.iloc[:, 0].to_numpy()
                amps = num_df.iloc[:, 1].to_numpy()
            else:
                raise ValueError('No numeric columns detected in TSV')
        mask = np.isfinite(freqs) & np.isfinite(amps)
        return freqs[mask], amps[mask]
    except Exception:
        # Robust manual parse: ignore headers/malformed rows and accept tabs or whitespace
        freqs_list = []
        amps_list = []
        with open(tsv_path, 'r', encoding='utf-8-sig') as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                parts = line.split('\t') if '\t' in line else line.split()
                if len(parts) < 2:
                    continue
                try:
                    fval = float(parts[0])
                    aval = float(parts[1])
                    freqs_list.append(fval)
                    amps_list.append(aval)
                except ValueError:
                    # Skip non-numeric (e.g., header)
                    continue
        if not freqs_list:
            raise ValueError('No numeric data found in TSV')
        freqs = np.asarray(freqs_list, dtype=float)
        amps = np.asarray(amps_list, dtype=float)
        return freqs, amps



## 3. Control Panel

In [4]:
ui = InteractiveSpectralUI(
    mode='fm',
    optimize_fn=my_optimize,
    synthesize_fn=my_synthesize,
    compute_target_spectrum_fn=compute_target_spectrum,
    save_wav_fn=save_wav,
    defaults={
        'target_tsv': 'tsv/cello_single.tsv',
        'num_operators': 4,
        'duration': 2.0,
        'fs': 44100,
        'optimizer': 'differential_evolution',
        'metric': 'spectral_convergence',
        'iterations': 200,
        'output_name': 'optimized_output_fm.wav',
    }
)
ui.display()

# You can get the current settings from the UI at any time:
# current = ui.get_settings(); current

VBox(children=(VBox(children=(HTML(value="<h3 style='margin:4px 0'>Spectral FM Control Panel</h3>"), Dropdown(…