# Module C — Precompute Validation
This notebook mirrors the `make precompute-data` target. It orchestrates the canonical 12 configuration runs, captures metadata, and ensures the generated artifacts under `python/validation/artifacts/precompute_data/` stay in sync with the Rust core implementation.

## 1. Configure Validation Paths and Environment
Resolve paths relative to this notebook, hydrate optional environment variables, and surface the validation binary along with the artifact directory used throughout this run.

In [None]:
from __future__ import annotations
import json
import os
import subprocess
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from dotenv import load_dotenv
from IPython.display import HTML, display
from plotly.subplots import make_subplots

# Load optional .env to pick up VALIDATION_BIN overrides, etc.
ENV_PATH = Path.cwd() / ".env"
if ENV_PATH.exists():
    load_dotenv(ENV_PATH)

try:
    NOTEBOOK_DIR = Path(__file__).resolve().parent
except NameError:
    # __file__ is not defined when running inside some notebook frontends,
    # so fall back to the working directory to keep relative paths stable.
    NOTEBOOK_DIR = Path.cwd()

REPO_ROOT = NOTEBOOK_DIR.parent.parent
VALIDATION_DIR = NOTEBOOK_DIR
ARTIFACT_DIR = VALIDATION_DIR / "artifacts" / "precompute_data"
ARTIFACT_DIR.mkdir(parents=True, exist_ok=True)
VALIDATION_BIN = os.environ.get("VALIDATION_BIN", "mpb2d-validation")
CARGO_CMD = os.environ.get("CARGO", "cargo")
print(f"Notebook dir: {NOTEBOOK_DIR}")
print(f"Artifact dir: {ARTIFACT_DIR}")
print(f"Validation binary: {VALIDATION_BIN}")

## 2. Declare Precompute Parameter Grid
Mirror the Makefile tuples (lattice, resolution, radii, permittivities, mesh sizes) so the notebook can orchestrate the same runs deterministically.

In [None]:
def build_param_grid() -> pd.DataFrame:
    entries = [
        # Square lattice tuples
        dict(lattice="square", resolution=24, radius=0.28, eps_bg=11.0, eps_inside=1.0, mesh_size=1, tag="square_res24_mesh1"),
        dict(lattice="square", resolution=24, radius=0.28, eps_bg=11.0, eps_inside=1.0, mesh_size=4, tag="square_res24_mesh4"),
        dict(lattice="square", resolution=48, radius=0.25, eps_bg=8.5, eps_inside=2.0, mesh_size=1, tag="square_res48_mesh1"),
        dict(lattice="square", resolution=48, radius=0.25, eps_bg=8.5, eps_inside=2.0, mesh_size=4, tag="square_res48_mesh4"),
        dict(lattice="square", resolution=128, radius=0.20, eps_bg=16.0, eps_inside=4.0, mesh_size=1, tag="square_res128_mesh1"),
        dict(lattice="square", resolution=128, radius=0.20, eps_bg=16.0, eps_inside=4.0, mesh_size=4, tag="square_res128_mesh4"),
        # Triangular lattice tuples
        dict(lattice="triangular", resolution=24, radius=0.22, eps_bg=10.5, eps_inside=3.5, mesh_size=1, tag="triangular_res24_mesh1"),
        dict(lattice="triangular", resolution=24, radius=0.22, eps_bg=10.5, eps_inside=3.5, mesh_size=4, tag="triangular_res24_mesh4"),
        dict(lattice="triangular", resolution=48, radius=0.18, eps_bg=6.0, eps_inside=1.5, mesh_size=1, tag="triangular_res48_mesh1"),
        dict(lattice="triangular", resolution=48, radius=0.18, eps_bg=6.0, eps_inside=1.5, mesh_size=4, tag="triangular_res48_mesh4"),
        dict(lattice="triangular", resolution=128, radius=0.15, eps_bg=14.0, eps_inside=5.5, mesh_size=1, tag="triangular_res128_mesh1"),
        dict(lattice="triangular", resolution=128, radius=0.15, eps_bg=14.0, eps_inside=5.5, mesh_size=4, tag="triangular_res128_mesh4"),
    ]
    df_params = pd.DataFrame(entries)
    df_params["output_json"] = df_params.apply(
        lambda row: f"precompute_{row.lattice}_res{row.resolution}_bg{row.eps_bg:.1f}_hole{row.eps_inside:.1f}_mesh{row.mesh_size}.json",
        axis=1,
    )
    df_params["output_path"] = df_params["output_json"].apply(lambda name: ARTIFACT_DIR / name)
    return df_params

