# Profiling `malca.events` on SkyPatrol light curves

Use `cProfile` to time the Bayesian event scorer on real SkyPatrol CSVs stored in `input/skypatrol2`. The notebook keeps everything self contained so it can run directly against the repo without extra setup.

## Environment setup

Locate the repo root, add it to `sys.path`, and collect the SkyPatrol CSVs we'll profile.

In [None]:
import sys
from pathlib import Path

import numpy as np
import pandas as pd

# Find repo root whether the notebook is run from notebooks/ or repo root
candidates = [Path.cwd(), Path.cwd().parent, Path.cwd().parent.parent]
repo_root = next((p for p in candidates if (p / "malca").exists()), Path.cwd())

for path in (repo_root, repo_root / "malca"):
    sp = str(path.resolve())
    if sp not in sys.path:
        sys.path.insert(0, sp)

data_dir = repo_root / "input" / "skypatrol2"
lc_paths = sorted(data_dir.glob("*-light-curves.csv"))

print(f"Repo root: {repo_root}")
print(f"Found {len(lc_paths)} light curves in {data_dir}")
if not lc_paths:
    raise FileNotFoundError("No SkyPatrol CSVs found; adjust data_dir above.")

## Quick peek at one SkyPatrol light curve

Read a single CSV to confirm the loader works and to see the columns that flow into the scorer.

In [None]:
from malca.plot import read_skypatrol_csv

example_path = lc_paths[0]
df_example = read_skypatrol_csv(example_path)
print(f"{example_path.name}: {len(df_example)} rows")
display(df_example.head())

## Profiling helpers

Wrap `_process_one` so we can reuse the same kwargs as the CLI and capture both `cProfile` output and a tidy summary table of the hottest functions.

In [None]:
import cProfile
import io
import pstats
import time

import malca.events as events

DEFAULT_EVENT_KWARGS = {
    "trigger_mode": "posterior_prob",
    "logbf_threshold_dip": 5.0,
    "logbf_threshold_jump": 5.0,
    "significance_threshold": 99.99997,
    "p_points": 80,
    "p_min_dip": None,
    "p_max_dip": None,
    "p_min_jump": None,
    "p_max_jump": None,
    "run_min_points": 3,
    "run_allow_gap_points": 1,
    "run_max_gap_days": None,
    "run_min_duration_days": None,
    "run_sum_threshold": None,
    "run_sum_multiplier": 2.5,
    "baseline_tag": "gp",
    "use_sigma_eff": True,
    "require_sigma_eff": True,
    "compute_event_prob": True,
}

def score_single_lc(path: Path, **overrides):
    kwargs = dict(DEFAULT_EVENT_KWARGS)
    kwargs.update(overrides)
    return events._process_one(str(path), **kwargs)

def stats_to_frame(stats_obj, limit=20):
    rows = []
    for func, (cc, nc, tt, ct, callers) in stats_obj.stats.items():
        rows.append(
            {
                "func": f"{func[2]} ({Path(func[0]).name}:{func[1]})",
                "ncalls": nc,
                "tottime_s": tt,
                "cumtime_s": ct,
            }
        )
    df_stats = pd.DataFrame(rows)
    if df_stats.empty:
        return df_stats
    return df_stats.sort_values("cumtime_s", ascending=False).head(limit)

def profile_light_curve(path: Path, stats_limit=25, **overrides):
    profiler = cProfile.Profile()
    start = time.perf_counter()
    result = profiler.runcall(score_single_lc, path, **overrides)
    elapsed = time.perf_counter() - start

    stats_buffer = io.StringIO()
    stats_obj = pstats.Stats(profiler, stream=stats_buffer).strip_dirs().sort_stats("cumtime")
    stats_obj.print_stats(stats_limit)
    stats_text = stats_buffer.getvalue()

    top_df = stats_to_frame(stats_obj, limit=stats_limit)
    return result, stats_text, top_df, elapsed

## Profile a single light curve

Run the Bayesian scorer on one SkyPatrol CSV (default parameters match the CLI). `stats_df_single` highlights the functions consuming the most cumulative time.

In [None]:
single_path = lc_paths[0]
result_single, stats_text_single, stats_df_single, elapsed_single = profile_light_curve(single_path, stats_limit=25)

print(f"Profiled {single_path.name} in {elapsed_single:.2f} s")
print(stats_text_single)
stats_df_single

## Batch timing on a handful of files (optional)

Process a small subset sequentially to gauge throughput without full profiling. Adjust `N_LC` to cover more files if needed.

