[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/crunchdao/quickstarters/blob/master/competitions/structural-break/quickstarters/baseline/baseline.ipynb)

![Banner](https://raw.githubusercontent.com/crunchdao/quickstarters/refs/heads/master/competitions/structural-break/assets/banner.webp)

# ADIA Lab Structural Break Challenge

## Challenge Overview

Welcome to the ADIA Lab Structural Break Challenge! In this challenge, you will analyze univariate time series data to determine whether a structural break has occurred at a specified boundary point.

### What is a Structural Break?

A structural break occurs when the process governing the data generation changes at a certain point in time. These changes can be subtle or dramatic, and detecting them accurately is crucial across various domains such as climatology, industrial monitoring, finance, and healthcare.

![Structural Break Example](https://raw.githubusercontent.com/crunchdao/competitions/refs/heads/master/competitions/structural-break/quickstarters/baseline/images/example.png)

### Your Task

For each time series in the test set, you need to predict a score between `0` and `1`:
- Values closer to `0` indicate no structural break at the specified boundary point;
- Values closer to `1` indicate a structural break did occur.

### Evaluation Metric

The evaluation metric is [ROC AUC (Area Under the Receiver Operating Characteristic Curve)](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.roc_auc_score.html), which measures the performance of detection algorithms regardless of their specific calibration.

- ROC AUC around `0.5`: No better than random chance;
- ROC AUC approaching `1.0`: Perfect detection.

# Setup

The first steps to get started are:
1. Get the setup command
2. Execute it in the cell below

### >> https://hub.crunchdao.com/competitions/structural-break/submit/notebook

![Reveal token](https://raw.githubusercontent.com/crunchdao/competitions/refs/heads/master/documentation/animations/reveal-token.gif)

In [1]:
# # Install the Crunch CLI
# %pip install --upgrade crunch-cli

# # Setup your local environment
# !crunch setup --notebook structural-break hello --token aaaabbbbccccddddeeeeffff
%pip install crunch-cli --upgrade --quiet --progress-bar off
!crunch setup-notebook structural-break 49Uj7NG28SSPE9LovUPCKGGh

crunch-cli, version 6.5.0
main.py: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/submissions/21403/main.py (49073 bytes)
notebook.ipynb: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/submissions/21403/notebook.ipynb (102634 bytes)
requirements.txt: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/submissions/21403/requirements.original.txt (206 bytes)
data/X_train.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/X_train.parquet (204327238 bytes)
data/X_test.reduced.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/X_test.reduced.parquet (2380918 bytes)
data/y_train.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/y_train.parquet (61003 bytes)
data/y_test.reduced.parquet: download from https:crunchdao--compe

In [4]:
if True:
    !pip install xgboost
    !pip install lightgbm
    !pip install torch
    !pip install rich
    !pip install nolds

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

# Your model

## Setup

In [2]:
import os
import typing

# Import your dependencies
import joblib
import pandas as pd
import scipy
import sklearn.metrics

In [5]:
import numpy as np
import pandas as pd
from scipy import stats
from typing import List, Tuple, Dict, Union, Optional
from pydantic import BaseModel, Field, validator
import matplotlib.pyplot as plt
from scipy.stats import entropy
from scipy.special import kl_div
# Import your dependencies
import os
import typing


import joblib
import pandas as pd
import scipy
import sklearn.metrics

import numpy as np
from typing import Tuple, Literal, List, Dict, Any, Optional, Union
from pydantic import BaseModel, Field, field_validator
from scipy import stats
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, roc_auc_score
from sklearn.base import BaseEstimator
from sklearn.preprocessing import StandardScaler

import xgboost as xgb
import lightgbm as lgb
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from torch.nn.functional import binary_cross_entropy_with_logits
import statsmodels.api as sm
import nolds
from rich import print
from scipy import signal

from tqdm.auto import tqdm
# import crunch

In [6]:
# Libraries
class TestResult(BaseModel):
    """Base class for test results"""
    test_name: str
    statistic: float
    p_value: float
    reject_null: bool
    alpha: float = 0.05

    def __str__(self) -> str:
        return (f"{self.test_name} Test Result:\n"
                f"  Statistic: {self.statistic:.6f}\n"
                f"  p-value: {self.p_value:.6f}\n"
                f"  Reject Null Hypothesis: {self.reject_null} (α = {self.alpha})")

class KSTestResult(TestResult):
    """Kolmogorov-Smirnov test results"""
    test_name: str = "Kolmogorov-Smirnov"

class ADTestResult(TestResult):
    """Anderson-Darling test results"""
    test_name: str = "Anderson-Darling"
    critical_values: List[float] = Field(default_factory=list)
    significance_levels: List[float] = Field(default_factory=list)

    def __str__(self) -> str:
        base_str = super().__str__()
        crit_vals = "\n  ".join([f"{sl:.3f}: {cv:.3f}" for sl, cv in
                                zip(self.significance_levels, self.critical_values)])
        return (f"{base_str}\n"
                f"  Critical Values:\n  {crit_vals}")

class MannWhitneyTestResult(TestResult):
    """Mann-Whitney U test results"""
    test_name: str = "Mann-Whitney U"

class KLDivergenceResult(BaseModel):
    """Kullback-Leibler divergence results"""
    test_name: str = "Kullback-Leibler Divergence"
    kl_xy: float  # KL(x||y)
    kl_yx: float  # KL(y||x)
    symmetric_kl: float  # Symmetric KL

    def __str__(self) -> str:
        return (f"{self.test_name} Result:\n"
                f"  KL(x||y): {self.kl_xy:.6f}\n"
                f"  KL(y||x): {self.kl_yx:.6f}\n"
                f"  Symmetric KL: {self.symmetric_kl:.6f}\n"
                f"  Note: Higher values indicate more dissimilar distributions")

class PermutationTestResult(TestResult):
    """Permutation test results"""
    test_name: str = "Permutation"
    n_permutations: int
    test_statistic_name: str

    def __str__(self) -> str:
        base_str = super().__str__()
        return (f"{base_str}\n"
                f"  Test Statistic: {self.test_statistic_name}\n"
                f"  Number of Permutations: {self.n_permutations}")

class TimeSeriesComparison:
    """
    A class to compare two time series using various statistical tests to determine
    if they are drawn from the same or different distributions.
    """

    def __init__(self, default_alpha: float = 0.05):
        """
        Initialize the TimeSeriesComparison class.

        Parameters:
        -----------
        default_alpha : float, optional
            Default significance level for hypothesis tests (default is 0.05)
        """
        self.default_alpha = default_alpha

    def _validate_inputs(self, x: Union[List, np.ndarray], y: Union[List, np.ndarray]) -> Tuple[np.ndarray, np.ndarray]:
        """
        Validate input time series data.

        Parameters:
        -----------
        x, y : array-like
            Time series data

        Returns:
        --------
        Tuple[np.ndarray, np.ndarray]
            Validated numpy arrays
        """
        x_array = np.asarray(x, dtype=float)
        y_array = np.asarray(y, dtype=float)

        # Check for NaN values
        if np.isnan(x_array).any() or np.isnan(y_array).any():
            raise ValueError("Input data contains NaN values")

        # Ensure we have enough data points
        if len(x_array) < 2 or len(y_array) < 2:
            raise ValueError("Both time series must have at least 2 data points")

        return x_array, y_array

    def kolmogorov_smirnov_test(self, x: Union[List, np.ndarray], y: Union[List, np.ndarray],
                               alpha: Optional[float] = None) -> KSTestResult:
        """
        Perform Kolmogorov-Smirnov test on two time series.

        Null Hypothesis (H0): The two samples come from the same continuous distribution.
        Alternative Hypothesis (H1): The two samples come from different distributions.

        Parameters:
        -----------
        x, y : array-like
            Time series data
        alpha : float, optional
            Significance level (default is self.default_alpha)

        Returns:
        --------
        KSTestResult
            Results of the KS test
        """
        alpha = alpha if alpha is not None else self.default_alpha
        x_array, y_array = self._validate_inputs(x, y)

        # Perform KS test
        statistic, p_value = stats.ks_2samp(x_array, y_array)

        # Determine if we reject the null hypothesis
        reject_null = p_value < alpha

        return KSTestResult(
            statistic=statistic,
            p_value=p_value,
            reject_null=reject_null,
            alpha=alpha
        )

    def anderson_darling_test(self, x: Union[List, np.ndarray], y: Union[List, np.ndarray],
                             alpha: Optional[float] = None) -> ADTestResult:
        """
        Perform Anderson-Darling test on two time series.

        Null Hypothesis (H0): The two samples come from the same continuous distribution.
        Alternative Hypothesis (H1): The two samples come from different distributions.

        Note: This implements the k-sample Anderson-Darling test, which tests whether samples
        come from the same distribution without specifying what that distribution is.

        Parameters:
        -----------
        x, y : array-like
            Time series data
        alpha : float, optional
            Significance level (default is self.default_alpha)

        Returns:
        --------
        ADTestResult
            Results of the Anderson-Darling test
        """
        alpha = alpha if alpha is not None else self.default_alpha
        x_array, y_array = self._validate_inputs(x, y)

        # Combine samples and create sample indices
        samples = [x_array, y_array]

        # Perform Anderson-Darling test
        result = stats.anderson_ksamp(samples)

        # Extract results
        statistic = result.statistic
        p_value = result.significance_level
        significance_levels = [0.25, 0.1, 0.05, 0.025, 0.01]
        critical_values = result.critical_values

        # Determine if we reject the null hypothesis
        reject_null = p_value < alpha

        return ADTestResult(
            statistic=statistic,
            p_value=p_value,
            reject_null=reject_null,
            alpha=alpha,
            critical_values=list(critical_values),
            significance_levels=significance_levels
        )

    def mann_whitney_test(self, x: Union[List, np.ndarray], y: Union[List, np.ndarray],
                        alpha: Optional[float] = None, alternative: str = 'two-sided') -> MannWhitneyTestResult:
        """
        Perform Mann-Whitney U test on two time series.

        Null Hypothesis (H0): The distributions of both samples are equal.
        Alternative Hypothesis (H1): The distributions of the two samples are not equal
                                    (or one has a larger median than the other, depending
                                    on the 'alternative' parameter).

        Parameters:
        -----------
        x, y : array-like
            Time series data
        alpha : float, optional
            Significance level (default is self.default_alpha)
        alternative : {'two-sided', 'less', 'greater'}, optional
            Defines the alternative hypothesis:
            - 'two-sided': distributions are different
            - 'less': the distribution of x is stochastically less than the distribution of y
            - 'greater': the distribution of x is stochastically greater than the distribution of y

        Returns:
        --------
        MannWhitneyTestResult
            Results of the Mann-Whitney U test
        """
        alpha = alpha if alpha is not None else self.default_alpha
        x_array, y_array = self._validate_inputs(x, y)

        # Perform Mann-Whitney U test
        statistic, p_value = stats.mannwhitneyu(x_array, y_array, alternative=alternative)

        # Determine if we reject the null hypothesis
        reject_null = p_value < alpha

        return MannWhitneyTestResult(
            statistic=statistic,
            p_value=p_value,
            reject_null=reject_null,
            alpha=alpha
        )

    def kl_divergence(self, x: Union[List, np.ndarray], y: Union[List, np.ndarray],
                     bins: int = 20, smoothing: float = 1e-10) -> KLDivergenceResult:
        """
        Calculate Kullback-Leibler divergence between two time series.

        Note: KL divergence is not a statistical test but a measure of how one probability
        distribution diverges from another. It is not symmetric, meaning KL(x||y) ≠ KL(y||x).
        Higher values indicate distributions that are more different from each other.

        Parameters:
        -----------
        x, y : array-like
            Time series data
        bins : int, optional
            Number of bins to use in histogram (default is 20)
        smoothing : float, optional
            Small value added to probability estimates to avoid zeros (default is 1e-10)

        Returns:
        --------
        KLDivergenceResult
            Results of the KL divergence calculation
        """
        x_array, y_array = self._validate_inputs(x, y)

        # Get the range for the histograms - use the same range for both to ensure comparable results
        min_val = min(np.min(x_array), np.min(y_array))
        max_val = max(np.max(x_array), np.max(y_array))

        # Create histograms
        hist_x, bin_edges = np.histogram(x_array, bins=bins, range=(min_val, max_val), density=True)
        hist_y, _ = np.histogram(y_array, bins=bins, range=(min_val, max_val), density=True)

        # Add smoothing to avoid zeros
        hist_x = hist_x + smoothing
        hist_y = hist_y + smoothing

        # Normalize to ensure proper probability distributions
        hist_x = hist_x / np.sum(hist_x)
        hist_y = hist_y / np.sum(hist_y)

        # Calculate KL divergence in both directions
        kl_xy = entropy(hist_x, hist_y)  # KL(x||y)
        kl_yx = entropy(hist_y, hist_x)  # KL(y||x)

        # Symmetric KL divergence (average of both directions)
        symmetric_kl = (kl_xy + kl_yx) / 2

        return KLDivergenceResult(
            kl_xy=kl_xy,
            kl_yx=kl_yx,
            symmetric_kl=symmetric_kl
        )

    def permutation_test(self, x: Union[List, np.ndarray], y: Union[List, np.ndarray],
                        n_permutations: int = 1000, alpha: Optional[float] = None,
                        statistic: str = 'mean') -> PermutationTestResult:
        """
        Perform a permutation test on two time series.

        Null Hypothesis (H0): The two samples come from the same distribution.
        Alternative Hypothesis (H1): The two samples come from different distributions.

        Parameters:
        -----------
        x, y : array-like
            Time series data
        n_permutations : int, optional
            Number of permutations to perform (default is 1000)
        alpha : float, optional
            Significance level (default is self.default_alpha)
        statistic : str, optional
            Statistic to use for the test ('mean', 'median', 'variance', 'std', or 'ks')

        Returns:
        --------
        PermutationTestResult
            Results of the permutation test
        """
        alpha = alpha if alpha is not None else self.default_alpha
        x_array, y_array = self._validate_inputs(x, y)

        # Define the test statistic function
        if statistic == 'mean':
            def calc_statistic(a, b):
                return np.abs(np.mean(a) - np.mean(b))
        elif statistic == 'median':
            def calc_statistic(a, b):
                return np.abs(np.median(a) - np.median(b))
        elif statistic == 'variance':
            def calc_statistic(a, b):
                return np.abs(np.var(a) - np.var(b))
        elif statistic == 'std':
            def calc_statistic(a, b):
                return np.abs(np.std(a) - np.std(b))
        elif statistic == 'ks':
            def calc_statistic(a, b):
                return stats.ks_2samp(a, b)[0]  # Return the KS statistic
        else:
            raise ValueError("Invalid statistic. Choose from 'mean', 'median', 'variance', 'std', or 'ks'")

        # Calculate observed test statistic
        observed_statistic = calc_statistic(x_array, y_array)

        # Combine data for permutation
        combined = np.concatenate([x_array, y_array])
        n_x = len(x_array)
        n_y = len(y_array)
        n_total = n_x + n_y

        # Perform permutation test
        count = 0
        for _ in range(n_permutations):
            # Shuffle the combined data
            np.random.shuffle(combined)

            # Split into two groups of original sizes
            perm_x = combined[:n_x]
            perm_y = combined[n_x:]

            # Calculate test statistic for permuted data
            perm_statistic = calc_statistic(perm_x, perm_y)

            # Count how many permutations have a test statistic >= observed
            if perm_statistic >= observed_statistic:
                count += 1

        # Calculate p-value
        p_value = count / n_permutations

        # Determine if we reject the null hypothesis
        reject_null = p_value < alpha

        return PermutationTestResult(
            statistic=observed_statistic,
            p_value=p_value,
            reject_null=reject_null,
            alpha=alpha,
            n_permutations=n_permutations,
            test_statistic_name=statistic
        )

    def run_all_tests(self, x: Union[List, np.ndarray], y: Union[List, np.ndarray],
                     alpha: Optional[float] = None, permutation_statistic: str = 'mean',
                     n_permutations: int = 1000, kl_bins: int = 20) -> Dict:
        """
        Run all available tests on two time series.

        Parameters:
        -----------
        x, y : array-like
            Time series data
        alpha : float, optional
            Significance level (default is self.default_alpha)
        permutation_statistic : str, optional
            Statistic to use for the permutation test (default is 'mean')
        n_permutations : int, optional
            Number of permutations for the permutation test (default is 1000)
        kl_bins : int, optional
            Number of bins for KL divergence calculation (default is 20)

        Returns:
        --------
        Dict
            Dictionary containing results of all tests
        """
        alpha = alpha if alpha is not None else self.default_alpha

        results = {
            'ks_test': self.kolmogorov_smirnov_test(x, y, alpha),
            'ad_test': self.anderson_darling_test(x, y, alpha),
            'mw_test': self.mann_whitney_test(x, y, alpha),
            'kl_divergence': self.kl_divergence(x, y, bins=kl_bins),
            'permutation_test': self.permutation_test(x, y, n_permutations=n_permutations,
                                                    alpha=alpha, statistic=permutation_statistic)
        }

        return results

    def plot_distributions(self, x: Union[List, np.ndarray], y: Union[List, np.ndarray],
                         bins: int = 20, figsize: Tuple[int, int] = (10, 6)) -> plt.Figure:
        """
        Plot histograms and kernel density estimates of the two time series.

        Parameters:
        -----------
        x, y : array-like
            Time series data
        bins : int, optional
            Number of bins for histograms (default is 20)
        figsize : tuple, optional
            Figure size (default is (10, 6))

        Returns:
        --------
        plt.Figure
            Matplotlib figure containing the plots
        """
        x_array, y_array = self._validate_inputs(x, y)

        # Create figure with two subplots
        fig, axes = plt.subplots(1, 2, figsize=figsize)

        # Plot histograms
        axes[0].hist(x_array, bins=bins, alpha=0.5, label='Series X')
        axes[0].hist(y_array, bins=bins, alpha=0.5, label='Series Y')
        axes[0].set_title('Histogram Comparison')
        axes[0].set_xlabel('Value')
        axes[0].set_ylabel('Frequency')
        axes[0].legend()

        # Plot kernel density estimates
        from scipy.stats import gaussian_kde
        x_density = gaussian_kde(x_array)
        y_density = gaussian_kde(y_array)

        # Create a common x axis for plotting
        min_val = min(np.min(x_array), np.min(y_array))
        max_val = max(np.max(x_array), np.max(y_array))
        xs = np.linspace(min_val, max_val, 1000)

        axes[1].plot(xs, x_density(xs), label='Series X')
        axes[1].plot(xs, y_density(xs), label='Series Y')
        axes[1].set_title('Kernel Density Estimate')
        axes[1].set_xlabel('Value')
        axes[1].set_ylabel('Density')
        axes[1].legend()

        plt.tight_layout()
        return fig

class SyntheticDataGenerator(BaseModel):
    n_series: int
    pct_true: float  # Between 0 and 1
    min_length: int = 50
    max_length: int = 200
    seed: int = 42

    class Config:
        arbitrary_types_allowed = True

    def _random_distribution(self, name: str, size: int, params: dict) -> np.ndarray:
        dist_map = {
            'normal': lambda: np.random.normal(params.get('loc', 0), params.get('scale', 1), size),
            't': lambda: stats.t.rvs(df=params.get('df', 5), size=size),
            'exponential': lambda: np.random.exponential(params.get('scale', 1), size),
            'binomial': lambda: np.random.binomial(n=params.get('n', 10), p=params.get('p', 0.5), size=size),
        }
        if name not in dist_map:
            raise ValueError(f"Unsupported distribution: {name}")
        return dist_map[name]()

    def _generate_series(self, id_val: int, has_break: bool) -> Tuple[pd.DataFrame, bool]:
        np.random.seed(self.seed + id_val)

        length = np.random.randint(self.min_length, self.max_length + 1)
        breakpoint = np.random.randint(length // 3, length - 10) if has_break else np.random.randint(length // 2, length)

        # Randomly pick a base distribution
        dist_name = np.random.choice(['normal', 't', 'exponential', 'binomial'])
        base_params = {
            'normal': {'loc': 0, 'scale': 1},
            't': {'df': 5},
            'exponential': {'scale': 1},
            'binomial': {'n': 10, 'p': 0.5}
        }[dist_name]

        # Generate values
        pre_values = self._random_distribution(dist_name, breakpoint, base_params)

        if has_break:
            # Change distribution parameters
            changed_params = {
                'normal': {'loc': 1, 'scale': 1.5},
                't': {'df': 2},
                'exponential': {'scale': 2},
                'binomial': {'n': 10, 'p': 0.8}
            }[dist_name]
            post_values = self._random_distribution(dist_name, length - breakpoint, changed_params)
        else:
            post_values = self._random_distribution(dist_name, length - breakpoint, base_params)

        values = np.concatenate([pre_values, post_values])
        period = np.array([0]*breakpoint + [1]*(length - breakpoint))

        df = pd.DataFrame({
            'value': values,
            'period': period
        }, index=pd.MultiIndex.from_product([[id_val], range(length)], names=['id', 'time']))

        return df, has_break

    def generate(self) -> Tuple[pd.DataFrame, pd.Series]:
        true_count = int(self.pct_true * self.n_series)
        false_count = self.n_series - true_count
        labels = [True]*true_count + [False]*false_count
        np.random.shuffle(labels)

        series_list = []
        y_dict = {}

        for id_val, has_break in enumerate(labels):
            df, label = self._generate_series(id_val, has_break)
            series_list.append(df)
            y_dict[id_val] = label

        X = pd.concat(series_list)
        y = pd.Series(y_dict, name='structural_breakpoint')

        return X, y

class ETLPipeline(BaseModel):
    X: pd.DataFrame = pd.DataFrame(dtype='float64')
    # y: pd.DataFrame = pd.DataFrame(dtype='float64')
    y: pd.Series = pd.Series(dtype='float64')

    class Config:
        arbitrary_types_allowed = True  # Allow non-pydantic types like pd.DataFrame

    @field_validator('X')
    def validate_X(cls, v):
        if not isinstance(v.index, pd.MultiIndex):
            raise ValueError("X must have a MultiIndex of ['id', 'time']")
        if 'value' not in v.columns or 'period' not in v.columns:
            raise ValueError("X must contain 'value' and 'period' columns")
        return v

    @field_validator('y')
    def validate_y(cls, v):
        if not isinstance(v, pd.Series):
            raise ValueError("y must be a pandas Series")
        if v.dtype != 'bool':
            raise ValueError("y must be of dtype 'bool'")
        return v

    def get_ids(self) -> list:
        """Returns a list of all unique ids in the X set."""
        return list(self.X.index.get_level_values('id').unique())

    def get_series_by_id(self, id_val: int) -> pd.DataFrame:
        """Returns the time series data for a specific id."""
        if (id_val not in self.y.index) & len(self.y)>0:
            raise ValueError(f"id {id_val} not found in y")
        try:
            return self.X.loc[id_val]
        except KeyError:
            raise ValueError(f"id {id_val} not found in X")

    def get_target_by_id(self, id_val: int) -> bool:
        """Returns the target value for a specific id."""
        return self.y.loc[id_val]

    def get_structural_breakdown(self) -> pd.Series:
        """Returns the proportion of True/False in y"""
        return self.y.value_counts(normalize=True).rename("proportion")

class MLP(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        return self.net(x).squeeze(1)

class MLModelPipeline(BaseModel):
    X: pd.DataFrame
    y: pd.Series
    X_train: Optional[pd.DataFrame] = None
    y_train: Optional[pd.Series] = None
    X_test: Optional[pd.DataFrame] = None
    y_test: Optional[pd.Series] = None
    model_name: str = Field(default="logistic_regression")
    model: Optional[Any] = None
    test_size: float = 0.2
    random_state: int = 42
    device: str = Field(default="cuda" if torch.cuda.is_available() else "cpu")

    class Config:
        arbitrary_types_allowed = True

    @field_validator('X')
    def validate_X(cls, v):
        if not isinstance(v, pd.DataFrame):
            raise ValueError("X must be a pandas DataFrame")
        return v

    @field_validator('y')
    def validate_y(cls, v):
        if not isinstance(v, pd.Series):
            raise ValueError("y must be a pandas Series")
        if v.dtype != bool:
            raise ValueError("y must be a boolean Series")
        return v

    def _initialize_model(self, input_dim: int = None):
        if self.model_name == "logistic_regression":
            return LogisticRegression(max_iter=1000)
        elif self.model_name == "random_forest":
            return RandomForestClassifier(n_estimators=100)
        elif self.model_name == "xgboost":
            return xgb.XGBClassifier(use_label_encoder=False, eval_metric='logloss')
        elif self.model_name == "lightgbm":
            return lgb.LGBMClassifier()
        elif self.model_name == "mlp":
            if input_dim is None:
                raise ValueError("input_dim required for MLP model")
            return MLP(input_dim).to(self.device)
        else:
            raise ValueError(f"Unsupported model: {self.model_name}")

    def fit_v0(self):
        # Align feature index with label index
        X_aligned = self.X.loc[self.y.index.intersection(self.X.index)]
        y_aligned = self.y.loc[X_aligned.index]

        X_train, X_test, y_train, y_test = train_test_split(
            X_aligned, y_aligned, test_size=self.test_size, random_state=self.random_state
        )

        self.X_train, self.X_test = X_train, X_test
        self.y_train, self.y_test = y_train, y_test

        if self.model_name == "mlp":
            input_dim = X_train.shape[1]
            self.model = self._initialize_model(input_dim)
            self._train_mlp(X_train, y_train)
        else:
            self.model = self._initialize_model()
            self.model.fit(X_train, y_train)

    def _train_mlp_v0(self, X_train: pd.DataFrame, y_train: pd.Series):
        X_tensor = torch.tensor(X_train.values, dtype=torch.float32).to(self.device)
        y_tensor = torch.tensor(y_train.values.astype(np.float32)).to(self.device)

        dataset = TensorDataset(X_tensor, y_tensor)
        dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

        optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001)
        self.model.train()

        for epoch in range(10):  # small epochs for demo
            for xb, yb in dataloader:
                preds = self.model(xb)
                loss = binary_cross_entropy_with_logits(preds, yb)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

    def predict_v0(self, X: Optional[pd.DataFrame] = None) -> np.ndarray:
        if X is None:
            X = self.X_test

        if self.model_name == "mlp":
            self.model.eval()
            with torch.no_grad():
                X_tensor = torch.tensor(X.values, dtype=torch.float32).to(self.device)
                logits = self.model(X_tensor)
                probs = torch.sigmoid(logits)
                return (probs > 0.5).cpu().numpy()
        else:
            return self.model.predict(X)

    def fit(self):
        # Align feature index with label index
        X_aligned = self.X.loc[self.y.index.intersection(self.X.index)]
        y_aligned = self.y.loc[X_aligned.index]

        X_train, X_test, y_train, y_test = train_test_split(
            X_aligned, y_aligned,
            test_size=self.test_size,
            random_state=self.random_state
        )

        self.X_train, self.X_test = X_train, X_test
        self.y_train, self.y_test = y_train, y_test

        if self.model_name == "mlp":
            input_dim = X_train.shape[1]
            self.model = self._initialize_model(input_dim)
            self._train_mlp(X_train, y_train)
        else:
            self.model = self._initialize_model()
            # wrap the (single) fit call in a tiny tqdm bar
            with tqdm(total=1, desc=f"Fitting {self.model_name}") as pbar:
                self.model.fit(X_train, y_train)
                pbar.update()

    def _train_mlp(self, X_train: pd.DataFrame, y_train: pd.Series):
        X_tensor = torch.tensor(X_train.values, dtype=torch.float32).to(self.device)
        y_tensor = torch.tensor(y_train.values.astype(np.float32)).to(self.device)

        dataset = TensorDataset(X_tensor, y_tensor)
        dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

        optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001)
        self.model.train()

        # outer loop: epochs
        for epoch in tqdm(range(10), desc="Epochs"):
            # inner loop: batches
            for xb, yb in tqdm(dataloader, desc="Batches", leave=False):
                preds = self.model(xb)
                loss = binary_cross_entropy_with_logits(preds, yb)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

    def predict(self, X: Optional[pd.DataFrame] = None) -> np.ndarray:
        if X is None:
            X = self.X_test

        if self.model_name == "mlp":
            self.model.eval()
            # turn X into batches so we can show a bar
            dataset = TensorDataset(
                torch.tensor(X.values, dtype=torch.float32)
            )
            dataloader = DataLoader(dataset, batch_size=64)
            all_preds = []
            with torch.no_grad():
                for (xb,) in tqdm(dataloader, desc="Predicting Batches"):
                    xb = xb.to(self.device)
                    logits = self.model(xb)
                    probs = torch.sigmoid(logits)
                    all_preds.append((probs > 0.5).cpu().numpy())
            return np.concatenate(all_preds)
        else:
            # single-call predict; emulate a bar if you really need it
            with tqdm(total=len(X), desc=f"Predicting {self.model_name}") as pbar:
                preds = self.model.predict(X)
                pbar.update(len(X))
            return preds

    def evaluate(self) -> Dict[str, Any]:
        preds = self.predict()
        if self.model_name == "mlp":
            with torch.no_grad():
                X_tensor = torch.tensor(self.X_test.values, dtype=torch.float32).to(self.device)
                probs = torch.sigmoid(self.model(X_tensor)).cpu().numpy()
        elif hasattr(self.model, "predict_proba"):
            probs = self.model.predict_proba(self.X_test)[:, 1]
        else:
            probs = preds

        report = classification_report(self.y_test, preds, output_dict=True)
        auc = roc_auc_score(self.y_test, probs) if probs is not None else None

        return {
            "classification_report": report,
            "roc_auc": auc,
            "model": self.model_name,
            "probs": probs
        }

class EnhancedFeatureGenerator(BaseModel):
    etl: ETLPipeline

    class Config:
        arbitrary_types_allowed = True

    def extract_basic_statistics(self, before, after):
        """Extract basic statistical features from time series segments"""
        features = {
            "mean_before": before['value'].mean(),
            "std_before": before['value'].std(),
            "skew_before": before['value'].skew(),
            "kurtosis_before": before['value'].kurtosis(),
            "mean_after": after['value'].mean() if after is not None else np.nan,
            "std_after": after['value'].std() if after is not None else np.nan,
            "skew_after": after['value'].skew() if after is not None else np.nan,
            "kurtosis_after": after['value'].kurtosis() if after is not None else np.nan
        }

        # Calculate delta features
        if after is not None:
            features.update({
                "length_pct": len(after) / (len(before) + len(after)),
                "delta_mean": (features["mean_after"] / features["mean_before"]) - 1
                    if features["mean_before"] != 0 else np.nan,
                "delta_std": (features["std_after"] / features["std_before"]) - 1
                    if features["std_before"] != 0 else np.nan,
                "delta_skew": (features["skew_after"] / features["skew_before"]) - 1
                    if features["skew_before"] != 0 else np.nan,
                "delta_kurtosis": (features["kurtosis_after"] / features["kurtosis_before"]) - 1
                    if features["kurtosis_before"] != 0 else np.nan,
                "delta_mean_std": (features["mean_before"] / features["std_before"]) -
                    (features["mean_after"] / features["std_after"])
                    if features["std_before"] != 0 and features["std_after"] != 0 else np.nan
            })
        else:
            features.update({
                "length_pct": np.nan,
                "delta_mean": np.nan,
                "delta_std": np.nan,
                "delta_skew": np.nan,
                "delta_kurtosis": np.nan,
                "delta_mean_std": np.nan
            })

        return features

    def extract_statistical_tests(self, before, after):
        """Extract statistical test features"""
        tsc = TimeSeriesComparison()
        x = before['value']
        y = after['value'] if after is not None else x

        features = {
            'ks_pvalue': tsc.kolmogorov_smirnov_test(x, y).p_value,
            'ad_pvalue': tsc.anderson_darling_test(x, y).p_value,
            'mw_pvalue': tsc.mann_whitney_test(x, y).p_value,
            'permutation_pvalue': tsc.permutation_test(x, y).p_value
        }

        # Add test statistics in addition to p-values
        features.update({
            'ks_statistic': tsc.kolmogorov_smirnov_test(x, y).statistic,
            'ad_statistic': tsc.anderson_darling_test(x, y).statistic
        })

        return features

    def extract_frequency_features(self, before, after):
        """Extract frequency domain features"""
        features = {}

        # Calculate power spectral density for both segments
        def get_psd_features(series, prefix):
            if len(series) < 4:
                return {f"{prefix}_psd_mean": np.nan, f"{prefix}_psd_std": np.nan}

            f, psd = signal.welch(series, fs=1.0, nperseg=min(len(series)//2, 256))
            return {
                f"{prefix}_psd_mean": np.mean(psd),
                f"{prefix}_psd_std": np.std(psd),
                f"{prefix}_psd_max": np.max(psd),
                f"{prefix}_psd_peak_freq": f[np.argmax(psd)]
            }

        features.update(get_psd_features(before['value'], "before"))

        if after is not None:
            features.update(get_psd_features(after['value'], "after"))
            # Calculate spectral distance features
            features["psd_distance"] = np.mean(np.abs(
                signal.welch(before['value'], fs=1.0)[1] -
                signal.welch(after['value'], fs=1.0)[1]
            )) if min(len(before), len(after)) > 4 else np.nan

        return features

    def extract_autocorrelation_features(self, before, after):
        """Extract autocorrelation features"""
        features = {}

        # Calculate autocorrelation at different lags
        def get_acf_features(series, prefix, max_lag=5): # 10
            if len(series) <= max_lag:
                return {f"{prefix}_acf_lag{i}": np.nan for i in range(1, max_lag+1)}

            acf_values = sm.tsa.acf(series, nlags=max_lag, fft=True)
            return {f"{prefix}_acf_lag{i}": acf_values[i] for i in range(1, min(max_lag+1, len(acf_values)))}

        features.update(get_acf_features(before['value'], "before"))

        if after is not None:
            features.update(get_acf_features(after['value'], "after"))

            # Calculate ACF distance features
            before_acf = sm.tsa.acf(before['value'], nlags=min(10, len(before)-1), fft=True)
            after_acf = sm.tsa.acf(after['value'], nlags=min(10, len(after)-1), fft=True)
            min_len = min(len(before_acf), len(after_acf))
            features["acf_distance"] = np.mean(np.abs(before_acf[:min_len] - after_acf[:min_len]))

        return features

    def extract_entropy_features(self, before, after):
        """Extract entropy-based features"""
        features = {}

        # Sample entropy
        def sample_entropy(series, m=2, r=0.2):
            if len(series) < m+2:
                return np.nan
            # Normalize the series
            series = (series - np.mean(series)) / np.std(series)
            r = r * np.std(series)
            return nolds.sampen(series, emb_dim=m, tolerance=r)

        features["before_sample_entropy"] = sample_entropy(before['value'])
        features["after_sample_entropy"] = sample_entropy(after['value']) if after is not None else np.nan

        if after is not None:
            features["delta_entropy"] = features["after_sample_entropy"] - features["before_sample_entropy"]

        return features

    def extract_catch22_features(self, before, after):
        """Extract catch22 features as mentioned in the research paper"""
        try:
            import pycatch22
            features = {}

            # Extract all catch22 features for both segments
            before_catch22 = pycatch22.catch22_all(before['value'])
            features.update({f"before_catch22_{name}": value for name, value in zip(before_catch22['names'], before_catch22['values'])})

            if after is not None:
                after_catch22 = pycatch22.catch22_all(after['value'])
                features.update({f"after_catch22_{name}": value for name, value in zip(after_catch22['names'], after_catch22['values'])})

                # Calculate delta features
                for i, name in enumerate(before_catch22['names']):
                    before_val = before_catch22['values'][i]
                    after_val = after_catch22['values'][i]
                    if before_val != 0:
                        features[f"delta_catch22_{name}"] = (after_val / before_val) - 1

            return features
        except ImportError:
            # Fallback if pycatch22 is not available
            return {}

    def _split_series(self, ts, id_val=None):
        """Split time series into before/after segments based on period change"""
        change_indices = ts.index[ts['period'].diff() == 1].tolist()

        # Determine if this represents a true regime change
        id_change = False
        if id_val is not None and len(self.etl.y) > 0:
            id_change = self.etl.y.get(id_val, False)

        if len(self.etl.y)==0:
            # use this case to split X dataframe if no target is provided
            id_change = True

        # if (not change_indices) or (id_val is not None and not id_change):
        #     before = ts
        #     after = ts
        #     change_point = np.nan
        # else:
        #     change_point = change_indices[0]
        #     before = ts.loc[:change_point]
        #     after = ts.loc[change_point + 1:] if change_point + 1 in ts.index else None

        # test
        change_point = change_indices[0]
        before = ts.loc[:change_point]
        after = ts.loc[change_point + 1:] if change_point + 1 in ts.index else None

        return before, after, change_point

    def extract_features_for_id(self, id_val, is_training=False):
        """Unified feature extraction method for both training and testing"""
        ts = self.etl.get_series_by_id(id_val)
        before, after, change_point = self._split_series(ts, id_val if is_training else None)

        features = {"id": id_val}

        # Extract different types of features
        features.update(self.extract_basic_statistics(before, after))
        features.update(self.extract_autocorrelation_features(before, after))
        features.update(self.extract_statistical_tests(before, after))
        # features.update(self.extract_frequency_features(before, after))
        # features.update(self.extract_entropy_features(before, after))
        # features.update(self.extract_catch22_features(before, after))

        return features

    def generate_feature_dataframe(self):
        """Generate features for all IDs"""
        all_ids = self.etl.get_ids()
        feature_dicts = []

        if len(self.etl.y)==0:
            is_training = False # True
        else:
            is_training = len(self.etl.y) > 0

        for id_val in all_ids:
            try:
                features = self.extract_features_for_id(id_val, is_training)
                feature_dicts.append(features)
            except Exception as e:
                print(f"Skipping id {id_val} due to error: {str(e)}")

        df = pd.DataFrame(feature_dicts).set_index("id")

        # Handle NaN values
        df = df.fillna(df.mean())

        return df

class EnhancedMLP(nn.Module):
    def __init__(self, input_dim, hidden_dims=[64, 32, 16], dropout_rate=0.3):
        super().__init__()

        layers = []
        prev_dim = input_dim

        # Create hidden layers
        for h_dim in hidden_dims:
            layers.extend([
                nn.Linear(prev_dim, h_dim),
                nn.BatchNorm1d(h_dim),
                nn.ReLU(),
                nn.Dropout(dropout_rate)
            ])
            prev_dim = h_dim

        # Output layer
        layers.append(nn.Linear(prev_dim, 1))

        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x).squeeze(1)

class EnhancedMLModelPipeline(BaseModel):
    X: pd.DataFrame
    y: pd.Series
    X_train: Optional[pd.DataFrame] = None
    y_train: Optional[pd.Series] = None
    X_test: Optional[pd.DataFrame] = None
    y_test: Optional[pd.Series] = None
    model_name: str = Field(default="random_forest")
    model: Optional[Any] = None
    cv_folds: int = 5
    random_state: int = 42
    device: str = Field(default="cuda" if torch.cuda.is_available() else "cpu")
    return_probabilities: bool = Field(default=True)
    scaler: StandardScaler = StandardScaler()

    class Config:
        arbitrary_types_allowed = True

    def _initialize_model(self, input_dim: int = None):
        if self.model_name == "logistic_regression":
            return LogisticRegression(max_iter=1000, C=0.1)
        elif self.model_name == "random_forest":
            return RandomForestClassifier(
                n_estimators=200,
                max_depth=10,
                min_samples_split=5,
                class_weight='balanced'
            )
        elif self.model_name == "xgboost":
            return xgb.XGBClassifier(
                use_label_encoder=False,
                eval_metric='auc',
                learning_rate=0.05,
                n_estimators=300,
                max_depth=5,
                subsample=0.8,
                colsample_bytree=0.8
            )
        elif self.model_name == "mlp":
            if input_dim is None:
                raise ValueError("input_dim required for MLP model")
            return EnhancedMLP(input_dim).to(self.device)
        elif self.model_name == "ensemble":
            if input_dim is None:
                raise ValueError("input_dim required for ensemble model")
            return VotingClassifier(
                estimators=[
                    ('rf', RandomForestClassifier(n_estimators=200)),
                    ('xgb', xgb.XGBClassifier(use_label_encoder=False, eval_metric='auc')),
                    ('lr', LogisticRegression(max_iter=1000))
                ],
                voting='soft'
            )
        else:
            raise ValueError(f"Unsupported model: {self.model_name}")

    def preprocess_features(self, X):
        """Apply feature preprocessing"""
        # Remove highly correlated features
        correlation_matrix = X.corr().abs()
        upper = correlation_matrix.where(np.triu(np.ones(correlation_matrix.shape), k=1).astype(bool))
        to_drop = [column for column in upper.columns if any(upper[column] > 0.95)]
        X_processed = X.drop(columns=to_drop, errors='ignore')

        # Apply scaling
        X_scaled = pd.DataFrame(
            self.scaler.fit_transform(X_processed),
            index=X_processed.index,
            columns=X_processed.columns
        )

        return X_scaled, self.scaler, to_drop

    def fit(self):
        print(f"Running model: {self.model_name}")

        # Align feature index with label index
        X_aligned = self.X.loc[self.y.index.intersection(self.X.index)]
        y_aligned = self.y.loc[X_aligned.index]

        # Preprocess features
        X_processed, self.scaler, dropped_features = self.preprocess_features(X_aligned)

        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X_processed, y_aligned, test_size=0.2, random_state=self.random_state
        )

        self.X_train, self.X_test = X_train, X_test
        self.y_train, self.y_test = y_train, y_test

        input_dim = X_train.shape[1]

        if self.model_name == "mlp":
            self.model = self._initialize_model(input_dim)
            self._train_mlp(X_train, y_train)
        else:
            # Use cross-validation for hyperparameter tuning
            if self.model_name == "random_forest":
                param_grid = {
                    'n_estimators': [100, 200, 300],
                    'max_depth': [5, 10, None],
                    'min_samples_split': [2, 5, 10]
                }
                base_model = RandomForestClassifier(class_weight='balanced')
                grid_search = GridSearchCV(
                    base_model, param_grid, cv=self.cv_folds, scoring='roc_auc'
                )
                grid_search.fit(X_train, y_train)
                self.model = grid_search.best_estimator_
            elif self.model_name == "xgboost":
                param_grid = {
                    'learning_rate': [0.01, 0.05, 0.1],
                    'max_depth': [3, 5, 7],
                    'n_estimators': [100, 200, 300]
                }
                base_model = xgb.XGBClassifier(use_label_encoder=False, eval_metric='auc')
                grid_search = GridSearchCV(
                    base_model, param_grid, cv=self.cv_folds, scoring='roc_auc'
                )
                grid_search.fit(X_train, y_train)
                self.model = grid_search.best_estimator_
            else:
                self.model = self._initialize_model(input_dim)
                self.model.fit(X_train, y_train)

    def _train_mlp(self, X_train, y_train):
        X_tensor = torch.tensor(X_train.values, dtype=torch.float32).to(self.device)
        y_tensor = torch.tensor(y_train.values.astype(np.float32)).to(self.device)

        dataset = TensorDataset(X_tensor, y_tensor)
        train_size = int(0.8 * len(dataset))
        val_size = len(dataset) - train_size
        train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

        train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=64)

        criterion = nn.BCEWithLogitsLoss()
        optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001, weight_decay=1e-5)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='min', factor=0.5, patience=5, verbose=True
        )

        best_val_loss = float('inf')
        best_model_state = None
        patience_counter = 0
        max_patience = 10

        for epoch in range(100):
            # Training phase
            self.model.train()
            train_loss = 0
            for xb, yb in train_loader:
                optimizer.zero_grad()
                preds = self.model(xb)
                loss = criterion(preds, yb)
                loss.backward()
                optimizer.step()
                train_loss += loss.item() * xb.size(0)
            train_loss /= len(train_loader.dataset)

            # Validation phase
            self.model.eval()
            val_loss = 0
            with torch.no_grad():
                for xb, yb in val_loader:
                    preds = self.model(xb)
                    loss = criterion(preds, yb)
                    val_loss += loss.item() * xb.size(0)
            val_loss /= len(val_loader.dataset)

            scheduler.step(val_loss)

            print(f"Epoch {epoch+1}: Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

            # Early stopping
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                best_model_state = copy.deepcopy(self.model.state_dict())
                patience_counter = 0
            else:
                patience_counter += 1

            if patience_counter >= max_patience:
                print(f"Early stopping at epoch {epoch+1}")
                break

        # Load best model
        if best_model_state is not None:
            self.model.load_state_dict(best_model_state)

    def evaluate(self):
        """Evaluate the model and return performance metrics"""
        if self.model_name == "mlp":
            self.model.eval()
            X_test_tensor = torch.tensor(self.X_test.values, dtype=torch.float32).to(self.device)
            with torch.no_grad():
                y_proba = torch.sigmoid(self.model(X_test_tensor)).cpu().numpy()
                y_pred = (y_proba > 0.5).astype(int)
        else:
            y_proba = self.model.predict_proba(self.X_test)[:, 1]
            y_pred = self.model.predict(self.X_test)

        metrics_dict = {
            "accuracy": accuracy_score(self.y_test, y_pred),
            "roc_auc": roc_auc_score(self.y_test, y_proba),
            "precision": precision_score(self.y_test, y_pred),
            "recall": recall_score(self.y_test, y_pred),
            "f1": f1_score(self.y_test, y_pred),
            "confusion_matrix": confusion_matrix(self.y_test, y_pred).tolist()
        }

        return metrics_dict

def list_files_in_directory(dir_path):
  """Lists all files in the specified directory.

  Args:
    dir_path: The path to the directory. If not specified, the current
      working directory is used.

  Returns:
    A list of strings, where each string is the name of a file in the directory.
    Returns an empty list if the directory does not exist or is empty.

  Usage:
    current_dir_files = list_files_in_directory(os.getcwd())
    print("Files in current directory:", current_dir_files)
  """

  try:
    files = os.listdir(dir_path)
    return files
  except FileNotFoundError:
    print(f"Error: Directory '{dir_path}' not found.")
    return []

def extract_values_by_id(df, n):
    """
    Extract all rows for a given id number 'n' from a DataFrame
    with a MultiIndex of ['id', 'time'].

    Parameters:
        df (pd.DataFrame): The input DataFrame with a MultiIndex.
        n (int): The id value to filter on.

    Returns:
        pd.DataFrame: Filtered DataFrame with rows corresponding to id 'n'.

    Usage:
     df = extract_values_by_id(X_train, id=3)
     df['level'] = df[['value']].cumsum()
     df[['level','period']].plot()
     df[['value','period']].plot()
     print(df.groupby('period').describe()['value'])
     print(df.tail())
     print(f"y_train: {y_train.loc[id]}")
    """
    return df.loc[n]

def split_df_by_id(df: pd.DataFrame, id_cut: int = None, n_random: int = None, seed: int = None):
    """
    Splits a MultiIndex DataFrame by 'id'.

    Parameters
    ----------
    df : pd.DataFrame
        The input DataFrame with a MultiIndex ('id', 'time').
    id_cut : int, optional
        If provided, selects all ids <= id_cut for the first split.
    n_random : int, optional
        If provided, randomly selects this many unique ids for the first split.
    seed : int, optional
        Random seed for reproducibility when using n_random.

    Returns
    -------
    df1, df2 : pd.DataFrame
        Tuple of DataFrames:
          - df1 contains rows for the selected ids.
          - df2 contains rows for all other ids.

    Notes
    -----
    Exactly one of `id_cut` or `n_random` must be provided.
    """
    # Extract unique ids
    ids = df.index.get_level_values('id').unique()

    # Determine selected ids based on threshold or random sampling
    if id_cut is not None and n_random is None:
        selected_ids = ids[ids <= id_cut]
    elif n_random is not None and id_cut is None:
        if n_random > len(ids):
            raise ValueError(f"n_random={n_random} exceeds the number of unique ids={len(ids)}")
        rng = np.random.default_rng(seed)
        selected_ids = rng.choice(ids, size=n_random, replace=False)
    else:
        raise ValueError("Provide exactly one of `id_cut` or `n_random`")

    # Split the DataFrame
    df1 = df.loc[selected_ids]
    df2 = df.loc[~df.index.get_level_values('id').isin(selected_ids)]

    return df1, df2

# function definitions for training and inference
def train(
    X_train: pd.DataFrame,
    y_train: pd.Series,
    model_directory_path: str,
    model_name = "xgboost" # # "logistic_regression", "random_forest", "xgboost", "lightgbm", "mlp"
):
    # For our baseline t-test approach, we don't need to train a model
    # This is essentially an unsupervised approach calculated at inference time
    if not isinstance(y_train, pd.Series):
        y_train = y_train.iloc[:, 0]

    """
    # ETL pipeline
    """
    etl = ETLPipeline(X=X_train, y=y_train)

    """
    # Feature generator
    """
    generator = EnhancedFeatureGenerator(etl=etl)
    X_features = generator.generate_feature_dataframe()

    """
    # ML model pipeline
    """
    valid_index = X_features.loc[~X_features.isna().any(axis=1)].index
    X = X_features.loc[valid_index]
    y = etl.y[valid_index]

    model = MLModelPipeline(X=X, y=y, model_name=model_name)
    model.fit()
    # model = None

    joblib.dump(model, os.path.join(model_directory_path, 'model.joblib'))

def infer(
    X_test: typing.Iterable[pd.DataFrame],
    model_directory_path: str):
    """
    Note: This baseline approach uses a t-test to compare the distributions
     before and after the boundary point. A smaller p-value (larger negative number)
     suggests stronger evidence that the distributions are different,
     indicating a potential structural break.
    """
    model = joblib.load(os.path.join(model_directory_path, 'model.joblib'))

    yield  # Mark as ready

    # X_test can only be iterated once.
    # Before getting the next dataset, you must predict the current one.
    for dataset in X_test:
        etl = ETLPipeline(X=X_test)

        generator = EnhancedFeatureGenerator(etl=etl)
        X_features = generator.generate_feature_dataframe()

        valid_index = X_features.loc[~X_features.isna().any(axis=1)].index
        X = X_features.loc[valid_index]
        prediction = model.predict(X)

        yield prediction  # Send the prediction for the current dataset



In [7]:
import crunch

# Load the Crunch Toolings
crunch = crunch.load_notebook()

loaded inline runner with module: <module '__main__'>

cli version: 6.5.0
available ram: 12.67 gb
available cpu: 2 core
----


## Understanding the Data

The dataset consists of univariate time series, each containing ~2,000-5,000 values with a designated boundary point. For each time series, you need to determine whether a structural break occurred at this boundary point.

The data was downloaded when you setup your local environment and is now available in the `data/` directory.

In [8]:
# Load the data simply
X_train, y_train, X_test = crunch.load_data()

data/X_train.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/X_train.parquet (204327238 bytes)
data/X_train.parquet: already exists, file length match
data/X_test.reduced.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/X_test.reduced.parquet (2380918 bytes)
data/X_test.reduced.parquet: already exists, file length match
data/y_train.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/y_train.parquet (61003 bytes)
data/y_train.parquet: already exists, file length match
data/y_test.reduced.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/y_test.reduced.parquet (2655 bytes)
data/y_test.reduced.parquet: already exists, file length match


### Understanding `X_train`

The training data is structured as a pandas DataFrame with a MultiIndex:

**Index Levels:**
- `id`: Identifies the unique time series
- `time`: The timestep within each time series

**Columns:**
- `value`: The actual time series value at each timestep
- `period`: A binary indicator where `0` represents the **period before** the boundary point, and `1` represents the **period after** the boundary point

In [9]:
X_train

Unnamed: 0_level_0,Unnamed: 1_level_0,value,period
id,time,Unnamed: 2_level_1,Unnamed: 3_level_1
0,0,-0.005564,0
0,1,0.003705,0
0,2,0.013164,0
0,3,0.007151,0
0,4,-0.009979,0
...,...,...,...
10000,2134,0.001137,1
10000,2135,0.003526,1
10000,2136,0.000687,1
10000,2137,0.001640,1


### Understanding `y_train`

This is a simple `pandas.Series` that tells if a dataset id has a structural breakpoint or not.

**Index:**
- `id`: the ID of the dataset

**Value:**
- `structural_breakpoint`: Boolean indicating whether a structural break occurred (`True`) or not (`False`)

In [10]:
y_train

Unnamed: 0_level_0,structural_breakpoint
id,Unnamed: 1_level_1
0,False
1,False
2,True
3,False
4,False
...,...
9996,False
9997,False
9998,False
9999,False


### Understanding `X_test`

The test data is provided as a **`list` of `pandas.DataFrame`s** with the same format as [`X_train`](#understanding-X_test).

It is structured as a list to encourage processing records one by one, which will be mandatory in the `infer()` function.

In [11]:
print("Number of datasets:", len(X_test))

In [12]:
X_test[0]

Unnamed: 0_level_0,Unnamed: 1_level_0,value,period
id,time,Unnamed: 2_level_1,Unnamed: 3_level_1
10001,0,0.010753,0
10001,1,-0.031915,0
10001,2,-0.010989,0
10001,3,-0.011111,0
10001,4,0.011236,0
10001,...,...,...
10001,2774,-0.013937,1
10001,2775,-0.015649,1
10001,2776,-0.009744,1
10001,2777,0.025375,1


## Strategy Implementation

There are multiple approaches you can take to detect structural breaks:

1. **Statistical Tests**: Compare distributions before and after the boundary point;
2. **Feature Engineering**: Extract features from both segments for comparison;
3. **Time Series Modeling**: Detect deviations from expected patterns;
4. **Machine Learning**: Train models to recognize break patterns from labeled examples.

The baseline implementation below uses a simple statistical approach: a t-test to compare the distributions before and after the boundary point.

### The `train()` Function

In this function, you build and train your model for making inferences on the test data. Your model must be stored in the `model_directory_path`.

The baseline implementation below doesn't require a pre-trained model, as it uses a statistical test that will be computed at inference time.

In [13]:
# def train(
#     X_train: pd.DataFrame,
#     y_train: pd.Series,
#     model_directory_path: str,
# ):
#     # For our baseline t-test approach, we don't need to train a model
#     # This is essentially an unsupervised approach calculated at inference time
#     model = None

#     # You could enhance this by training an actual model, for example:
#     # 1. Extract features from before/after segments of each time series
#     # 2. Train a classifier using these features and y_train labels
#     # 3. Save the trained model

#     joblib.dump(model, os.path.join(model_directory_path, 'model.joblib'))

def train(
    X_train: pd.DataFrame,
    y_train: pd.Series,
    model_directory_path: str,
    model_name = "xgboost" # # "logistic_regression", "random_forest", "xgboost", "lightgbm", "mlp"
):
    # For our baseline t-test approach, we don't need to train a model
    # This is essentially an unsupervised approach calculated at inference time
    if not isinstance(y_train, pd.Series):
        y_train = y_train.iloc[:, 0]

    """
    # ETL pipeline
    """
    etl = ETLPipeline(X=X_train, y=y_train)

    """
    # Feature generator
    """
    generator = EnhancedFeatureGenerator(etl=etl)
    X_features = generator.generate_feature_dataframe()

    """
    # ML model pipeline
    """
    valid_index = X_features.loc[~X_features.isna().any(axis=1)].index
    X = X_features.loc[valid_index]
    y = etl.y[valid_index]

    model = MLModelPipeline(X=X, y=y, model_name=model_name)
    model.fit()
    # model = None

    joblib.dump(model, os.path.join(model_directory_path, 'model.joblib'))


### The `infer()` Function

In the inference function, your trained model (if any) is loaded and used to make predictions on test data.

**Important workflow:**
1. Load your model;
2. Use the `yield` statement to signal readiness to the runner;
3. Process each dataset one by one within the for loop;
4. For each dataset, use `yield prediction` to return your prediction.

**Note:** The datasets can only be iterated once!

In [14]:
# def infer(
#     X_test: typing.Iterable[pd.DataFrame],
#     model_directory_path: str,
# ):
#     model = joblib.load(os.path.join(model_directory_path, 'model.joblib'))

#     yield  # Mark as ready

#     # X_test can only be iterated once.
#     # Before getting the next dataset, you must predict the current one.
#     for dataset in X_test:
#         # Baseline approach: Compute t-test between values before and after boundary point
#         # The negative p-value is used as our score - smaller p-values (larger negative numbers)
#         # indicate more evidence against the null hypothesis that distributions are the same,
#         # suggesting a structural break
#         def t_test(u: pd.DataFrame):
#             return -scipy.stats.ttest_ind(
#                 u["value"][u["period"] == 0],  # Values before boundary point
#                 u["value"][u["period"] == 1],  # Values after boundary point
#             ).pvalue

#         prediction = t_test(dataset)
#         yield prediction  # Send the prediction for the current dataset

#         # Note: This baseline approach uses a t-test to compare the distributions
#         # before and after the boundary point. A smaller p-value (larger negative number)
#         # suggests stronger evidence that the distributions are different,
#         # indicating a potential structural break.

def infer(
    X_test: typing.Iterable[pd.DataFrame],
    model_directory_path: str):
    """
    Note: This baseline approach uses a t-test to compare the distributions
     before and after the boundary point. A smaller p-value (larger negative number)
     suggests stronger evidence that the distributions are different,
     indicating a potential structural break.
    """
    model = joblib.load(os.path.join(model_directory_path, 'model.joblib'))

    yield  # Mark as ready

    # X_test can only be iterated once.
    # Before getting the next dataset, you must predict the current one.
    for dataset in X_test:
        etl = ETLPipeline(X=dataset)

        generator = EnhancedFeatureGenerator(etl=etl)
        X_features = generator.generate_feature_dataframe()

        valid_index = X_features.loc[~X_features.isna().any(axis=1)].index
        X = X_features.loc[valid_index]
        prediction = model.predict(X)

        yield prediction  # Send the prediction for the current dataset

## Local testing

To make sure your `train()` and `infer()` function are working properly, you can call the `crunch.test()` function that will reproduce the cloud environment locally. <br />
Even if it is not perfect, it should give you a quick idea if your model is working properly.

In [15]:
crunch.test(
    # Uncomment to disable the train
    # force_first_train=False,

    # Uncomment to disable the determinism check
    # no_determinism_check=True,
)

19:25:14 no forbidden library found
19:25:14 
19:25:14 started
19:25:14 running local test
19:25:14 internet access isn't restricted, no check will be done
19:25:14 
19:25:16 starting unstructured loop...
19:25:16 executing - command=train


data/X_train.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/X_train.parquet (204327238 bytes)
data/X_train.parquet: already exists, file length match
data/X_test.reduced.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/X_test.reduced.parquet (2380918 bytes)
data/X_test.reduced.parquet: already exists, file length match
data/y_train.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/y_train.parquet (61003 bytes)
data/y_train.parquet: already exists, file length match
data/y_test.reduced.parquet: download from https:crunchdao--competition--production.s3.eu-west-1.amazonaws.com/data-releases/146/y_test.reduced.parquet (2655 bytes)
data/y_test.reduced.parquet: already exists, file length match


  result = stats.anderson_ksamp(samples)
  result = stats.anderson_ksamp(samples)


Fitting xgboost:   0%|          | 0/1 [00:00<?, ?it/s]

Parameters: { "use_label_encoder" } are not used.

19:39:02 executing - command=infer
19:39:02 duration - time=00:13:47
19:39:02 memory - before="1.24 GB" after="1.31 GB" consumed="64.55 MB"


ValidationError: 1 validation error for ETLPipeline
X
  Input should be an instance of DataFrame [type=is_instance_of, input_value=<generator object Generat...inner at 0x7aa798117e60>, input_type=generator]
    For further information visit https://errors.pydantic.dev/2.11/v/is_instance_of

## Results

Once the local tester is done, you can preview the result stored in `data/prediction.parquet`.

In [None]:
prediction = pd.read_parquet("data/prediction.parquet")
prediction

### Local scoring

You can call the function that the system uses to estimate your score locally.

In [None]:
# Load the targets
target = pd.read_parquet("data/y_test.reduced.parquet")["structural_breakpoint"]

# Call the scoring function
sklearn.metrics.roc_auc_score(
    target,
    prediction,
)

# Submit your Notebook

To submit your work, you must:
1. Download your Notebook from Colab
2. Upload it to the platform
3. Create a run to validate it

### >> https://hub.crunchdao.com/competitions/structural-break/submit/notebook

![Download and Submit Notebook](https://raw.githubusercontent.com/crunchdao/competitions/refs/heads/master/documentation/animations/download-and-submit-notebook.gif)