# b3 from Kn + raw data -- SPS MBA

**Measurement session:** `20251212_171026_SPS_MBA`  
**Magnet:** MBA (dipole, normal)  
**Project:** SPS  

This notebook **bypasses the GUI** and computes field harmonics directly
from the Kn calibration files and per-run raw measurement data.

### Pre-requisites

Replace the all-zero Kn files with the correct calibration constants:

```
measurements/20251212_171026_SPS_MBA/Kn_values_Seg_CS.txt
measurements/20251212_171026_SPS_MBA/Kn_values_Seg_NCS.txt
```

Each file: 15 rows (harmonics n=1..15) x 4 columns  
`AbsRe  AbsIm  CmpRe  CmpIm`  (complex Kn for absolute and compensated channels)

## 0. Imports

In [None]:
from pathlib import Path
import re
import warnings

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

%matplotlib inline
plt.rcParams.update({
    "figure.figsize": (10, 5),
    "axes.grid": True,
    "grid.alpha": 0.3,
})

## 1. Configuration (from Parameters.txt)

In [None]:
SESSION = "20251212_171026_SPS_MBA"
MEAS_SUBDIR = "20251212_171620_MBA"

# Repo root -- walk up from CWD until we find pyproject.toml or .git
REPO_ROOT = Path(".").resolve()
while REPO_ROOT != REPO_ROOT.parent:
    if (REPO_ROOT / "pyproject.toml").exists() or (REPO_ROOT / ".git").exists():
        break
    REPO_ROOT = REPO_ROOT.parent

SESSION_DIR = REPO_ROOT / "measurements" / SESSION
RUN_DIR = SESSION_DIR / MEAS_SUBDIR

# Magnet / measurement parameters (from Parameters.txt)
R_REF = 0.02            # reference radius [m]
L_COIL = 0.47           # shaft / coil length [m]
SAMPLES_PER_TURN = 1024 # encoder pulses per revolution
MAGNET_ORDER = 1        # dipole

print(f"Repo root   : {REPO_ROOT}")
print(f"Session dir : {SESSION_DIR}")
print(f"Run dir     : {RUN_DIR}")
print(f"R_ref       : {R_REF} m")
print(f"Samples/turn: {SAMPLES_PER_TURN}")

## 2. Load and validate Kn files

In [None]:
def load_kn(path: Path):
    """Load a Kn file (15 x 4) and return complex arrays kn_abs, kn_cmp."""
    arr = np.loadtxt(path)  # (15, 4): AbsRe AbsIm CmpRe CmpIm
    kn_abs = arr[:, 0] + 1j * arr[:, 1]
    kn_cmp = arr[:, 2] + 1j * arr[:, 3]
    return kn_abs, kn_cmp


kn_cs_path = SESSION_DIR / "Kn_values_Seg_CS.txt"
kn_ncs_path = SESSION_DIR / "Kn_values_Seg_NCS.txt"

kn_abs_cs, kn_cmp_cs = load_kn(kn_cs_path)
kn_abs_ncs, kn_cmp_ncs = load_kn(kn_ncs_path)

# Validate: all-zero Kn will produce garbage
for name, kn in [("CS abs", kn_abs_cs), ("CS cmp", kn_cmp_cs),
                  ("NCS abs", kn_abs_ncs), ("NCS cmp", kn_cmp_ncs)]:
    if np.all(kn == 0):
        warnings.warn(f"{name} Kn is ALL ZEROS -- replace the file before running!",
                       stacklevel=1)
    else:
        print(f"{name:>8s}: {np.count_nonzero(kn):2d}/15 non-zero, "
              f"|Kn| range [{np.abs(kn[kn!=0]).min():.4e}, {np.abs(kn[kn!=0]).max():.4e}]")

In [None]:
# Display Kn values as a table
kn_table = pd.DataFrame({
    "n": np.arange(1, 16),
    "CS_abs_Re": kn_abs_cs.real,
    "CS_abs_Im": kn_abs_cs.imag,
    "CS_cmp_Re": kn_cmp_cs.real,
    "CS_cmp_Im": kn_cmp_cs.imag,
    "NCS_abs_Re": kn_abs_ncs.real,
    "NCS_abs_Im": kn_abs_ncs.imag,
    "NCS_cmp_Re": kn_cmp_ncs.real,
    "NCS_cmp_Im": kn_cmp_ncs.imag,
}).set_index("n")

