In [2]:
import numpy as np
import pandas as pd
import sympy as sy

# Try importing torch, but don't fail if it's not available
try:
    import torch
    TORCH_AVAILABLE = True
    print(f"PyTorch {torch.__version__} is available")
except ImportError:
    TORCH_AVAILABLE = False
    print("Warning: PyTorch is not installed. Using numpy/scipy for diagonalization.")
    torch = None

import ujson as uj

from copy import deepcopy
from functools import partial
from pathlib import Path

from sympy.physics.wigner import wigner_3j, wigner_6j
from numpy import linalg as LA
from IPython.display import Latex, display
import matplotlib.pyplot as plt
from matplotlib import colors
from matplotlib.collections import LineCollection
%matplotlib inline
import seaborn as sns

from scipy.optimize import least_squares
from scipy.stats import norm

import Energy_Levels as EL
from Energy_Levels import MoleculeLevels
from Energy_Levels import (
    branching_ratios,
    Calculate_TDMs,
    Calculate_TDM_evecs,
    Calculate_forbidden_TDM_evecs,
    Calculate_forbidden_TDMs,
)

print("All imports successful!")

# sns.set()
# sns.set_palette('bright')
# np.set_printoptions(precision=9, suppress=True)
# from tabulate import tabulate

PyTorch 2.9.1+cpu is available
All imports successful!


In [3]:
# Torch device configuration and GPU-ready diagonalization hooks
# Only configure torch if it's available
if TORCH_AVAILABLE:
    try:
        TORCH_DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using torch device: {TORCH_DEVICE}")
    except Exception as e:
        print(f"Warning: Could not configure torch device: {e}")
        TORCH_AVAILABLE = False
else:
    TORCH_DEVICE = None
    print("Torch not available, using numpy/scipy for diagonalization")

# Store original functions
if not hasattr(EL, "diagonalize_cpu"):
    EL.diagonalize_cpu = EL.diagonalize
    EL.diagonalize_batch_cpu = EL.diagonalize_batch

# Patch the diagonalize function in the Energy_Levels module
# This ensures internal calls to diagonalize() use our patched version
def diagonalize_with_device(matrix, method="torch", order=False, Normalize=False, round=10):
    """GPU/CPU-aware diagonalization with proper tensor handling."""
    if method == "torch" and TORCH_AVAILABLE:
        try:
            tensor = torch.from_numpy(matrix).to(TORCH_DEVICE)
            w, v = torch.linalg.eigh(tensor)
            # Use detach() to ensure we can convert to numpy even if requires_grad=True
            evals = np.round(w.detach().cpu().numpy(), round)
            evecs = np.round(v.detach().cpu().numpy().T, round)
            if order:
                idx_order = np.argsort(evals)
                evecs = evecs[idx_order, :]
                evals = evals[idx_order]
            return evals, evecs
        except Exception as e:
            print(f"Warning: Torch diagonalization failed: {e}")
            print("Falling back to numpy...")
            method = "numpy"
    # Fallback to original function for non-torch methods
    return EL.diagonalize_cpu(matrix, method=method, order=order, Normalize=Normalize, round=round)


def diagonalize_batch_with_device(matrix_array, method="torch", round=10):
    """GPU/CPU-aware batch diagonalization with proper tensor handling."""
    if method == "torch" and TORCH_AVAILABLE:
        try:
            tensors = torch.from_numpy(matrix_array).to(TORCH_DEVICE)
            w, v = torch.linalg.eigh(tensors)
            evals = np.round(w.detach().cpu().numpy(), round)
            evecs = np.round(v.detach().cpu().numpy().transpose(0, 2, 1), round)
            return evals, evecs
        except Exception as e:
            print(f"Warning: Torch batch diagonalization failed: {e}")
            print("Falling back to numpy...")
            method = "numpy"
    # Fallback to original function for non-torch methods
    return EL.diagonalize_batch_cpu(matrix_array, method=method, round=round)


# Patch the functions in the module namespace
# This replaces the function references so internal calls use the patched version
EL.diagonalize = diagonalize_with_device
EL.diagonalize_batch = diagonalize_batch_with_device
if TORCH_AVAILABLE:
    EL.TORCH_DEVICE = TORCH_DEVICE

print("Diagonalization functions configured successfully.")
print(f"Default method: {'torch' if TORCH_AVAILABLE else 'numpy'}")


Using torch device: cpu
Diagonalization functions configured successfully.
Default method: torch


