<a href="https://colab.research.google.com/github/Roberock/structural-break/blob/master/roberock_structural_break.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

https://hub.crunchdao.com/competitions/structural-break/submit/notebook?projectName=semantic-penguin


In [9]:
%pip install crunch-cli --upgrade --quiet --progress-bar off
!crunch setup-notebook structural-break 7zhoh1AuNOkWercet1v9wiWi

crunch-cli, version 7.4.0
main.py: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/submissions/27380/main.py (21371 bytes)
notebook.ipynb: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/submissions/27380/notebook.ipynb (42046 bytes)
requirements.txt: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/submissions/27380/requirements.original.txt (212 bytes)
data/X_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_train.parquet (204327238 bytes)
data/X_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_test.reduced.parquet (2380918 bytes)
data/y_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_train.parquet (61003 bytes)
data/y_test.reduced.parquet: download from https:crunchdao--

In [14]:
import os
import typing
from pathlib import Path
import joblib
import warnings
from typing import Optional, Dict, Any

# Import
import numpy as np
import pandas as pd
import scipy
import sklearn.metrics

from scipy.stats import kurtosis, skew
from sklearn.model_selection import train_test_split
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import StratifiedKFold, GridSearchCV
from sklearn.utils.class_weight import compute_sample_weight

# --- Rich features: robust stats + higher moments + FFT + wavelets + Random rubbish
from numpy.fft import rfft
import pywt  # pip install PyWavelets



In [15]:
import crunch
# Load the Crunch Toolings
crunch = crunch.load_notebook()

loaded inline runner with module: <module '__main__'>

cli version: 7.4.0
available ram: 12.67 gb
available cpu: 2 core
----


In [16]:
# Load the data simply
X_train, y_train, X_test = crunch.load_data()

data/X_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_train.parquet (204327238 bytes)
data/X_train.parquet: already exists, file length match
data/X_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_test.reduced.parquet (2380918 bytes)
data/X_test.reduced.parquet: already exists, file length match
data/y_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_train.parquet (61003 bytes)
data/y_train.parquet: already exists, file length match
data/y_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_test.reduced.parquet (2655 bytes)
data/y_test.reduced.parquet: already exists, file length match



## Strategy Implementation

There are multiple approaches you can take to detect structural breaks:

1. **Statistical Tests**: Compare distributions before and after the boundary point;
2. **Feature Engineering**: Extract features from both segments for comparison;
3. **Time Series Modeling**: Detect deviations from expected patterns;
4. **Machine Learning**: Train models to recognize break patterns from labeled examples.

The baseline implementation below uses a simple statistical approach: a t-test to compare the distributions before and after the boundary point.

In [34]:
def _stats_robust(x):
    if x.size == 0:
        return {}
    q = np.percentile(x, [1, 5, 10, 25, 35, 50, 65, 75, 90, 95, 99])
    q1, q5, q10, q25, q35, q50, q65, q75, q90, q95, q99 = q
    mad = np.median(np.abs(x - q50))  # Median Absolute Deviation
    iqr = q75 - q25

    # basic stats
    out = {
        "mean": float(np.mean(x)), "std": float(np.std(x)), "var": float(np.var(x)), "min": float(np.min(x)), "max": float(np.max(x)),
        "median": float(q50),  "q1": float(q1), "q5": float(q5), "q10": float(q10),
        "q25": float(q25), "q35": float(q35), "q65": float(q65), "q75": float(q75),
        "q90": float(q90), "q95": float(q95), "q99": float(q99), "mad": float(mad), "iqr": float(iqr),
        "skew": float(skew(x, bias=False)) if x.size > 2 else 0.0,
        "kurt": float(kurtosis(x, fisher=True, bias=False)) if x.size > 3 else 0.0,
        "rms": float(np.sqrt(np.mean(x**2))),
        "ptp": float(np.ptp(x)),  # peak-to-peak
    }

    # additional robust metrics
    out.update({
        "cv": float(out["std"] / (out["mean"] + 1e-12)),   # coefficient of variation
        "range": float(out["max"] - out["min"]),
        "ratio_max_min": float(out["max"] / (out["min"] + 1e-12)) if out["min"] != 0 else 0.0,
        "range_iqr_ratio": float(out["ptp"] / (out["iqr"] + 1e-12)) if out["iqr"] > 0 else 0.0,
        "entropy": float(-np.sum(p*np.log(p+1e-12)) if (p:=np.histogram(x, bins="fd", density=True)[0]).sum() > 0 else 0.0),
    })

    return out