print("Kn calibration constants (real / imaginary parts):")
kn_table

## 3. Harmonics engine

Replicates the core of `kn_pipeline.py`: FFT of integrated flux, then Kn calibration.

$$
C_n = \frac{2\,\mathrm{FFT}[\Phi]_n}{N_s}
      \;\cdot\;
      \frac{r_\mathrm{ref}^{\,n-1}}{K_n^*}
$$

In [None]:
def integrate_flux(df_incremental: np.ndarray) -> np.ndarray:
    """Cumulative-sum integration with linear-drift removal per turn.

    Parameters
    ----------
    df_incremental : (n_turns, Ns) incremental flux values

    Returns
    -------
    flux : (n_turns, Ns) drift-corrected integrated flux
    """
    flux = np.cumsum(df_incremental, axis=1)
    Ns = flux.shape[1]
    # Remove linear drift so the flux is periodic
    drift = np.linspace(0, 1, Ns)[None, :] * (flux[:, -1:] - flux[:, :1])
    return flux - drift


def compute_harmonics(flux: np.ndarray, kn: np.ndarray, Rref: float) -> np.ndarray:
    """FFT + Kn calibration.

    Parameters
    ----------
    flux : (n_turns, Ns) integrated flux
    kn   : (H,) complex calibration constants
    Rref : reference radius [m]

    Returns
    -------
    C : (n_turns, H) complex harmonic coefficients Bn + j*An
    """
    Ns = flux.shape[1]
    H = len(kn)

    # Normalised FFT: keep harmonics 1..H (skip DC at index 0)
    f = (2.0 * np.fft.fft(flux, axis=1)) / float(Ns)
    f = f[:, 1:H + 1]  # (n_turns, H)

    # Sensitivity: sens_n = Rref^(n-1) / conj(kn_n)
    idx = np.arange(H, dtype=float)  # 0, 1, ..., H-1 = n-1 for n=1..H
    sens = (Rref ** idx) / np.conj(kn)

    return f * sens[None, :]  # broadcast (n_turns, H)


print("Engine ready.")

## 4. Discover per-run raw-measurement files

In [None]:
def parse_run_files(run_dir: Path):
    """Return sorted list of (run_index, current_A, segment, path)."""
    pattern = re.compile(
        r"Run_(\d+)_I_([\d.]+)A_(N?CS)_raw_measurement_data\.txt$"
    )
    entries = []
    for f in sorted(run_dir.iterdir()):
        m = pattern.search(f.name)
        if m:
            entries.append((
                int(m.group(1)),    # run index
                float(m.group(2)),  # nominal current
                m.group(3),         # segment: CS or NCS
                f,                  # full path
            ))
    return entries


run_files = parse_run_files(RUN_DIR)
n_cs = sum(1 for r in run_files if r[2] == "CS")
n_ncs = sum(1 for r in run_files if r[2] == "NCS")
print(f"Found {len(run_files)} raw files  (CS: {n_cs}, NCS: {n_ncs})")
print(f"Current range: {min(r[1] for r in run_files):.0f} .. {max(r[1] for r in run_files):.0f} A")

## 5. Process all runs

For each run: read raw data &#x2192; split into turns &#x2192; integrate &#x2192; FFT + Kn &#x2192; average over turns.

In [None]:
def process_run(path: Path, kn_abs, kn_cmp, Rref, Ns):
    """Return (C_abs_mean, C_cmp_mean, I_mean, time_mean, n_turns)."""
    raw = np.loadtxt(path)  # columns: time, df_abs, df_cmp, I, ramprate

    n_samples = raw.shape[0]
    n_turns = n_samples // Ns
    usable = n_turns * Ns  # trim incomplete last turn

    time_arr = raw[:usable, 0].reshape(n_turns, Ns)
    df_abs   = raw[:usable, 1].reshape(n_turns, Ns)
    df_cmp   = raw[:usable, 2].reshape(n_turns, Ns)
    I_arr    = raw[:usable, 3].reshape(n_turns, Ns)

    flux_abs = integrate_flux(df_abs)
    flux_cmp = integrate_flux(df_cmp)

    C_abs = compute_harmonics(flux_abs, kn_abs, Rref)
    C_cmp = compute_harmonics(flux_cmp, kn_cmp, Rref)

    return (
        C_abs.mean(axis=0),   # (H,) complex
        C_cmp.mean(axis=0),
        I_arr.mean(),         # scalar
        time_arr.mean(),
        n_turns,
    )


