# 10 — Alignment & Binning (FGS1 ↔ AIRS)

**Goal**: robust, physics‑aware alignment and binning utilities to transform raw/Calibrated FGS1 & AIRS data into
model‑ready tensors. This notebook is Kaggle‑aware, zero‑internet, and uses only `matplotlib` for plots.

**Covers**
- Environment detection & path resolution (Kaggle vs local repo)
- Quick inventory of candidate inputs
- Time‑domain alignment & **phase folding** helpers
- **Binning strategies** (time binning, adaptive binning, fixed spectral bin checks)
- Lightweight **jitter/centroid** outline and sanity checks
- Exports concise artifacts under `outputs/`

> Keep this light (<~3 min) and reproducible; heavy lifting should live in library code and DVC stages.

## 🧭 Environment Detection & Paths

In [None]:
import os, sys, platform, glob, math, json
from pathlib import Path
from typing import Optional, Dict, Tuple

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

BIN_COUNT = 283
COMP_DIR = Path('/kaggle/input/ariel-data-challenge-2025')
REPO_ROOT_CANDIDATES = [Path.cwd(), Path.cwd().parent, Path.cwd().parent.parent]

def detect_env() -> Dict:
    env = {
        "is_kaggle": COMP_DIR.exists(),
        "platform": platform.platform(),
        "python": sys.version.replace("\n", " "),
        "cwd": str(Path.cwd()),
        "repo_root": None,
    }
    for c in REPO_ROOT_CANDIDATES:
        if (c/'configs').exists() and (c/'schemas').exists():
            env["repo_root"] = str(c.resolve())
            break
    return env

ENV = detect_env()
ENV

In [None]:
def resolve_paths(env: Dict) -> Dict[str, Optional[Path]]:
    repo_root = Path(env['repo_root']) if env['repo_root'] else None
    out = {
        "competition": COMP_DIR if env['is_kaggle'] else None,
        "repo_root": repo_root,
        "data_raw": (repo_root/'data'/'raw') if repo_root else None,
        "data_interim": (repo_root/'data'/'interim') if repo_root else None,
        "data_processed": (repo_root/'data'/'processed') if repo_root else None,
        "outputs": Path('outputs'),
    }
    out["outputs"].mkdir(parents=True, exist_ok=True)
    return out

PATHS = resolve_paths(ENV)
PATHS

## 📦 Inventory (Fast)

In [None]:
def list_files(base: Optional[Path], patterns=('*.csv','*.parquet','*.npy','*.npz')):
    if not base or not base.exists():
        return []
    result = []
    for pat in patterns:
        result.extend([str(p) for p in base.rglob(pat)])
    return sorted(result)[:80]

inventory = {
    "kaggle_input": list_files(PATHS["competition"]),
    "data_raw": list_files(PATHS["data_raw"]),
    "data_interim": list_files(PATHS["data_interim"]),
}
inventory

## 🧮 Alignment & Phase Folding Helpers

These utilities are **guarded** and unit‑friendly. They do not assume specific file structures.

In [None]:
from dataclasses import dataclass

@dataclass
class Ephemeris:
    period: float           # orbital period (same time unit as timestamps)
    t0: float               # reference transit epoch
    duration: Optional[float] = None  # optional: transit duration (same unit)

def phase_fold(t: np.ndarray, ephem: Ephemeris) -> np.ndarray:
    """Return phase in [-0.5, 0.5) given timestamps t and ephemeris."""
    if ephem.period <= 0:
        raise ValueError("period must be positive")
    phase = ((t - ephem.t0) / ephem.period) % 1.0
    phase[phase >= 0.5] -= 1.0
    return phase