def _fft_features(
    x: np.ndarray,
    n_bands: int = 5,
    band_scale: str = "log",    # "equal" or "log"
    rolloff_pct: float = 0.85,  # spectral rolloff percentile
    use_hann: bool = True,      # taper to reduce leakage
) -> dict[str, float]:
    """spectral features from the (demeaned) 1D signal:
      - relative band energies (equal or log-spaced)
      - spectral centroid, spread (stdev), rolloff, flatness, flux
      - peak frequency (bin index) and peak/median ratio
      - spectral entropy
    All features are scale-invariant (power-normalized).
    """
    out = {}
    x = np.asarray(x, dtype=float)
    n = x.size
    if n < 8:
        # keep schema stable
        out.update({f"fft_band_{i}": 0.0 for i in range(n_bands)})
        out.update({
            "spec_centroid": 0.0, "spec_spread": 0.0, "spec_rolloff": 0.0,
            "spec_flatness": 0.0, "spec_flux": 0.0, "spec_peak_bin": 0.0,
            "spec_peak_med_ratio": 0.0, "spec_entropy": 0.0
        })
        return out

    # Demean + optional Hann window to reduce leakage
    x = x - np.mean(x)
    if use_hann:
        w = np.hanning(n)
        # preserve overall energy scale
        x = x * w / (np.sqrt((w**2).mean()) + 1e-12)

    # One-sided power spectrum (drop DC)
    spec = np.abs(rfft(x))**2
    if spec.size <= 1:
        spec = np.zeros(2, dtype=float)
    spec = spec[1:]  # drop DC
    P = spec.astype(float)
    P_sum = float(P.sum()) + 1e-12
    Pn = P / P_sum              # normalized power (probability over bins)
    K = Pn.size

    # Frequency bin vector (arbitrary units; bin index is fine for features)
    f = np.arange(1, K+1, dtype=float)

    # -------- Band energies --------
    if n_bands > 0:
        if band_scale == "equal" or K < n_bands:
            edges = np.linspace(0, K, n_bands+1, dtype=int)
        else:
            # log-spaced edges in bin index space
            # (ensure unique, monotone edges within [0, K])
            logs = np.linspace(0, 1, n_bands+1)
            e = np.unique(np.minimum(K, np.round((K)*(10**logs - 1)/(10 - 1)).astype(int)))
            # if uniqueness collapsed (tiny K), fall back to equal
            edges = e if e.size == n_bands+1 else np.linspace(0, K, n_bands+1, dtype=int)

        band_energies = []
        for i in range(len(edges)-1):
            a, b = int(edges[i]), int(edges[i+1])
            if a >= b:
                band_energies.append(0.0)
            else:
                band_energies.append(float(Pn[a:b].sum()))
        # normalize bands to sum to 1 (numerical safety)
        be = np.array(band_energies, dtype=float)
        be = be / (be.sum() + 1e-12)
        out.update({f"fft_band_{i}": float(v) for i, v in enumerate(be)})

    # -------- Spectral descriptors --------
    # centroid & spread (2nd central moment)
    centroid = float((f * Pn).sum())
    spread = float(np.sqrt(((f - centroid)**2 * Pn).sum()))
    out["spec_centroid"] = centroid
    out["spec_spread"]   = spread

    # rolloff: smallest bin where cumulative power >= rolloff_pct
    cdf = np.cumsum(Pn)
    roll_idx = int(np.searchsorted(cdf, rolloff_pct, side="left")) + 1  # 1-based-ish
    out["spec_rolloff"] = float(roll_idx)

    # flatness: geometric mean / arithmetic mean (on power)
    gm = float(np.exp(np.mean(np.log(P + 1e-12))))
    am = float(np.mean(P))
    out["spec_flatness"] = float(gm / (am + 1e-12))

    # spectral flux: L2 difference of successive normalized spectra (single frame ≈ gradient)
    # Here, approximate flux as variability across bins:
    dPn = np.diff(Pn, prepend=Pn[:1])
    out["spec_flux"] = float(np.sqrt((dPn**2).sum()))

    # peak frequency bin & peak/median power ratio
    peak_bin = int(np.argmax(P)) + 1
    medP = float(np.median(P) + 1e-12)
    out["spec_peak_bin"] = float(peak_bin)
    out["spec_peak_med_ratio"] = float(P.max() / medP)

    # spectral entropy (Shannon) of normalized power
    out["spec_entropy"] = float(-(Pn * np.log(Pn + 1e-12)).sum() / np.log(K + 1e-12))

    return out


def _wavelet_energies(x, wavelet="db4", level=None):
    """Wavelet packet-ish: energy per scale from DWT coefficients."""
    if x.size < 8:
        return {}
    coeffs = pywt.wavedec(x - np.mean(x), wavelet=wavelet, level=level)
    energies = [np.sum(c**2) for c in coeffs]  # [cA_L, cD_L, ..., cD1]
    tot = np.sum(energies) + 1e-12
    out = {"wl_cA": float(energies[0]/tot)}
    for i, e in enumerate(energies[1:], start=1):
        out[f"wl_cD_{i}"] = float(e/tot)
    return out