In [4]:
X010_173 = MoleculeLevels.initialize_state(
    "YbOH",
    "173",
    "X010",
    [1, 2],
    M_values="all",
    I=[5 / 2, 1 / 2],
    S=1 / 2,
    round=8,
    P_values=[1 / 2, 3 / 2],
)


KeyboardInterrupt: 

In [None]:
X010_173 = MoleculeLevels.initialize_state(
    "YbOH",
    "173",
    "X010",
    [1, 2],
    M_values="all",
    I=[5 / 2, 1 / 2],
    S=1 / 2,
    round=8,
    P_values=[1 / 2, 3 / 2],
)

BASE_PARAMETERS = deepcopy(X010_173.parameters)
FIT_PARAMETER_NAMES = [
    "Be",
    "Gamma_SR",
    "Gamma_Prime",
    "bFYb",
    "cYb",
    "bFH",
    "cH",
    "e2Qq0",
    "q_lD",
    "p_lD",
    "muE",
    "g_S_eff",
]
PARAM_PRIORS = {key: BASE_PARAMETERS[key] for key in FIT_PARAMETER_NAMES}


def _default_bound(value, frac=0.2, floor=1e-6):
    span = max(abs(value) * frac, floor)
    return value - span, value + span


FIT_PARAMETER_BOUNDS = {key: _default_bound(PARAM_PRIORS[key]) for key in FIT_PARAMETER_NAMES}

print(f"Loaded {X010_173.iso_state} with {len(FIT_PARAMETER_NAMES)} fit parameters tracked.")

In [None]:
# --- Synthetic "experimental" peak generator for this repo (windowed & sparse) ---
# Run this AFTER you have X010_173 initialized.
# Goal: for EACH spectrum, produce 5–10 observed peaks between 300 and 400 MHz.

import numpy as np
import pandas as pd

# ------------------- helpers -------------------
def _to_numpy_1d(x):
    """Convert torch/numpy/list to a 1D float numpy array."""
    try:
        import torch
        if isinstance(x, torch.Tensor):
            return x.detach().cpu().numpy().astype(float).ravel()
    except Exception:
        pass
    return np.asarray(x, dtype=float).ravel()

def get_evals(state, Ez, Bz, method="torch"):
    """
    Robustly get eigenvalues from MoleculeLevels regardless of return style.
    Uses set_attr=True then reads state.evals if needed.
    """
    out = state.eigensystem(Ez, Bz, order=True, method=method, set_attr=True)
    if out is not None:
        evals = out[0]
    else:
        evals = getattr(state, "evals", None)
        if evals is None:
            raise RuntimeError("Couldn't find eigenvalues. state.eigensystem() didn't return and state.evals not set.")
    evals = _to_numpy_1d(evals)
    if evals.size < 2:
        raise RuntimeError(f"Too few eigenvalues ({evals.size}). Basis/truncation may be too small.")
    return evals

def predict_lines_from_levels(evals,
                             n_levels=120,
                             fmin=300.0,
                             fmax=400.0,
                             max_lines=50000,
                             seed=0):
    """
    Candidate transition frequencies from pairwise differences of the lowest n_levels energies.
    NOT enforcing selection rules (pipeline testing only).

    Returns only lines within [fmin, fmax] MHz (no auto-expansion).
    """
    rng = np.random.default_rng(seed)

    e = np.sort(_to_numpy_1d(evals))
    n = min(int(n_levels), e.size)
    if n < 2:
        return np.array([], dtype=float)
    e = e[:n]

    # Positive pairwise differences: e[j] - e[i] for j>i
    diffs = (e[None, :] - e[:, None])[np.triu_indices(n, k=1)]
    diffs = diffs[(diffs >= fmin) & (diffs <= fmax)]

    if diffs.size == 0:
        return np.array([], dtype=float)

    diffs = np.unique(np.round(np.sort(diffs), 6))

    # If too many, subsample to max_lines for speed
    if diffs.size > max_lines:
        idx = rng.choice(diffs.size, size=max_lines, replace=False)
        diffs = np.sort(diffs[idx])

    return diffs.astype(float)

