# Dataset E — Single-source Helmholtz pairs (outer-collar PML)

This notebook generates a small supervised dataset of Helmholtz solutions:

- **Splits:** train/val/test = **50 / 5 / 5**
- **RHS:** **one point source** per sample (random location + random amplitude in **[1, 2]**, random phase)
- **Solve:** **direct sparse solve** on an **extended grid** with **outer-collar PML**, then **crop** back to the physical domain
- **Frequencies:** sampled from `OMEGA_LIST`.  
  We enforce the **“≥ 10 wavelengths across the domain”** requirement **only for ω ≥ 64** (as requested).

We save **physical-domain arrays only** (`f_real, f_imag, u_real, u_imag`) plus `meta_json`.


In [1]:
# ============================
# 0) Imports
# ============================
from __future__ import annotations

from pathlib import Path
import json
import time
import numpy as np
import scipy.sparse.linalg as spla

# Optional progress bar
try:
    from tqdm import tqdm
except Exception:
    tqdm = lambda x, **kw: x  # fallback

# --- project imports ---
from core.config import HelmholtzConfig, PMLConfig
from core.cases import make_default_cases
from core.medium import build_medium
from core.resolution import grid_from_ppw_with_pml_extension

from operators.assemble import assemble_helmholtz_matrix
from operators.pml import build_pml_profiles

print("✅ Imports OK")

ImportError: cannot import name 'PMLProfiles' from 'operators.pml' (C:\Users\31624\Documents\MIT\Programming\Freq2Transfer\src\operators\pml.py)

## 1) Experiment configuration

A few conventions we use:

- The **physical domain** is `[X_MIN, X_MIN+LX] × [Y_MIN, Y_MIN+LY]`.
- We build a **physical grid** targeting at least `PPW` points-per-wavelength (via `grid_from_ppw_with_pml_extension`).
- We then extend with a **PML collar** of thickness `NPML` grid points on each side.
- PML “strength” is stored as `eta = σ_max / |ω|` (dimensionless), consistent with your sweeps.

> **10 wavelengths condition (only for ω ≥ 64):**  
> Using conservative speed `C_MIN`, the number of wavelengths across `L` is  
> `waves = L / (2π C_MIN / ω) = ω L / (2π C_MIN)`.  
> We enforce `waves ≥ 10` for ω ≥ 64.


In [None]:
# ============================
# 1) Settings
# ============================

# Dataset sizes
N_TRAIN = 50
N_VAL   = 5
N_TEST  = 5

# Domain (edit if needed)
LX, LY = 1.0, 1.0
X_MIN, Y_MIN = 0.0, 0.0

# Medium / wavespeed (conservative)
C_MIN = 1.0

# Resolution target
PPW = 10.0

# Frequencies to sample from
OMEGA_LIST = [32.0, 64.0, 128.0]   # adjust if you want

# Enforce ≥10 wavelengths for omega >= this threshold
OMEGA_WAVES_ENFORCE_MIN = 64.0
N_WAVES_MIN = 10.0

# PML policy (baseline from your sweeps)
NPML = 40
ETA  = 6.0
PML_POWER = 2.0   # polynomial order

# Source sampling
AMP_MIN, AMP_MAX = 1.0, 2.0
SOURCE_MARGIN = 0.10  # fraction of domain size (keep away from boundaries)

# Output path
DATASET_TAG = "E_outercollar_1src_50_5_5"
OUT_ROOT = Path(f"data/{DATASET_TAG}")

# Reproducibility
SEED = 0
rng = np.random.default_rng(SEED)

print("Writing dataset to:", OUT_ROOT.resolve())

## 2) Small helpers (wavelength feasibility + grid utilities)

The **10 wavelengths** requirement is a *physics/domain* requirement, not a grid requirement:

- Wavelength: `λ = 2π C_MIN / ω`
- Number of wavelengths across `L`: `L/λ = ω L / (2π C_MIN)`

So for ω ≥ 64 we explicitly check that `min(LX, LY)` contains at least 10 wavelengths.


In [None]:
def waves_across_domain(*, omega: float, lx: float, ly: float, c_min: float) -> tuple[float, float]:
    """Return (waves_x, waves_y) for conservative c_min."""
    lam = 2.0 * np.pi * float(c_min) / abs(float(omega))
    return float(lx / lam), float(ly / lam)


def assert_min_waves_if_needed(*, omega: float, lx: float, ly: float, c_min: float,
                               omega_enforce_min: float, n_waves_min: float) -> None:
    """Enforce ≥ n_waves_min across both directions for omega >= omega_enforce_min."""
    omega = float(omega)
    if omega < float(omega_enforce_min):
        return
    wx, wy = waves_across_domain(omega=omega, lx=lx, ly=ly, c_min=c_min)
    if wx < n_waves_min or wy < n_waves_min:
        raise ValueError(
            f"Wavelength constraint failed for omega={omega}: waves_x={wx:.2f}, waves_y={wy:.2f} < {n_waves_min}. "
            f"Increase omega, increase domain, or lower the waves requirement."
        )


