# TOI-5807.01 Robustness Checks (TIC 188646744)

This notebook complements `04-real-candidate-validation.ipynb` by running three robustness-oriented diagnostics:

1. **Re-run V16 after per-sector detrending** to see whether the transit-only model becomes preferred.
2. **Run per-sector pixel vetting (V08窶天10)** with per-sector TPFs (requires `lightkurve` + network).
3. **Inspect V19 phase-shift event diagnostics** to understand non-transit phase structure.

These are metrics-only diagnostics. They do not impose new validation thresholds.


## Setup

In [None]:
import csv
from pathlib import Path

import numpy as np

import tess_vetter.api as btv
from tess_vetter.api.catalogs import fetch_exofop_toi_table

TIC_ID = 188646744
SECTORS = [55, 75, 82, 83]
DATA_DIR = Path("data/tic188646744")


def load_sector_lc(sector: int) -> btv.LightCurve:
    path = DATA_DIR / f"sector{sector}_pdcsap.csv"
    time: list[float] = []
    flux: list[float] = []
    flux_err: list[float] = []
    quality: list[int] = []

    with path.open(newline="") as f:
        for line in f:
            if not line.startswith("#"):
                header = line
                break
        else:
            raise ValueError(f"Missing CSV header in {path}")

        reader = csv.DictReader([header] + f.readlines())
        for row in reader:
            time.append(float(row["time_btjd"]))
            flux.append(float(row["flux"]))
            flux_err.append(float(row["flux_err"]))
            quality.append(int(row["quality"]))

    t = np.asarray(time, dtype=np.float64)
    f_arr = np.asarray(flux, dtype=np.float64)
    e_arr = np.asarray(flux_err, dtype=np.float64)
    q = np.asarray(quality, dtype=np.int32)

    ok = q == 0
    return btv.LightCurve(time=t[ok], flux=f_arr[ok], flux_err=e_arr[ok])


# Load ephemeris from ExoFOP (matches tutorial Step 1)
exofop = fetch_exofop_toi_table()
rows = [r for r in exofop.rows if str(r.get("tic_id", "")) == str(TIC_ID)]
if not rows:
    raise RuntimeError(f"No ExoFOP TOI entry found for TIC {TIC_ID}")
row = rows[0]

period_days = float(row["period_days"])
# epoch_bjd -> BTJD
epoch_bjd = float(row["epoch_bjd"])
t0_btjd = epoch_bjd - 2457000.0

duration_hours = float(row["duration_hours"])
depth_ppm = float(row.get("depth_ppm") or 0.0)

candidate = btv.Candidate(
    ephemeris=btv.Ephemeris(period_days=period_days, t0_btjd=t0_btjd, duration_hours=duration_hours),
    depth_ppm=depth_ppm,
)

stellar = btv.StellarParams(
    radius=float(row.get("stellar_radius_r_sun") or 1.65),
    mass=float(row.get("stellar_mass_m_sun") or 1.47),
    teff=float(row.get("stellar_eff_temp_k") or 6816),
    logg=4.17,
)

print("Ephemeris:", period_days, t0_btjd, duration_hours)
print("Depth_ppm (ExoFOP):", depth_ppm)


## 1) V16 after per-sector detrending

We detrend each sector with a rolling median filter using a window larger than the transit duration, then re-run V16.

Notes:

- This is a pragmatic robustness test, not a production detrending recommendation.
- Median detrending is not transit-masked here; choose a window that does not track the transit shape.

In [None]:
def detrend_sector(lc: btv.LightCurve, *, window_days: float) -> btv.LightCurve:
    t = np.asarray(lc.time, dtype=np.float64)
    f_arr = np.asarray(lc.flux, dtype=np.float64)
    e_arr = np.asarray(lc.flux_err, dtype=np.float64)

    idx = np.argsort(t)
    t = t[idx]
    f_arr = f_arr[idx]
    e_arr = e_arr[idx]

    if len(t) > 1:
        dt = float(np.median(np.diff(t)))
    else:
        dt = 1.0 / 48.0

    window_points = int(np.ceil(float(window_days) / max(dt, 1e-9)))
    window_points = max(101, window_points)
    if window_points % 2 == 0:
        window_points += 1

    f_d = btv.median_detrend(f_arr, window=window_points)

    # approximate multiplicative trend for error propagation
    with np.errstate(invalid="ignore", divide="ignore"):
        trend = f_arr / f_d
    trend_med = float(np.nanmedian(trend)) if np.isfinite(trend).any() else 1.0
    trend = np.where(np.isfinite(trend) & (trend != 0.0), trend, trend_med)
    e_d = e_arr / trend

    return btv.LightCurve(time=t, flux=f_d, flux_err=e_d)