def make_fake_experiment_in_window(lines,
                                   nu_min=300.0,
                                   nu_max=400.0,
                                   n_keep_min=5,
                                   n_keep_max=10,
                                   sigma_MHz=0.25,
                                   spurious=1,
                                   seed=0):
    """
    Create sparse, unassigned observed peaks in [nu_min, nu_max] MHz.

    - Picks a random number of peaks between n_keep_min and n_keep_max from the candidate lines.
    - If not enough candidate lines exist, fills remaining peaks uniformly in-window (spurious).
    - Adds Gaussian jitter sigma_MHz to mimic centroid uncertainty.
    - Adds extra spurious peaks (optional) uniformly in-window.
    """
    rng = np.random.default_rng(seed)
    lines = _to_numpy_1d(lines)

    # Ensure windowing (should already be windowed, but keep safe)
    inwin = lines[(lines >= nu_min) & (lines <= nu_max)]
    inwin = np.unique(np.round(np.sort(inwin), 6))

    n_target = int(rng.integers(int(n_keep_min), int(n_keep_max) + 1))

    if inwin.size >= n_target:
        idx = rng.choice(inwin.size, size=n_target, replace=False)
        obs = np.sort(inwin[idx])
    else:
        obs = inwin.copy()
        n_missing = n_target - obs.size
        fill = rng.uniform(nu_min, nu_max, size=n_missing)
        obs = np.sort(np.concatenate([obs, fill]))

    # Add measurement jitter and clip to window
    obs = obs + rng.normal(0.0, sigma_MHz, size=obs.size)
    obs = np.clip(obs, nu_min, nu_max)

    # Add extra spurious peaks (optional)
    if spurious and int(spurious) > 0:
        spur = rng.uniform(nu_min, nu_max, size=int(spurious))
        obs = np.sort(np.concatenate([obs, spur]))

    return np.sort(obs)

def generate_synthetic_peaks_csv_windowed(state,
                                         conditions,
                                         output_csv="synthetic_peaks.csv",
                                         method="torch",
                                         # window constraints
                                         window_min=300.0,
                                         window_max=400.0,
                                         # eigenlevel sampling
                                         n_levels_start=120,
                                         n_levels_max=600,
                                         # observed sparsity
                                         n_keep_min=5,
                                         n_keep_max=10,
                                         sigma_MHz=0.25,
                                         spurious=1,
                                         seed_base=0,
                                         verbose=True):
    """
    Generate a multi-spectrum unassigned peak CSV with 5–10 peaks per spectrum
    in a specified frequency window [window_min, window_max] MHz.
    """
    rows = []

    for idx, (spec_id, Ez, Bz) in enumerate(conditions):
        evals = get_evals(state, Ez, Bz, method=method)

        # Try increasing n_levels until we get some lines in the window
        lines = np.array([], dtype=float)
        n_levels = int(n_levels_start)

        while lines.size == 0 and n_levels <= int(n_levels_max):
            lines = predict_lines_from_levels(
                evals,
                n_levels=n_levels,
                fmin=window_min,
                fmax=window_max,
                max_lines=50000,
                seed=seed_base + 100 + idx,
            )
            n_levels = int(np.ceil(n_levels * 1.5))

        # If still none, widen the window slightly (still centered around requested region)
        if lines.size == 0:
            lines = predict_lines_from_levels(
                evals,
                n_levels=min(int(n_levels_max), evals.size),
                fmin=window_min - 50.0,
                fmax=window_max + 50.0,
                max_lines=50000,
                seed=seed_base + 200 + idx,
            )

        if lines.size == 0:
            raise RuntimeError(
                f"[{spec_id}] No candidate lines found near {window_min}-{window_max} MHz "
                f"(even after widening). Try increasing n_levels_max or widening the window."
            )

        peaks = make_fake_experiment_in_window(
            lines,
            nu_min=window_min,
            nu_max=window_max,
            n_keep_min=n_keep_min,
            n_keep_max=n_keep_max,
            sigma_MHz=sigma_MHz,
            spurious=spurious,
            seed=seed_base + 300 + idx,
        )

        if verbose:
            print(f"{spec_id}: Ez={Ez} V/cm, Bz={Bz} G | evals={evals.size} | "
                  f"cand_lines_in_window={lines.size} | peaks_written={peaks.size} | "
                  f"window=[{window_min},{window_max}] MHz")

        for nu in peaks:
            rows.append({
                "spectrum_id": spec_id,
                "Ez_V_per_cm": float(Ez),
                "Bz_G": float(Bz),
                "nu_obs_MHz": float(nu),
                "sigma_MHz": float(sigma_MHz),
            })

    df = pd.DataFrame(rows).sort_values(["spectrum_id", "nu_obs_MHz"]).reset_index(drop=True)
    df.to_csv(output_csv, index=False)
    return df

# ------------------- Example usage -------------------
spectra_conditions = [
    ("spec0", 0.0, 0.0),
    ("spec1", 20.0, 5.0),
    ("spec2", 40.0, 5.0),
    ("spec3", 60.0, 10.0),
]