params_df = build_param_grid()
params_df

## 3. Helper to Invoke Cargo Validation Binary
Wrap `cargo run -p mpb2d-validation -- precompute-data …` with structured logging so every run surfaces its arguments, runtime, and stdout/stderr for later inspection.

In [None]:
@dataclass
class RunResult:
    tag: str
    lattice: str
    resolution: int
    mesh_size: int
    output_path: Path
    duration_s: float
    returncode: int
    stdout_path: Path
    stderr_path: Path

def run_precompute_row(row: pd.Series) -> RunResult:
    args = [
        CARGO_CMD,
        "run",
        "-p", VALIDATION_BIN,
        "--", "precompute-data",
        "--lattice", row.lattice,
        "--resolution", str(row.resolution),
        "--radius", str(row.radius),
        "--eps-bg", str(row.eps_bg),
        "--eps-inside", str(row.eps_inside),
        "--mesh-size", str(row.mesh_size),
        "--output", str(row.output_path),
        "--tag", row.tag,
    ]
    stdout_path = ARTIFACT_DIR / f"{row.tag}_stdout.log"
    stderr_path = ARTIFACT_DIR / f"{row.tag}_stderr.log"
    start = time.perf_counter()
    completed = subprocess.run(
        args,
        cwd=REPO_ROOT,
        capture_output=True,
        text=True,
        check=False,
    )
    duration = time.perf_counter() - start
    stdout_path.write_text(completed.stdout)
    stderr_path.write_text(completed.stderr)
    return RunResult(
        tag=row.tag,
        lattice=row.lattice,
        resolution=row.resolution,
        mesh_size=row.mesh_size,
        output_path=row.output_path,
        duration_s=duration,
        returncode=completed.returncode,
        stdout_path=stdout_path,
        stderr_path=stderr_path,
    )

## 4. Batch Execution Across Square and Triangular Lattices
Iterate over the parameter grid, run each configuration exactly once, and capture timing plus success metrics. The logs are persisted next to the artifacts for later debugging.

In [None]:
results: list[RunResult] = []
for _, row in params_df.iterrows():
    print(f"Running {row.tag} …", end=" ")
    run_result = run_precompute_row(row)
    status = "OK" if run_result.returncode == 0 else f"FAIL ({run_result.returncode})"
    print(status)
    results.append(run_result)

results_df = pd.DataFrame([r.__dict__ for r in results])
results_df

## 5. Validate Generated JSON Artifacts
Confirm every expected JSON file exists, then spot-check a subset to ensure the schema exposes TE/TM effective epsilons, FFT stats, and file references.

In [None]:
def validate_json_artifacts(params: pd.DataFrame, sample_size: int = 3) -> tuple[pd.DataFrame, pd.DataFrame]:
    missing_json: list[Path] = []
    missing_csv: list[Path] = []
    summaries: list[dict[str, Any]] = []
    samples: list[dict[str, Any]] = []
    for _, row in params.iterrows():
        json_path: Path = row.output_path
        if not json_path.exists():
            missing_json.append(json_path)
            continue
        payload = json.loads(json_path.read_text())
        files = payload.get("files", {})
        eps_csv = files.get("epsilon_fourier_csv")
        fft_csv = files.get("fft_workspace_csv")
        eps_ok = bool(eps_csv and json_path.with_name(eps_csv).exists())
        fft_ok = bool(fft_csv and json_path.with_name(fft_csv).exists())
        if eps_csv and not eps_ok:
            missing_csv.append(json_path.with_name(eps_csv))
        if fft_csv and not fft_ok:
            missing_csv.append(json_path.with_name(fft_csv))
        summaries.append(
        {
            "tag": payload.get("tag") or row.tag,
            "lattice": payload.get("lattice"),
            "resolution": payload.get("resolution"),
            "mesh_size": payload.get("mesh_size"),
            "te_eps_eff": payload.get("te_eps_eff"),
            "tm_eps_eff": payload.get("tm_eps_eff"),
            "clamp_fraction": payload.get("clamp_fraction"),
            "has_eps_fourier_csv": eps_ok,
            "has_fft_workspace_csv": fft_ok,
            "json_path": str(json_path),
            "output_path": str(json_path),
        },
        )
        if len(samples) < sample_size:
            samples.append(
            {
                "tag": payload.get("tag") or row.tag,
                "te_eps_eff": payload.get("te_eps_eff"),
                "tm_eps_eff": payload.get("tm_eps_eff"),
                "k_plus_g_sq": payload.get("k_plus_g_sq"),
                "workspace": payload.get("workspace"),
                "files": files,
            },
            )
    if missing_json:
        raise FileNotFoundError(f"Missing JSON artifacts: {[str(p) for p in missing_json]}")
    if missing_csv:
        raise FileNotFoundError(f"Referenced CSV artifacts not found: {[str(p) for p in missing_csv]}")
    return pd.DataFrame(summaries), pd.DataFrame(samples)