# --- main processing loop ---
records = []
kn_map = {
    "CS":  (kn_abs_cs, kn_cmp_cs),
    "NCS": (kn_abs_ncs, kn_cmp_ncs),
}

any_kn_zero = any(np.all(k == 0) for k in [kn_abs_cs, kn_cmp_cs, kn_abs_ncs, kn_cmp_ncs])
if any_kn_zero:
    print("WARNING: At least one Kn array is all zeros.")
    print("Replace the Kn files and re-run this cell.")
    print("Skipping computation.")
else:
    for run_idx, I_nom, seg, path in run_files:
        kn_a, kn_c = kn_map[seg]
        try:
            C_abs, C_cmp, I_mean, t_mean, nturns = process_run(
                path, kn_a, kn_c, R_REF, SAMPLES_PER_TURN
            )
            rec = {
                "run": run_idx,
                "segment": seg,
                "I_nom_A": I_nom,
                "I_mean_A": I_mean,
                "time_s": t_mean,
                "n_turns": nturns,
            }
            # Store all Bn and An (using the compensated channel, which is
            # standard for higher harmonics; absolute for n=1)
            H = len(C_abs)
            for n in range(1, H + 1):
                i = n - 1
                C = C_abs[i] if n == MAGNET_ORDER else C_cmp[i]
                rec[f"B{n}"] = C.real
                rec[f"A{n}"] = C.imag
                # Also keep absolute-channel values for reference
                rec[f"B{n}_abs"] = C_abs[i].real
                rec[f"A{n}_abs"] = C_abs[i].imag
            records.append(rec)
        except Exception as exc:
            print(f"  SKIP {path.name}: {exc}")

    print(f"\nProcessed {len(records)} runs successfully.")

In [None]:
if not records:
    raise RuntimeError(
        "No data processed.  Replace the Kn files and re-run from cell 2."
    )

df = pd.DataFrame(records)
df.sort_values(["segment", "run"], inplace=True, ignore_index=True)

print(f"{len(df)} rows, segments: {sorted(df['segment'].unique())}")
df[["run", "segment", "I_nom_A", "I_mean_A", "time_s", "B1", "B3", "n_turns"]].head(10)

---
## 6. b3 vs time

In [None]:
fig, ax = plt.subplots()
for seg in sorted(df["segment"].unique()):
    sub = df[df["segment"] == seg]
    ax.plot(sub["time_s"], sub["B3"], "o-", markersize=3, label=seg)

ax.set_xlabel("Time (s)")
ax.set_ylabel("B3 (T)")
ax.set_title(f"b3 vs time  --  {SESSION}")
ax.legend()
fig.tight_layout()
plt.show()

## 7. b3 vs current (scatter)

In [None]:
fig, ax = plt.subplots()
for seg in sorted(df["segment"].unique()):
    sub = df[df["segment"] == seg]
    ax.plot(sub["I_mean_A"], sub["B3"], "o", markersize=4, label=seg)

ax.set_xlabel("I (A)")
ax.set_ylabel("B3 (T)")
ax.set_title(f"b3 vs current  --  {SESSION}")
ax.legend()
fig.tight_layout()
plt.show()

## 8. b3 histogram + summary statistics

In [None]:
fig, ax = plt.subplots()
for seg in sorted(df["segment"].unique()):
    vals = df.loc[df["segment"] == seg, "B3"]
    ax.hist(vals, bins="auto", alpha=0.6, label=seg,
            edgecolor="black", linewidth=0.5)

ax.set_xlabel("B3 (T)")
ax.set_ylabel("Count")
ax.set_title(f"b3 histogram  --  {SESSION}")
ax.legend()
fig.tight_layout()
plt.show()