df = generate_synthetic_peaks_csv_windowed(
    X010_173,
    spectra_conditions,
    output_csv="synthetic_peaks.csv",
    method="torch",
    window_min=300.0,
    window_max=400.0,
    n_levels_start=120,
    n_levels_max=600,
    n_keep_min=5,
    n_keep_max=10,
    sigma_MHz=0.25,
    spurious=1,          # set 0 for none
    seed_base=123,
    verbose=True,
)

df.head(20)
# File "synthetic_peaks.csv" now has ~5–10 peaks per spectrum in 300–400 MHz.


In [None]:
def set_state_parameters(state, updates=None):
    if updates is None:
        updates = {}
    new_params = dict(state.parameters)
    new_params.update(updates)

    state.parameters = new_params
    state.library.parameters[state.iso_state] = new_params

    b = state.library.H_builders[state.iso_state]
    state.H_function, state.H_symbolic = b(
        state.q_numbers,
        params=new_params,
        M_values=state.M_values,
        precision=state.round,
    )

    state.eigensystem(0, 1e-8, order=True, method="torch", set_attr=True)
    state.generate_parities(state.evecs0)



def parameter_vector_to_dict(vector):
    return {name: value for name, value in zip(FIT_PARAMETER_NAMES, vector)}


def current_parameter_dict(state=None):
    source = state.parameters if state is not None else BASE_PARAMETERS
    return {name: source[name] for name in FIT_PARAMETER_NAMES}


def parameters_to_vector(params):
    return np.array([params[name] for name in FIT_PARAMETER_NAMES], dtype=float)

set_state_parameters(X010_173)
baseline_parameter_vector = parameters_to_vector(current_parameter_dict(X010_173))

In [None]:
# Experimental data setup and frequency transform configuration
EXPERIMENTAL_DATA_PATH = Path("/absolute/path/to/your/experimental_assignments.csv")  # TODO: update
REQUIRED_COLUMNS = ["state index 0", "state index 1", "freq_obs"]

if EXPERIMENTAL_DATA_PATH.exists():
    observed_df = pd.read_csv(EXPERIMENTAL_DATA_PATH)
    observed_df.columns = [col.strip() for col in observed_df.columns]
    missing = [col for col in REQUIRED_COLUMNS if col not in observed_df.columns]
    if missing:
        raise ValueError(f"Missing required columns in experimental data: {missing}")
else:
    observed_df = pd.DataFrame(columns=REQUIRED_COLUMNS + ["uncertainty"])
    print("Experimental data file not found. Update EXPERIMENTAL_DATA_PATH and re-run this cell.")

if "uncertainty" not in observed_df.columns:
    observed_df["uncertainty"] = 1.0  # MHz weights (set to 1 if unknown)

observed_df = observed_df.copy()
for col in ["freq_obs", "uncertainty"]:
    if col in observed_df.columns:
        observed_df[col] = pd.to_numeric(observed_df[col], errors="coerce")

observed_df = observed_df.dropna(subset=["freq_obs"]).reset_index(drop=True)

# Frequency transform controls (matching the previous plotting convention)
FREQ_OFFSET = (106.089 - 97.58) * 2  # MHz
FREQ_SCALE = 0.5  # Divide by two for two-photon frequency conversion
FREQ_SHIFT = -90.0  # Additional shift applied after scaling


def model_frequency_transform(raw_freq):
    """Map raw transition frequency from the model to the experimental frequency axis."""
    return (raw_freq - FREQ_OFFSET) * FREQ_SCALE + FREQ_SHIFT


TRANSITION_INDEX_SET = None  # Replace with a list of indices to restrict transitions if desired
LASER_POLARIZATION = "orth"
PARITY_SIGN = 1
INTENSITY_THRESHOLD = None  # Set to a float to discard transitions with weaker intensity
EZ_FIELD = 40  # Update if the experimental conditions change
B_FIELD = 1e-8
RUN_OPTIMIZATION = False  # Set to True to launch least-squares fitting