def _raw_n_peaks_and_valleys(x: np.ndarray, n=100) -> dict:
    """
    Extract 100 largest peaks and 100 lowest valleys from the raw signal x.
    Returns them as fixed-length features (padded with 0 if shorter).
    """
    if x.size == 0:
        return {f"raw_peak_{i}": 0.0 for i in range(n)} | {f"raw_valley_{i}": 0.0 for i in range(n)}

    # Sort values
    sorted_vals = np.sort(x)

    # Lowest 100
    valleys = sorted_vals[:n]
    # Highest 100
    peaks = sorted_vals[-n:][::-1]  # reverse to descending

    # Pad if signal shorter than 2n
    valleys = np.pad(valleys, (0, max(0, n - valleys.size)), constant_values=0.0)
    peaks   = np.pad(peaks,   (0, max(0, n - peaks.size)),   constant_values=0.0)

    # Build feature dictionary
    out = {f"raw_peak_{i}": float(peaks[i]) for i in range(n)}
    out.update({f"raw_valley_{i}": float(valleys[i]) for i in range(n)})
    return out


def _wavelet_energies_dwt(x: np.ndarray, wavelet: str = "db4", max_level: int | None = None) -> dict[str, float]:
    """
    Energy per DWT scale (approx + details), normalized to sum=1.
    Stable and fast. Returns fixed keys regardless of x length (given wavelet+level policy).
    """
    if pywt is None or x.size < 8:
        return {}
    x = x.astype(float, copy=False)
    x = x - x.mean()

    # choose maximal level by signal length if not provided
    if max_level is None:
        try:
            max_level = pywt.dwt_max_level(len(x), pywt.Wavelet(wavelet).dec_len)
            max_level = int(max(1, min(max_level, 6)))  # cap – avoid too deep trees
        except Exception:
            max_level = 4

    coeffs = pywt.wavedec(x, wavelet=wavelet, level=max_level)
    energies = [float(np.sum(c.astype(float)**2)) for c in coeffs]  # [cA_L, cD_L, ..., cD1]
    tot = float(np.sum(energies)) + 1e-12
    out = {"wl_dwt_cA": float(energies[0] / tot)}
    for i, e in enumerate(energies[1:], start=1):
        out[f"wl_dwt_cD_{i}"] = float(e / tot)

    # extra shape descriptors across scales
    e_arr = np.array(energies, dtype=float) / tot
    idx = np.arange(len(e_arr), dtype=float)
    out["wl_dwt_scale_centroid"] = float((idx * e_arr).sum() / (e_arr.sum() + 1e-12))
    out["wl_dwt_entropy"] = float(-np.sum(e_arr * np.log(e_arr + 1e-12)) / np.log(len(e_arr) + 1e-12))
    return out

def _segment_features(x):
    """Compose robust stats + spectral features for one segment."""
    f = {}
    f.update(_stats_robust(x))
    f.update(_fft_features(x, n_bands=5))
    f.update(_wavelet_energies(x))
    f.update(_raw_n_peaks_and_valleys(x,n=400))
    return f

def extract_features_rich(
    X: pd.DataFrame,
    windows: tuple[int, ...] = (500,),   # add more, e.g., (64, 256, 512)
) -> pd.DataFrame:
    """Per-id features using value and period columns + optional windowed contrasts."""
    feats = []
    print_count = 0

    for id_, g in X.groupby(level="id"):
        if print_count % 500 == 0 and print_count > 0:
            print(f"...features extracted from samples id: 0-{id_}")
        print_count += 1

        # Pull arrays
        v = g["value"].to_numpy(dtype=float, copy=False)
        p = g["period"].to_numpy(dtype=int, copy=False)

        # Boundary: first post-break index; if no post segment, boundary=end
        if np.any(p == 1):
            boundary_idx = int(np.argmax(p == 1))  # first True
        else:
            boundary_idx = len(v)

        # Full pre/post segments
        pre = v[:boundary_idx]
        post = v[boundary_idx:]

        d = {"id": id_}

        # --- Global features
        d.update({f"g_{k}": val for k, val in _segment_features(v).items()})

        # --- Pre/Post features
        d.update({f"pre_{k}": val for k, val in _segment_features(pre).items()})
        d.update({f"post_{k}": val for k, val in _segment_features(post).items()})

        # --- Deltas (post - pre) for key stats
        for k in ["mean", "std", "median", "mad", "skew", "kurt", "rms"]:
            d[f"delta_{k}"] = d.get(f"post_{k}", 0.0) - d.get(f"pre_{k}", 0.0)

        # --- Windowed features around boundary (pre/post windows per size)
        for W in windows:
            # slice bounds
            pre_win = v[max(0, boundary_idx - W): boundary_idx]
            post_win = v[boundary_idx: min(len(v), boundary_idx + W)]

            # stats per window
            pre_feats = _segment_features(pre_win)
            post_feats = _segment_features(post_win)

            # prefix & attach
            d.update({f"pre_win{W}_{k}": val for k, val in pre_feats.items()})
            d.update({f"post_win{W}_{k}": val for k, val in post_feats.items()})

            # deltas per window
            for k in ["mean", "std", "median", "mad", "skew", "kurt", "rms"]:
                d[f"delta_win{W}_{k}"] = post_feats.get(k, 0.0) - pre_feats.get(k, 0.0)

            """for k in ["spec_centroid", "spec_spread", "spec_rolloff", "spec_flatness", "spec_flux",
                      "spec_peak_bin", "spec_peak_med_ratio", "spec_entropy"]:
                          d[f"delta_{k}"] = d.get(f"post_{k}", 0.0) - d.get(f"pre_{k}", 0.0)
            """
            # counts & ratio for this window
            len_pre_w = pre_win.size
            len_post_w = post_win.size
            d[f"len_pre_win{W}"] = int(len_pre_w)
            d[f"len_post_win{W}"] = int(len_post_w)
            d[f"ratio_post_pre_win{W}"] = float(len_post_w / (len_pre_w + 1e-6))

        # --- basic counts
        d["len_total"] = int(v.size)
        d["n_pre"] = int(pre.size)
        d["n_post"] = int(post.size)
        d["ratio_post_pre"] = float(d["n_post"] / (d["n_pre"] + 1e-6))

        # collect
        feats.append(d)

    df = pd.DataFrame(feats).set_index("id")
    df = df.replace([np.inf, -np.inf], 0.0).fillna(0.0)
    return df