for window_days in [1.0, 2.0, 4.0]:
    print(f"\nwindow_days={window_days}")
    for sector in SECTORS:
        lc = load_sector_lc(sector)
        raw = btv.vet_candidate(
            lc,
            candidate,
            stellar=stellar,
            network=False,
            tic_id=TIC_ID,
            preset="extended",
            checks=["V16"],
        ).results[0]

        lc_d = detrend_sector(lc, window_days=window_days)
        det = btv.vet_candidate(
            lc_d,
            candidate,
            stellar=stellar,
            network=False,
            tic_id=TIC_ID,
            preset="extended",
            checks=["V16"],
        ).results[0]

        print(
            f"sector {sector}: raw winner={raw.metrics.get('winner') if raw.metrics else None} flags={raw.flags} | "
            f"detr winner={det.metrics.get('winner') if det.metrics else None} flags={det.flags}"
        )


## 2) Multi-sector pixel vetting (V08窶天10)

This step downloads a per-sector TPF from MAST and runs V08窶天10 using the **matching per-sector light curve**.

Requirements:

- `lightkurve` installed
- network access

If you prefer offline-only execution, you can skip this section and use the bundled `sector83_tpf.npz` from `04-real-candidate-validation.ipynb` (but you will not be able to verify sector-to-sector consistency).

In [None]:
try:
    import lightkurve as lk

    print("Downloading per-sector TPFs and running V08窶天10...")
    print(f"TIC {TIC_ID} sectors={SECTORS}")

    for sector in SECTORS:
        lc = load_sector_lc(sector)

        search = lk.search_targetpixelfile(f"TIC {TIC_ID}", sector=sector, mission="TESS")
        if len(search) == 0:
            print(f"sector {sector}: no TPF found")
            continue

        # Choose the product closest to 120s cadence when possible.
        best = None
        best_delta = 1e9
        for i in range(len(search)):
            row_i = search[i]
            try:
                exptime = float(row_i.exptime.value) if hasattr(row_i.exptime, "value") else float(row_i.exptime)
            except Exception:
                exptime = 120.0
            delta = abs(exptime - 120.0)
            if delta < best_delta:
                best = row_i
                best_delta = delta

        tpf = best.download()
        if type(tpf).__name__ == "TargetPixelFileCollection":
            tpf = tpf[0]

        tpf_stamp = btv.TPFStamp(
            time=np.asarray(tpf.time.btjd, dtype=np.float64),
            flux=np.asarray(tpf.flux.value, dtype=np.float64),
            flux_err=np.asarray(tpf.flux_err.value, dtype=np.float64)
            if getattr(tpf, "flux_err", None) is not None
            else None,
            wcs=tpf.wcs,
            aperture_mask=np.asarray(getattr(tpf, "pipeline_mask", None), dtype=bool)
            if getattr(tpf, "pipeline_mask", None) is not None
            else None,
            quality=np.asarray(
                getattr(tpf, "quality", np.zeros(len(tpf.time.btjd), dtype=np.int32)), dtype=np.int32
            ),
        )

        bundle = btv.vet_candidate(
            lc,
            candidate,
            stellar=stellar,
            tpf=tpf_stamp,
            network=False,
            tic_id=TIC_ID,
            checks=["V08", "V09", "V10"],
        )

        by = {r.id: r for r in bundle.results}
        v08 = by["V08"]
        v09 = by["V09"]
        v10 = by["V10"]

        print(
            f"sector {sector}: "
            f"V08 shift_px={v08.metrics.get('centroid_shift_pixels'):.4f} sig={v08.metrics.get('significance_sigma'):.2f}; "
            f"V09 offset_px={v09.metrics.get('distance_to_target_pixels'):.3f}; "
            f"V10 stability={v10.metrics.get('stability_metric'):.3f}"
        )

except ImportError:
    print("lightkurve not installed - skipping multi-sector pixel checks")


## 3) V19 Phase-Shift Event Diagnostics

V19 surfaces phase-shifted dip-like events elsewhere in the orbit. This is useful for diagnosing residual variability/systematics or additional signals.

We print the list of detected events per sector (phase, significance, depth estimate).

In [None]:
for sector in SECTORS:
    lc = load_sector_lc(sector)
    r = btv.vet_candidate(
        lc,
        candidate,
        stellar=stellar,
        network=False,
        tic_id=TIC_ID,
        preset="extended",
        checks=["V19"],
    ).results[0]

    n_events = r.metrics.get("n_phase_shift_events") if r.metrics else None
    max_sigma = r.metrics.get("max_phase_shift_event_sigma") if r.metrics else None

    print(f"\nsector {sector}: status={r.status} n_events={n_events} max_sigma={max_sigma}")

    events = None
    if isinstance(r.raw, dict) and "phase_shift_events" in r.raw:
        events = r.raw["phase_shift_events"]

    if not events:
        continue

    for ev in events:
        phase = ev.get("phase")
        sig = ev.get("significance")
        depth = ev.get("depth_ppm")
        npts = ev.get("n_points")
        print(f"  phase={phase} sigma={sig:.2f} depth_ppm={depth:.1f} n_points={npts}")