artifact_status_df, sample_payloads_df = validate_json_artifacts(params_df)
artifact_status_df, sample_payloads_df

## 6. Persist Run + Artifact Metadata
Merge the runtime metadata with artifact stats and persist compact CSV/JSON summaries so downstream notebooks can ingest the same information without re-running the expensive simulations.

In [None]:
def _prepare_run_df() -> pd.DataFrame:
    if "results_df" in globals():
        df = results_df.copy()
    else:
        df = params_df[["tag", "lattice", "resolution", "mesh_size", "output_path"]].copy()
        df["duration_s"] = pd.NA
        df["returncode"] = pd.NA
        df["stdout_path"] = pd.NA
        df["stderr_path"] = pd.NA
    for col in ["output_path", "stdout_path", "stderr_path"]:
        if col in df.columns:
            df[col] = df[col].apply(lambda value: str(value) if isinstance(value, Path) else value)
    return df

def persist_metadata(run_df: pd.DataFrame, artifacts_df: pd.DataFrame) -> dict[str, Path]:
    artifacts_df = artifacts_df.copy()
    if "output_path" in artifacts_df.columns and "json_path" in artifacts_df.columns:
        artifacts_df.drop(columns=["output_path"], inplace=True)
    if "json_path" in artifacts_df.columns:
        artifacts_df.rename(columns={"json_path": "output_path"}, inplace=True)
    if "output_path" not in artifacts_df.columns:
        raise ValueError("artifact_status_df must contain an output_path or json_path column")
    artifacts_df["output_path"] = artifacts_df["output_path"].astype(str)
    run_df = run_df.copy()
    merged = artifacts_df.merge(
        run_df,
        on=["tag", "lattice", "resolution", "mesh_size", "output_path"],
        how="left",
        suffixes=("", "_run"),
    )
    artifact_status_path = ARTIFACT_DIR / "precompute_artifact_status.csv"
    merged_path_csv = ARTIFACT_DIR / "precompute_run_metadata.csv"
    merged_path_json = ARTIFACT_DIR / "precompute_run_metadata.json"
    artifacts_df.to_csv(artifact_status_path, index=False)
    merged.to_csv(merged_path_csv, index=False)
    merged.to_json(merged_path_json, orient="records", indent=2)
    return {
        "artifact_status_csv": artifact_status_path,
        "run_metadata_csv": merged_path_csv,
        "run_metadata_json": merged_path_json,
    }

run_df_prepared = _prepare_run_df()
output_paths = persist_metadata(run_df_prepared, artifact_status_df)
output_paths

## 7. Visualize ε(G) Spectra and FFT Workspace Grids
Build per-configuration heatmaps for the complex ε(G) spectrum (magnitude) and the FFT workspace diagnostics (|k+G|² plus clamp mask). Title each figure with the TE/TM effective permittivities to quickly compare materials across the parameter grid.

In [None]:
def _pivot_matrix(df: pd.DataFrame, value_col: str) -> np.ndarray:
    pivot = df.pivot(index="iy", columns="ix", values=value_col).sort_index(ascending=False)
    return pivot.values

def _safe_show(fig):
    try:
        fig.show()
    except ValueError as exc:
        if "nbformat" in str(exc).lower():
            display(HTML(fig.to_html(include_plotlyjs="cdn", full_html=False)))
        else:
            raise

def _load_epsilon_fourier_maps(csv_path: Path) -> dict[str, np.ndarray]:
    df = pd.read_csv(csv_path)
    for col in ("real", "imag"):
        df[col] = pd.to_numeric(df[col], errors="coerce")
    df["magnitude"] = np.sqrt(df["real"].fillna(0.0) ** 2 + df["imag"].fillna(0.0) ** 2)
    return {
        "magnitude": _pivot_matrix(df, "magnitude"),
        "real": _pivot_matrix(df, "real"),
        "imag": _pivot_matrix(df, "imag"),
    }

