In [None]:
!pip -q install lightkurve==2.* numpy pandas scipy astropy tqdm pyarrow

**Detrending + BLS pipeline**

In [None]:
from typing import Optional, Dict, Any, List, Tuple
import numpy as np
import pandas as pd
from tqdm import tqdm
from astropy.timeseries import BoxLeastSquares
from scipy.signal import savgol_filter
import lightkurve as lk

def _to_clean_arrays(time_in, flux_in):
    t = np.array(time_in, dtype=float)
    f = np.array(flux_in, dtype=float)
    m = np.isfinite(t) & np.isfinite(f)
    t, f = t[m], f[m]
    if len(t) == 0:
        return t, f
    order = np.argsort(t)
    return t[order], f[order]

def _estimate_baseline_days(time):
    if len(time) < 2:
        return 0.0
    return float(np.nanmax(time) - np.nanmin(time))

def _savgol_detrend(time, flux, window_length=401, polyorder=2):
    if len(flux) < 7:
        return time, flux
    wl = max(5, int(window_length))
    wl = wl if wl % 2 == 1 else wl + 1
    wl = min(wl, len(flux) - (1 - len(flux) % 2))
    if wl < 5:
        return time, flux
    trend = savgol_filter(flux, window_length=wl, polyorder=polyorder, mode="interp")
    med = np.nanmedian(trend) if np.isfinite(trend).any() else 1.0
    trend = np.where(trend == 0, med, trend)
    detrended = flux / trend - 1.0
    return time, detrended

def _lk_flatten(time, flux, window_length=401, polyorder=2, break_tolerance=5):
    try:
        lc = lk.LightCurve(time=time, flux=flux)
        wl = max(5, int(window_length))
        wl = wl if wl % 2 == 1 else wl + 1
        lc_flat = lc.flatten(window_length=wl, polyorder=polyorder, break_tolerance=break_tolerance)
        return np.array(lc_flat.time.value), np.array(lc_flat.flux.value)
    except Exception:
        return _savgol_detrend(time, flux, window_length=window_length, polyorder=polyorder)

def detrend_series(time_in, flux_in,
                   method: str = "lightkurve",
                   window_length: Optional[int] = None,
                   polyorder: int = 2,
                   cadence_min: Optional[float] = None,
                   max_transit_hours: float = 10.0) -> Tuple[np.ndarray, np.ndarray]:
    t, f = _to_clean_arrays(time_in, flux_in)
    if len(t) < 50:
        return t, f

    if window_length is None:
        if cadence_min is None:
            dt_days = np.nanmedian(np.diff(t)) if len(t) > 1 else 0.0208333
            cadence_min = float(dt_days * 24.0 * 60.0)
        target_minutes = max_transit_hours * 60.0 * 2.5
        window_length = max(51, int(round(target_minutes / max(cadence_min, 1e-6))))
        if window_length % 2 == 0:
            window_length += 1

    if method == "lightkurve":
        return _lk_flatten(t, f, window_length=window_length, polyorder=polyorder)
    else:
        return _savgol_detrend(t, f, window_length=window_length, polyorder=polyorder)

def _period_limits_from_baseline(baseline_days: float,
                                 min_period: float = 0.3,
                                 max_period: Optional[float] = None) -> Tuple[float, float]:
    if max_period is None:
        max_period = max(1.0, 0.9 * max(baseline_days, 1.0))
        max_period = min(max_period, 100.0)  # safety cap
    return max(min_period, 0.05), max_period

def run_bls(time: np.ndarray,
            flux: np.ndarray,
            min_period: float = 0.3,
            max_period: Optional[float] = None,
            min_dur_hours: float = 0.5,
            max_dur_hours: float = 10.0,
            n_durations: int = 25,
            oversample: int = 10,
            minimum_n_transit: int = 3) -> Dict[str, Any]:
    if len(time) < 100:
        return {}

    baseline = _estimate_baseline_days(time)
    pmin, pmax = _period_limits_from_baseline(baseline, min_period=min_period, max_period=max_period)

    dur_grid_days = np.linspace(min_dur_hours/24.0, max_dur_hours/24.0, n_durations)

    bls = BoxLeastSquares(time, flux)
    res = bls.autopower(dur_grid_days,
                        minimum_n_transit=minimum_n_transit,
                        frequency_factor=oversample)

    sel = (res.period >= pmin) & (res.period <= pmax) & np.isfinite(res.power)
    if not np.any(sel):
        return {}

    out = {
        "period": np.array(res.period[sel]),
        "power": np.array(res.power[sel]),
        "duration": np.array(res.duration[sel]),
        "transit_time": np.array(res.transit_time[sel]),
        "depth": np.array(res.depth[sel]) if hasattr(res, "depth") else None,
        "depth_err": np.array(res.depth_err[sel]) if hasattr(res, "depth_err") else None,
        "snr": None,
    }
    if out["depth"] is not None and out["depth_err"] is not None:
        with np.errstate(divide="ignore", invalid="ignore"):
            out["snr"] = out["depth"] / np.where(out["depth_err"] == 0, np.nan, out["depth_err"])

    return out