In [65]:
from sklearn.model_selection import StratifiedKFold, GridSearchCV
from sklearn.utils.class_weight import compute_sample_weight
from sklearn.metrics import roc_auc_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, HistGradientBoostingClassifier, StackingClassifier
from sklearn.svm import NuSVC
from sklearn.calibration import CalibratedClassifierCV

try: # Optional: XGBoost (guarded import)
    from xgboost import XGBClassifier
    _HAS_XGB = True
except Exception:
    _HAS_XGB = False

import lightgbm as lgb
from sklearn.svm import NuSVC, SVC


In [36]:
# ---------------------------
# Utilities
# ---------------------------
def _safe_cv(n_splits=5, seed=42):
    return StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=seed)

def _supports_sample_weight(estimator) -> bool:
    # crude but practical check
    return "sample_weight" in estimator.fit.__code__.co_varnames

def _ensure_proba_model(model, method="sigmoid", cv=3, n_jobs=1, random_state=42):
    """
    Ensure we can call predict_proba(). If the model lacks it, wrap with CalibratedClassifierCV.
    For NuSVC we set probability=True already, but calibration can be more stable on small data.
    """
    if hasattr(model, "predict_proba"):
        return model
    return CalibratedClassifierCV(model, method=method, cv=cv, n_jobs=n_jobs)


# ---------------------------
# Tuners (cross val + grid search) per model type
# ---------------------------

def tune_rfc(Xf, y, n_jobs=2, verbose=3, seed=42):
    """ RandomForestClassifier tuner"""
    base = RandomForestClassifier(random_state=seed, n_estimators=300)
    grid = {
        "n_estimators": [200, 300, 500],
        "max_depth": [None, 8, 12],
        "min_samples_leaf": [1, 2, 5],
        "max_features": ["sqrt", 0.5, None],
    }
    cv = _safe_cv(5, seed)
    sw = compute_sample_weight("balanced", y.astype(int))
    gs = GridSearchCV(
        base, grid, scoring="roc_auc", cv=cv,
        n_jobs=n_jobs, pre_dispatch=n_jobs, refit=True, verbose=verbose, return_train_score=False
    )
    if _supports_sample_weight(base):
        gs.fit(Xf, y.astype(int), sample_weight=sw)
    else:
        gs.fit(Xf, y.astype(int))
    print("RFC BEST AUC:", gs.best_score_, "PARAMS:", gs.best_params_)
    return gs.best_estimator_

def tune_hgb(Xf, y, n_jobs=2, verbose=3, seed=42):
    """ HistGradientBoostingClassifier tuner"""
    est = HistGradientBoostingClassifier( random_state=seed, early_stopping=True,
                                         validation_fraction=0.15, max_iter=1000 )
    grid = {
        "learning_rate":     [0.02, 0.04, 0.06],
        "max_depth":         [6, 10, 15],
        "min_samples_leaf":  [20, 50],
        "l2_regularization": [5e-4, 1e-3, 1e-2],
        "max_bins":          [63, 127]
    }
    cv = _safe_cv(5, seed)
    sw = compute_sample_weight("balanced", y.astype(int))
    gs = GridSearchCV(
        est, grid, scoring="roc_auc", cv=cv,
        n_jobs=n_jobs, pre_dispatch=n_jobs, refit=True, verbose=verbose, return_train_score=False
    )
    # HGB supports sample_weight
    gs.fit(Xf, y.astype(int), sample_weight=sw)
    print("HGB BEST AUC:", gs.best_score_, "PARAMS:", gs.best_params_)
    return gs.best_estimator_