def mesh_ij(grid):
    """Return X,Y arrays with ij indexing consistent with (nx,ny) fields."""
    x = np.linspace(float(grid.x_min), float(grid.x_min) + float(grid.lx), int(grid.nx))
    y = np.linspace(float(grid.y_min), float(grid.y_min) + float(grid.ly), int(grid.ny))
    X, Y = np.meshgrid(x, y, indexing="ij")
    return X, Y

## 3) RHS: one point source (random location + amplitude)

We generate a **single** point source per sample:

- location sampled uniformly in the interior, keeping a margin away from boundaries
- amplitude uniform in `[1, 2]`
- phase uniform in `[0, 2π]`

RHS is injected on the **nearest grid node**.


In [None]:
def sample_single_source(*, lx: float, ly: float, x_min: float, y_min: float,
                         margin_frac: float, amp_min: float, amp_max: float,
                         rng: np.random.Generator) -> dict:
    mx = margin_frac * lx
    my = margin_frac * ly
    x = rng.uniform(x_min + mx, x_min + lx - mx)
    y = rng.uniform(y_min + my, y_min + ly - my)
    amp = rng.uniform(amp_min, amp_max)
    phase = rng.uniform(0.0, 2.0*np.pi)
    return {"x": float(x), "y": float(y), "amp": float(amp), "phase": float(phase)}


def build_rhs_from_source(grid_phys, source: dict) -> np.ndarray:
    """Nearest-node injection on the physical grid. Returns f_phys (nx,ny) complex."""
    x = np.linspace(float(grid_phys.x_min), float(grid_phys.x_min) + float(grid_phys.lx), int(grid_phys.nx))
    y = np.linspace(float(grid_phys.y_min), float(grid_phys.y_min) + float(grid_phys.ly), int(grid_phys.ny))

    ix = int(np.argmin(np.abs(x - source["x"])))
    iy = int(np.argmin(np.abs(y - source["y"])))

    f = np.zeros((int(grid_phys.nx), int(grid_phys.ny)), dtype=np.complex128)
    f[ix, iy] += source["amp"] * np.exp(1j * source["phase"])
    return f

## 4) One forward solve (direct)

For each sample we:

1. Enforce the “≥10 wavelengths” rule for ω ≥ 64
2. Build a **physical** grid satisfying `PPW`
3. Extend with an **outer-collar PML** of thickness `NPML`
4. Build `HelmholtzConfig(omega, grid_ext, PMLConfig(thickness, eta, power))`
5. Assemble and solve on the **extended** grid
6. Crop the solution back to the **physical** grid
7. Save physical-domain arrays + metadata