def time_bin(x: np.ndarray, y: np.ndarray, bins: int = 100) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Simple fixed-count binning in time; returns (x_bin_centers, y_mean, y_std)."""
    if len(x) != len(y) or len(x) == 0:
        return np.array([]), np.array([]), np.array([])
    order = np.argsort(x)
    x, y = x[order], y[order]
    edges = np.linspace(x.min(), x.max(), bins + 1)
    idx = np.digitize(x, edges) - 1
    xcs, ym, ys = [], [], []
    for b in range(bins):
        m = idx == b
        if not m.any():
            continue
        xcs.append(x[m].mean())
        ym.append(y[m].mean())
        ys.append(y[m].std(ddof=1) if m.sum() > 1 else 0.0)
    return np.array(xcs), np.array(ym), np.array(ys)

def adaptive_bin(x: np.ndarray, y: np.ndarray, target_count: int = 200) -> Tuple[np.ndarray, np.ndarray]:
    """Adaptive binning to maintain ~constant samples per bin; returns centers and means."""
    if len(x) != len(y) or len(x) == 0:
        return np.array([]), np.array([])
    order = np.argsort(x); x, y = x[order], y[order]
    n = len(x)
    step = max(1, n // max(1, (n // max(1, target_count))))
    centers, means = [], []
    for i in range(0, n, step):
        j = min(n, i + step)
        centers.append(x[i:j].mean())
        means.append(y[i:j].mean())
    return np.array(centers), np.array(means)

## 🧷 Jitter / Centroid Outline (FGS1)

If centroid tracks (x(t), y(t)) are provided (from calibrated cubes), we can regress out common‑mode jitter.
Below is a simple linear decorrelation utility; in practice you may want higher‑order or spline terms.

In [None]:
def decorrelate_jitter(flux: np.ndarray, cx: Optional[np.ndarray], cy: Optional[np.ndarray]) -> np.ndarray:
    """Return flux with linear jitter terms removed: flux ~ a + b*cx + c*cy."""
    if cx is None or cy is None or len(flux) != len(cx) or len(cx) != len(cy) or len(flux) == 0:
        return flux
    X = np.vstack([np.ones_like(cx), cx, cy]).T
    try:
        beta, *_ = np.linalg.lstsq(X, flux, rcond=None)
        model = X @ beta
        return flux - (model - np.mean(model))  # keep original mean scale
    except Exception:
        return flux

## 📐 Demo (Guarded) with Synthetic or Detected Data

We attempt to detect plausible light‑curve columns. If none found, we synthesize a small example.

In [None]:
rng = np.random.default_rng(42)

def synth_lightcurve(n=5000, period=1.7, t0=0.3, depth=0.002, noise=5e-4):
    t = np.sort(rng.uniform(0, 10*period, size=n))
    phase = ((t - t0) / period) % 1.0
    transit = (np.abs(phase - 0.5) < 0.02)  # crude box
    flux = 1.0 - depth*transit + rng.normal(0, noise, size=n)
    cx = rng.normal(0, 0.05, size=n); cy = rng.normal(0, 0.05, size=n)
    flux += 1e-3*cx - 8e-4*cy  # inject jitter correlation
    return t, flux, cx, cy, Ephemeris(period=period, t0=t0, duration=0.04)

# Try loading from Kaggle tables
loaded = False
t, flux, cx, cy, eph = None, None, None, None, None

if PATHS["competition"]:
    # Heuristics: look for any CSV with columns like time, flux or fgs1_flux
    for c in ["train.csv", "test.csv"]:
        p = PATHS["competition"]/c
        if p.exists():
            try:
                df = pd.read_csv(p, nrows=100000)
                cand_t = [col for col in df.columns if 'time' in col.lower() or 't_' == col[:2].lower()]
                cand_f = [col for col in df.columns if 'flux' in col.lower() or 'fgs' in col.lower()]
                if cand_t and cand_f:
                    t = df[cand_t[0]].to_numpy()
                    flux = df[cand_f[0]].to_numpy()
                    cx = df[cand_t[0]].to_numpy()*0  # placeholder if no centroid
                    cy = df[cand_t[0]].to_numpy()*0
                    eph = Ephemeris(period=1.0, t0=t.min())
                    loaded = True
                    break
            except Exception:
                pass

if not loaded:
    t, flux, cx, cy, eph = synth_lightcurve()

In [None]:
# Decorrelate jitter (if centroids present)
flux_dc = decorrelate_jitter(flux, cx, cy)

# Phase fold
phase = phase_fold(t, eph)

# Bin in time and in phase
xb, yb, yerr = time_bin(t, flux_dc, bins=120)
pb, pfm = adaptive_bin(phase, flux_dc, target_count=200)

# Plots: default colors only
plt.figure(figsize=(6,4))
plt.plot(t[:2000], flux_dc[:2000], '.', ms=2)
plt.xlabel('time'); plt.ylabel('flux (decorrelated)'); plt.title('Segment (time domain)')
plt.tight_layout(); plt.show()

plt.figure(figsize=(6,4))
if len(xb):
    plt.errorbar(xb, yb, yerr=yerr, fmt='.', ms=4)
    plt.xlabel('time'); plt.ylabel('binned flux'); plt.title('Binned time flux')
    plt.tight_layout(); plt.show()

plt.figure(figsize=(6,4))
plt.plot(phase, flux_dc, '.', ms=2, alpha=0.5)
if len(pb):
    plt.plot(pb, pfm, '-', lw=1)
plt.xlabel('phase'); plt.ylabel('flux'); plt.title('Phase folded (with adaptive mean)')
plt.tight_layout(); plt.show()

## 🔬 Spectral Bin Integrity (AIRS)

Quick check for expected **283** spectral bins `(mu_*, sigma_*)` when table layout provides them.

In [None]:
issues = []

def count_bin_columns(df: pd.DataFrame, prefix: str) -> int:
    return len([c for c in df.columns if c.startswith(prefix)])

if PATHS["competition"] and (PATHS["competition"]/'train.csv').exists():
    try:
        d = pd.read_csv(PATHS["competition"]/'train.csv', nrows=1000)
        mu_n = count_bin_columns(d, 'mu_')
        sigma_n = count_bin_columns(d, 'sigma_')
        if mu_n and mu_n != BIN_COUNT:
            issues.append(f'Expected {BIN_COUNT} mu_* cols, found {mu_n}')
        if sigma_n and sigma_n != BIN_COUNT:
            issues.append(f'Expected {BIN_COUNT} sigma_* cols, found {sigma_n}')
    except Exception as e:
        issues.append(f'Failed to read train.csv: {e}')

issues or "Spectral bin shape checks passed (or skipped)."

## 💾 Export Artifacts

We persist a small alignment/phase summary and any detected issues.

In [None]:
summary = {
    "env": ENV,
    "n_samples": int(len(t)),
    "phase_bins": int(len(pb)),
    "time_bins": int(len(set(np.digitize(t, np.linspace(t.min(), t.max(), 121))))) if len(t) else 0,
    "issues": issues,
    "ephemeris": {"period": eph.period, "t0": eph.t0, "duration": eph.duration} if eph else None,
}
Path('outputs').mkdir(exist_ok=True, parents=True)
with open('outputs/alignment_summary.json', 'w', encoding='utf-8') as f:
    json.dump(summary, f, indent=2)
np.savez('outputs/phase_binned.npz', phase_centers=pb, phase_means=pfm)
print('Wrote outputs/alignment_summary.json and outputs/phase_binned.npz')

## Next Steps
- Integrate centroid extraction from calibrated FGS1 cubes; extend decorrelation to higher‑order terms
- Replace synthetic demo with competition‑specific loaders once confirmed
- Parameterize ephemeris from metadata tables (period, t0) and propagate to Hydra configs
- Push these utilities into `src/spectramind/pipeline/calibrate.py` and cover with unit tests

**Done.** ✅