summary = (
    df.groupby("segment")["B3"]
    .agg(["count", "mean", "std", "min", "max"])
    .rename(columns={"count": "N"})
)
print("\nSummary statistics (B3, in Tesla):")
print(summary.to_string())

## 9. b3 ramp-up vs ramp-down (hysteresis)

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(14, 5), sharey=True)

for i, seg in enumerate(sorted(df["segment"].unique())):
    ax = axes[i]
    sub = df[df["segment"] == seg].copy()

    # Split by ramp direction: current first rises then falls
    I_vals = sub["I_nom_A"].values
    peak_idx = np.argmax(I_vals)
    up = sub.iloc[:peak_idx + 1]
    down = sub.iloc[peak_idx:]

    ax.plot(up["I_mean_A"], up["B3"], "o-", markersize=3, label="ramp up")
    ax.plot(down["I_mean_A"], down["B3"], "s-", markersize=3, label="ramp down")
    ax.set_xlabel("I (A)")
    ax.set_ylabel("B3 (T)")
    ax.set_title(seg)
    ax.legend()

fig.suptitle(f"b3 hysteresis  --  {SESSION}", fontsize=13)
fig.tight_layout()
plt.show()

## 10. Full harmonic spectrum at peak current

In [None]:
bn_cols = [c for c in df.columns if re.match(r"^B\d+$", c)]
bn_cols = sorted(bn_cols, key=lambda c: int(c[1:]))

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

for i, seg in enumerate(sorted(df["segment"].unique())):
    sub = df[df["segment"] == seg]
    row = sub.loc[sub["I_mean_A"].abs().idxmax()]
    orders = [int(c[1:]) for c in bn_cols]
    vals = [row[c] for c in bn_cols]

    ax = axes[i]
    colors = ["tab:red" if o == 3 else "tab:blue" for o in orders]
    ax.bar(orders, vals, color=colors)
    ax.set_xlabel("Harmonic order n")
    ax.set_ylabel("Bn (T)")
    ax.set_title(f"{seg} at I = {row['I_mean_A']:.0f} A")
    ax.axhline(0, color="grey", linewidth=0.5)

fig.suptitle(f"Normal harmonic spectrum  --  {SESSION}", fontsize=13)
fig.tight_layout()
plt.show()

## 11. Export computed results

Save to `output/` for archival and further analysis.

In [None]:
out_dir = REPO_ROOT / "output" / SESSION
out_dir.mkdir(parents=True, exist_ok=True)

for seg in sorted(df["segment"].unique()):
    sub = df[df["segment"] == seg]
    fname = f"MBA_{seg}_computed_results.csv"
    sub.to_csv(out_dir / fname, index=False)
    print(f"Wrote {out_dir / fname}  ({len(sub)} rows)")

print("\nDone.")

---
## Reproducing in the GUI

- [ ] **Launch:** `py -m rotating_coil_analyzer.gui.app`
- [ ] **Load:** `measurements/20251212_171026_SPS_MBA`
- [ ] **Select** aperture / segment (CS, NCS, or both)
- [ ] **Set** magnet order = 1 (dipole), reference radius r_ref = 0.02 m
- [ ] **Ensure** the correct Kn files are in place (non-zero!)
- [ ] **Run** harmonics analysis
- [ ] **Export** results to `output/`
- [ ] **Compare** with the CSV exported by cell 11 above

---
## What to check if GUI and notebook disagree

| Symptom | Likely cause | Fix |
|---------|-------------|-----|
| b3 values differ slightly | Turn averaging or drift correction differs | Compare n_turns used; check GUI drift-correction setting |
| Sign flip on b3 | Different sign convention or rotation angle | Check whether the GUI applies rotation ("rot" option) |
| Amplitude scale differs | Different R_ref | Verify both use R_ref = 0.02 m |
| Spectrum shape differs | Notebook uses cmp for n>1, abs for n=1 | GUI may use a different merge strategy |
| Abs vs Cmp channel swap | Column order in raw file differs | Try swapping columns 1 and 2 in `process_run` |
| Harmonics all zero | Kn file still contains zeros | Replace the Kn files and re-run |
| b3 is NaN/Inf | Kn value for n=3 is zero | Check row 3 of the Kn file (non-zero AbsRe/Im and CmpRe/Im) |