In [None]:
def solve_one_sample(*, omega: float, ppw: float, case_name: str) -> tuple[np.ndarray, np.ndarray, dict]:
    # 1) enforce wavelength condition (only for omega >= threshold)
    assert_min_waves_if_needed(
        omega=omega, lx=LX, ly=LY, c_min=C_MIN,
        omega_enforce_min=OMEGA_WAVES_ENFORCE_MIN,
        n_waves_min=N_WAVES_MIN,
    )

    # 2) build grids (physical + extended collar)
    ext = grid_from_ppw_with_pml_extension(
        omega=float(omega),
        ppw=float(ppw),
        lx=float(LX),
        ly=float(LY),
        npml=int(NPML),
        c_min=float(C_MIN),
        n_min_phys=201,         # keep consistent with your earlier datasets
        make_odd_phys=True,
        x_min_phys=float(X_MIN),
        y_min_phys=float(Y_MIN),
    )
    gphys = ext.grid_phys
    gext  = ext.grid_ext
    si, sj = ext.core_slices  # slices in (i,j) indexing for arrays shaped (nx,ny)

    # 3) configs (frozen dataclasses)
    pml_cfg = PMLConfig(thickness=int(NPML), strength=float(ETA), power=float(PML_POWER))
    cfg = HelmholtzConfig(omega=float(omega), grid=gext, pml=pml_cfg, ppw_target=float(ppw))

    # 4) medium on extended grid
    cases = make_default_cases()
    if case_name not in cases:
        raise KeyError(f"Unknown case '{case_name}'. Available: {list(cases.keys())}")
    case = cases[case_name]

    X, Y = mesh_ij(gext)
    c = build_medium(cfg=cfg, case=case, X=X, Y=Y)  # (nx,ny)

    # 5) assemble + pml profiles
    # assemble_helmholtz_matrix calls build_pml_profiles internally in your codebase,
    # but we also compute it here to include in diagnostics if you want.
    A = assemble_helmholtz_matrix(cfg, c)

    # 6) RHS: single source on physical grid, embed into extended
    src = sample_single_source(
        lx=LX, ly=LY, x_min=X_MIN, y_min=Y_MIN,
        margin_frac=SOURCE_MARGIN, amp_min=AMP_MIN, amp_max=AMP_MAX, rng=rng
    )
    f_phys = build_rhs_from_source(gphys, src)  # (nxp,nyp)

    f_ext = np.zeros((int(gext.nx), int(gext.ny)), dtype=np.complex128)
    f_ext[si, sj] = f_phys

    b = f_ext.reshape(-1)

    # 7) solve (direct)
    t0 = time.perf_counter()
    u_vec = spla.spsolve(A, b)
    solve_time = time.perf_counter() - t0

    u_ext = u_vec.reshape(int(gext.nx), int(gext.ny))
    u_phys = u_ext[si, sj].copy()

    # residual
    r = A @ u_vec - b
    res_rel = float(np.linalg.norm(r) / (np.linalg.norm(b) + 1e-30))

    wx, wy = waves_across_domain(omega=omega, lx=LX, ly=LY, c_min=C_MIN)

    meta = {
        "omega": float(omega),
        "ppw": float(ppw),
        "case": case_name,
        "pml": {"npml": int(NPML), "eta": float(ETA), "power": float(PML_POWER)},
        "waves": {"waves_x": float(wx), "waves_y": float(wy), "min_required_for_omega>=64": float(N_WAVES_MIN)},
        "grid_phys": {"nx": int(gphys.nx), "ny": int(gphys.ny), "hx": float(gphys.hx), "hy": float(gphys.hy),
                        "x_min": float(gphys.x_min), "y_min": float(gphys.y_min), "lx": float(gphys.lx), "ly": float(gphys.ly)},
        "grid_ext": {"nx": int(gext.nx), "ny": int(gext.ny), "hx": float(gext.hx), "hy": float(gext.hy),
                       "x_min": float(gext.x_min), "y_min": float(gext.y_min), "lx": float(gext.lx), "ly": float(gext.ly)},
        "source": src,
        "solve_time_sec": float(solve_time),
        "res_rel": float(res_rel),
    }

    return f_phys, u_phys, meta

## 5) Saving samples + manifest

We store each sample as a compressed `.npz` containing:

- `f_real, f_imag` (float32)
- `u_real, u_imag` (float32)
- `meta_json` (JSON string)

A JSONL manifest is written per split (`manifest_train.jsonl`, etc.).


In [None]:
def save_npz(path: Path, **arrays) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    np.savez_compressed(path, **arrays)


def save_sample_npz(out_path: Path, f_phys: np.ndarray, u_phys: np.ndarray, meta: dict) -> None:
    save_npz(
        out_path,
        f_real=np.real(f_phys).astype(np.float32),
        f_imag=np.imag(f_phys).astype(np.float32),
        u_real=np.real(u_phys).astype(np.float32),
        u_imag=np.imag(u_phys).astype(np.float32),
        meta_json=np.array([json.dumps(meta)], dtype=object),
    )


def append_jsonl(path: Path, row: dict) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("a", encoding="utf-8") as f:
        f.write(json.dumps(row) + "\n")

## 6) Generate a split

We generate `n_samples` items by sampling `omega` uniformly from `OMEGA_LIST`.


In [None]:
def generate_split(*, out_root: Path, split: str, n_samples: int, start_id: int, case_name: str) -> list[dict]:
    split_dir = out_root / split
    split_dir.mkdir(parents=True, exist_ok=True)
    manifest_path = out_root / f"manifest_{split}.jsonl"

    rows = []
    sid = int(start_id)

    t_start = time.perf_counter()
    solve_times = []

    for k in tqdm(range(int(n_samples)), desc=f"Generating {split}"):
        omega = float(rng.choice(OMEGA_LIST))

        f_phys, u_phys, meta = solve_one_sample(omega=omega, ppw=PPW, case_name=case_name)

        # filename encodes key parameters
        fname = (
            f"{case_name}_sid{sid:06d}_"
            f"w{int(meta['omega'])}_ppw{PPW:g}_npml{meta['pml']['npml']}_eta{meta['pml']['eta']:g}_ns1.npz"
        )
        out_path = split_dir / fname
        save_sample_npz(out_path, f_phys, u_phys, meta)

        # log
        row = {
            "split": split,
            "file": str(out_path.as_posix()),
            "case": case_name,
            "sample_id": sid,
            "omega": meta["omega"],
            "ppw": meta["ppw"],
            "nx": meta["grid_phys"]["nx"],
            "ny": meta["grid_phys"]["ny"],
            "solve_time_sec": meta["solve_time_sec"],
            "res_rel": meta["res_rel"],
        }
        append_jsonl(manifest_path, row)
        rows.append(row)

        solve_times.append(meta["solve_time_sec"])
        sid += 1

        # lightweight progress info
        if (k + 1) % 10 == 0:
            avg = float(np.mean(solve_times[-10:]))
            total = time.perf_counter() - t_start
            print(f"  [{split}] {k+1:4d}/{n_samples} | avg solve(last10) {avg:.3f}s | total {total:.1f}s")

    total = time.perf_counter() - t_start
    print(f"✅ Done {split}: {n_samples} samples in {total:.1f}s (avg solve {np.mean(solve_times):.3f}s)")
    return rows