def _find_top_peaks(period, power, k_top=3, min_separation=0.02):
    if len(period) < 5:
        return np.argsort(power)[::-1][:k_top]

    is_peak = (power[1:-1] > power[:-2]) & (power[1:-1] > power[2:])
    peak_idx = np.where(is_peak)[0] + 1
    if len(peak_idx) == 0:
        peak_idx = np.array([np.argmax(power)])

    candidates = peak_idx[np.argsort(power[peak_idx])[::-1]]

    selected = []
    for i in candidates:
        if len(selected) >= k_top:
            break
        if all(np.abs(period[i] - period[j]) > min_separation * period[i] for j in selected):
            selected.append(i)

    if len(selected) < k_top:
        filler = np.argsort(power)[::-1]
        for i in filler:
            if i in selected:
                continue
            if all(np.abs(period[i] - period[j]) > min_separation * period[i] for j in selected):
                selected.append(i)
            if len(selected) >= k_top:
                break

    return np.array(selected[:k_top], dtype=int)

def process_block_row(row: pd.Series,
                      detrend_method: str = "lightkurve",
                      max_transit_hours: float = 10.0,
                      bls_top_k: int = 3,
                      save_power_every_n: int = 10,
                      decimate_power: int = 5) -> List[Dict[str, Any]]:
    time = row.get("time")
    flux = row.get("flux")
    if time is None or flux is None:
        return []

    t, f = _to_clean_arrays(time, flux)
    if len(t) < 100:
        return []

    t_d, f_d = detrend_series(
        t, f,
        method=detrend_method,
        cadence_min=row.get("cadence_min"),
        max_transit_hours=max_transit_hours
    )

    f_d = f_d - np.nanmedian(f_d)

    bls_out = run_bls(
        t_d, f_d,
        min_period=0.3,
        max_period=None,
        min_dur_hours=0.5,
        max_dur_hours=max_transit_hours,
        n_durations=25,
        oversample=10,
        minimum_n_transit=3
    )
    if not bls_out:
        return []

    period = bls_out["period"]
    power = bls_out["power"]
    duration = bls_out["duration"]
    t0 = bls_out["transit_time"]
    depth = bls_out["depth"]
    depth_err = bls_out["depth_err"]
    snr_arr = bls_out["snr"]

    peaks = _find_top_peaks(period, power, k_top=bls_top_k, min_separation=0.02)

    attach_power = False
    if save_power_every_n and save_power_every_n > 0:

        attach_power = (int(row.name) % save_power_every_n == 0)

    candidates = []
    for rank, i in enumerate(peaks, start=1):
        cand = {
            "obs_block_id": row.get("obs_block_id"),
            "target_id": row.get("target_id"),
            "label": row.get("label"),
            "mission": row.get("mission"),
            "sector": row.get("sector"),
            "quarter": row.get("quarter"),
            "campaign": row.get("campaign"),
            "rank": rank,
            "period": float(period[i]),
            "epoch": float(t0[i]),
            "duration": float(duration[i]),
            "depth": float(depth[i]) if depth is not None else np.nan,
            "depth_err": float(depth_err[i]) if depth_err is not None else np.nan,
            "snr": float(snr_arr[i]) if snr_arr is not None and np.isfinite(snr_arr[i]) else np.nan,
            "power": float(power[i]),
            "n_points": int(len(t_d)),
            "baseline_days": float(_estimate_baseline_days(t_d)),
            "detrend_method": detrend_method,
        }
        if attach_power:
            step = max(1, int(decimate_power))
            cand["bls_period_grid"] = period[::step].tolist()
            cand["bls_power_grid"] = power[::step].tolist()
        candidates.append(cand)

    return candidates


def run_bls_pipeline(vet_df: pd.DataFrame,
                     detrend_method: str = "lightkurve",
                     max_transit_hours: float = 10.0,
                     bls_top_k: int = 3,
                     save_power_every_n: int = 10,
                     decimate_power: int = 5) -> pd.DataFrame:
    all_cands: List[Dict[str, Any]] = []
    for idx, row in tqdm(vet_df.iterrows(), total=len(vet_df), desc="Detrend+BLS"):
        try:
            cands = process_block_row(
                row,
                detrend_method=detrend_method,
                max_transit_hours=max_transit_hours,
                bls_top_k=bls_top_k,
                save_power_every_n=save_power_every_n,
                decimate_power=decimate_power
            )
            all_cands.extend(cands)
        except Exception as e:
            obid = row.get("obs_block_id", "?")
            print(f"[WARN] failed on row {idx} ({obid}): {repr(e)}")

    if not all_cands:
        return pd.DataFrame()

    cand_df = pd.DataFrame.from_records(all_cands)
    cand_df = cand_df.sort_values(
        by=["target_id", "rank", "power"],
        ascending=[True, True, False]
    ).reset_index(drop=True)
    return cand_df


In [None]:
from google.colab import drive
drive.mount('/content/drive')
INPUT_PARQUET = "/content/vetting_kepler.parquet"
OUTPUT_PARQUET = "/content/drive/candidates_bls.parquet"

vet_df = pd.read_parquet(INPUT_PARQUET)
print("Loaded", len(vet_df), "observing blocks")
display(vet_df.head(2))


In [None]:
cand_df = run_bls_pipeline(
    vet_df,
    detrend_method="lightkurve",
    max_transit_hours=10.0,
    bls_top_k=3,
    save_power_every_n=10,
    decimate_power=10
)

print("Found", len(cand_df), "BLS candidates")
display(cand_df.head())

In [None]:
from google.colab import files
tmp_out = "candidates_bls.parquet"
cand_df.to_parquet(tmp_out, index=False)
files.download(tmp_out)