In [None]:
N_LC = 3
subset_paths = lc_paths[:N_LC]

timing_rows = []
for path in subset_paths:
    t0 = time.perf_counter()
    res = score_single_lc(path)
    timing_rows.append(
        {
            "path": path.name,
            "elapsed_s": time.perf_counter() - t0,
            "n_points": res.get("n_points"),
            "dip_sig": res.get("dip_significant"),
            "jump_sig": res.get("jump_significant"),
            "dip_bf": res.get("dip_bayes_factor"),
            "jump_bf": res.get("jump_bayes_factor"),
        }
    )

pd.DataFrame(timing_rows)

## Grid resolution sweep
Compare how `p_points` (probability grid) and `mag_points` (magnitude grid) affect runtime and outputs on a small SkyPatrol subset.

In [None]:
import time
from malca.baseline import per_camera_gp_baseline
import malca.events as events

BASELINE_KWARGS = dict(events.DEFAULT_BASELINE_KWARGS)

def prepare_light_curve(path):
    df_raw = read_skypatrol_csv(path)
    valid_mask = (
        df_raw["JD"].pipe(np.isfinite)
        & df_raw["mag"].pipe(np.isfinite)
        & df_raw["error"].pipe(np.isfinite)
        & (df_raw["error"] > 0)
        & (df_raw["error"] < 10)
    )
    df = df_raw[valid_mask].copy()
    if len(df) < 10:
        raise ValueError(f"Insufficient valid data points ({len(df)}) in {path.name}")
    df = events.clean_lc(df)

    df_base = per_camera_gp_baseline(df, **BASELINE_KWARGS)
    baseline_mags = df_base["baseline"].to_numpy(float) if "baseline" in df_base.columns else df_base["mag"].to_numpy(float)
    baseline_mag = float(np.nanmedian(baseline_mags))
    mags_for_grid = df_base["mag"].to_numpy(float) if "mag" in df_base.columns else df["mag"].to_numpy(float)

    def baseline_precomputed(df_in, **kwargs):
        return df_base

    return df, baseline_precomputed, baseline_mag, mags_for_grid

def run_grid_setting(path, p_points, mag_points, label):
    df, baseline_fn, baseline_mag, mags_for_grid = prepare_light_curve(path)
    mag_grid_dip = events.default_mag_grid(baseline_mag, mags_for_grid, "dip", n=mag_points)
    mag_grid_jump = events.default_mag_grid(baseline_mag, mags_for_grid, "jump", n=mag_points)

    start = time.perf_counter()
    res = events.run_bayesian_significance(
        df,
        baseline_func=baseline_fn,
        baseline_kwargs={},
        p_points=p_points,
        mag_grid_dip=mag_grid_dip,
        mag_grid_jump=mag_grid_jump,
        trigger_mode="posterior_prob",
        logbf_threshold_dip=5.0,
        logbf_threshold_jump=5.0,
        significance_threshold=99.99997,
        run_min_points=3,
        run_allow_gap_points=1,
        run_max_gap_days=None,
        run_min_duration_days=None,
        run_sum_threshold=None,
        run_sum_multiplier=2.5,
        use_sigma_eff=True,
        require_sigma_eff=True,
        compute_event_prob=True,
    )
    elapsed = time.perf_counter() - start

    return {
        "path": path.name,
        "config": label,
        "p_points": p_points,
        "mag_points": mag_points,
        "elapsed_s": elapsed,
        "dip_sig": res["dip"]["significant"],
        "jump_sig": res["jump"]["significant"],
        "dip_bf": res["dip"]["bayes_factor"],
        "jump_bf": res["jump"]["bayes_factor"],
        "dip_best_p": res["dip"]["best_p"],
        "jump_best_p": res["jump"]["best_p"],
    }

In [None]:
# Evaluate a few grid settings on a small subset of light curves
N_LC = 3
subset_paths = lc_paths[:N_LC]

grid_settings = [
    {"label": "baseline_80x60", "p_points": 80, "mag_points": 60},
    {"label": "coarse_40x30", "p_points": 40, "mag_points": 30},
    {"label": "fine_p_160x60", "p_points": 160, "mag_points": 60},
    {"label": "fine_mag_80x120", "p_points": 80, "mag_points": 120},
]

rows = []
for path in subset_paths:
    for cfg in grid_settings:
        rows.append(run_grid_setting(path, cfg["p_points"], cfg["mag_points"], cfg["label"]))

df_grid = pd.DataFrame(rows)
df_grid