## 7) Run generation (50/5/5)

This will write files to:

- `data/<DATASET_TAG>/train/*.npz`
- `data/<DATASET_TAG>/val/*.npz`
- `data/<DATASET_TAG>/test/*.npz`

and manifests:

- `data/<DATASET_TAG>/manifest_train.jsonl`
- `data/<DATASET_TAG>/manifest_val.jsonl`
- `data/<DATASET_TAG>/manifest_test.jsonl`


In [None]:
# ============================
# 7) Generate dataset
# ============================

CASE_NAME = "const"

# optional: clean old data
for split in ("train", "val", "test"):
    d = OUT_ROOT / split
    if d.exists():
        for p in d.glob("*.npz"):
            p.unlink()
    mpath = OUT_ROOT / f"manifest_{split}.jsonl"
    if mpath.exists():
        mpath.unlink()

print("Generating dataset...")
m_train = generate_split(out_root=OUT_ROOT, split="train", n_samples=N_TRAIN, start_id=0, case_name=CASE_NAME)
m_val   = generate_split(out_root=OUT_ROOT, split="val",   n_samples=N_VAL,   start_id=N_TRAIN, case_name=CASE_NAME)
m_test  = generate_split(out_root=OUT_ROOT, split="test",  n_samples=N_TEST,  start_id=N_TRAIN + N_VAL, case_name=CASE_NAME)

print("✅ Dataset complete")
print("Manifests:")
print(" -", OUT_ROOT / "manifest_train.jsonl")
print(" -", OUT_ROOT / "manifest_val.jsonl")
print(" -", OUT_ROOT / "manifest_test.jsonl")

## 8) Quick sanity checks (optional but recommended)

We verify:

- file counts per split
- residual statistics (direct solve should give very small `res_rel`)
- show a couple of samples (real/imag parts)

If you plan to train CNN/U-Nets later, also check whether shapes vary with ω.


In [None]:
def read_manifest(path: Path) -> list[dict]:
    rows = []
    with path.open("r", encoding="utf-8") as f:
        for line in f:
            rows.append(json.loads(line))
    return rows


def stats(xs):
    xs = np.asarray(xs, dtype=float)
    return {
        "min": float(xs.min()),
        "p50": float(np.quantile(xs, 0.50)),
        "p90": float(np.quantile(xs, 0.90)),
        "max": float(xs.max()),
    }


def load_npz(path: Path):
    d = np.load(path, allow_pickle=True)
    meta = json.loads(str(d["meta_json"][0]))
    f = d["f_real"] + 1j*d["f_imag"]
    u = d["u_real"] + 1j*d["u_imag"]
    return f, u, meta


def quick_report(split: str):
    man = read_manifest(OUT_ROOT / f"manifest_{split}.jsonl")
    print(f"[{split}] N={len(man)}")
    print("  solve_time_sec:", stats([r["solve_time_sec"] for r in man]))
    print("  res_rel       :", stats([r["res_rel"] for r in man]))
    shapes = set()
    for r in man[:10]:
        f, u, meta = load_npz(Path(r["file"]))
        shapes.add((f.shape, u.shape))
    print("  shapes (first 10):", shapes)
    return man


mtr = quick_report("train")
mva = quick_report("val")
mte = quick_report("test")

In [None]:
# Visualize 2 random training samples: real/imag of u
import matplotlib.pyplot as plt

for idx in rng.choice(len(mtr), size=min(2, len(mtr)), replace=False):
    f, u, meta = load_npz(Path(mtr[idx]["file"]))
    print("Sample:", Path(mtr[idx]["file"]).name, "| omega:", meta["omega"], "| res_rel:", meta["res_rel"])

    plt.figure(figsize=(10,3))
    plt.subplot(1,2,1)
    plt.imshow(np.real(u).T, origin="lower", aspect="auto")
    plt.title("Re(u)")
    plt.colorbar()

    plt.subplot(1,2,2)
    plt.imshow(np.imag(u).T, origin="lower", aspect="auto")
    plt.title("Im(u)")
    plt.colorbar()

    plt.tight_layout()
    plt.show()