[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/crunchdao/quickstarters/blob/master/competitions/structural-break/quickstarters/baseline/baseline.ipynb)

![Banner](https://raw.githubusercontent.com/crunchdao/quickstarters/refs/heads/master/competitions/structural-break/assets/banner.webp)

# ADIA Lab Structural Break Challenge

## Challenge Overview

Welcome to the ADIA Lab Structural Break Challenge! In this challenge, you will analyze univariate time series data to determine whether a structural break has occurred at a specified boundary point.

### What is a Structural Break?

A structural break occurs when the process governing the data generation changes at a certain point in time. These changes can be subtle or dramatic, and detecting them accurately is crucial across various domains such as climatology, industrial monitoring, finance, and healthcare.

![Structural Break Example](https://raw.githubusercontent.com/crunchdao/competitions/refs/heads/master/competitions/structural-break/quickstarters/baseline/images/example.png)

### Your Task

For each time series in the test set, you need to predict a score between `0` and `1`:
- Values closer to `0` indicate no structural break at the specified boundary point;
- Values closer to `1` indicate a structural break did occur.

### Evaluation Metric

The evaluation metric is [ROC AUC (Area Under the Receiver Operating Characteristic Curve)](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.roc_auc_score.html), which measures the performance of detection algorithms regardless of their specific calibration.

- ROC AUC around `0.5`: No better than random chance;
- ROC AUC approaching `1.0`: Perfect detection.

# Setup

The first steps to get started are:
1. Get the setup command
2. Execute it in the cell below

### >> https://hub.crunchdao.com/competitions/structural-break/submit/notebook

![Reveal token](https://raw.githubusercontent.com/crunchdao/competitions/refs/heads/master/documentation/animations/reveal-token.gif)

In [1]:
%pip install crunch-cli --upgrade --quiet --progress-bar off
!crunch setup-notebook structural-break XYSTlFGgHTHExFaBYwUyEgKo

crunch-cli, version 6.5.0
delete /content/.crunchdao
main.py: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/submissions/21798/main.py (12366 bytes)
notebook.ipynb: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/submissions/21798/notebook.ipynb (55168 bytes)
requirements.txt: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/submissions/21798/requirements.original.txt (222 bytes)
data/X_train.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/X_train.parquet (204327238 bytes)
data/X_test.reduced.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/X_test.reduced.parquet (2380918 bytes)
data/y_train.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/y_train.parquet (61003 bytes)
data/y_test.reduced.parquet: download f

In [2]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import ttest_ind
from scipy.stats import t
import time
from multiprocessing import Pool, cpu_count
from scipy.stats import shapiro
from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf,plot_pacf
import statsmodels.api as sm
from statsmodels.tsa.arima.model import ARIMA
from scipy.stats import skew, kurtosis, iqr, ttest_ind, ks_2samp
# from pykalman import KalmanFilter
import scipy

import joblib
import typing

from xgboost import XGBClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.linear_model import LinearRegression
from statsmodels.tsa.stattools import adfuller
# ---------------- Custom Transformer ----------------
from multiprocessing import Pool, cpu_count
import warnings

# Your model

## Setup

In [3]:
import os
import typing

# Import your dependencies
import joblib
import pandas as pd
import scipy
import sklearn.metrics

In [4]:
import crunch

# Load the Crunch Toolings
crunch = crunch.load_notebook()

loaded inline runner with module: <module '__main__'>

cli version: 6.5.0
available ram: 12.67 gb
available cpu: 2 core
----


## Understanding the Data

The dataset consists of univariate time series, each containing ~2,000-5,000 values with a designated boundary point. For each time series, you need to determine whether a structural break occurred at this boundary point.

The data was downloaded when you setup your local environment and is now available in the `data/` directory.

In [5]:
# Load the data simply
X_train, y_train, X_test = crunch.load_data()

data/X_train.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/X_train.parquet (204327238 bytes)
data/X_train.parquet: already exists, file length match
data/X_test.reduced.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/X_test.reduced.parquet (2380918 bytes)
data/X_test.reduced.parquet: already exists, file length match
data/y_train.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/y_train.parquet (61003 bytes)
data/y_train.parquet: already exists, file length match
data/y_test.reduced.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/y_test.reduced.parquet (2655 bytes)
data/y_test.reduced.parquet: already exists, file length match


### Understanding `X_train`

The training data is structured as a pandas DataFrame with a MultiIndex:

**Index Levels:**
- `id`: Identifies the unique time series
- `time`: The timestep within each time series

**Columns:**
- `value`: The actual time series value at each timestep
- `period`: A binary indicator where `0` represents the **period before** the boundary point, and `1` represents the **period after** the boundary point

### Understanding `y_train`

This is a simple `pandas.Series` that tells if a dataset id has a structural breakpoint or not.

**Index:**
- `id`: the ID of the dataset

**Value:**
- `structural_breakpoint`: Boolean indicating whether a structural break occurred (`True`) or not (`False`)

### Understanding `X_test`

The test data is provided as a **`list` of `pandas.DataFrame`s** with the same format as [`X_train`](#understanding-X_test).

It is structured as a list to encourage processing records one by one, which will be mandatory in the `infer()` function.

## Strategy Implementation

There are multiple approaches you can take to detect structural breaks:

1. **Statistical Tests**: Compare distributions before and after the boundary point;
2. **Feature Engineering**: Extract features from both segments for comparison;
3. **Time Series Modeling**: Detect deviations from expected patterns;
4. **Machine Learning**: Train models to recognize break patterns from labeled examples.

The baseline implementation below uses a simple statistical approach: a t-test to compare the distributions before and after the boundary point.

### The `train()` Function

In this function, you build and train your model for making inferences on the test data. Your model must be stored in the `model_directory_path`.

The baseline implementation below doesn't require a pre-trained model, as it uses a statistical test that will be computed at inference time.

In [6]:
indices_x = X_train.index.get_level_values(0).unique().tolist()
indices_x.sort()

In [7]:
def kalman_stats(series):
    kf = KalmanFilter(initial_state_mean=0, n_dim_obs=1)
    # fewer EM iterations
    kf = kf.em(series, n_iter=1)
    state_means, _ = kf.smooth(series)
    state_means = state_means.flatten()

    smoothed_mean = np.mean(state_means)
    smoothed_std = np.std(state_means)
    residual = series - state_means
    residual_std = np.std(residual)
    trend_strength = smoothed_std / (np.std(series) + 1e-6)

    return {
        'kf_mean': smoothed_mean,
        'kf_std': smoothed_std,
        'kf_residual_std': residual_std,
        'kf_trend_strength': trend_strength
    }

def ou_process_params(series):
    X = series[:-1].reshape(-1, 1)
    Y = series[1:].reshape(-1,1)

    meanX = np.mean(X)
    meanY = np.mean(Y)
    varX = np.var(X)
    varY = np.var(Y)
    CovXY = np.mean((X-meanX)*(Y-meanY))
    R2 = CovXY**2/(varX*varY)

    beta = CovXY/varX
    cs = (meanY - beta*meanX)


    kappa = -np.log(abs(beta))
    mu = (cs/(1-beta+0.000001))

    residuals = Y - beta*X - cs
    sigma_hat = np.std(residuals, ddof=1)
    sigma_squaroot = (sigma_hat) / np.sqrt(1 - beta ** 2 + 0.000001)

    return kappa, mu, sigma_squaroot


def find_d(series, max_d=2):
    series = pd.Series(series)

    for d in range(max_d + 1):
        diffed = series.diff(d).dropna()

        # Skip if constant or too short
        if diffed.nunique() <= 1 or len(diffed) < 10:
            continue

        try:
            p_value = adfuller(diffed)[1]
            if p_value < 0.05:
                return d
        except ValueError:
            continue  # Skip constant series or bad input

    return max_d  # fallback

def find_best_arima_coeffs(series, max_p=1, max_q=1, d=None):

    warnings.filterwarnings("ignore")

    if d is None:
        d = find_d(series)

    best_aic = np.inf
    best_order = None
    best_model = None

    for p in range(max_p + 1):
        for q in range(max_q + 1):
            try:
                model = ARIMA(series, order=(p, d, q)).fit()
                if model.aic < best_aic:
                    best_aic = model.aic
                    best_order = (p, d, q)
                    best_model = model
            except:
                continue

    if best_order is None:
        return None, [0],0, [0], None

    p, d, q = best_order

    # Extract AR (phi) and MA (theta) coefficients
    params = best_model.arparams if p > 0 else []
    p_vec = list(params[:1]) + [0.0] * (1 - len(params))

    params = best_model.maparams if q > 0 else []
    q_vec = list(params[:1]) + [0.0] * (1 - len(params))

    return best_order, p_vec, d, q_vec, best_model



def fast_ema(x, span):
    alpha = 2/(span+1)
    ema = np.empty_like(x)
    ema[0] = x[0]
    for i in range(1, len(x)):
        ema[i] = alpha * x[i] + (1-alpha) * ema[i-1]
    return ema[-1]

def extract_features(a, b):
    features = {}

    # Basic statistics
    features['mean_a'] = np.mean(a)
    features['mean_b'] = np.mean(b)
    features['std_a'] = np.std(a)
    features['std_b'] = np.std(b)
    features['skew_a'] = skew(a)
    features['skew_b'] = skew(b)
    features['kurt_a'] = kurtosis(a)
    features['kurt_b'] = kurtosis(b)
    features['iqr_a'] = iqr(a)
    features['iqr_b'] = iqr(b)
    features['std_ratio'] = features['std_b'] / (features['std_a'] + 1e-10)  # avoid div0

    # Fit t-distribution (consider sampling if large arrays)
    params_a = scipy.stats.t.fit(a)
    params_b = scipy.stats.t.fit(b)
    features['t_val_a'] = params_a[0]
    features['t_mu_a'] = params_a[1]
    features['t_sigma_a'] = params_a[2]
    features['t_val_b'] = params_b[0]
    features['t_mu_b'] = params_b[1]
    features['t_sigma_b'] = params_b[2]

    # Differences
    features['mean_diff'] = features['mean_b'] - features['mean_a']
    features['std_diff'] = features['std_b'] - features['std_a']

    # Statistical tests
    features['ttest_p'] = ttest_ind(a, b, equal_var=False).pvalue
    features['ks_p'] = ks_2samp(a, b).pvalue

    result0 = adfuller(a)
    result1 = adfuller(b)

    if result0[1] < 0.05:
        features['mean_reverting_0'] = 1
        features['kappa_0'],features['mu_0'],features['var_0'] = ou_process_params(a)


    else:
        features['mean_reverting_0'] = 0
        features['kappa_0'],features['mu_0'],features['var_0'] = 0,0,0

    # best_order, p_vec, d, q_vec, best_model = find_best_arima_coeffs(a)
    # for i in range(1):
    #     features[f'arima0_p{i}'] = p_vec[i]
    #     features[f'arima0_q{i}'] = q_vec[i]
    #     features['arima0_d'] = d

    # best_order, p_vec, d, q_vec, best_model = find_best_arima_coeffs(b)
    # for i in range(1):
    #     features[f'arima1_p{i}'] = p_vec[i]
    #     features[f'arima_p{i}_diff'] = features[f'arima0_p{i}'] - p_vec[i]
    #     features[f'arima1_q{i}'] = q_vec[i]
    #     features[f'arima_q{i}_diff'] = features[f'arima0_q{i}'] - p_vec[i]
    #     features['arima1_d'] = d
    #     features['arima1_d_diff'] = features['arima0_d'] - d

    if result1[1] < 0.05:
        features['mean_reverting_1'] = 1
        features['kappa_1'],features['mu_1'],features['var_1'] = ou_process_params(b)
    else:
        features['mean_reverting_1'] = 0
        features['kappa_1'],features['mu_1'],features['var_1'] = 0,0,0

    # Kalman features
    # try:

    #   kf_a = kalman_stats(np.array(a))
    #   kf_b = kalman_stats(np.array(b))
    #   for k, v in kf_a.items():
    #       features[f'{k}_a'] = v
    #   for k, v in kf_b.items():
    #       features[f'{k}_b'] = v
    #   features['kf_trend_strength_diff'] = features['kf_trend_strength_b'] - features['kf_trend_strength_a']

    # except:
    #   pass

    # EMA features (use fast_ema)
    ema_windows = [5, 10, 15, 20, 25]
    for w in ema_windows:
        ema_a = fast_ema(a, w)
        ema_b = fast_ema(b, w)
        features[f'ema{w}_a'] = ema_a
        features[f'ema{w}_b'] = ema_b
        features[f'ema{w}_diff'] = ema_b - ema_a

    # Cross-correlation
    min_len = min(len(a), len(b))
    a = a[:min_len]
    b = b[:min_len]
    max_lag = min(10, min_len - 1)
    xcorr = []
    for lag in range(1, max_lag + 1):
        corr = np.corrcoef(a[:-lag], b[lag:])[0, 1]
        xcorr.append(corr)

    features['max_xcorr'] = np.nanmax(xcorr)
    features['mean_xcorr'] = np.nanmean(xcorr)

    return features


In [8]:
index_x = X_train.index.get_level_values(0).unique().tolist()
len(index_x)

10001

In [9]:
def process_chunk(temp):
  t0_series = temp[temp.period == 0].value.values
  t1_series = temp[temp.period == 1].value.values
  features_v = extract_features(t0_series, t1_series)
  return features_v

def process_wrapper(i_X):
    i, X = i_X
    if i%100 == 0:
        print(f"presently running {i} data item to extract")
    return i, process_chunk(X.loc[i])



def train(
    X_train: pd.DataFrame,
    y_train: pd.Series,
    model_directory_path: str,
):
    os.makedirs(model_directory_path, exist_ok=True)

    # try:
    #   !pip install pykalman
    # except:
    #   pass



    # For our baseline t-test approach, we don't need to train a model
    # This is essentially an unsupervised approach calculated at inference time
    #data_file = os.path.join(model_directory_path,"data.csv")

    index_x = X_train.index.get_level_values(0).unique().tolist()

    print(f"Spawning {cpu_count()} parallel workers...")

    # Prepare iterable of (i, X) tuples
    args = [(i, X_train) for i in index_x]

    with Pool(processes=cpu_count()) as pool:
        results = list(pool.imap_unordered(process_wrapper, args, chunksize=10))

    # Unpack results
    index_x, features_list = zip(*results)

    data = pd.DataFrame.from_records(features_list, index=index_x)

    data.index.name = "time"

    print(f"shape of data is {data.shape} and len is {len(index_x)}")
      # data.to_csv(data_file)

    base_model = XGBClassifier(
    eval_metric='auc',
    random_state=42,

)

    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('clf', base_model)
    ])

    param_grid = {
        'clf__n_estimators': [100, 200, 300],
        'clf__max_depth': [5, 10],
        'clf__learning_rate': [0.01, 0.1,0.2],
        'clf__subsample': [0.8, 1.0],
        'clf__colsample_bytree': [0.8, 1.0]
    }

    grid = GridSearchCV(
        estimator=pipeline,
        param_grid=param_grid,
        scoring='roc_auc',
        cv=3,
        n_jobs=-1,
        verbose=1
    )

    grid.fit(data, y_train)

    best_model = grid.best_estimator_
    os.makedirs(model_directory_path, exist_ok=True)
    #best_model.save_model('model.json')
    joblib.dump(best_model, os.path.join(model_directory_path, 'model.joblib'))

### The `infer()` Function

In the inference function, your trained model (if any) is loaded and used to make predictions on test data.

**Important workflow:**
1. Load your model;
2. Use the `yield` statement to signal readiness to the runner;
3. Process each dataset one by one within the for loop;
4. For each dataset, use `yield prediction` to return your prediction.

**Note:** The datasets can only be iterated once!

In [10]:
# def infer(
#     X_test: typing.Iterable[pd.DataFrame],
#     model_directory_path: str,
# ):
#     model = joblib.load(os.path.join(model_directory_path, 'model.joblib'))

#     yield  # Mark as ready

#     # X_test can only be iterated once.
#     # Before getting the next dataset, you must predict the current one.
#     for dataset in X_test:
#         # Baseline approach: Compute t-test between values before and after boundary point
#         # The negative p-value is used as our score - smaller p-values (larger negative numbers)
#         # indicate more evidence against the null hypothesis that distributions are the same,
#         # suggesting a structural break
#         def t_test(u: pd.DataFrame):
#             return -scipy.stats.ttest_ind(
#                 u["value"][u["period"] == 0],  # Values before boundary point
#                 u["value"][u["period"] == 1],  # Values after boundary point
#             ).pvalue

#         prediction = t_test(dataset)
#         yield prediction  # Send the prediction for the current dataset

#         # Note: This baseline approach uses a t-test to compare the distributions
#         # before and after the boundary point. A smaller p-value (larger negative number)
#         # suggests stronger evidence that the distributions are different,
#         # indicating a potential structural break.


def infer(
    X_test: typing.Iterable[pd.DataFrame],
    model_directory_path: str,
):
    # try:
    #   !pip install pykalman
    # except:
    #   pass
    # Load the trained model
    model = joblib.load(os.path.join(model_directory_path, 'model.joblib'))

    yield  # Mark as ready for inference

    for dataset in X_test:
        # Feature extraction using the same logic as in training
        features = process_chunk(dataset)
        features_df = pd.DataFrame([features])

        # Predict probability of the positive class (1)
        prediction = model.predict_proba(features_df)[0, 1]

        yield prediction


## Local testing

To make sure your `train()` and `infer()` function are working properly, you can call the `crunch.test()` function that will reproduce the cloud environment locally. <br />
Even if it is not perfect, it should give you a quick idea if your model is working properly.

In [None]:
crunch.test(
)

18:39:04 no forbidden library found
18:39:04 
18:39:04 started
18:39:04 running local test
18:39:04 internet access isn't restricted, no check will be done
18:39:04 
18:39:06 starting unstructured loop...
18:39:06 executing - command=train


data/X_train.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/X_train.parquet (204327238 bytes)
data/X_train.parquet: already exists, file length match
data/X_test.reduced.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/X_test.reduced.parquet (2380918 bytes)
data/X_test.reduced.parquet: already exists, file length match
data/y_train.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/y_train.parquet (61003 bytes)
data/y_train.parquet: already exists, file length match
data/y_test.reduced.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/y_test.reduced.parquet (2655 bytes)
data/y_test.reduced.parquet: already exists, file length match
Spawning 2 parallel workers...


## Results

Once the local tester is done, you can preview the result stored in `data/prediction.parquet`.

In [12]:
prediction = pd.read_parquet("data/prediction.parquet")
prediction

Unnamed: 0_level_0,prediction
id,Unnamed: 1_level_1
10001,0.250684
10002,0.166222
10003,0.287946
10004,0.263036
10005,0.246365
...,...
10097,0.387867
10098,0.273245
10099,0.507799
10100,0.204416


### Local scoring

You can call the function that the system uses to estimate your score locally.

In [13]:
# Load the targets
target = pd.read_parquet("data/y_test.reduced.parquet")["structural_breakpoint"]

# Call the scoring function
sklearn.metrics.roc_auc_score(
    target,
    prediction,
)

np.float64(0.6446009389671362)

# Submit your Notebook

To submit your work, you must:
1. Download your Notebook from Colab
2. Upload it to the platform
3. Create a run to validate it

### >> https://hub.crunchdao.com/competitions/structural-break/submit/notebook

![Download and Submit Notebook](https://raw.githubusercontent.com/crunchdao/competitions/refs/heads/master/documentation/animations/download-and-submit-notebook.gif)