In [None]:
# Compare each setting to the baseline configuration
baseline_label = "baseline_80x60"
base = df_grid[df_grid["config"] == baseline_label]
comparison = df_grid.merge(base, on="path", suffixes=("", "_base"))
comparison["dip_bf_delta"] = comparison["dip_bf"] - comparison["dip_bf_base"]
comparison["jump_bf_delta"] = comparison["jump_bf"] - comparison["jump_bf_base"]
comparison["elapsed_delta_s"] = comparison["elapsed_s"] - comparison["elapsed_s_base"]
cols = [
    "path",
    "config",
    "p_points",
    "mag_points",
    "elapsed_s",
    "elapsed_delta_s",
    "dip_sig",
    "dip_sig_base",
    "dip_bf",
    "dip_bf_delta",
    "jump_sig",
    "jump_sig_base",
    "jump_bf",
    "jump_bf_delta",
]
comparison[cols]

## Simulated dips and jumps
Create a small suite of synthetic light curves (reusing a real SkyPatrol cadence) with injected dips, jumps, and a mixed case so grid sensitivity can be tested on known events.

In [None]:
# Build simulated light curves by injecting analytic events into a real cadence
from malca import events

rng = np.random.default_rng(42)

base_sim_path = lc_paths[0]
df_sim_base_raw = read_skypatrol_csv(base_sim_path)
mask = (
    df_sim_base_raw["JD"].pipe(np.isfinite)
    & df_sim_base_raw["mag"].pipe(np.isfinite)
    & df_sim_base_raw["error"].pipe(np.isfinite)
    & (df_sim_base_raw["error"] > 0)
    & (df_sim_base_raw["error"] < 10)
)
df_sim_base = df_sim_base_raw[mask].copy()
df_sim_base = events.clean_lc(df_sim_base)

jd_med = float(df_sim_base["JD"].median())


def inject_event(df_in, kind="dip", shape="gaussian", amp=0.25, width=25.0, t0_offset=0.0):
    # Return a copy with an injected event (positive amp = dip, negative amp = jump).
    df = df_in.copy()
    t0 = jd_med + float(t0_offset)
    amp_signed = float(amp) if kind == "dip" else -float(amp)
    t_arr = df["JD"].to_numpy(float)
    if shape == "gaussian":
        delta = events.gaussian(t_arr, amp_signed, t0, float(width), 0.0)
    elif shape == "paczynski":
        delta = events.paczynski(t_arr, amp_signed, t0, float(width), 0.0)
    else:
        raise ValueError("shape must be gaussian or paczynski")
    df["mag"] = df["mag"].to_numpy(float) + delta
    return df


def inject_mixed(df_in):
    df = inject_event(df_in, kind="dip", shape="gaussian", amp=0.18, width=20.0, t0_offset=-60.0)
    df = inject_event(df, kind="jump", shape="paczynski", amp=0.14, width=30.0, t0_offset=40.0)
    return df


def add_noise_and_gaps(df_in, jitter_mag=0.02, gap_frac=0.15, spike_amp=0.25, spike_width=0.08, spike_count=3):
    # Add photometric jitter, drop random points, and sprinkle outliers to make messy cases.
    df = df_in.copy()
    n = len(df)
    df["mag"] = df["mag"].to_numpy(float) + rng.normal(0.0, jitter_mag, n)
    df["error"] = (df["error"].to_numpy(float) * (1 + rng.normal(0.0, 0.05, n))).clip(min=1e-3)
    keep = rng.random(n) > gap_frac
    df = df.loc[keep].copy()
    if len(df) == 0:
        return df_in.copy()
    for _ in range(int(spike_count)):
        idx = int(rng.integers(0, len(df)))
        df.loc[idx, "mag"] += rng.normal(spike_amp, spike_width)
    df = df.sort_values("JD").reset_index(drop=True)
    return df