def tune_nusvc(Xf, y, n_jobs=2, verbose=3, seed=42):
    """ NuSVC tuner"""
    est = NuSVC(kernel="rbf", probability=True, random_state=seed)
    grid = {
        "nu": [0.1, 0.2, 0.3, 0.5],
        "kernel": ["rbf", "sigmoid"],
        "gamma": ["scale", "auto"]
    }
    cv = _safe_cv(3, seed)
    sw = compute_sample_weight("balanced", y.astype(int))
    gs = GridSearchCV(
        est, grid, scoring="roc_auc", cv=cv,
        n_jobs=n_jobs, pre_dispatch=n_jobs, refit=True, verbose=verbose, return_train_score=False
    )
    # NuSVC does NOT support sample_weight directly
    gs.fit(Xf, y.astype(int))
    print("NuSVC BEST AUC:", gs.best_score_, "PARAMS:", gs.best_params_)
    return gs.best_estimator_


def tune_xgb(Xf, y, n_jobs=2, verbose=3, seed=42):
    """ XGBClassifier tuner"""
    if not _HAS_XGB:
        raise RuntimeError("XGBoost not installed/available")

    est = XGBClassifier(random_state=seed, eval_metric="logloss",
                        tree_method="hist",  n_estimators=500)
    grid = {
        "max_depth": [5, 10, 15],
        "learning_rate": [0.05, 0.1],
        "subsample": [0.7, 1.0],
        "colsample_bytree": [0.7, 1.0],
        "min_child_weight": [1, 5]
    }
    cv = _safe_cv(5, seed)
    sw = compute_sample_weight("balanced", y.astype(int))
    gs = GridSearchCV(
        est, grid, scoring="roc_auc", cv=cv,
        n_jobs=n_jobs, pre_dispatch=n_jobs, refit=True, verbose=verbose, return_train_score=False
    )
    gs.fit(Xf, y.astype(int), sample_weight=sw)
    print("XGB BEST AUC:", gs.best_score_, "PARAMS:", gs.best_params_)
    return gs.best_estimator_


# ---------------------------
# Model registry & builder
# ---------------------------
def get_model(model_class: str, seed: int = 42):

    model_class = model_class.strip()

    if model_class == "RandomForestClassifier":
        return RandomForestClassifier(n_estimators=300, random_state=seed)

    elif model_class == "HistGradientBoostingClassifier":
        return HistGradientBoostingClassifier(
            random_state=seed, early_stopping=True, validation_fraction=0.10,
            max_iter=2000, learning_rate=0.04,
            max_depth=10, min_samples_leaf=40, l2_regularization=1e-3
        )

    elif model_class == "NuSVC":
        return NuSVC(nu=0.5, kernel="rbf", probability=True, random_state=seed)

    elif model_class == "LogisticRegression":
        # always scale for LR
        return Pipeline([("scaler", StandardScaler(with_mean=True, with_std=True)),
                         ("clf", LogisticRegression(max_iter=2000, class_weight="balanced", random_state=seed))])

    elif model_class == "XGBClassifier":
        if not _HAS_XGB:
            raise RuntimeError("XGBClassifier requested but xgboost is not available.")
        return XGBClassifier(
            random_state=seed, eval_metric="logloss", tree_method="hist",
            n_estimators=600, max_depth=5, learning_rate=0.05, subsample=0.9, colsample_bytree=0.9
        )
    else:
        raise ValueError(f"Unknown model_class={model_class}")


def tune_model(model_class: str,
               Xf: pd.DataFrame,
               y: pd.Series,
               n_jobs: int = 2,
               seed: int = 42):

    if model_class == "RandomForestClassifier":
        return tune_rfc(Xf, y, n_jobs=n_jobs, seed=seed)

    elif model_class == "HistGradientBoostingClassifier":
        return tune_hgb(Xf, y, n_jobs=n_jobs, seed=seed)

    elif model_class == "NuSVC":
        return tune_nusvc(Xf, y, n_jobs=n_jobs, seed=seed)

    elif model_class == "XGBClassifier":
        return tune_xgb(Xf, y, n_jobs=n_jobs, seed=seed)

    elif model_class == "LogisticRegression":
        # typical LR tuning: C and penalty (kept small here)
        pipe = Pipeline([("scaler", StandardScaler()), ("clf", LogisticRegression(max_iter=2000, class_weight="balanced"))])
        grid = {"clf__C": [0.1, 1.0, 3.0], "clf__penalty": ["l2"]}
        cv = _safe_cv(5, seed)
        gs = GridSearchCV(pipe, grid, scoring="roc_auc", cv=cv, n_jobs=n_jobs, pre_dispatch=n_jobs, refit=True, verbose=2)
        gs.fit(Xf, y.astype(int))
        print("LR BEST AUC:", gs.best_score_, "PARAMS:", gs.best_params_)
        return gs.best_estimator_

    else:
        raise ValueError(f"No tuner implemented for model_class={model_class}")



### The `train()` Function

In this function, you build and train your model for making inferences on the test data. Your model must be stored in the `model_directory_path`.

The baseline implementation below doesn't require a pre-trained model, as it uses a statistical test that will be computed at inference time.

In [63]:

def train(
    X_train: pd.DataFrame,
    y_train: pd.Series,
    model_directory_path: str,
    is_debug= True,
    do_cross_val = False,
    model_class: str = "RandomForestClassifier",
    n_jobs: int = 2,
    seed: int = 42,
):
    """
    Train on per-id features, align y to ids, save bundle with feature schema.
    X_train: MultiIndex (id, time), columns ["value","period"]
    y_train: Series/DataFrame indexed by id with {0,1} or {False,True}
    """
    os.makedirs(model_directory_path, exist_ok=True)

    print("TRAINING MODEL CLASS {}".format(model_class))

    # 1) y: squeeze to Series[int], index = ids
    if isinstance(y_train, pd.DataFrame):
        y_train = y_train.squeeze()
    y = y_train.astype(int).copy()

    # 2) X: aggregate to per-id features
    print("Features extraction...")
    if is_debug:
          ids_subset = X_train.index.get_level_values("id").unique()[:501]
          y = y.loc[y.index.isin(ids_subset)]
          X_debug = X_train.loc[X_train.index.get_level_values("id").isin(ids_subset)]
          Xf = extract_features_rich(X_debug).replace([np.inf, -np.inf], 0.0).fillna(0.0)
    else:
          Xf = extract_features_rich(X_train).replace([np.inf, -np.inf], 0.0).fillna(0.0)

    # 3) align y to feature rows (ids)
    try: # ensure both indices are comparable types
        Xf.index = Xf.index.astype(int)
        y.index = y.index.astype(int)
    except Exception:
        pass
    y = y.reindex(Xf.index)
    if y.isna().any(): # Drop any ids without a label (shouldn't happen on official train)
        mask = ~y.isna()
        Xf, y = Xf.loc[mask],  y.loc[mask]


    # 4) fit model
    print("Fitting...")
    if do_cross_val: # gird search + cross validation
        model = tune_model(model_class, Xf, y, n_jobs=n_jobs, seed=seed)

    else: # fit the model with default params
        model = get_model(model_class, seed=seed)
        sw = compute_sample_weight("balanced", y) if _supports_sample_weight(model) else None
        if sw is not None:
            model.fit(Xf, y, sample_weight=sw)
        else:
            model.fit(Xf, y)
    print("Fitting...done")

    # 5) persist bundle with schema for inference
    bundle = {"model": model, "feature_names": list(Xf.columns), "model_class": model_class}
    joblib.dump(bundle, os.path.join(model_directory_path, "model.joblib"))
    print(f"[train] saved -> {os.path.join(model_directory_path, 'model.joblib')}")

### The `infer()` Function

In the inference function, your trained model (if any) is loaded and used to make predictions on test data.

**Important workflow:**
1. Load your model;
2. Use the `yield` statement to signal readiness to the runner;
3. Process each dataset one by one within the for loop;
4. For each dataset, use `yield prediction` to return your prediction.

**Note:** The datasets can only be iterated once!

In [64]:
import os, typing as t

def _to_feature_row(df_one_id: pd.DataFrame,
                    feature_names: t.List[str]) -> pd.DataFrame:
    """
    Turn a single-id time series DataFrame into a 1xD feature row,
    aligned to 'feature_names' (missing features -> 0.0).
    """
    # If df has MultiIndex (id, time), drop the id level
    if isinstance(df_one_id.index, pd.MultiIndex) and "id" in df_one_id.index.names:
        # Expect a single id per dataset; drop it
        df_one_id = df_one_id.droplevel("id")

    # Build features for a single id by reusing the rich extractor over a tiny fake batch
    # (wrap in a MultiIndex with id=0 temporarily)
    tmp = df_one_id.copy()
    tmp.index = pd.MultiIndex.from_product([[0], tmp.index], names=["id", "time"])
    feats = extract_features_rich(tmp)          # returns DataFrame indexed by id
    row = feats.iloc[[0]]                       # 1xD
    if feature_names:                           # align to saved schema
        row = row.reindex(columns=feature_names, fill_value=0.0)
    row = row.replace([np.inf, -np.inf], 0.0).fillna(0.0)
    return row

def infer(
    X_test: t.Iterable[pd.DataFrame],
    model_directory_path: str,
):
    """  Load trained model and yield P(y=1) for each incoming dataset. """
    bundle = joblib.load(os.path.join(model_directory_path, "model.joblib"))

    # Support both: (a) raw estimator, (b) dict/bundle with feature_names.
    if isinstance(bundle, dict) and "model" in bundle:
        model = bundle["model"]
        feature_names = bundle.get("feature_names", [])
    else:
        model = bundle
        # Try to load feature schema if saved separately
        feat_path = os.path.join(model_directory_path, "feature_names.joblib")
        feature_names = joblib.load(feat_path) if os.path.exists(feat_path) else []

    # print("\033[94mYield predictions for model: {}\033[0m".format(bundle["model_class"]))

    # Handshake: ready
    yield

    # Iterate ONCE over datasets
    for dataset in X_test:
        x_row = _to_feature_row(dataset, feature_names)  # Compute 1xD feature row aligned to training schema

        # Predict probability for the positive class (y=1: structural break)
        proba_pos = float(model.predict_proba(x_row)[:, 1][0])  # scikit-learn's predict_proba returns [:, 1] for the positive class

        # Yield a scalar in [0,1]
        yield proba_pos