In [None]:
def compute_model_transitions(
    state,
    Ez=EZ_FIELD,
    B=B_FIELD,
    indices=None,
    parity_sign=PARITY_SIGN,
    polarization=LASER_POLARIZATION,
):
    index_list = indices if indices is not None else TRANSITION_INDEX_SET
    if index_list is None:
        index_list = list(range(84))

    result = state.calculate_two_photon_spectrum(
        Ez,
        B,
        index_list,
        parity_sign=parity_sign,
        laser_polarization=polarization,
    )
    transitions = pd.DataFrame(result[1])

    if not transitions.empty:
        transitions["freq_model"] = transitions["freq"].apply(model_frequency_transform)
        if INTENSITY_THRESHOLD is not None:
            intensity_key = next(
                (key for key in ["intensity", "Intensity", "strength"] if key in transitions.columns),
                None,
            )
            if intensity_key is not None:
                transitions = transitions[transitions[intensity_key] >= INTENSITY_THRESHOLD].reset_index(drop=True)

    return transitions, result


def merge_predictions_with_experiment(predicted_df, experimental_df):
    if experimental_df.empty:
        predicted_df = predicted_df.copy()
        predicted_df["freq_obs"] = np.nan
        predicted_df["residual"] = np.nan
        predicted_df["weight"] = 1.0
        return predicted_df, pd.DataFrame(), predicted_df

    merge_cols = ["state index 0", "state index 1"]
    for col in ["M0", "M1"]:
        if col in experimental_df.columns and col in predicted_df.columns:
            merge_cols.append(col)

    merged = experimental_df.merge(predicted_df, how="left", on=merge_cols, suffixes=("_obs", "_model"))
    missing = merged[merged["freq_model"].isna()].copy()

    merged["freq_model"] = merged["freq_model"].astype(float)
    merged["residual"] = merged["freq_model"] - merged["freq_obs"]
    if "uncertainty" in merged.columns:
        weights = merged["uncertainty"].replace(0, np.nan).fillna(1.0)
    else:
        weights = pd.Series(1.0, index=merged.index)
    merged["weight"] = weights
    merged["weighted_residual"] = merged["residual"] / merged["weight"]

    matched = merged[merged["freq_model"].notna()].copy()
    return matched, missing, predicted_df


def summarize_fit(matched_df):
    if matched_df.empty:
        return {"rms": np.nan, "weighted_rms": np.nan, "n_points": 0}

    valid = np.isfinite(matched_df["residual"]) & np.isfinite(matched_df["weight"])
    if not valid.any():
        return {"rms": np.nan, "weighted_rms": np.nan, "n_points": 0}

    residuals = matched_df.loc[valid, "residual"].to_numpy()
    weights = matched_df.loc[valid, "weight"].to_numpy()
    rms = np.sqrt(np.mean(residuals**2))
    weighted_rms = np.sqrt(np.mean((residuals / weights) ** 2))
    return {"rms": rms, "weighted_rms": weighted_rms, "n_points": int(valid.sum())}

In [None]:
baseline_predicted_df, baseline_raw = compute_model_transitions(X010_173)
baseline_matched_df, baseline_missing_df, baseline_predicted_df = merge_predictions_with_experiment(
    baseline_predicted_df, observed_df
)
baseline_summary = summarize_fit(baseline_matched_df)

print("Baseline comparison summary:", baseline_summary)
if not baseline_missing_df.empty:
    print(f"Unmatched experimental assignments: {len(baseline_missing_df)}")


def weighted_residual_vector(param_vector):
    updates = parameter_vector_to_dict(param_vector)
    set_state_parameters(X010_173, updates)
    predicted_df, _ = compute_model_transitions(X010_173)
    matched_df, missing_df, _ = merge_predictions_with_experiment(predicted_df, observed_df)
    if matched_df.empty:
        raise ValueError(
            "No model transitions matched the experimental assignments. "
            "Adjust TRANSITION_INDEX_SET or check the assignments."
        )
    return matched_df["weighted_residual"].to_numpy()


optimization_result = None
fit_parameters = parameter_vector_to_dict(baseline_parameter_vector)
fit_predicted_df = baseline_predicted_df
fit_matched_df = baseline_matched_df
fit_missing_df = baseline_missing_df
fit_summary = baseline_summary

if RUN_OPTIMIZATION and not observed_df.empty:
    lower_bounds = np.array([FIT_PARAMETER_BOUNDS[name][0] for name in FIT_PARAMETER_NAMES])
    upper_bounds = np.array([FIT_PARAMETER_BOUNDS[name][1] for name in FIT_PARAMETER_NAMES])

    optimization_result = least_squares(
        weighted_residual_vector,
        x0=baseline_parameter_vector,
        bounds=(lower_bounds, upper_bounds),
        verbose=2,
    )
    fit_parameters = parameter_vector_to_dict(optimization_result.x)
    set_state_parameters(X010_173, fit_parameters)
    fit_predicted_df, _ = compute_model_transitions(X010_173)
    fit_matched_df, fit_missing_df, fit_predicted_df = merge_predictions_with_experiment(
        fit_predicted_df, observed_df
    )
    fit_summary = summarize_fit(fit_matched_df)
    print("Fit summary:", fit_summary)