simulated_lcs = {
    "dip_gaussian": inject_event(df_sim_base, kind="dip", shape="gaussian", amp=0.22, width=25.0, t0_offset=-30.0),
    "dip_paczynski": inject_event(df_sim_base, kind="dip", shape="paczynski", amp=0.28, width=18.0, t0_offset=20.0),
    "dip_shallow_fast": inject_event(df_sim_base, kind="dip", shape="gaussian", amp=0.12, width=8.0, t0_offset=-10.0),
    "dip_double": inject_event(inject_event(df_sim_base, kind="dip", shape="gaussian", amp=0.18, width=15.0, t0_offset=-50.0), kind="dip", shape="paczynski", amp=0.15, width=12.0, t0_offset=25.0),
    "jump_gaussian": inject_event(df_sim_base, kind="jump", shape="gaussian", amp=0.20, width=22.0, t0_offset=-15.0),
    "jump_paczynski": inject_event(df_sim_base, kind="jump", shape="paczynski", amp=0.24, width=16.0, t0_offset=45.0),
    "microlens_weak": inject_event(df_sim_base, kind="jump", shape="paczynski", amp=0.10, width=10.0, t0_offset=5.0),
    "microlens_strong": inject_event(df_sim_base, kind="jump", shape="paczynski", amp=0.30, width=26.0, t0_offset=70.0),
    "mixed": inject_mixed(df_sim_base),
    "messy_dip": add_noise_and_gaps(inject_event(df_sim_base, kind="dip", shape="gaussian", amp=0.20, width=18.0, t0_offset=-5.0), jitter_mag=0.03, gap_frac=0.2, spike_count=3),
    "messy_jump": add_noise_and_gaps(inject_event(df_sim_base, kind="jump", shape="paczynski", amp=0.18, width=14.0, t0_offset=35.0), jitter_mag=0.03, gap_frac=0.2, spike_count=3),
    "messy_mixed": add_noise_and_gaps(inject_mixed(df_sim_base), jitter_mag=0.04, gap_frac=0.25, spike_count=4),
}

print(f"Simulated light curves: {list(simulated_lcs.keys())}")
display(df_sim_base.head())

In [None]:
from malca.baseline import per_camera_gp_baseline


def prepare_df_for_sim(df):
    df_clean = events.clean_lc(df)
    df_base = per_camera_gp_baseline(df_clean, **BASELINE_KWARGS)
    baseline_mags = df_base["baseline"].to_numpy(float) if "baseline" in df_base.columns else df_base["mag"].to_numpy(float)
    baseline_mag = float(np.nanmedian(baseline_mags))
    mags_for_grid = df_base["mag"].to_numpy(float) if "mag" in df_base.columns else df_clean["mag"].to_numpy(float)

    def baseline_precomputed(df_in, **kwargs):
        return df_base

    return df_clean, baseline_precomputed, baseline_mag, mags_for_grid


def run_grid_setting_sim(name, df, p_points, mag_points, label):
    df_clean, baseline_fn, baseline_mag, mags_for_grid = prepare_df_for_sim(df)
    mag_grid_dip = events.default_mag_grid(baseline_mag, mags_for_grid, "dip", n=mag_points)
    mag_grid_jump = events.default_mag_grid(baseline_mag, mags_for_grid, "jump", n=mag_points)

    start = time.perf_counter()
    res = events.run_bayesian_significance(
        df_clean,
        baseline_func=baseline_fn,
        baseline_kwargs={},
        p_points=p_points,
        mag_grid_dip=mag_grid_dip,
        mag_grid_jump=mag_grid_jump,
        trigger_mode="posterior_prob",
        logbf_threshold_dip=5.0,
        logbf_threshold_jump=5.0,
        significance_threshold=99.99997,
        run_min_points=3,
        run_allow_gap_points=1,
        run_max_gap_days=None,
        run_min_duration_days=None,
        run_sum_threshold=None,
        run_sum_multiplier=2.5,
        use_sigma_eff=True,
        require_sigma_eff=True,
        compute_event_prob=True,
    )
    elapsed = time.perf_counter() - start

    return {
        "case": name,
        "config": label,
        "p_points": p_points,
        "mag_points": mag_points,
        "elapsed_s": elapsed,
        "dip_sig": res["dip"]["significant"],
        "jump_sig": res["jump"]["significant"],
        "dip_bf": res["dip"]["bayes_factor"],
        "jump_bf": res["jump"]["bayes_factor"],
        "dip_best_p": res["dip"]["best_p"],
        "jump_best_p": res["jump"]["best_p"],
    }

In [None]:
# Sweep grid sizes on the simulated cases
sim_grid_settings = [
    {"label": "sim_10x10", "p_points": 10, "mag_points": 10},
    {"label": "sim_25x25", "p_points": 25, "mag_points": 25},
    {"label": "sim_50x50", "p_points": 50, "mag_points": 50},
    {"label": "sim_80x60", "p_points": 80, "mag_points": 60},
]

sim_rows = []
for name, df_sim in simulated_lcs.items():
    for cfg in sim_grid_settings:
        sim_rows.append(run_grid_setting_sim(name, df_sim, cfg["p_points"], cfg["mag_points"], cfg["label"]))

sim_results = pd.DataFrame(sim_rows)
sim_results