def _load_fft_workspace_maps(csv_path: Path) -> dict[str, np.ndarray]:
    df = pd.read_csv(csv_path)
    numeric_cols = ["k_plus_g_sq", "kx_plus_g", "ky_plus_g"]
    for col in numeric_cols:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")
    df["clamped"] = df["clamped"].astype(int)
    return {
        "k_plus_g_sq": _pivot_matrix(df, "k_plus_g_sq"),
        "clamped": _pivot_matrix(df, "clamped"),
    }

def _resolve_artifacts(row: pd.Series) -> tuple[dict[str, Any], Path, Path]:
    json_path = Path(row["output_path"])
    payload = json.loads(json_path.read_text())
    files = payload.get("files", {})
    eps_csv = files.get("epsilon_fourier_csv")
    fft_csv = files.get("fft_workspace_csv")
    if not eps_csv or not fft_csv:
        raise FileNotFoundError(f"CSV references missing for {row['tag']}")
    eps_path = json_path.with_name(eps_csv)
    fft_path = json_path.with_name(fft_csv)
    if not eps_path.exists():
        raise FileNotFoundError(f"Missing epsilon(G) CSV: {eps_path}")
    if not fft_path.exists():
        raise FileNotFoundError(f"Missing FFT workspace CSV: {fft_path}")
    return payload, eps_path, fft_path

def plot_config_artifacts(row: pd.Series) -> None:
    payload, eps_path, fft_path = _resolve_artifacts(row)
    eps_maps = _load_epsilon_fourier_maps(eps_path)
    fft_maps = _load_fft_workspace_maps(fft_path)
    title_suffix = f"TE eff={payload['te_eps_eff']:.3f} · TM eff={payload['tm_eps_eff']:.3f}"
    fig_eps = make_subplots(
        rows=1, cols=3,
        subplot_titles=("|ε(G)|", "Re ε(G)", "Im ε(G)"),
        horizontal_spacing=0.05,
    )
    fig_eps.add_trace(go.Heatmap(z=eps_maps["magnitude"], coloraxis="coloraxis"), row=1, col=1)
    fig_eps.add_trace(go.Heatmap(z=eps_maps["real"], coloraxis="coloraxis2"), row=1, col=2)
    fig_eps.add_trace(go.Heatmap(z=eps_maps["imag"], coloraxis="coloraxis3"), row=1, col=3)
    fig_eps.update_layout(
        title=f"{row['tag']} — ε(G) spectra ({title_suffix})",
        height=450,
        width=1350,
        coloraxis=dict(colorscale="Viridis", colorbar_title="|ε(G)|"),
        coloraxis2=dict(
            colorscale="RdBu",
            colorbar=dict(title="Re ε(G)", x=0.53, xanchor="left", len=0.8, xpad=30)
        ),
        coloraxis3=dict(colorscale="PuOr", colorbar_title="Im", colorbar=dict(x=0.93)),
    )
    _safe_show(fig_eps)

    fig_fft = make_subplots(
        rows=1, cols=2,
        subplot_titles=("k+G squared", "Clamp mask"),
        horizontal_spacing=0.08,
    )
    fig_fft.add_trace(go.Heatmap(z=fft_maps["k_plus_g_sq"], colorscale="Magma", colorbar=dict(title="|k+G|²")), row=1, col=1)
    fig_fft.add_trace(
        go.Heatmap(
            z=fft_maps["clamped"],
            colorscale=[[0.0, "#1f77b4"], [1.0, "#d62728"]],
            showscale=False,
        ),
        row=1,
        col=2,
    )
    fig_fft.update_layout(
        title=f"{row['tag']} — FFT workspace diagnostics ({title_suffix})",
        height=400,
        width=900,
    )
    _safe_show(fig_fft)

In [None]:
def render_all_artifact_plots(order_by: tuple[str, ...] = ("lattice", "resolution", "mesh_size"), limit: int | None = None) -> None:
    if "artifact_status_df" not in globals():
        raise RuntimeError("Run section 5 first to populate artifact_status_df")
    ordered = artifact_status_df.sort_values(list(order_by)).reset_index(drop=True)
    rows = ordered.head(limit) if limit else ordered
    for idx, row in rows.iterrows():
        print(f"Rendering plots for {row['tag']} ({idx + 1}/{len(rows)})")
        try:
            plot_config_artifacts(row)
        except FileNotFoundError as exc:
            print(f"  Skipping due to missing files: {exc}")

In [None]:
# Render every configuration (set limit for quicker spot-checks)
render_all_artifact_plots()