fit_summary


In [None]:
if not fit_matched_df.empty:
    display(fit_matched_df[["state index 0", "state index 1", "freq_obs", "freq_model", "residual"]].head())

    fig, ax = plt.subplots(figsize=(10, 6))
    ax.errorbar(
        fit_matched_df["freq_obs"],
        fit_matched_df["freq_model"],
        yerr=fit_matched_df.get("uncertainty", pd.Series(0, index=fit_matched_df.index)),
        fmt="o",
        ms=6,
        alpha=0.7,
        label="Matched transitions",
    )
    lims = [
        min(fit_matched_df["freq_obs"].min(), fit_matched_df["freq_model"].min()) - 0.1,
        max(fit_matched_df["freq_obs"].max(), fit_matched_df["freq_model"].max()) + 0.1,
    ]
    ax.plot(lims, lims, "k--", label="Perfect agreement")
    ax.set_xlim(lims)
    ax.set_ylim(lims)
    ax.set_xlabel("Observed frequency (MHz)")
    ax.set_ylabel("Model frequency (MHz)")
    ax.set_title("Observed vs. model transition frequencies")
    ax.grid(True, alpha=0.3)
    ax.legend()
    plt.show()
else:
    print("No matched transitions to visualize yet. Load experimental data and rerun the fit.")


In [None]:
# Unassigned frequency data configuration
UNASSIGNED_DATA_PATH = Path("/absolute/path/to/your/unassigned_frequencies.csv")  # TODO: update
UNASSIGNED_FREQ_COLUMN = "freq_obs"  # Column name in the CSV file
UNASSIGNED_WEIGHT_COLUMN = None  # Set to column name if relative intensities are available
UNASSIGNED_SIGMA = 0.03  # MHz Gaussian width used to broaden stick spectra

if UNASSIGNED_DATA_PATH.exists():
    unassigned_df = pd.read_csv(UNASSIGNED_DATA_PATH)
    if UNASSIGNED_FREQ_COLUMN not in unassigned_df.columns:
        raise ValueError(
            f"Column '{UNASSIGNED_FREQ_COLUMN}' not found in unassigned data: {unassigned_df.columns.tolist()}"
        )
    observed_unassigned_freqs = pd.to_numeric(
        unassigned_df[UNASSIGNED_FREQ_COLUMN], errors="coerce"
    ).dropna().to_numpy()
    if UNASSIGNED_WEIGHT_COLUMN and UNASSIGNED_WEIGHT_COLUMN in unassigned_df.columns:
        observed_unassigned_weights = pd.to_numeric(
            unassigned_df[UNASSIGNED_WEIGHT_COLUMN], errors="coerce"
        ).fillna(1.0).to_numpy()
    else:
        observed_unassigned_weights = np.ones_like(observed_unassigned_freqs)
    print(f"Loaded {len(observed_unassigned_freqs)} unassigned transition frequencies.")
else:
    observed_unassigned_freqs = np.array([])
    observed_unassigned_weights = np.array([])
    print("Unassigned frequency data not found. Update UNASSIGNED_DATA_PATH and re-run.")


In [None]:
def gaussian_broadened_spectrum(frequencies, weights=None, freq_axis=None, sigma=UNASSIGNED_SIGMA):
    if freq_axis is None:
        if frequencies.size == 0:
            return np.linspace(0, 1, 1000), np.zeros(1000)
        f_min, f_max = frequencies.min() - 3 * sigma, frequencies.max() + 3 * sigma
        freq_axis = np.linspace(f_min, f_max, 2000)
    if weights is None:
        weights = np.ones_like(frequencies)
    if frequencies.size == 0:
        return freq_axis, np.zeros_like(freq_axis)
    diff = freq_axis[:, None] - frequencies[None, :]
    spectrum = np.exp(-(diff**2) / (2 * sigma**2)) @ weights
    return freq_axis, spectrum