## Local testing

To make sure your `train()` and `infer()` function are working properly, you can call the `crunch.test()` function that will reproduce the cloud environment locally. <br />
Even if it is not perfect, it should give you a quick idea if your model is working properly.

In [67]:
target = pd.read_parquet("data/y_test.reduced.parquet")["structural_breakpoint"]

# Random Forest
train(X_train, y_train, model_directory_path='resources', do_cross_val=False, model_class="RandomForestClassifier")
crunch.test(force_first_train=False)
ROC_AUC_Forest = sklearn.metrics.roc_auc_score( target,    pd.read_parquet("data/prediction.parquet") ) # Call the scoring function
print("\033[94m ROC_AUC_Forest = {}\033[0m".format(ROC_AUC_Forest))

# NuSVC
train(X_train, y_train, "resources", is_debug= True, do_cross_val=False, model_class="NuSVC")
crunch.test(force_first_train=False)
ROC_AUC_NuSVC = sklearn.metrics.roc_auc_score( target,    pd.read_parquet("data/prediction.parquet") ) # Call the scoring function
print("\033[94m ROC_AUC_NuSVC = {}\033[0m".format(ROC_AUC_NuSVC))

# LightGBM (if available)
train(X_train, y_train, "resources", is_debug= True, do_cross_val=False, model_class="LogisticRegression")
crunch.test(force_first_train=False)
ROC_AUC_LightGBM = sklearn.metrics.roc_auc_score( target,    pd.read_parquet("data/prediction.parquet") ) # Call the scoring funct
print("\033[94m ROC_AUC = {}\033[0m".format(ROC_AUC_LightGBM))

# XGBoost (if available)
train(X_train, y_train, "resources", is_debug= True, do_cross_val=False, model_class="XGBClassifier")
crunch.test(force_first_train=False)
ROC_AUC_XGBoost = sklearn.metrics.roc_auc_score( target,    pd.read_parquet("data/prediction.parquet") ) # Call the scoring funct
print("\033[94m ROC_AUC = {}\033[0m".format(ROC_AUC_XGBoost))

TRAINING MODEL CLASS RandomForestClassifier
FEATURES EXTRACTION...
Progress report...extracting features from id: 500
Fitting...
Fitting...done
[train] saved -> resources/model.joblib
ignoring cell #5: invalid syntax (<unknown>, line 12)


08:49:29 no forbidden library found
08:49:29 
08:49:30 started
08:49:30 running local test
08:49:30 internet access isn't restricted, no check will be done
08:49:30 
08:49:31 starting unstructured loop...
08:49:31 executing - command=infer


data/X_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_train.parquet (204327238 bytes)
data/X_train.parquet: already exists, file length match
data/X_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_test.reduced.parquet (2380918 bytes)
data/X_test.reduced.parquet: already exists, file length match
data/y_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_train.parquet (61003 bytes)
data/y_train.parquet: already exists, file length match
data/y_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_test.reduced.parquet (2655 bytes)
data/y_test.reduced.parquet: already exists, file length match


08:49:43 checking determinism by executing the inference again with 30% of the data (tolerance: 1e-08)
08:49:43 executing - command=infer
08:49:46 determinism check: passed
08:49:46 save prediction - path=data/prediction.parquet
08:49:46 ended
08:49:46 duration - time=00:00:16
08:49:46 memory - before="1.56 GB" after="1.55 GB" consumed="-3145728 bytes"