def spectral_residual(predicted_freqs, observed_freqs, predicted_weights=None, observed_weights=None, sigma=UNASSIGNED_SIGMA):
    if observed_freqs.size == 0 or predicted_freqs.size == 0:
        return np.inf
    freq_axis, observed_spec = gaussian_broadened_spectrum(observed_freqs, observed_weights, sigma=sigma)
    _, predicted_spec = gaussian_broadened_spectrum(predicted_freqs, predicted_weights, freq_axis=freq_axis, sigma=sigma)
    observed_spec /= observed_spec.max() if observed_spec.max() else 1
    predicted_spec /= predicted_spec.max() if predicted_spec.max() else 1
    return np.sqrt(np.mean((predicted_spec - observed_spec) ** 2))


def transition_frequency_set(state, Ez=EZ_FIELD, B=B_FIELD, indices=None, **kwargs):
    transitions, raw = compute_model_transitions(state, Ez=Ez, B=B, indices=indices, **kwargs)
    if transitions.empty:
        return np.array([]), np.array([]), raw
    weights = None
    for candidate in ["intensity", "Intensity", "strength", "Strength"]:
        if candidate in transitions.columns:
            weights = transitions[candidate].to_numpy()
            break
    return transitions["freq_model"].to_numpy(), weights, raw


def unassigned_spectrum_loss(state, Ez=EZ_FIELD, B=B_FIELD, sigma=UNASSIGNED_SIGMA, observed_freqs=None, observed_weights=None, **kwargs):
    if observed_freqs is None:
        observed_freqs = observed_unassigned_freqs
    if observed_weights is None:
        observed_weights = observed_unassigned_weights
    predicted_freqs, predicted_weights, _ = transition_frequency_set(state, Ez=Ez, B=B, **kwargs)
    return spectral_residual(predicted_freqs, observed_freqs, predicted_weights, observed_weights, sigma=sigma)


In [None]:
# Parameter space exploration for unassigned spectra
UNASSIGNED_PARAMETER_NAMES = FIT_PARAMETER_NAMES  # Override to restrict search
UNASSIGNED_PARAMETER_BOUNDS = {name: FIT_PARAMETER_BOUNDS[name] for name in UNASSIGNED_PARAMETER_NAMES}
UNASSIGNED_RANDOM_SAMPLES = 100
UNASSIGNED_TOP_K = 10
USE_GAUSSIAN_PROCESS = True  # Toggle Bayesian surrogate refinement


def sample_parameter_vector(bounds_dict):
    return np.array([
        np.random.uniform(low=low, high=high) for (low, high) in bounds_dict.values()
    ])


def evaluate_parameter_vector(param_vector, state=None, observed_freqs=None, observed_weights=None):
    state = state or X010_173
    updates = parameter_vector_to_dict(param_vector)
    set_state_parameters(state, updates)
    loss = unassigned_spectrum_loss(
        state,
        observed_freqs=observed_freqs,
        observed_weights=observed_weights,
    )
    return loss


def random_explore_unassigned(state, n_samples=UNASSIGNED_RANDOM_SAMPLES, bounds=None, observed_freqs=None, observed_weights=None):
    bounds = bounds or UNASSIGNED_PARAMETER_BOUNDS
    names = list(bounds.keys())
    samples = []
    for _ in range(n_samples):
        vec = sample_parameter_vector(bounds)
        loss = evaluate_parameter_vector(vec, state=state, observed_freqs=observed_freqs, observed_weights=observed_weights)
        samples.append({"params": parameter_vector_to_dict(vec), "vector": vec, "loss": loss})
    samples.sort(key=lambda x: x["loss"])
    return names, samples


random_search_results = None
if observed_unassigned_freqs.size:
    names, random_search_results = random_explore_unassigned(
        X010_173,
        n_samples=UNASSIGNED_RANDOM_SAMPLES,
        bounds=UNASSIGNED_PARAMETER_BOUNDS,
        observed_freqs=observed_unassigned_freqs,
        observed_weights=observed_unassigned_weights,
    )
    best_candidates = random_search_results[:UNASSIGNED_TOP_K]
    print("Top random-search candidates (loss is spectrum RMSE):")
    for idx, candidate in enumerate(best_candidates, 1):
        print(f"#{idx}: loss={candidate['loss']:.5f}")
else:
    print("No unassigned data loaded; skipping random exploration.")


surrogate_model = None
surrogate_history = []

if USE_GAUSSIAN_PROCESS and observed_unassigned_freqs.size and random_search_results:
    try:
        from sklearn.gaussian_process import GaussianProcessRegressor
        from sklearn.gaussian_process.kernels import Matern, WhiteKernel
        from sklearn.preprocessing import StandardScaler

        kernel = Matern(length_scale=np.ones(len(names)), nu=2.5) + WhiteKernel(noise_level=1e-6)
        gp = GaussianProcessRegressor(kernel=kernel, normalize_y=True, n_restarts_optimizer=3)
        scaler = StandardScaler()

        X_samples = np.array([entry["vector"] for entry in random_search_results])
        y_samples = np.array([entry["loss"] for entry in random_search_results])
        scaler.fit(X_samples)
        gp.fit(scaler.transform(X_samples), y_samples)

        surrogate_model = {"gp": gp, "scaler": scaler, "names": names}

        def propose_next(gp_model, n_candidates=512):
            raw = np.array([sample_parameter_vector(UNASSIGNED_PARAMETER_BOUNDS) for _ in range(n_candidates)])
            mu, sigma = gp_model["gp"].predict(gp_model["scaler"].transform(raw), return_std=True)
            acquisition = mu - 1.96 * sigma  # Lower Confidence Bound
            best_idx = np.argmin(acquisition)
            return raw[best_idx], acquisition[best_idx]

        for iteration in range(10):
            candidate_vec, acquisition_value = propose_next(surrogate_model)
            candidate_loss = evaluate_parameter_vector(
                candidate_vec,
                state=X010_173,
                observed_freqs=observed_unassigned_freqs,
                observed_weights=observed_unassigned_weights,
            )
            surrogate_history.append({
                "vector": candidate_vec,
                "params": parameter_vector_to_dict(candidate_vec),
                "loss": candidate_loss,
                "acquisition": acquisition_value,
            })
            # Update GP with new observation
            X_samples = np.vstack([X_samples, candidate_vec])
            y_samples = np.append(y_samples, candidate_loss)
            scaler.fit(X_samples)
            gp.fit(scaler.transform(X_samples), y_samples)

        print(f"Gaussian-process refinement completed with {len(surrogate_history)} additional evaluations.")
    except ImportError:
        print("scikit-learn not available; skipping Gaussian-process refinement.")


In [None]:
def collect_candidate_table(random_results=None, surrogate_results=None, top_k=UNASSIGNED_TOP_K):
    records = []
    for source, entries in [("random", random_results or []), ("gp", surrogate_results or [])]:
        for entry in (entries if isinstance(entries, list) else []):
            row = {"source": source, "loss": entry["loss"], **entry.get("params", {})}
            records.append(row)
    if not records:
        return pd.DataFrame()
    df = pd.DataFrame(records)
    df = df.sort_values("loss").head(top_k).reset_index(drop=True)
    return df


candidate_table = collect_candidate_table(
    random_results=random_search_results,
    surrogate_results=surrogate_history,
)

if not candidate_table.empty:
    display(candidate_table)
else:
    print("No parameter candidates generated yet.")


In [None]:
def plot_spectrum_comparison(candidate_params, observed_freqs=None, observed_weights=None, sigma=UNASSIGNED_SIGMA, title_suffix=""):
    observed_freqs = observed_freqs if observed_freqs is not None else observed_unassigned_freqs
    observed_weights = observed_weights if observed_weights is not None else observed_unassigned_weights
    if observed_freqs.size == 0:
        print("No unassigned frequencies to visualize.")
        return
    set_state_parameters(X010_173, candidate_params)
    predicted_freqs, predicted_weights, _ = transition_frequency_set(X010_173)
    freq_axis, observed_spec = gaussian_broadened_spectrum(observed_freqs, observed_weights, sigma=sigma)
    _, predicted_spec = gaussian_broadened_spectrum(predicted_freqs, predicted_weights, freq_axis=freq_axis, sigma=sigma)
    observed_spec /= observed_spec.max() if observed_spec.max() else 1
    predicted_spec /= predicted_spec.max() if predicted_spec.max() else 1
    plt.figure(figsize=(12, 5))
    plt.plot(freq_axis, observed_spec, label="Observed", lw=2)
    plt.plot(freq_axis, predicted_spec, label="Predicted", lw=2)
    plt.xlabel("Frequency (MHz)")
    plt.ylabel("Normalized intensity")
    plt.title(f"Spectrum overlay {title_suffix}")
    plt.grid(True, alpha=0.3)
    plt.legend()
    plt.show()


if candidate_table.shape[0]:
    best_candidate_params = candidate_table.iloc[0].drop(labels=["source", "loss"]).to_dict()
    print("Plotting spectrum overlay for top candidate...")
    plot_spectrum_comparison(best_candidate_params, title_suffix=f"loss={candidate_table.iloc[0]['loss']:.4f}")