[94m ROC_AUC_Forest = 0.5401408450704226[0m
TRAINING MODEL CLASS NuSVC
FEATURES EXTRACTION...
Progress report...extracting features from id: 500
Fitting...
Fitting...done
[train] saved -> resources/model.joblib
ignoring cell #5: invalid syntax (<unknown>, line 12)


08:50:01 no forbidden library found
08:50:01 
08:50:01 started
08:50:01 running local test
08:50:01 internet access isn't restricted, no check will be done
08:50:01 
08:50:02 starting unstructured loop...
08:50:02 executing - command=infer


data/X_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_train.parquet (204327238 bytes)
data/X_train.parquet: already exists, file length match
data/X_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_test.reduced.parquet (2380918 bytes)
data/X_test.reduced.parquet: already exists, file length match
data/y_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_train.parquet (61003 bytes)
data/y_train.parquet: already exists, file length match
data/y_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_test.reduced.parquet (2655 bytes)
data/y_test.reduced.parquet: already exists, file length match


08:50:12 checking determinism by executing the inference again with 30% of the data (tolerance: 1e-08)
08:50:12 executing - command=infer
08:50:16 determinism check: passed
08:50:16 save prediction - path=data/prediction.parquet
08:50:16 ended
08:50:16 duration - time=00:00:14
08:50:16 memory - before="1.55 GB" after="1.54 GB" consumed="-17809408 bytes"


[94m ROC_AUC_NuSVC = 0.49436619718309865[0m
TRAINING MODEL CLASS LogisticRegression
FEATURES EXTRACTION...
Progress report...extracting features from id: 500
Fitting...
Fitting...done
[train] saved -> resources/model.joblib
ignoring cell #5: invalid syntax (<unknown>, line 12)


08:50:35 no forbidden library found
08:50:35 
08:50:35 started
08:50:35 running local test
08:50:35 internet access isn't restricted, no check will be done
08:50:35 
08:50:36 starting unstructured loop...
08:50:36 executing - command=infer


data/X_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_train.parquet (204327238 bytes)
data/X_train.parquet: already exists, file length match
data/X_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_test.reduced.parquet (2380918 bytes)
data/X_test.reduced.parquet: already exists, file length match
data/y_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_train.parquet (61003 bytes)
data/y_train.parquet: already exists, file length match
data/y_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_test.reduced.parquet (2655 bytes)
data/y_test.reduced.parquet: already exists, file length match


08:50:47 checking determinism by executing the inference again with 30% of the data (tolerance: 1e-08)
08:50:47 executing - command=infer
08:50:49 determinism check: passed
08:50:50 save prediction - path=data/prediction.parquet
08:50:50 ended
08:50:50 duration - time=00:00:14
08:50:50 memory - before="1.55 GB" after="1.55 GB" consumed="-1392640 bytes"


[94m ROC_AUC = 0.3704225352112676[0m
TRAINING MODEL CLASS XGBClassifier
FEATURES EXTRACTION...
Progress report...extracting features from id: 500
Fitting...
Fitting...done
[train] saved -> resources/model.joblib
ignoring cell #5: invalid syntax (<unknown>, line 12)


08:54:05 no forbidden library found
08:54:05 
08:54:05 started
08:54:05 running local test
08:54:05 internet access isn't restricted, no check will be done
08:54:05 
08:54:06 starting unstructured loop...
08:54:06 executing - command=infer


data/X_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_train.parquet (204327238 bytes)
data/X_train.parquet: already exists, file length match
data/X_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_test.reduced.parquet (2380918 bytes)
data/X_test.reduced.parquet: already exists, file length match
data/y_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_train.parquet (61003 bytes)
data/y_train.parquet: already exists, file length match
data/y_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_test.reduced.parquet (2655 bytes)
data/y_test.reduced.parquet: already exists, file length match


08:54:47 checking determinism by executing the inference again with 30% of the data (tolerance: 1e-08)
08:54:47 executing - command=infer
08:55:00 determinism check: passed
08:55:00 save prediction - path=data/prediction.parquet
08:55:00 ended
08:55:00 duration - time=00:00:54
08:55:00 memory - before="1.58 GB" after="1.58 GB" consumed="0 bytes"


[94m ROC_AUC = 0.4863849765258216[0m


In [48]:
crunch.test(
    # Uncomment to disable the train
    # force_first_train=False,

    # Uncomment to disable the determinism check
    # no_determinism_check=True,
)

ignoring cell #5: invalid syntax (<unknown>, line 12)


08:27:46 no forbidden library found
08:27:46 
08:27:47 started
08:27:47 running local test
08:27:47 internet access isn't restricted, no check will be done
08:27:47 
08:27:48 starting unstructured loop...
08:27:48 executing - command=train


data/X_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_train.parquet (204327238 bytes)
data/X_train.parquet: already exists, file length match
data/X_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_test.reduced.parquet (2380918 bytes)
data/X_test.reduced.parquet: already exists, file length match
data/y_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_train.parquet (61003 bytes)
data/y_train.parquet: already exists, file length match
data/y_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_test.reduced.parquet (2655 bytes)
data/y_test.reduced.parquet: already exists, file length match
Progress report...extracting features per id
Progress report...extracting features from id: 500
Progress report...fit

08:28:13 executing - command=infer


Progress report...fitting the model....done
[train] saved -> resources/model.joblib
RandomForestClassifier


08:28:26 checking determinism by executing the inference again with 30% of the data (tolerance: 1e-08)
08:28:26 executing - command=infer


RandomForestClassifier


08:28:30 determinism check: passed
08:28:30 save prediction - path=data/prediction.parquet
08:28:30 ended
08:28:30 duration - time=00:00:43
08:28:30 memory - before="1.43 GB" after="1.47 GB" consumed="38.65 MB"


In [52]:
# Load the predictions and the targets
prediction = pd.read_parquet("data/prediction.parquet")
target = pd.read_parquet("data/y_test.reduced.parquet")["structural_breakpoint"]
sklearn.metrics.roc_auc_score( target,    prediction) # Call the scoring function

np.float64(0.5401408450704226)

 - Best ROC_AUC score so far -> 0.7427230  (HistGradientBoostingClassifier)
 - ....(addign CWT scalogram features (Complex Shannon via spkit))
 - ....(