# Setup

The first steps to get started are:
1. Get the setup command
2. Execute it in the cell below

### >> https://hub.crunchdao.com/competitions/structural-break/submit/notebook

![Reveal token](https://raw.githubusercontent.com/crunchdao/competitions/refs/heads/master/documentation/animations/reveal-token.gif)

In [None]:
# Install the Crunch CLI
%pip install --upgrade crunch-cli

# Setup your local environment
!crunch setup --notebook structural-break hello --token aaabbbcccddeeff00112233445566778899

Note: you may need to restart the kernel to use updated packages.
crunch-cli, version 7.5.0

---
Your token seems to have expired or is invalid.

Please follow this link to copy and paste your new setup command:
https://hub.crunchdao.com/competitions/structural-break/submit

If you think that is an error, please contact an administrator.


In [2]:
# Your model
## Setup
import os
import typing

# Import your dependencies
import joblib
import pandas as pd
import scipy
import sklearn.metrics
import pandas as pd
import numpy as np
import scipy.stats
from sklearn.metrics import roc_auc_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from scipy.spatial.distance import jensenshannon
import warnings

In [3]:
import crunch

# Load the Crunch Toolings
crunch = crunch.load_notebook()

loaded inline runner with module: <module '__main__'>

cli version: 7.5.0
available ram: 63.78 gb
available cpu: 32 core
----


## Understanding the Data

The dataset consists of univariate time series, each containing ~2,000-5,000 values with a designated boundary point. For each time series, you need to determine whether a structural break occurred at this boundary point.

The data was downloaded when you setup your local environment and is now available in the `data/` directory.

In [4]:
# Load the data simply
X_train, y_train, X_test = crunch.load_data()

data\X_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_train.parquet (204327238 bytes)
data\X_train.parquet: already exists, file length match
data\X_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_test.reduced.parquet (2380918 bytes)
data\X_test.reduced.parquet: already exists, file length match
data\y_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_train.parquet (61003 bytes)
data\y_train.parquet: already exists, file length match
data\y_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_test.reduced.parquet (2655 bytes)
data\y_test.reduced.parquet: already exists, file length match


### Understanding `X_train`

The training data is structured as a pandas DataFrame with a MultiIndex:

**Index Levels:**
- `id`: Identifies the unique time series
- `time`: The timestep within each time series

**Columns:**
- `value`: The actual time series value at each timestep
- `period`: A binary indicator where `0` represents the **period before** the boundary point, and `1` represents the **period after** the boundary point

In [5]:
X_train

Unnamed: 0_level_0,Unnamed: 1_level_0,value,period
id,time,Unnamed: 2_level_1,Unnamed: 3_level_1
0,0,-0.005564,0
0,1,0.003705,0
0,2,0.013164,0
0,3,0.007151,0
0,4,-0.009979,0
...,...,...,...
10000,2134,0.001137,1
10000,2135,0.003526,1
10000,2136,0.000687,1
10000,2137,0.001640,1


### Understanding `y_train`

This is a simple `pandas.Series` that tells if a dataset id has a structural breakpoint or not.

**Index:**
- `id`: the ID of the dataset

**Value:**
- `structural_breakpoint`: Boolean indicating whether a structural break occurred (`True`) or not (`False`)

In [6]:
y_train

id
0        False
1        False
2         True
3        False
4        False
         ...  
9996     False
9997     False
9998     False
9999     False
10000     True
Name: structural_breakpoint, Length: 10001, dtype: bool

### Understanding `X_test`

The test data is provided as a **`list` of `pandas.DataFrame`s** with the same format as [`X_train`](#understanding-X_test).


It is structured as a list to encourage processing records one by one, which will be mandatory in the `infer()` function.

In [7]:
print("Number of datasets:", len(X_test))

Number of datasets: 101


In [8]:
X_test[0]

Unnamed: 0_level_0,Unnamed: 1_level_0,value,period
id,time,Unnamed: 2_level_1,Unnamed: 3_level_1
10001,0,0.010753,0
10001,1,-0.031915,0
10001,2,-0.010989,0
10001,3,-0.011111,0
10001,4,0.011236,0
10001,...,...,...
10001,2774,-0.013937,1
10001,2775,-0.015649,1
10001,2776,-0.009744,1
10001,2777,0.025375,1


# model 

In [19]:
!pip install xgboost statsmodels numba pywavelets lightgbm



In [None]:
import numpy as np
import pandas as pd
from typing import Dict, List, Any, Optional, Tuple
from sklearn.ensemble import StackingClassifier
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif
from sklearn.decomposition import PCA
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import VotingClassifier, RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from lightgbm import LGBMClassifier
from sklearn.feature_selection import VarianceThreshold, SelectKBest, mutual_info_classif, RFE
from sklearn.ensemble import StackingClassifier, RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
import xgboost as xgb

# If you want to use LightGBM (optional)
try:
    from lightgbm import LGBMClassifier
    LIGHTGBM_AVAILABLE = True
except ImportError:
    LIGHTGBM_AVAILABLE = False
import warnings
warnings.filterwarnings('ignore')
class FastDGPFeatureExtractor:
    """
    Optimized DGP feature extractor focusing on speed and essential features
    """
    
    def __init__(self):
        # Focus on fast, essential estimators only
        self.fast_estimators = {
            'gaussian': self._estimate_gaussian_fast,
            'ar1': self._estimate_ar1_fast, 
            'volatility': self._estimate_volatility_fast,
            'trend': self._estimate_trend_fast,
            'spectral': self._estimate_spectral_fast,
            'jumps': self._estimate_jumps_fast,
            'autocorr': self._estimate_autocorr_fast,
            'distribution': self._estimate_distribution_fast,
            # New DGP estimators
            'ma_process': self._estimate_ma_process_fast,
            'ar_process': self._estimate_ar_process_fast,
            'arma_process': self._estimate_arma_process_fast,
            'garch_process': self._estimate_garch_process_fast,
            'ornstein_uhlenbeck': self._estimate_ornstein_uhlenbeck_fast,
            'geometric_brownian_motion': self._estimate_geometric_brownian_motion_fast,
            'levy_stable': self._estimate_levy_stable_fast
        }
    
    def extract_features(self, X_train: pd.DataFrame, max_series: Optional[int] = None) -> pd.DataFrame:
        """
        Fast feature extraction optimized for speed
        
        Parameters:
        -----------
        X_train : pd.DataFrame
            MultiIndex DataFrame with levels ['id', 'time'] and columns ['value', 'period']
        max_series : int, optional
            Limit number of series to process (for testing)
            
        Returns:
        --------
        pd.DataFrame
            Features with columns for full, period0, period1, and differences
        """
        
        #print("Starting optimized DGP feature extraction...")
        
        # Get unique IDs
        unique_ids = X_train.index.get_level_values('id').unique()
        if max_series:
            unique_ids = unique_ids[:max_series]
        
        #print(f"Processing {len(unique_ids)} time series with {len(self.fast_estimators)} estimators")
        
        # Pre-allocate results list
        all_features = []
        
        # Batch processing for efficiency
        for i, ts_id in enumerate(unique_ids):
            if i % 50 == 0:  # Less frequent updates
                #print(f"Processed {i}/{len(unique_ids)} series")
            
                try:
                    # Extract data efficiently
                    ts_data = X_train.loc[ts_id]
                    
                    # Vectorized operations
                    values = ts_data['value'].values
                    periods = ts_data['period'].values
                    
                    # Split series efficiently
                    mask_p0 = periods == 0
                    mask_p1 = periods == 1
                    
                    full_series = values
                    period0_series = values[mask_p0] if mask_p0.any() else np.array([])
                    period1_series = values[mask_p1] if mask_p1.any() else np.array([])
                    
                    # Extract features
                    features = self._extract_features_fast(ts_id, full_series, period0_series, period1_series)
                    all_features.append(features)
                    
                except Exception as e:
                    print(f"Error processing {ts_id}: {str(e)}")
                    all_features.append({'id': ts_id, 'error': str(e)})
        
        # Convert to DataFrame efficiently
        features_df = pd.DataFrame(all_features)
        if 'id' in features_df.columns:
            features_df.set_index('id', inplace=True)
        
        #print(f"Feature extraction completed in optimized mode. Shape: {features_df.shape}")
        return features_df
    
    def _extract_features_fast(self, ts_id: str, full_series: np.ndarray, 
                              period0_series: np.ndarray, period1_series: np.ndarray) -> Dict[str, float]:
        """
        Fast feature extraction using vectorized operations
        """
        
        features = {'id': ts_id}
        
        # Minimum length check
        min_len = 5
        series_dict = {
            'full': full_series if len(full_series) >= min_len else np.array([]),
            'period0': period0_series if len(period0_series) >= min_len else np.array([]),
            'period1': period1_series if len(period1_series) >= min_len else np.array([])
        }
        
        # Storage for differences
        p0_params = {}
        p1_params = {}
        
        # Apply fast estimators
        for prefix, series in series_dict.items():
            if len(series) == 0:
                continue
                
            for est_name, est_func in self.fast_estimators.items():
                try:
                    params = est_func(series)
                    
                    # Flatten and store with prefix
                    for param_name, param_value in params.items():
                        if isinstance(param_value, (int, float, np.number)) and np.isfinite(param_value):
                            col_name = f"{prefix}_{est_name}_{param_name}"
                            features[col_name] = float(param_value)
                    
                    # Store for difference calculation
                    if prefix == 'period0':
                        p0_params[est_name] = params
                    elif prefix == 'period1':
                        p1_params[est_name] = params
                        
                except:
                    continue  # Skip failed estimations silently for speed
        
        # Compute differences efficiently
        if p0_params and p1_params:
            diff_features = self._compute_differences_fast(p0_params, p1_params)
            features.update(diff_features)
        
        return features
    
    def _compute_differences_fast(self, p0_params: Dict, p1_params: Dict) -> Dict[str, float]:
        """
        Fast difference computation
        """
        diff_features = {}
        
        for est_name in set(p0_params.keys()) & set(p1_params.keys()):
            p0 = p0_params[est_name]
            p1 = p1_params[est_name]
            
            for param_name in set(p0.keys()) & set(p1.keys()):
                try:
                    val0 = p0[param_name]
                    val1 = p1[param_name]
                    
                    if isinstance(val0, (int, float, np.number)) and isinstance(val1, (int, float, np.number)):
                        if np.isfinite(val0) and np.isfinite(val1):
                            # Absolute difference
                            diff_features[f"diff_{est_name}_{param_name}_abs"] = float(val1 - val0)
                            
                            # Relative difference (safe division)
                            if abs(val0) > 1e-8:
                                diff_features[f"diff_{est_name}_{param_name}_rel"] = float((val1 - val0) / val0)
                except:
                    continue
                    
        return diff_features
    
    # ==================== FAST ESTIMATORS ====================
    
    def _estimate_gaussian_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast Gaussian parameter estimation"""
        return {
            'mean': np.mean(ts),
            'std': np.std(ts, ddof=1),
            'skew': self._skewness_fast(ts),
            'kurt': self._kurtosis_fast(ts)
        }
    
    def _estimate_ar1_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast AR(1) estimation using simple correlation"""
        if len(ts) < 3:
            return {'phi': 0.0, 'sigma': np.std(ts)}
        
        # Simple AR(1) using correlation
        x_lag = ts[:-1]
        x_curr = ts[1:]
        
        # Avoid correlation computation if no variance
        if np.std(x_lag) < 1e-10 or np.std(x_curr) < 1e-10:
            phi = 0.0
        else:
            phi = np.corrcoef(x_lag, x_curr)[0, 1]
            if not np.isfinite(phi):
                phi = 0.0
        
        # Residual variance
        residuals = x_curr - phi * x_lag
        sigma = np.std(residuals, ddof=1) if len(residuals) > 1 else np.std(ts)
        
        return {'phi': phi, 'sigma': sigma}
    
    def _estimate_volatility_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast volatility estimation"""
        if len(ts) < 3:
            return {'vol_mean': np.std(ts), 'vol_std': 0.0}
        
        # Rolling volatility (squared returns)
        returns_sq = np.diff(ts) ** 2
        
        return {
            'vol_mean': np.mean(returns_sq),
            'vol_std': np.std(returns_sq, ddof=1),
            'vol_persistence': np.corrcoef(returns_sq[:-1], returns_sq[1:])[0, 1] if len(returns_sq) > 2 else 0.0
        }
    
    def _estimate_trend_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast trend estimation"""
        n = len(ts)
        if n < 3:
            return {'trend': 0.0, 'trend_pval': 1.0}
        
        # Simple linear trend using least squares
        x = np.arange(n)
        
        # Vectorized least squares
        x_mean = np.mean(x)
        y_mean = np.mean(ts)
        
        numerator = np.sum((x - x_mean) * (ts - y_mean))
        denominator = np.sum((x - x_mean) ** 2)
        
        if denominator < 1e-10:
            return {'trend': 0.0, 'trend_rsq': 0.0}
        
        slope = numerator / denominator
        
        # R-squared
        y_pred = slope * (x - x_mean) + y_mean
        ss_res = np.sum((ts - y_pred) ** 2)
        ss_tot = np.sum((ts - y_mean) ** 2)
        r_squared = 1 - (ss_res / ss_tot) if ss_tot > 1e-10 else 0.0
        
        return {'trend': slope, 'trend_rsq': r_squared}
    
    def _estimate_spectral_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast spectral analysis using FFT"""
        n = len(ts)
        if n < 8:
            return {'dom_freq': 0.0, 'spectral_entropy': 0.0}
        
        # Detrend
        ts_detrend = ts - np.linspace(ts[0], ts[-1], n)
        
        # FFT
        fft_vals = np.fft.fft(ts_detrend)
        power = np.abs(fft_vals[:n//2]) ** 2
        
        # Dominant frequency
        dom_idx = np.argmax(power[1:]) + 1  # Skip DC component
        dom_freq = dom_idx / n
        
        # Spectral entropy
        power_norm = power / np.sum(power)
        power_norm = power_norm[power_norm > 1e-10]  # Avoid log(0)
        spectral_entropy = -np.sum(power_norm * np.log(power_norm))
        
        return {'dom_freq': dom_freq, 'spectral_entropy': spectral_entropy}
    
    def _estimate_jumps_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast jump detection"""
        if len(ts) < 3:
            return {'jump_count': 0, 'jump_intensity': 0.0}
        
        # Simple jump detection using threshold
        returns = np.diff(ts)
        threshold = 3 * np.std(returns, ddof=1)
        
        jumps = np.abs(returns) > threshold
        jump_count = np.sum(jumps)
        jump_intensity = np.sum(np.abs(returns[jumps])) if jump_count > 0 else 0.0
        
        return {'jump_count': float(jump_count), 'jump_intensity': jump_intensity}
    
    def _estimate_autocorr_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast autocorrelation estimation"""
        n = len(ts)
        if n < 4:
            return {'acf_1': 0.0, 'acf_2': 0.0, 'acf_5': 0.0}
        
        # Center the series
        ts_centered = ts - np.mean(ts)
        
        # Compute autocorrelations for lags 1, 2, 5
        autocorrs = {}
        for lag in [1, 2, min(5, n//2)]:
            if lag < n:
                corr = np.corrcoef(ts_centered[:-lag], ts_centered[lag:])[0, 1]
                autocorrs[f'acf_{lag}'] = corr if np.isfinite(corr) else 0.0
            else:
                autocorrs[f'acf_{lag}'] = 0.0
        
        return autocorrs
    
    def _estimate_distribution_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast distributional properties"""
        return {
            'range': np.ptp(ts),  # max - min
            'iqr': np.percentile(ts, 75) - np.percentile(ts, 25),
            'q95_q5': np.percentile(ts, 95) - np.percentile(ts, 5),
            'median_abs_dev': np.median(np.abs(ts - np.median(ts)))
        }
    
    # ==================== NEW FAST ESTIMATORS ====================
    
    def _estimate_ma_process_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast MA process estimation using autocorrelation"""
        if len(ts) < 5:
            return {'ma_coef': 0.0, 'ma_resid_std': np.std(ts)}
        
        # Use first autocorrelation to estimate MA(1) coefficient
        acf_1 = np.corrcoef(ts[:-1], ts[1:])[0, 1]
        
        # Solve for theta in MA(1): ρ(1) = θ/(1+θ²)
        # This gives a quadratic equation: θ²ρ(1) - θ + ρ(1) = 0
        if abs(acf_1) < 0.5:
            # Use the invertible solution
            theta = (1 - np.sqrt(1 - 4 * acf_1**2)) / (2 * acf_1) if abs(acf_1) > 1e-10 else 0.0
        else:
            theta = 0.0  # Not invertible, set to 0
            
        return {'ma_coef': theta, 'ma_resid_std': np.std(ts)}
    
    def _estimate_ar_process_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast AR process estimation (beyond AR1) using Yule-Walker"""
        n = len(ts)
        if n < 6:
            return {'ar2_coef1': 0.0, 'ar2_coef2': 0.0, 'ar_resid_std': np.std(ts)}
        
        # Estimate AR(2) parameters using Yule-Walker equations
        acf_0 = 1.0
        acf_1 = np.corrcoef(ts[:-1], ts[1:])[0, 1]
        acf_2 = np.corrcoef(ts[:-2], ts[2:])[0, 1] if n > 5 else 0
        
        # Solve Yule-Walker equations for AR(2)
        denominator = 1 - acf_1**2
        if denominator > 1e-10:
            phi1 = acf_1 * (1 - acf_2) / denominator
            phi2 = (acf_2 - acf_1**2) / denominator
        else:
            phi1, phi2 = 0.0, 0.0
            
        return {'ar2_coef1': phi1, 'ar2_coef2': phi2, 'ar_resid_std': np.std(ts)}
    
    def _estimate_arma_process_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast ARMA(1,1) estimation using moments"""
        n = len(ts)
        if n < 6:
            return {'arma_phi': 0.0, 'arma_theta': 0.0, 'arma_resid_std': np.std(ts)}
        
        # Use first two autocorrelations to estimate ARMA(1,1)
        acf_1 = np.corrcoef(ts[:-1], ts[1:])[0, 1]
        acf_2 = np.corrcoef(ts[:-2], ts[2:])[0, 1] if n > 5 else 0
        
        # For ARMA(1,1): ρ₂ = φ * ρ₁
        if abs(acf_1) > 1e-10:
            phi = acf_2 / acf_1 if abs(acf_1) > abs(acf_2) else 0.0
            # Solve for theta using ρ₁ = (1+φθ)(φ+θ)/(1+2φθ+θ²)
            # We'll use a simple approximation
            theta = (acf_1 - phi) / (1 - phi * acf_1) if abs(phi) < 0.99 else 0.0
        else:
            phi, theta = 0.0, 0.0
            
        return {'arma_phi': phi, 'arma_theta': theta, 'arma_resid_std': np.std(ts)}
    
    def _estimate_garch_process_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast GARCH(1,1) estimation using moments"""
        if len(ts) < 10:
            return {'garch_alpha': 0.0, 'garch_beta': 0.0, 'garch_omega': np.var(ts)}
        
        returns = np.diff(ts)
        squared_returns = returns ** 2
        
        # Estimate parameters using moment matching
        mean_sq = np.mean(squared_returns)
        var_sq = np.var(squared_returns)
        
        # Simplified GARCH(1,1) estimation
        if var_sq > 1e-10:
            # Use the relationship: E[ε²] = ω/(1-α-β)
            # and Var[ε²] = ω²(1 + (α+β)² + α²(κ-1)) / [(1-α-β)²(1-(α+β)²-α²(κ-1))]
            # For simplicity, we use a heuristic approach
            alpha = min(0.1, 0.9 * (var_sq / (mean_sq**2 + var_sq)))
            beta = min(0.85, 0.9 - alpha)
            omega = mean_sq * (1 - alpha - beta)
        else:
            alpha, beta, omega = 0.0, 0.0, mean_sq
            
        return {'garch_alpha': alpha, 'garch_beta': beta, 'garch_omega': omega}
    
    def _estimate_ornstein_uhlenbeck_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast Ornstein-Uhlenbeck process estimation"""
        if len(ts) < 5:
            return {'ou_theta': 0.0, 'ou_mu': np.mean(ts), 'ou_sigma': np.std(ts)}
        
        # Estimate using linear regression between Δx and x
        x_t = ts[:-1]
        dx = np.diff(ts)
        
        # Linear regression: dx = θ(μ - x)dt + σdW
        # Which can be written as: dx = (θμ)dt - θ x dt + σdW
        x_mean = np.mean(x_t)
        dx_mean = np.mean(dx)
        
        numerator = np.sum((x_t - x_mean) * (dx - dx_mean))
        denominator = np.sum((x_t - x_mean) ** 2)
        
        if denominator > 1e-10:
            theta = -numerator / denominator
            mu = (dx_mean + theta * x_mean) / theta if abs(theta) > 1e-10 else x_mean
        else:
            theta, mu = 0.0, x_mean
            
        # Estimate sigma from residuals
        residuals = dx - (theta * (mu - x_t))
        sigma = np.std(residuals, ddof=1) if len(residuals) > 1 else np.std(ts)
        
        return {'ou_theta': theta, 'ou_mu': mu, 'ou_sigma': sigma}
    
    def _estimate_geometric_brownian_motion_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast Geometric Brownian Motion estimation"""
        if len(ts) < 3:
            return {'gbm_mu': 0.0, 'gbm_sigma': np.std(ts)}
        
        # Calculate log returns
        log_returns = np.diff(np.log(ts + 1e-10))  # Add small constant to avoid log(0)
        
        # Estimate parameters
        mu = np.mean(log_returns) + 0.5 * np.var(log_returns)
        sigma = np.std(log_returns, ddof=1)
        
        return {'gbm_mu': mu, 'gbm_sigma': sigma}
    
    def _estimate_levy_stable_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast Lévy stable distribution estimation using quantile method"""
        if len(ts) < 10:
            return {'levy_alpha': 2.0, 'levy_beta': 0.0, 'levy_scale': np.std(ts)}
        
        # Use quantile method to estimate parameters
        q05, q25, q50, q75, q95 = np.percentile(ts, [5, 25, 50, 75, 95])
        
        # Estimate alpha (characteristic exponent)
        if abs(q95 - q05) > 1e-10:
            alpha_est = (q95 - q05) / (q75 - q25 + 1e-10)
            alpha = np.log2(np.log(alpha_est)) if alpha_est > 0 else 2.0  # Simplified approach
            alpha = max(0.5, min(2.0, alpha))  # Constrain to valid range
        else:
            alpha = 2.0
            
        # Estimate beta (skewness parameter)
        if abs(q95 - q50) > 1e-10 and abs(q50 - q05) > 1e-10:
            beta_est = (q95 - q50) / (q50 - q05) - 1
            beta = max(-1.0, min(1.0, beta_est))  # Constrain to valid range
        else:
            beta = 0.0
            
        # Estimate scale (similar to standard deviation for alpha=2)
        scale = (q75 - q25) / 1.349  # For normal distribution, IQR/1.349 is std
        
        return {'levy_alpha': alpha, 'levy_beta': beta, 'levy_scale': scale}
    
    # ==================== FAST STATISTICAL FUNCTIONS ====================
    
    def _skewness_fast(self, x: np.ndarray) -> float:
        """Fast skewness calculation"""
        n = len(x)
        if n < 3:
            return 0.0
            
        mean = np.mean(x)
        std = np.std(x, ddof=1)
        
        if std < 1e-10:
            return 0.0
            
        skew = np.mean(((x - mean) / std) ** 3)
        return skew if np.isfinite(skew) else 0.0
    
    def _kurtosis_fast(self, x: np.ndarray) -> float:
        """Fast kurtosis calculation"""
        n = len(x)
        if n < 4:
            return 0.0
            
        mean = np.mean(x)
        std = np.std(x, ddof=1)
        
        if std < 1e-10:
            return 0.0
            
        kurt = np.mean(((x - mean) / std) ** 4) - 3  # Excess kurtosis
        return kurt if np.isfinite(kurt) else 0.0



    def _estimate_white_noise_t_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast estimation of t-distributed white noise parameters"""
        if len(ts) < 10:
            return {'df': 2.0, 'mean': np.mean(ts), 'scale': np.std(ts, ddof=1)}
        
        # Use method of moments to estimate degrees of freedom
        kurt = self._kurtosis_fast(ts)
        if kurt > 0:
            df = 6 / kurt + 4
        else:
            df = 2.0  # Minimum reasonable value
            
        return {'df': max(2.0, min(df, 30.0)), 'mean': np.mean(ts), 'scale': np.std(ts, ddof=1)}

    def _estimate_threshold_ar_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast estimation of threshold autoregressive model parameters"""
        if len(ts) < 20:
            return {'threshold': 0.0, 'ar_low': 0.0, 'ar_high': 0.0}
        
        # Simple threshold estimation using median
        threshold = np.median(ts)
        
        # Split series based on threshold
        low_regime = ts[:-1][ts[:-1] <= threshold]
        high_regime = ts[:-1][ts[:-1] > threshold]
        
        # Estimate AR(1) for each regime
        ar_low = self._estimate_ar1_fast(np.concatenate([low_regime, ts[1:][ts[:-1] <= threshold]]))['phi']
        ar_high = self._estimate_ar1_fast(np.concatenate([high_regime, ts[1:][ts[:-1] > threshold]]))['phi']
        
        return {'threshold': threshold, 'ar_low': ar_low, 'ar_high': ar_high}

    def _estimate_sinusoidal_with_noise_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast estimation of sinusoidal process with noise"""
        if len(ts) < 10:
            return {'amplitude': 0.0, 'frequency': 0.0, 'phase': 0.0, 'noise_std': np.std(ts)}
        
        # Use FFT to estimate dominant frequency
        n = len(ts)
        fft_vals = np.fft.fft(ts - np.mean(ts))
        freqs = np.fft.fftfreq(n)
        
        # Find dominant frequency (excluding DC component)
        idx = np.argmax(np.abs(fft_vals[1:n//2])) + 1
        dominant_freq = abs(freqs[idx])
        
        # Estimate amplitude
        amplitude = 2 * np.abs(fft_vals[idx]) / n
        
        # Estimate phase
        phase = np.angle(fft_vals[idx])
        
        # Estimate noise standard deviation
        # Create sinusoidal approximation
        t = np.arange(n)
        sinusoid = amplitude * np.cos(2 * np.pi * dominant_freq * t + phase)
        residuals = ts - sinusoid
        noise_std = np.std(residuals, ddof=1)
        
        return {
            'amplitude': amplitude,
            'frequency': dominant_freq,
            'phase': phase,
            'noise_std': noise_std
        }

    def _estimate_logistic_growth_fast(self, ts: np.ndarray) -> Dict[str, float]:
        """Fast estimation of logistic growth parameters"""
        if len(ts) < 10:
            return {'carrying_capacity': np.max(ts), 'growth_rate': 0.0, 'midpoint': 0.0}
        
        # Simple estimation using first and last values
        initial = ts[0]
        final = ts[-1]
        
        # Estimate carrying capacity as maximum value
        carrying_capacity = np.max(ts)
        
        # Estimate growth rate using simplified approach
        if initial > 0 and final > initial:
            growth_rate = np.log(final / initial) / len(ts)
        else:
            growth_rate = 0.0
            
        # Estimate midpoint as time when growth is fastest
        diffs = np.diff(ts)
        midpoint_idx = np.argmax(diffs)
        midpoint = midpoint_idx / len(ts) if len(ts) > 0 else 0.0
        
        return {
            'carrying_capacity': carrying_capacity,
            'growth_rate': growth_rate,
            'midpoint': midpoint
        }

In [30]:
import pandas as pd
import numpy as np
import os
import joblib
import scipy.stats
import xgboost as xgb
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import roc_auc_score, accuracy_score, f1_score
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.ensemble import RandomForestClassifier
from scipy.spatial.distance import jensenshannon
from statsmodels.tsa.stattools import adfuller, kpss, coint
from statsmodels.tsa.ar_model import AutoReg
from statsmodels.stats.diagnostic import het_breuschpagan
from statsmodels.regression.linear_model import OLS
from tqdm import tqdm
import warnings
from typing import Iterable, Iterator
from numba import jit, prange
import multiprocessing as mp
from functools import partial
import zlib
import gzip

warnings.filterwarnings("ignore")

# Numba-optimized functions for speed
@jit(nopython=True)
def fast_autocorr(x, max_lag=5):
    """Fast autocorrelation calculation using numba"""
    n = len(x)
    if n < max_lag + 1:
        return np.zeros(max_lag)

    x = x - np.mean(x)
    autocorrs = np.zeros(max_lag)

    for lag in range(1, max_lag + 1):
        if lag < n:
            c0 = np.sum(x**2) / n
            c_lag = np.sum(x[:-lag] * x[lag:]) / n
            if c0 > 0:
                autocorrs[lag-1] = c_lag / c0

    return autocorrs

@jit(nopython=True)
def fast_rolling_std(x, window=10):
    """Fast rolling standard deviation"""
    n = len(x)
    if n < window:
        return np.std(x)

    stds = np.zeros(n - window + 1)
    for i in range(n - window + 1):
        stds[i] = np.std(x[i:i+window])

    return np.mean(stds)

def extract_dtw_features(values_before, values_after):
    """Extract Dynamic Time Warping distance features"""
    features = {}

    try:
        if len(values_before) > 0 and len(values_after) > 0:
            # Normalize sequences for DTW
            before_norm = (values_before - values_before.mean()) / (values_before.std() + 1e-10)
            after_norm = (values_after - values_after.mean()) / (values_after.std() + 1e-10)

            # DTW distance
            dtw_distance = dtw.distance(before_norm.values, after_norm.values)

            # Normalize by sequence length
            avg_length = (len(values_before) + len(values_after)) / 2
            features["dtw_distance"] = dtw_distance / avg_length

            # DTW distance with different window sizes
            for window in [0.1, 0.2, 0.5]:
                try:
                    window_size = int(window * avg_length)
                    if window_size > 0:
                        dtw_windowed = dtw.distance(before_norm.values, after_norm.values,
                                                  window=window_size)
                        features[f"dtw_window_{int(window*100)}"] = dtw_windowed / avg_length
                except:
                    features[f"dtw_window_{int(window*100)}"] = 0
        else:
            features["dtw_distance"] = 0
            for window in [10, 20, 50]:
                features[f"dtw_window_{window}"] = 0

    except Exception as e:
        print(f"DTW calculation failed: {e}")
        features["dtw_distance"] = 0
        for window in [10, 20, 50]:
            features[f"dtw_window_{window}"] = 0

    return features

def extract_compression_features(values_before, values_after):
    """Extract Compression-Based Dissimilarity (CBD) features"""
    features = {}

    try:
        # Convert to bytes for compression
        before_bytes = values_before.values.tobytes()
        after_bytes = values_after.values.tobytes()
        combined_bytes = np.concatenate([values_before.values, values_after.values]).tobytes()

        # Compression lengths using different algorithms
        for compressor, name in [(zlib.compress, 'zlib'), (gzip.compress, 'gzip')]:
            try:
                c_before = len(compressor(before_bytes))
                c_after = len(compressor(after_bytes))
                c_combined = len(compressor(combined_bytes))

                # Normalized Compression Distance (NCD)
                ncd = (c_combined - min(c_before, c_after)) / max(c_before, c_after)
                features[f"ncd_{name}"] = ncd

                # Compression ratios
                features[f"compression_ratio_before_{name}"] = c_before / len(before_bytes)
                features[f"compression_ratio_after_{name}"] = c_after / len(after_bytes)

            except Exception as e:
                features[f"ncd_{name}"] = 0
                features[f"compression_ratio_before_{name}"] = 1
                features[f"compression_ratio_after_{name}"] = 1

    except Exception as e:
        print(f"Compression features failed: {e}")
        for name in ['zlib', 'gzip']:
            features[f"ncd_{name}"] = 0
            features[f"compression_ratio_before_{name}"] = 1
            features[f"compression_ratio_after_{name}"] = 1

    return features

def extract_cointegration_features(values_before, values_after):
    """Extract cointegration features"""
    features = {}

    try:
        if len(values_before) > 10 and len(values_after) > 10:
            # Test for cointegration between the two periods
            # Pad shorter series to match lengths for cointegration test
            min_len = min(len(values_before), len(values_after))
            before_trimmed = values_before.iloc[:min_len]
            after_trimmed = values_after.iloc[:min_len]

            if min_len > 12:  # Need sufficient data for cointegration test
                # Engle-Granger cointegration test
                coint_stat, coint_pvalue, _ = coint(before_trimmed, after_trimmed)
                features["cointegration_stat"] = coint_stat
                features["cointegration_pvalue"] = coint_pvalue
                features["cointegration_evidence"] = 1 if coint_pvalue < 0.05 else 0
            else:
                features["cointegration_stat"] = 0
                features["cointegration_pvalue"] = 1
                features["cointegration_evidence"] = 0
        else:
            features["cointegration_stat"] = 0
            features["cointegration_pvalue"] = 1
            features["cointegration_evidence"] = 0

    except Exception as e:
        print(f"Cointegration test failed: {e}")
        features["cointegration_stat"] = 0
        features["cointegration_pvalue"] = 1
        features["cointegration_evidence"] = 0

    return features

def extract_chow_test_features(df):
    """Extract Chow test features for structural break detection"""
    features = {}

    try:
        values = df["value"].values
        time_idx = np.arange(len(values))

        if len(values) > 20:  # Need sufficient data
            # Find potential breakpoint (midpoint for now, but could be optimized)
            breakpoint = len(values) // 2

            # Fit models for full sample and subsamples
            # Full sample regression
            X_full = np.column_stack([np.ones(len(values)), time_idx])
            try:
                beta_full = np.linalg.lstsq(X_full, values, rcond=None)[0]
                residuals_full = values - X_full @ beta_full
                rss_full = np.sum(residuals_full**2)

                # First subsample
                X1 = np.column_stack([np.ones(breakpoint), time_idx[:breakpoint]])
                beta1 = np.linalg.lstsq(X1, values[:breakpoint], rcond=None)[0]
                residuals1 = values[:breakpoint] - X1 @ beta1
                rss1 = np.sum(residuals1**2)

                # Second subsample
                X2 = np.column_stack([np.ones(len(values) - breakpoint),
                                    time_idx[breakpoint:]])
                beta2 = np.linalg.lstsq(X2, values[breakpoint:], rcond=None)[0]
                residuals2 = values[breakpoint:] - X2 @ beta2
                rss2 = np.sum(residuals2**2)

                # Chow test statistic
                k = 2  # number of parameters
                n = len(values)
                chow_stat = ((rss_full - (rss1 + rss2)) / k) / ((rss1 + rss2) / (n - 2*k))

                features["chow_statistic"] = chow_stat
                features["chow_pvalue"] = 1 - scipy.stats.f.cdf(chow_stat, k, n - 2*k)

                # Parameter differences
                features["slope_diff"] = abs(beta1[1] - beta2[1]) if len(beta1) > 1 and len(beta2) > 1 else 0
                features["intercept_diff"] = abs(beta1[0] - beta2[0])

            except np.linalg.LinAlgError:
                features["chow_statistic"] = 0
                features["chow_pvalue"] = 1
                features["slope_diff"] = 0
                features["intercept_diff"] = 0
        else:
            features["chow_statistic"] = 0
            features["chow_pvalue"] = 1
            features["slope_diff"] = 0
            features["intercept_diff"] = 0

    except Exception as e:
        print(f"Chow test failed: {e}")
        features["chow_statistic"] = 0
        features["chow_pvalue"] = 1
        features["slope_diff"] = 0
        features["intercept_diff"] = 0

    return features

def extract_likelihood_ratio_features(df):
    """Extract likelihood ratio test features"""
    features = {}

    try:
        values_before = df["value"][df["period"] == 0]
        values_after = df["value"][df["period"] == 1]

        if len(values_before) > 5 and len(values_after) > 5:
            # Likelihood ratio for normal distributions
            # H0: same parameters, H1: different parameters

            # Combined sample
            all_values = np.concatenate([values_before, values_after])
            mu_combined = np.mean(all_values)
            sigma_combined = np.std(all_values, ddof=1)

            # Separate samples
            mu_before = np.mean(values_before)
            sigma_before = np.std(values_before, ddof=1)
            mu_after = np.mean(values_after)
            sigma_after = np.std(values_after, ddof=1)

            # Log-likelihoods
            n_before = len(values_before)
            n_after = len(values_after)
            n_total = n_before + n_after

            # Combined model likelihood
            if sigma_combined > 0:
                ll_combined = -n_total/2 * np.log(2*np.pi) - n_total * np.log(sigma_combined) - \
                             np.sum((all_values - mu_combined)**2) / (2 * sigma_combined**2)
            else:
                ll_combined = 0

            # Separate models likelihood
            ll_separate = 0
            if sigma_before > 0:
                ll_separate += -n_before/2 * np.log(2*np.pi) - n_before * np.log(sigma_before) - \
                              np.sum((values_before - mu_before)**2) / (2 * sigma_before**2)
            if sigma_after > 0:
                ll_separate += -n_after/2 * np.log(2*np.pi) - n_after * np.log(sigma_after) - \
                              np.sum((values_after - mu_after)**2) / (2 * sigma_after**2)

            # Likelihood ratio statistic
            lr_stat = -2 * (ll_combined - ll_separate)
            features["lr_statistic"] = lr_stat
            features["lr_pvalue"] = 1 - scipy.stats.chi2.cdf(lr_stat, df=2)  # 2 df for mean and variance

        else:
            features["lr_statistic"] = 0
            features["lr_pvalue"] = 1

    except Exception as e:
        print(f"Likelihood ratio test failed: {e}")
        features["lr_statistic"] = 0
        features["lr_pvalue"] = 1

    return features
def extract_difference_features(before_stats, after_stats):
    """
    Calculate differences and ratios between before and after statistics
    """
    diff_features = {}

    # Calculate differences
    for key in before_stats:
        # Absolute difference
        diff_features[f"{key}_diff"] = after_stats[key] - before_stats[key]

        # Percentage change (avoid division by zero)
        if abs(before_stats[key]) > 1e-10:
            diff_features[f"{key}_pct_change"] = (after_stats[key] - before_stats[key]) / abs(before_stats[key])
        else:
            diff_features[f"{key}_pct_change"] = 0

        # Absolute ratio (use log for better scaling)
        if abs(before_stats[key]) > 1e-10 and abs(after_stats[key]) > 1e-10:
            ratio = after_stats[key] / before_stats[key]
            if ratio > 0:  # Ensure log is defined
                diff_features[f"{key}_log_ratio"] = np.log(ratio)
            else:
                diff_features[f"{key}_log_ratio"] = 0
        else:
            diff_features[f"{key}_log_ratio"] = 0

    return diff_features
def extract_enhanced_statistical_features(u: pd.DataFrame):
    """Enhanced version of statistical test features with optimizations"""
    features = {}

    values_before = u["value"][u["period"] == 0]
    values_after = u["value"][u["period"] == 1]

    if values_before.empty or values_after.empty:
        return {f: 0.0 for f in [
            "t_test", "ks_test", "mann_whitney", "anderson_darling",
            "wasserstein", "energy_distance", "levene_test", "chi_square",
            "anova", "jensen_shannon"
        ]}

    # Vectorized statistical tests
    before_vals = values_before.values
    after_vals = values_after.values

    # T-test (optimized)
    try:
        t_stat, t_pvalue = scipy.stats.ttest_ind(before_vals, after_vals, equal_var=False)
        features["t_test"] = -np.log10(max(t_pvalue, 1e-10))
    except:
        features["t_test"] = 0.0

    # Mann-Whitney U test (more robust than t-test)
    try:
        mw_stat, mw_pvalue = scipy.stats.mannwhitneyu(before_vals, after_vals, alternative='two-sided')
        features["mann_whitney"] = -np.log10(max(mw_pvalue, 1e-10))
    except:
        features["mann_whitney"] = 0.0

    # KS-test
    try:
        ks_stat, ks_pvalue = scipy.stats.ks_2samp(before_vals, after_vals)
        features["ks_test"] = -np.log10(max(ks_pvalue, 1e-10))
    except:
        features["ks_test"] = 0.0

    # Wasserstein distance (fast implementation)
    try:
        wasserstein = scipy.stats.wasserstein_distance(before_vals, after_vals)
        data_range = max(u["value"].max() - u["value"].min(), 1e-10)
        features["wasserstein"] = wasserstein / data_range
    except:
        features["wasserstein"] = 0.0

    # Energy distance
    try:
        energy_dist = scipy.stats.energy_distance(before_vals, after_vals)
        data_range = max(u["value"].max() - u["value"].min(), 1e-10)
        features["energy_distance"] = energy_dist / data_range
    except:
        features["energy_distance"] = 0.0

    # Levene's test
    try:
        levene_stat, levene_pvalue = scipy.stats.levene(before_vals, after_vals)
        features["levene_test"] = -np.log10(max(levene_pvalue, 1e-10))
    except:
        features["levene_test"] = 0.0

    # ANOVA
    try:
        f_stat, anova_pvalue = scipy.stats.f_oneway(before_vals, after_vals)
        features["anova"] = -np.log10(max(anova_pvalue, 1e-10))
    except:
        features["anova"] = 0.0

    # Additional robust tests
    # Mood's median test
    try:
        mood_stat, mood_pvalue = scipy.stats.median_test(before_vals, after_vals)
        features["mood_median"] = -np.log10(max(mood_pvalue, 1e-10))
    except:
        features["mood_median"] = 0.0

    # Ansari-Bradley test for scale differences
    try:
        ab_stat, ab_pvalue = scipy.stats.ansari(before_vals, after_vals)
        features["ansari_bradley"] = -np.log10(max(ab_pvalue, 1e-10))
    except:
        features["ansari_bradley"] = 0.0

    return features

def extract_optimized_descriptive_statistics(values):
    """Optimized descriptive statistics extraction"""
    if len(values) == 0:
        return {f: 0 for f in [
            "mean", "median", "std", "min", "max", "iqr", "skew",
            "kurtosis", "range", "entropy", "cv", "mad"
        ]}

    vals = values.values if hasattr(values, 'values') else values

    stats = {}
    stats["mean"] = np.mean(vals)
    stats["median"] = np.median(vals)
    stats["std"] = np.std(vals, ddof=1) if len(vals) > 1 else 0
    stats["min"] = np.min(vals)
    stats["max"] = np.max(vals)

    # Quantiles (vectorized)
    q25, q75 = np.percentile(vals, [25, 75])
    stats["iqr"] = q75 - q25
    stats["range"] = stats["max"] - stats["min"]

    # Higher order moments
    if len(vals) > 2:
        stats["skew"] = scipy.stats.skew(vals)
        stats["kurtosis"] = scipy.stats.kurtosis(vals)
    else:
        stats["skew"] = 0
        stats["kurtosis"] = 0

    # Additional robust statistics
    stats["cv"] = stats["std"] / max(abs(stats["mean"]), 1e-10)  # Coefficient of variation
    stats["mad"] = np.median(np.abs(vals - stats["median"]))  # Median absolute deviation

    return stats

def extract_optimized_time_series_features(values):
    """Optimized time series features using numba acceleration"""
    if len(values) < 10:
        return {"autocorr_lag1": 0, "autocorr_lag2": 0, "autocorr_lag3": 0,
                "trend_slope": 0, "volatility": 0, "hurst_exponent": 0}

    vals = values.values if hasattr(values, 'values') else np.array(values)
    features = {}

    # Fast autocorrelations
    autocorrs = fast_autocorr(vals, max_lag=5)
    for i, lag in enumerate([1, 2, 3, 4, 5]):
        if i < len(autocorrs):
            features[f"autocorr_lag{lag}"] = autocorrs[i]
        else:
            features[f"autocorr_lag{lag}"] = 0

    # Linear trend (optimized)
    x = np.arange(len(vals), dtype=np.float64)
    try:
        slope = np.polyfit(x, vals, 1)[0]
        features["trend_slope"] = slope
    except:
        features["trend_slope"] = 0

    # Volatility measures
    if len(vals) > 1:
        returns = np.diff(vals) / (vals[:-1] + 1e-10)
        features["volatility"] = np.std(returns)
        features["rolling_volatility"] = fast_rolling_std(returns, window=min(10, len(returns)//2))
    else:
        features["volatility"] = 0
        features["rolling_volatility"] = 0

    # Hurst exponent (simplified version)
    try:
        if len(vals) > 20:
            lags = np.arange(2, min(20, len(vals)//4))
            tau = [np.sqrt(np.std(np.subtract(vals[lag:], vals[:-lag]))) for lag in lags]
            poly_coef = np.polyfit(np.log(lags), np.log(tau), 1)
            features["hurst_exponent"] = poly_coef[0]
        else:
            features["hurst_exponent"] = 0.5
    except:
        features["hurst_exponent"] = 0.5

    return features
import numpy as np
import scipy.signal as signal
from scipy.fft import fft, fftfreq, ifft
from scipy.spatial.distance import euclidean
from scipy.stats import wasserstein_distance
import pywt


def extract_fourier_spectrum_features(values_before, values_after):
    """Extract Fourier Spectrum Distance features"""
    features = {}
    
    try:
        # Ensure equal length for comparison by padding with zeros
        max_len = max(len(values_before), len(values_after))
        before_padded = np.pad(values_before.values, (0, max_len - len(values_before)), mode='constant')
        after_padded = np.pad(values_after.values, (0, max_len - len(values_after)), mode='constant')
        
        # Compute FFT for both segments
        fft_before = fft(before_padded)
        fft_after = fft(after_padded)
        
        # Power spectra (magnitude squared)
        power_before = np.abs(fft_before) ** 2
        power_after = np.abs(fft_after) ** 2
        
        # Normalize power spectra
        power_before_norm = power_before / np.sum(power_before)
        power_after_norm = power_after / np.sum(power_after)
        
        # L2 norm distance between power spectra
        features["power_spectrum_l2_distance"] = np.linalg.norm(power_before_norm - power_after_norm)
        
        # Earth Mover's Distance (Wasserstein distance)
        try:
            features["power_spectrum_emd"] = wasserstein_distance(
                range(len(power_before_norm)), range(len(power_after_norm)),
                power_before_norm, power_after_norm
            )
        except:
            features["power_spectrum_emd"] = 0
            
        # Spectral centroid difference
        freqs = np.arange(len(power_before_norm))
        centroid_before = np.sum(freqs * power_before_norm) / np.sum(power_before_norm)
        centroid_after = np.sum(freqs * power_after_norm) / np.sum(power_after_norm)
        features["spectral_centroid_diff"] = abs(centroid_after - centroid_before)
        
    except Exception as e:
        print(f"Fourier spectrum features failed: {e}")
        features["power_spectrum_l2_distance"] = 0
        features["power_spectrum_emd"] = 0
        features["spectral_centroid_diff"] = 0
    
    return features


def extract_dominant_frequency_features(values_before, values_after):
    """Extract Dominant Frequency Shift features"""
    features = {}
    
    try:
        # Sampling frequency (assumed to be 1 for simplicity)
        fs = 1.0
        
        # Compute FFT for both segments
        fft_before = fft(values_before.values)
        fft_after = fft(values_after.values)
        
        # Get frequency arrays
        freqs_before = fftfreq(len(values_before), 1/fs)
        freqs_after = fftfreq(len(values_after), 1/fs)
        
        # Find dominant frequencies (max amplitude)
        dominant_idx_before = np.argmax(np.abs(fft_before[:len(fft_before)//2]))
        dominant_idx_after = np.argmax(np.abs(fft_after[:len(fft_after)//2]))
        
        dominant_freq_before = abs(freqs_before[dominant_idx_before])
        dominant_freq_after = abs(freqs_after[dominant_idx_after])
        
        # Dominant frequency shift
        features["dominant_frequency_shift"] = abs(dominant_freq_after - dominant_freq_before)
        features["dominant_frequency_ratio"] = dominant_freq_after / max(dominant_freq_before, 1e-10)
        
        # Amplitude of dominant frequency
        features["dominant_amplitude_before"] = np.abs(fft_before[dominant_idx_before])
        features["dominant_amplitude_after"] = np.abs(fft_after[dominant_idx_after])
        features["dominant_amplitude_ratio"] = features["dominant_amplitude_after"] / max(features["dominant_amplitude_before"], 1e-10)
        
    except Exception as e:
        print(f"Dominant frequency features failed: {e}")
        features["dominant_frequency_shift"] = 0
        features["dominant_frequency_ratio"] = 1
        features["dominant_amplitude_before"] = 0
        features["dominant_amplitude_after"] = 0
        features["dominant_amplitude_ratio"] = 1
    
    return features


def extract_band_energy_features(values_before, values_after):
    """Extract Frequency Band Energy features"""
    features = {}
    
    try:
        # Define frequency bands (normalized frequencies)
        bands = {
            'low': (0, 0.1),
            'mid': (0.1, 0.3),
            'high': (0.3, 0.5)
        }
        
        for segment_name, values in [('before', values_before), ('after', values_after)]:
            # Compute power spectral density
            freqs, psd = signal.welch(values.values, nperseg=min(len(values), 256))
            
            # Calculate energy in each band
            for band_name, (low_freq, high_freq) in bands.items():
                mask = (freqs >= low_freq) & (freqs <= high_freq)
                band_energy = np.sum(psd[mask])
                features[f"{segment_name}_energy_{band_name}"] = band_energy
        
        # Calculate ratios and differences
        total_energy_before = sum(features[f"before_energy_{band}"] for band in bands.keys())
        total_energy_after = sum(features[f"after_energy_{band}"] for band in bands.keys())
        
        for band_name in bands.keys():
            # Energy ratios within each period
            features[f"before_{band_name}_energy_ratio"] = features[f"before_energy_{band_name}"] / max(total_energy_before, 1e-10)
            features[f"after_{band_name}_energy_ratio"] = features[f"after_energy_{band_name}"] / max(total_energy_after, 1e-10)
            
            # Cross-period comparisons
            features[f"{band_name}_energy_change"] = features[f"after_energy_{band_name}"] - features[f"before_energy_{band_name}"]
            features[f"{band_name}_energy_ratio"] = features[f"after_energy_{band_name}"] / max(features[f"before_energy_{band_name}"], 1e-10)
        
        # Low/High frequency ratio change
        features["low_high_ratio_before"] = features["before_energy_low"] / max(features["before_energy_high"], 1e-10)
        features["low_high_ratio_after"] = features["after_energy_low"] / max(features["after_energy_high"], 1e-10)
        features["low_high_ratio_change"] = features["low_high_ratio_after"] - features["low_high_ratio_before"]
        
    except Exception as e:
        print(f"Band energy features failed: {e}")
        for band in ['low', 'mid', 'high']:
            for period in ['before', 'after']:
                features[f"{period}_energy_{band}"] = 0
                features[f"{period}_{band}_energy_ratio"] = 0
            features[f"{band}_energy_change"] = 0
            features[f"{band}_energy_ratio"] = 1
        features["low_high_ratio_before"] = 1
        features["low_high_ratio_after"] = 1
        features["low_high_ratio_change"] = 0
    
    return features


def extract_wavelet_energy_features(values_before, values_after):
    """Extract Wavelet Energy Shift features"""
    features = {}
    
    try:
        wavelet = 'db4'  # Daubechies wavelet
        max_level = 5
        
        for segment_name, values in [('before', values_before), ('after', values_after)]:
            # Perform wavelet decomposition
            coeffs = pywt.wavedec(values.values, wavelet, level=max_level)
            
            # Calculate energy at each level
            for i, coeff in enumerate(coeffs):
                energy = np.sum(coeff ** 2)
                if i == 0:
                    features[f"{segment_name}_wavelet_approx_energy"] = energy
                else:
                    features[f"{segment_name}_wavelet_detail_{i}_energy"] = energy
        
        # Calculate energy differences and ratios
        features["wavelet_approx_energy_diff"] = features["after_wavelet_approx_energy"] - features["before_wavelet_approx_energy"]
        features["wavelet_approx_energy_ratio"] = features["after_wavelet_approx_energy"] / max(features["before_wavelet_approx_energy"], 1e-10)
        
        for i in range(1, max_level + 1):
            before_key = f"before_wavelet_detail_{i}_energy"
            after_key = f"after_wavelet_detail_{i}_energy"
            
            if before_key in features and after_key in features:
                features[f"wavelet_detail_{i}_energy_diff"] = features[after_key] - features[before_key]
                features[f"wavelet_detail_{i}_energy_ratio"] = features[after_key] / max(features[before_key], 1e-10)
        
        # Total detail energy comparison
        total_detail_before = sum(features.get(f"before_wavelet_detail_{i}_energy", 0) for i in range(1, max_level + 1))
        total_detail_after = sum(features.get(f"after_wavelet_detail_{i}_energy", 0) for i in range(1, max_level + 1))
        
        features["total_detail_energy_diff"] = total_detail_after - total_detail_before
        features["total_detail_energy_ratio"] = total_detail_after / max(total_detail_before, 1e-10)
        
    except Exception as e:
        print(f"Wavelet energy features failed: {e}")
        features["wavelet_approx_energy_diff"] = 0
        features["wavelet_approx_energy_ratio"] = 1
        features["total_detail_energy_diff"] = 0
        features["total_detail_energy_ratio"] = 1
        for i in range(1, 6):
            features[f"before_wavelet_detail_{i}_energy"] = 0
            features[f"after_wavelet_detail_{i}_energy"] = 0
            features[f"wavelet_detail_{i}_energy_diff"] = 0
            features[f"wavelet_detail_{i}_energy_ratio"] = 1
    
    return features


def extract_spectrogram_difference_features(values_before, values_after):
    """Extract Time-Frequency Spectrogram features"""
    features = {}
    
    try:
        # Parameters for STFT
        nperseg = min(32, len(values_before) // 4, len(values_after) // 4)
        nperseg = max(nperseg, 8)  # Minimum window size
        
        # Compute spectrograms
        f_before, t_before, Sxx_before = signal.spectrogram(values_before.values, nperseg=nperseg)
        f_after, t_after, Sxx_after = signal.spectrogram(values_after.values, nperseg=nperseg)
        
        # Normalize spectrograms
        Sxx_before_norm = Sxx_before / (np.sum(Sxx_before) + 1e-10)
        Sxx_after_norm = Sxx_after / (np.sum(Sxx_after) + 1e-10)
        
        # Resize to same dimensions for comparison
        target_shape = (min(Sxx_before.shape[0], Sxx_after.shape[0]), 
                       min(Sxx_before.shape[1], Sxx_after.shape[1]))
        
        if target_shape[0] > 0 and target_shape[1] > 0:
            Sxx_before_resized = Sxx_before_norm[:target_shape[0], :target_shape[1]]
            Sxx_after_resized = Sxx_after_norm[:target_shape[0], :target_shape[1]]
            
            # Mean absolute difference
            features["spectrogram_mean_diff"] = np.mean(np.abs(Sxx_after_resized - Sxx_before_resized))
            
            # Frobenius norm difference
            features["spectrogram_frobenius_diff"] = np.linalg.norm(Sxx_after_resized - Sxx_before_resized, 'fro')
            
            # Energy distribution differences
            features["spectrogram_energy_before"] = np.sum(Sxx_before_resized)
            features["spectrogram_energy_after"] = np.sum(Sxx_after_resized)
            features["spectrogram_energy_ratio"] = features["spectrogram_energy_after"] / max(features["spectrogram_energy_before"], 1e-10)
        else:
            features["spectrogram_mean_diff"] = 0
            features["spectrogram_frobenius_diff"] = 0
            features["spectrogram_energy_before"] = 0
            features["spectrogram_energy_after"] = 0
            features["spectrogram_energy_ratio"] = 1
        
    except Exception as e:
        print(f"Spectrogram features failed: {e}")
        features["spectrogram_mean_diff"] = 0
        features["spectrogram_frobenius_diff"] = 0
        features["spectrogram_energy_before"] = 0
        features["spectrogram_energy_after"] = 0
        features["spectrogram_energy_ratio"] = 1
    
    return features


def extract_phase_spectrum_features(values_before, values_after):
    """Extract Phase Spectrum Discrepancy features"""
    features = {}
    
    try:
        # Compute FFT for both segments
        fft_before = fft(values_before.values)
        fft_after = fft(values_after.values)
        
        # Extract phase angles
        phase_before = np.angle(fft_before)
        phase_after = np.angle(fft_after)
        
        # Handle different lengths by taking minimum
        min_len = min(len(phase_before), len(phase_after))
        phase_before = phase_before[:min_len]
        phase_after = phase_after[:min_len]
        
        # Phase differences
        phase_diff = phase_after - phase_before
        
        # Unwrap phase differences to handle 2π discontinuities
        phase_diff_unwrapped = np.unwrap(phase_diff)
        
        # Mean phase difference
        features["phase_mean_diff"] = np.mean(phase_diff_unwrapped)
        
        # Variance of phase differences
        features["phase_variance_diff"] = np.var(phase_diff_unwrapped)
        
        # Standard deviation of phase differences
        features["phase_std_diff"] = np.std(phase_diff_unwrapped)
        
        # Mean absolute phase difference
        features["phase_mean_abs_diff"] = np.mean(np.abs(phase_diff_unwrapped))
        
        # Phase coherence (circular correlation)
        try:
            phase_coherence = np.abs(np.mean(np.exp(1j * phase_diff)))
            features["phase_coherence"] = phase_coherence
        except:
            features["phase_coherence"] = 0
        
    except Exception as e:
        print(f"Phase spectrum features failed: {e}")
        features["phase_mean_diff"] = 0
        features["phase_variance_diff"] = 0
        features["phase_std_diff"] = 0
        features["phase_mean_abs_diff"] = 0
        features["phase_coherence"] = 0
    
    return features


def extract_hilbert_envelope_features(values_before, values_after):
    """Extract Hilbert Envelope features"""
    features = {}
    
    try:
        # Compute analytic signals using Hilbert transform
        analytic_before = signal.hilbert(values_before.values)
        analytic_after = signal.hilbert(values_after.values)
        
        # Extract envelopes (amplitude)
        envelope_before = np.abs(analytic_before)
        envelope_after = np.abs(analytic_after)
        
        # Mean envelope values
        features["envelope_mean_before"] = np.mean(envelope_before)
        features["envelope_mean_after"] = np.mean(envelope_after)
        features["envelope_mean_diff"] = features["envelope_mean_after"] - features["envelope_mean_before"]
        features["envelope_mean_ratio"] = features["envelope_mean_after"] / max(features["envelope_mean_before"], 1e-10)
        
        # Envelope variance
        features["envelope_var_before"] = np.var(envelope_before)
        features["envelope_var_after"] = np.var(envelope_after)
        features["envelope_var_diff"] = features["envelope_var_after"] - features["envelope_var_before"]
        features["envelope_var_ratio"] = features["envelope_var_after"] / max(features["envelope_var_before"], 1e-10)
        
        # Mean absolute difference between envelopes
        min_len = min(len(envelope_before), len(envelope_after))
        envelope_before_trunc = envelope_before[:min_len]
        envelope_after_trunc = envelope_after[:min_len]
        
        features["envelope_mean_abs_diff"] = np.mean(np.abs(envelope_after_trunc - envelope_before_trunc))
        
        # Envelope shape similarity (correlation)
        try:
            correlation = np.corrcoef(envelope_before_trunc, envelope_after_trunc)[0, 1]
            features["envelope_correlation"] = correlation if not np.isnan(correlation) else 0
        except:
            features["envelope_correlation"] = 0
        
    except Exception as e:
        print(f"Hilbert envelope features failed: {e}")
        features["envelope_mean_before"] = 0
        features["envelope_mean_after"] = 0
        features["envelope_mean_diff"] = 0
        features["envelope_mean_ratio"] = 1
        features["envelope_var_before"] = 0
        features["envelope_var_after"] = 0
        features["envelope_var_diff"] = 0
        features["envelope_var_ratio"] = 1
        features["envelope_mean_abs_diff"] = 0
        features["envelope_correlation"] = 0
    
    return features


def extract_instantaneous_frequency_features(values_before, values_after):
    """Extract Instantaneous Frequency features"""
    features = {}
    
    try:
        # Compute analytic signals
        analytic_before = signal.hilbert(values_before.values)
        analytic_after = signal.hilbert(values_after.values)
        
        # Compute instantaneous phase
        inst_phase_before = np.unwrap(np.angle(analytic_before))
        inst_phase_after = np.unwrap(np.angle(analytic_after))
        
        # Compute instantaneous frequency (derivative of phase)
        inst_freq_before = np.diff(inst_phase_before) / (2 * np.pi)
        inst_freq_after = np.diff(inst_phase_after) / (2 * np.pi)
        
        # Mean instantaneous frequency
        features["inst_freq_mean_before"] = np.mean(inst_freq_before)
        features["inst_freq_mean_after"] = np.mean(inst_freq_after)
        features["inst_freq_mean_diff"] = features["inst_freq_mean_after"] - features["inst_freq_mean_before"]
        
        # Instantaneous frequency variance
        features["inst_freq_var_before"] = np.var(inst_freq_before)
        features["inst_freq_var_after"] = np.var(inst_freq_after)
        features["inst_freq_var_diff"] = features["inst_freq_var_after"] - features["inst_freq_var_before"]
        
        # Instantaneous frequency range
        features["inst_freq_range_before"] = np.max(inst_freq_before) - np.min(inst_freq_before)
        features["inst_freq_range_after"] = np.max(inst_freq_after) - np.min(inst_freq_after)
        features["inst_freq_range_diff"] = features["inst_freq_range_after"] - features["inst_freq_range_before"]
        
        # Compare instantaneous frequency profiles
        min_len = min(len(inst_freq_before), len(inst_freq_after))
        if min_len > 0:
            inst_freq_before_trunc = inst_freq_before[:min_len]
            inst_freq_after_trunc = inst_freq_after[:min_len]
            
            features["inst_freq_profile_diff"] = np.mean(np.abs(inst_freq_after_trunc - inst_freq_before_trunc))
            
            # Correlation between instantaneous frequency profiles
            try:
                correlation = np.corrcoef(inst_freq_before_trunc, inst_freq_after_trunc)[0, 1]
                features["inst_freq_profile_correlation"] = correlation if not np.isnan(correlation) else 0
            except:
                features["inst_freq_profile_correlation"] = 0
        else:
            features["inst_freq_profile_diff"] = 0
            features["inst_freq_profile_correlation"] = 0
        
    except Exception as e:
        print(f"Instantaneous frequency features failed: {e}")
        features["inst_freq_mean_before"] = 0
        features["inst_freq_mean_after"] = 0
        features["inst_freq_mean_diff"] = 0
        features["inst_freq_var_before"] = 0
        features["inst_freq_var_after"] = 0
        features["inst_freq_var_diff"] = 0
        features["inst_freq_range_before"] = 0
        features["inst_freq_range_after"] = 0
        features["inst_freq_range_diff"] = 0
        features["inst_freq_profile_diff"] = 0
        features["inst_freq_profile_correlation"] = 0
    
    return features


def extract_cepstral_coefficient_features(values_before, values_after):
    """Extract Cepstral Coefficient features"""
    features = {}
    
    try:
        # Number of cepstral coefficients to compute
        n_coeffs = 10
        
        for segment_name, values in [('before', values_before), ('after', values_after)]:
            # Compute FFT
            fft_vals = fft(values.values)
            
            # Compute log magnitude spectrum
            log_spectrum = np.log(np.abs(fft_vals) + 1e-10)
            
            # Compute cepstrum (IFFT of log spectrum)
            cepstrum = np.real(ifft(log_spectrum))
            
            # Extract first n_coeffs cepstral coefficients
            for i in range(min(n_coeffs, len(cepstrum))):
                features[f"{segment_name}_cepstral_c{i}"] = cepstrum[i]
        
        # Compare cepstral coefficients
        for i in range(n_coeffs):
            before_key = f"before_cepstral_c{i}"
            after_key = f"after_cepstral_c{i}"
            
            if before_key in features and after_key in features:
                features[f"cepstral_c{i}_diff"] = features[after_key] - features[before_key]
                features[f"cepstral_c{i}_ratio"] = features[after_key] / max(abs(features[before_key]), 1e-10)
        
        # Overall cepstral distance
        before_coeffs = [features.get(f"before_cepstral_c{i}", 0) for i in range(n_coeffs)]
        after_coeffs = [features.get(f"after_cepstral_c{i}", 0) for i in range(n_coeffs)]
        
        features["cepstral_distance"] = np.linalg.norm(np.array(after_coeffs) - np.array(before_coeffs))
        
    except Exception as e:
        print(f"Cepstral coefficient features failed: {e}")
        for i in range(10):
            features[f"before_cepstral_c{i}"] = 0
            features[f"after_cepstral_c{i}"] = 0
            features[f"cepstral_c{i}_diff"] = 0
            features[f"cepstral_c{i}_ratio"] = 1
        features["cepstral_distance"] = 0
    
    return features


def extract_resonant_frequency_features(values_before, values_after):
    """Extract Resonant Frequency (Multi-peak) features"""
    features = {}
    
    try:
        n_peaks = 5  # Number of top peaks to analyze
        
        for segment_name, values in [('before', values_before), ('after', values_after)]:
            # Compute FFT and power spectrum
            fft_vals = fft(values.values)
            power_spectrum = np.abs(fft_vals) ** 2
            freqs = fftfreq(len(values), 1.0)
            
            # Only consider positive frequencies
            pos_freqs = freqs[:len(freqs)//2]
            pos_power = power_spectrum[:len(power_spectrum)//2]
            
            # Find peaks in the power spectrum
            peaks, properties = signal.find_peaks(pos_power, height=0, distance=2)
            
            if len(peaks) > 0:
                # Sort peaks by amplitude (descending)
                peak_amplitudes = pos_power[peaks]
                sorted_indices = np.argsort(peak_amplitudes)[::-1]
                top_peaks = peaks[sorted_indices[:min(n_peaks, len(peaks))]]
                top_amplitudes = peak_amplitudes[sorted_indices[:min(n_peaks, len(peaks))]]
                
                # Store peak frequencies and amplitudes
                for i, (peak_idx, amplitude) in enumerate(zip(top_peaks, top_amplitudes)):
                    features[f"{segment_name}_peak_{i}_freq"] = abs(pos_freqs[peak_idx])
                    features[f"{segment_name}_peak_{i}_amplitude"] = amplitude
            else:
                # No peaks found
                for i in range(n_peaks):
                    features[f"{segment_name}_peak_{i}_freq"] = 0
                    features[f"{segment_name}_peak_{i}_amplitude"] = 0
        
        # Compare peaks between segments
        for i in range(n_peaks):
            before_freq_key = f"before_peak_{i}_freq"
            after_freq_key = f"after_peak_{i}_freq"
            before_amp_key = f"before_peak_{i}_amplitude"
            after_amp_key = f"after_peak_{i}_amplitude"
            
            if all(key in features for key in [before_freq_key, after_freq_key, before_amp_key, after_amp_key]):
                # Frequency shifts
                features[f"peak_{i}_freq_shift"] = abs(features[after_freq_key] - features[before_freq_key])
                
                # Amplitude changes
                features[f"peak_{i}_amplitude_diff"] = features[after_amp_key] - features[before_amp_key]
                features[f"peak_{i}_amplitude_ratio"] = features[after_amp_key] / max(features[before_amp_key], 1e-10)
        
        # Overall resonance shift metrics
        before_freqs = [features.get(f"before_peak_{i}_freq", 0) for i in range(n_peaks)]
        after_freqs = [features.get(f"after_peak_{i}_freq", 0) for i in range(n_peaks)]
        
        features["resonance_centroid_shift"] = abs(np.mean(after_freqs) - np.mean(before_freqs))
        features["resonance_spread_before"] = np.std(before_freqs)
        features["resonance_spread_after"] = np.std(after_freqs)
        features["resonance_spread_change"] = features["resonance_spread_after"] - features["resonance_spread_before"]
        
    except Exception as e:
        print(f"Resonant frequency features failed: {e}")
        for i in range(5):
            features[f"before_peak_{i}_freq"] = 0
            features[f"after_peak_{i}_freq"] = 0
            features[f"before_peak_{i}_amplitude"] = 0
            features[f"after_peak_{i}_amplitude"] = 0
            features[f"peak_{i}_freq_shift"] = 0
            features[f"peak_{i}_amplitude_diff"] = 0
            features[f"peak_{i}_amplitude_ratio"] = 1
        features["resonance_centroid_shift"] = 0
        features["resonance_spread_before"] = 0
        features["resonance_spread_after"] = 0
        features["resonance_spread_change"] = 0
    
    return features


def extract_all_features(df, include_dgp=True):
    """
    Extract all features for structural break detection in time series data.
    
    This function analyzes a time series with 'value' and 'period' columns,
    where period 0 represents the before segment and period 1 represents the after segment.
    
    Returns a comprehensive feature dictionary for structural break analysis.
    """
    # Check if we have data in both periods
    values_before = df["value"][df["period"] == 0]
    values_after = df["value"][df["period"] == 1]

    if values_before.empty or values_after.empty:
        return {}  # Cannot extract features without data in both periods

    features = {}

    # ========================================
    # 1. STATISTICAL TEST FEATURES
    # Formal statistical tests specifically designed to detect structural breaks
    # ========================================
    test_features = extract_enhanced_statistical_features(df)
    features.update(test_features)

    # ========================================
    # 2. DESCRIPTIVE STATISTICS BY PERIOD
    # Basic statistical measures calculated separately for before and after periods
    # ========================================
    before_stats = extract_optimized_descriptive_statistics(values_before)
    after_stats = extract_optimized_descriptive_statistics(values_after)

    # Add period-specific statistics with prefixes
    for key, value in before_stats.items():
        features[f"before_{key}"] = value

    for key, value in after_stats.items():
        features[f"after_{key}"] = value

    # ========================================
    # 3. TIME SERIES CHARACTERISTICS BY PERIOD
    # Advanced time series properties like autocorrelation, trends, and seasonality patterns
    # ========================================
    before_ts = extract_optimized_time_series_features(values_before)
    after_ts = extract_optimized_time_series_features(values_after)

    for key, value in before_ts.items():
        features[f"before_{key}"] = value
    for key, value in after_ts.items():
        features[f"after_{key}"] = value

    # ========================================
    # 4. DIFFERENCE AND CHANGE FEATURES
    # Direct comparisons between periods to quantify the magnitude of structural changes
    # ========================================
    
    # Statistical differences
    diff_features = extract_difference_features(before_stats, after_stats)
    features.update(diff_features)
    
    # Time series feature differences
    ts_diff_features = extract_difference_features(before_ts, after_ts)
    features.update(ts_diff_features)
    
    # Calculate percentage changes for key statistics
    for key in before_stats:
        diff = after_stats[key] - before_stats[key]
        features[f"{key}_diff"] = diff

        if abs(before_stats[key]) > 1e-10:
            features[f"{key}_pct_change"] = diff / abs(before_stats[key])
        else:
            features[f"{key}_pct_change"] = 0

    # ========================================
    # 5. VOLATILITY AND RISK MEASURES
    # Changes in variance, volatility, and risk characteristics between periods
    # ========================================
    before_returns = values_before.pct_change().dropna()
    after_returns = values_after.pct_change().dropna()

    if len(before_returns) > 0 and len(after_returns) > 0:
        features["volatility_ratio"] = after_returns.std() / max(before_returns.std(), 1e-10)
    else:
        features["volatility_ratio"] = 1.0

    # Value range comparison
    features["value_range_ratio"] = (values_after.max() - values_after.min()) / \
                                   max(values_before.max() - values_before.min(), 1e-10)

    # ========================================
    # 6. SEGMENT LENGTH AND STRUCTURAL PROPERTIES
    # Basic properties of the time series segments and their relative characteristics
    # ========================================
    features["before_length"] = len(values_before)
    features["after_length"] = len(values_after)
    features["length_ratio"] = len(values_after) / max(len(values_before), 1)

    # ========================================
    # 7. COMPRESSION-BASED COMPLEXITY FEATURES
    # Information-theoretic measures that capture changes in data complexity and patterns
    # ========================================
    compression_features = extract_compression_features(values_before, values_after)
    features.update(compression_features)

    # ========================================
    # 8. COINTEGRATION AND RELATIONSHIP FEATURES
    # Statistical measures of long-term relationships and equilibrium between periods
    # ========================================
    coint_features = extract_cointegration_features(values_before, values_after)
    features.update(coint_features)

    # ========================================
    # 9. CHOW TEST FEATURES
    # Classical econometric test for structural breaks in regression relationships
    # ========================================
    chow_features = extract_chow_test_features(df)
    features.update(chow_features)

    # ========================================
    # 10. LIKELIHOOD RATIO TEST FEATURES
    # Maximum likelihood-based tests for parameter stability and structural changes
    # ========================================
    lr_features = extract_likelihood_ratio_features(df)
    features.update(lr_features)

    # ========================================
    # 11. FOURIER SPECTRUM ANALYSIS FEATURES
    # Frequency domain comparisons using Fast Fourier Transform to detect spectral shifts
    # ========================================
    fft_features = extract_fourier_spectrum_features(values_before, values_after)
    features.update(fft_features)

    # ========================================
    # 12. DOMINANT FREQUENCY SHIFT FEATURES
    # Changes in primary oscillatory components and peak frequency locations
    # ========================================
    freq_shift_features = extract_dominant_frequency_features(values_before, values_after)
    features.update(freq_shift_features)

    # ========================================
    # 13. FREQUENCY BAND ENERGY FEATURES
    # Distribution of spectral energy across different frequency ranges and their ratios
    # ========================================
    band_energy_features = extract_band_energy_features(values_before, values_after)
    features.update(band_energy_features)

    # ========================================
    # 14. WAVELET DECOMPOSITION FEATURES
    # Multi-resolution analysis using wavelets to capture transient changes and scale-specific shifts
    # ========================================
    wavelet_features = extract_wavelet_energy_features(values_before, values_after)
    features.update(wavelet_features)

    # ========================================
    # 15. TIME-FREQUENCY SPECTROGRAM FEATURES
    # Short-time Fourier transform analysis for time-localized frequency changes
    # ========================================
    spectrogram_features = extract_spectrogram_difference_features(values_before, values_after)
    features.update(spectrogram_features)

    # ========================================
    # 16. PHASE SPECTRUM ANALYSIS FEATURES
    # Phase relationship changes and signal timing shifts between periods
    # ========================================
    phase_features = extract_phase_spectrum_features(values_before, values_after)
    features.update(phase_features)

    # ========================================
    # 17. HILBERT ENVELOPE FEATURES
    # Analytic signal analysis for amplitude modulation and envelope shape changes
    # ========================================
    hilbert_features = extract_hilbert_envelope_features(values_before, values_after)
    features.update(hilbert_features)

    # ========================================
    # 18. INSTANTANEOUS FREQUENCY FEATURES
    # Non-stationary frequency behavior and time-varying spectral characteristics
    # ========================================
    inst_freq_features = extract_instantaneous_frequency_features(values_before, values_after)
    features.update(inst_freq_features)

    # ========================================
    # 19. CEPSTRAL ANALYSIS FEATURES
    # Cepstrum-based features for detecting periodicity, echoes, and filter characteristic changes
    # ========================================
    cepstral_features = extract_cepstral_coefficient_features(values_before, values_after)
    features.update(cepstral_features)

    # ========================================
    # 20. RESONANT FREQUENCY FEATURES
    # Multi-peak spectral analysis for system resonance and modulation pattern detection
    # ========================================
    resonant_features = extract_resonant_frequency_features(values_before, values_after)
    features.update(resonant_features)

    # ========================================
    # 21. DGP (DATA GENERATING PROCESS) FEATURES
    # Features that characterize the underlying data generating process
    # ========================================
    if include_dgp:
            try:
                # Create a proper MultiIndex structure for DGP feature extraction
                if not isinstance(df.index, pd.MultiIndex):
                    # Create a temporary DataFrame with the proper structure
                    df_copy = df.copy()
                    
                    # Add a dummy ID for the DGP extractor
                    df_copy['id'] = 0
                    
                    # Use the existing index as time or create one
                    if df_copy.index.name is None:
                        df_copy = df_copy.reset_index(drop=True)
                        df_copy['time'] = range(len(df_copy))
                        time_col = 'time'
                    else:
                        time_col = df_copy.index.name
                        df_copy = df_copy.reset_index()
                    
                    # Set MultiIndex
                    df_copy = df_copy.set_index(['id', time_col])
                    
                    # Extract DGP features
                    dgp_extractor = FastDGPFeatureExtractor()
                    dgp_features = dgp_extractor.extract_features(df_copy, max_series=1)
                    
                    # Add DGP features to our feature dictionary
                    if not dgp_features.empty:
                        dgp_feature_dict = dgp_features.iloc[0].to_dict()
                        dgp_feature_dict.pop('id', None)  # Remove id to avoid conflicts
                        features.update(dgp_feature_dict)
                else:
                    # Already a MultiIndex, extract directly
                    dgp_extractor = FastDGPFeatureExtractor()
                    dgp_features = dgp_extractor.extract_features(df, max_series=1)
                    
                    if not dgp_features.empty:
                        dgp_feature_dict = dgp_features.iloc[0].to_dict()
                        dgp_feature_dict.pop('id', None)
                        features.update(dgp_feature_dict)
            except Exception as e:
                print(f"DGP feature extraction failed: {e}")
                # Continue without DGP features

    return features

def process_single_timeseries(args):
    """Helper function for parallel processing"""
    ts_id, ts_data = args
    try:
        features = extract_all_features(ts_data)
        if features:
            features['id'] = ts_id
            return features
    except Exception as e:
        print(f"Error processing {ts_id}: {e}")
    return None

def create_feature_matrix(X, output_path=None, verbose=True):
    """
    Create feature matrix from time series DataFrame

    Parameters:
    -----------
    X : pandas.DataFrame
        The input time series data with MultiIndex ['id', 'time']
    output_path : str, optional
        Path to save the extracted features. If None, features won't be saved.
    verbose : bool, default=True
        Whether to show progress bar

    Returns:
    --------
    pandas.DataFrame
        DataFrame containing the extracted features
    """
    # Ensure X is a DataFrame and has the proper multi-index structure
    if not isinstance(X, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame")

    # Check if the index is a MultiIndex
    if not isinstance(X.index, pd.MultiIndex):
        raise ValueError("Input DataFrame must have a MultiIndex with levels ['id', 'time']")

    # Group by ID and extract features for each time series
    ids = X.index.get_level_values('id').unique()
    features_list = []

    iterator = tqdm(ids) if verbose else ids
    for ts_id in iterator:
        # Get data for this time series
        try:
            ts_data = X.xs(ts_id, level='id')

            # Extract all features
            ts_features = extract_all_features(ts_data)

            if ts_features:  # Skip if no features could be extracted
                ts_features['id'] = ts_id
                features_list.append(ts_features)
        except Exception as e:
            print(f"Error processing time series ID {ts_id}: {str(e)}")
            continue

    # Convert to DataFrame
    feature_df = pd.DataFrame(features_list)

    if feature_df.empty:
        return pd.DataFrame()

    # Set ID as index
    feature_df = feature_df.set_index('id')

    # Handle missing values
    feature_df = feature_df.fillna(0)

    # Save features if output path is provided
    if output_path is not None:
        # Create directory if it doesn't exist
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        print(f"Saving extracted features to {output_path}")

        # Save features in different formats for flexibility
        # CSV format (human-readable)
        feature_df.to_csv(f"{output_path}.csv")

        # Pickle format (preserves datatypes, faster to load)
        feature_df.to_pickle(f"{output_path}.pkl")

        # Parquet format (efficient storage, good for large datasets)
        try:
            feature_df.to_parquet(f"{output_path}.parquet")
        except Exception as e:
            print(f"Warning: Could not save parquet format. Error: {e}")
            print("Parquet format requires 'pyarrow' or 'fastparquet' package.")

    return feature_df




def train(X_train: pd.DataFrame, y_train: pd.Series, model_directory_path: str, save_features=True):
    """
    Train a model to detect structural breaks in time series

    Parameters:
    -----------
    X_train : pandas.DataFrame
        The input time series data with MultiIndex ['id', 'time']
    y_train : pandas.Series
        The target variable (1 for structural break, 0 for no break)
    model_directory_path : str
        Directory to save the trained model
    save_features : bool, default=True
        Whether to save the extracted features
    """
    print("Training model for structural break detection")
    print(f"X_train type: {type(X_train)}")
    print(f"X_train shape: {X_train.shape}")
    print(f"X_train index type: {type(X_train.index)}")

    # Ensure X_train has the right structure
    if not isinstance(X_train.index, pd.MultiIndex):
        print("Warning: X_train doesn't have a MultiIndex. Checking structure...")
        if 'id' in X_train.columns and 'time' in X_train.columns:
            print("Converting to MultiIndex from columns.")
            X_train = X_train.set_index(['id', 'time'])
        else:
            print("ERROR: Cannot proceed without proper data structure")
            print("Expected MultiIndex with levels ['id', 'time'] or columns named 'id' and 'time'")
            # Save a dummy model that always predicts the majority class
            majority_class = int(y_train.mean() >= 0.5) if len(y_train) > 0 else 0.5
            dummy_model = {"type": "dummy", "majority_class": majority_class}
            os.makedirs(os.path.dirname(model_directory_path), exist_ok=True)
            joblib.dump(dummy_model, os.path.join(model_directory_path, 'model.joblib'))
            return

    # Create directory if it doesn't exist
    os.makedirs(model_directory_path, exist_ok=True)

    # Define feature output path if saving features
    feature_output_path = os.path.join(model_directory_path, 'train_features') if save_features else None

    # print("Extracting features from training data...")
    # X_features = create_feature_matrix(X_train, output_path=feature_output_path)

    # # Ensure we have features and they align with y_train
    # if X_features.empty:
    #     print("Warning: No features could be extracted from training data.")
    #     # Save a dummy model that always predicts the majority class
    #     majority_class = int(y_train.mean() >= 0.5) if len(y_train) > 0 else 0.5
    #     dummy_model = {"type": "dummy", "majority_class": majority_class}
    #     joblib.dump(dummy_model, os.path.join(model_directory_path, 'model.joblib'))
    #     return

    X_features = pd.read_csv(os.path.join(model_directory_path, 'train_features.csv'), index_col='id')

    # Align indices with y_train
    common_indices = X_features.index.intersection(y_train.index)
    X_features = X_features.loc[common_indices]
    y_train = y_train.loc[common_indices]

    # Save aligned target values
    if save_features:
        y_train.to_csv(os.path.join(model_directory_path, 'train_targets.csv'))

    print(f"Extracted {X_features.shape[1]} features for {X_features.shape[0]} time series")

    # Check if we have enough data to train
    if len(X_features) < 10:
        print("Warning: Not enough data to train a model.")
        # Save a dummy model that always predicts the majority class
        majority_class = int(y_train.mean() >= 0.5)
        dummy_model = {"type": "dummy", "majority_class": majority_class}
        joblib.dump(dummy_model, os.path.join(model_directory_path, 'model.joblib'))
        return
    
    # Calculate class weights for unbalanced data
    from sklearn.utils.class_weight import compute_class_weight
    import numpy as np
    from sklearn.feature_selection import SelectFromModel
    from sklearn.ensemble import ExtraTreesClassifier
    from sklearn.feature_selection import RFE

    # Assuming you have y_train available
    classes = np.unique(y_train)
    class_weights = compute_class_weight('balanced', classes=classes, y=y_train)
    scale_pos_weight = class_weights[1] / class_weights[0] if len(class_weights) > 1 else 1

    print(f"Class distribution: {np.bincount(y_train)}")
    print(f"Scale pos weight: {scale_pos_weight}")

    # Create a pipeline with preprocessing and model
    pipeline = Pipeline([
        
        ('scaler', StandardScaler()),
        ('feature_selector', RFE(
        estimator=xgb.XGBClassifier(random_state=42),
        n_features_to_select=500,  # Select top 500 features
        step=15
    )),
        # ('model', xgb.XGBClassifier(
        #         objective='binary:logistic',
        #         n_estimators=997,  # Increase for better performance
        #         max_depth=9,       # Reduce to prevent overfitting on minority class
        #         learning_rate=0.013293258485719994,
        #         subsample=0.942050971691917,
        #         colsample_bylevel=0.8297860865775033,
        #         colsample_bynode=0.8122445811896282,
        #         #colsample_bytree=0.788,
        #         random_state=89,
        #         use_label_encoder=False,
        #         eval_metric='logloss',
                
        #         # Key parameters for unbalanced classification:
        #         #scale_pos_weight=0.4,  # Most important for imbalanced data
        #         min_child_weight=10,                 # Higher values prevent overfitting
        #         gamma= 3.837,                          # Minimum loss reduction for splits
        #         reg_alpha=4.820456984721685,                      # L1 regularization
        #         reg_lambda=0.1881481457256562,                     # L2 regularization
        #         max_delta_step=2,                   # Helps with imbalanced datasets
        # )) # 0.791

        ('model', xgb.XGBClassifier(
                objective='binary:logistic',
                n_estimators=2000,  # Increase for better performance
                max_depth=8,       # Reduce to prevent overfitting on minority class
                learning_rate=0.0117638346084812,
                subsample=0.8880638162211006,
                colsample_bylevel=0.8297860865775033,
                colsample_bynode=0.8122445811896282,
                colsample_bytree=0.6583959588880559,
                random_state=89,
                use_label_encoder=False,
                eval_metric='auc',
                
                # Key parameters for unbalanced classification:
                #scale_pos_weight=1.2002030800473853,  # Most important for imbalanced data
                min_child_weight=15,                 # Higher values prevent overfitting
                gamma= 0.9267499839984471,                          # Minimum loss reduction for splits
                reg_alpha=0.37089061512357124,                      # L1 regularization
                reg_lambda=0.0671185894939356,                     # L2 regularization
                max_delta_step=2,                   # Helps with imbalanced datasets
        )) # 0.803

    ])




            # objective='binary:logistic',
            # n_estimators=550,  # Increase for better performance
            # max_depth=15,       # Reduce to prevent overfitting on minority class
            # learning_rate=0.016,
            # subsample=0.8,
            # colsample_bylevel=0.8,
            # colsample_bynode=0.8,
            # colsample_bytree=0.8,
            # random_state=42,
            # use_label_encoder=False,
            # eval_metric='logloss',
            
            # # Key parameters for unbalanced classification:
            # scale_pos_weight=scale_pos_weight,  # Most important for imbalanced data
            # min_child_weight=5,                 # Higher values prevent overfitting
            # gamma=0.35,                          # Minimum loss reduction for splits
            # reg_alpha=0.1,                      # L1 regularization
            # reg_lambda=1.0,                     # L2 regularization
            # max_delta_step=1,                   # Helps with imbalanced datasets

#     xgb_params = {
#         'objective': 'binary:logistic',
#         'eval_metric': 'auc',
#         'n_estimators': 2000,
#         'learning_rate': 0.01,
#         'max_depth': 7,  # Reduced from 15
#         'subsample': 0.7,
#         'colsample_bytree': 0.7,
#         'gamma': 0.1,
#         'reg_alpha': 0.1,
#         'reg_lambda': 0.1,
#         'n_jobs': -1,
#         'random_state': 42,  # Changed from 'seed' to 'random_state'
#         'scale_pos_weight': (len(y_train) - sum(y_train)) / sum(y_train)
#     }
        # ('model', xgb.XGBClassifier(
        #     objective='binary:logistic',
        #     n_estimators=2000,  # Increase for better performance
        #     max_depth=7,       # Reduce to prevent overfitting on minority class
        #     learning_rate=0.01,
        #     subsample=0.7,
        #     colsample_bytree= 0.7,
        #     random_state=42,
        #     use_label_encoder=False,
        #     eval_metric='logloss',
            
        #     # Key parameters for unbalanced classification:
        #     scale_pos_weight=scale_pos_weight,  # Most important for imbalanced data
        #     gamma=0.1,                          # Minimum loss reduction for splits
        #     reg_alpha= 0.1,                      # L1 regularization
        #     reg_lambda=0.1,                     # L2 regularization
        #     #max_delta_step=1,                   # Helps with imbalanced datasets
        # ))



    # Train the model
    print("Training XGBoost model...")
    pipeline.fit(X_features, y_train)
    # Get selected feature names after RFE
    selected_mask = pipeline.named_steps['feature_selector'].get_support()
    selected_feature_names = X_features.columns[selected_mask]

    # Get feature importances
    feature_importance = pipeline.named_steps['model'].feature_importances_

    # Create a DataFrame of feature importances
    importance_df = pd.DataFrame({
        'Feature': selected_feature_names,
        'Importance': feature_importance
    }).sort_values('Importance', ascending=False)

    # Display top 20 features
    print("\nTop 20 Most Important Features:")
    print(importance_df.head(20))

    # Calculate training metrics
    y_pred_proba = pipeline.predict_proba(X_features)[:, 1]
    #y_pred = pipeline.predict(X_features)

    auc = roc_auc_score(y_train, y_pred_proba)
    # accuracy = accuracy_score(y_train, y_pred_proba)
    # f1 = f1_score(y_train, y_pred_proba)

    print(f"\nTraining Metrics:")
    print(f"AUC-ROC: {auc:.4f}")
    # print(f"Accuracy: {accuracy:.4f}")
    # print(f"F1 Score: {f1:.4f}")

    # Save the model
    print(f"Saving model to {os.path.join(model_directory_path, 'model.joblib')}")
    joblib.dump(pipeline, os.path.join(model_directory_path, 'model.joblib'))

    # Save feature importance
    importance_df.to_csv(os.path.join(model_directory_path, 'feature_importance.csv'), index=False)

    # Save model metrics
    metrics = {
        'auc': auc,
        # 'accuracy': accuracy,
        # 'f1': f1,
        'n_features': X_features.shape[1],
        'n_samples': X_features.shape[0]
    }

    pd.Series(metrics).to_csv(os.path.join(model_directory_path, 'train_metrics.csv'))

def predict(X_test: pd.DataFrame, model_directory_path: str, save_features=True) -> pd.Series:
    """
    Generate predictions for structural breaks in test data

    Parameters:
    -----------
    X_test : pandas.DataFrame
        The input time series data with MultiIndex ['id', 'time']
    model_directory_path : str
        Directory containing the trained model
    save_features : bool, default=True
        Whether to save the extracted features

    Returns:
    --------
    pandas.Series
        Predicted probabilities of structural break for each time series
    """
    model_path = os.path.join(model_directory_path, 'model.joblib')

    # Check if model exists
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"Model file not found at {model_path}")

    # Load the model
    model = joblib.load(model_path)

    # Debug information
    print(f"X_test type: {type(X_test)}")
    print(f"X_test shape: {X_test}")


    if not isinstance(X_test.index, pd.MultiIndex):
        print("Warning: X_test doesn't have a MultiIndex. Attempting to convert...")
        # Try to infer the structure or convert to expected format
        if 'id' in X_test.columns and 'time' in X_test.columns:
            # Set MultiIndex from columns
            X_test = X_test.set_index(['id', 'time'])
            print("Converted to MultiIndex from columns.")
        else:
            print("Could not convert to proper MultiIndex structure.")
            # Get unique IDs if possible, or create a default response
            if 'id' in X_test.columns:
                ids = X_test['id'].unique()
                return pd.Series(0.5, index=ids)
            else:
                # Create arbitrary IDs as a last resort
                dummy_ids = np.arange(len(X_test) // 100 + 1)  # Rough estimate
                return pd.Series(0.5, index=dummy_ids)

    # Define feature output path if saving features
    feature_output_path = os.path.join(model_directory_path, 'test_features') if save_features else None

    # Check if we have a dummy model
    if isinstance(model, dict) and model.get('type') == 'dummy':
        # Return the majority class for all IDs
        ids = X_test.index.get_level_values('id').unique()
        return pd.Series(model['majority_class'], index=ids)

    # Extract features from test data
    print("Extracting features from test data...")
    X_features = create_feature_matrix(X_test, output_path=feature_output_path)

    if X_features.empty:
        print("Warning: No features could be extracted from test data.")
        # Return 0.5 for all IDs (neutral prediction)
        ids = X_test.index.get_level_values('id').unique()
        return pd.Series(0.5, index=ids)

    # Generate predictions
    print("Generating predictions...")
    probabilities = model.predict_proba(X_features)[:, 1]
    #probabilities = model.predict(X_features)
    # Create Series with predictions
    predictions = pd.Series(probabilities, index=X_features.index)

    # Save predictions if requested
    if save_features:
        predictions.to_csv(os.path.join(model_directory_path, 'test_predictions.csv'))
    if save_features:
        print("saved X_features_test")
        X_features.to_csv(os.path.join("C:\\Users\Alon\Desktop\Breakpoint classifiction\data", 'X_features_test.csv'))

    return predictions

In [37]:
import os
import pandas as pd
import numpy as np
import joblib
from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.feature_selection import (
    RFE, SelectKBest, mutual_info_classif, VarianceThreshold
)
from sklearn.ensemble import (
    StackingClassifier, RandomForestClassifier, VotingClassifier
)
from sklearn.linear_model import LogisticRegression
from sklearn.utils.class_weight import compute_class_weight
import xgboost as xgb

# Optional: Try to import LightGBM
try:
    from lightgbm import LGBMClassifier
    LIGHTGBM_AVAILABLE = True
except ImportError:
    LIGHTGBM_AVAILABLE = False
    print("LightGBM not available, using XGBoost and RandomForest only")

def train(X_train: pd.DataFrame, y_train: pd.Series, model_directory_path: str, save_features=True):
    """
    Train a model to detect structural breaks in time series

    Parameters:
    -----------
    X_train : pandas.DataFrame
        The input time series data with MultiIndex ['id', 'time']
    y_train : pandas.Series
        The target variable (1 for structural break, 0 for no break)
    model_directory_path : str
        Directory to save the trained model
    save_features : bool, default=True
        Whether to save the extracted features
    """
    print("Training model for structural break detection")
    print(f"X_train type: {type(X_train)}")
    print(f"X_train shape: {X_train.shape}")
    print(f"X_train index type: {type(X_train.index)}")

    # Ensure X_train has the right structure
    if not isinstance(X_train.index, pd.MultiIndex):
        print("Warning: X_train doesn't have a MultiIndex. Checking structure...")
        if 'id' in X_train.columns and 'time' in X_train.columns:
            print("Converting to MultiIndex from columns.")
            X_train = X_train.set_index(['id', 'time'])
        else:
            print("ERROR: Cannot proceed without proper data structure")
            print("Expected MultiIndex with levels ['id', 'time'] or columns named 'id' and 'time'")
            # Save a dummy model that always predicts the majority class
            majority_class = int(y_train.mean() >= 0.5) if len(y_train) > 0 else 0.5
            dummy_model = {"type": "dummy", "majority_class": majority_class}
            os.makedirs(os.path.dirname(model_directory_path), exist_ok=True)
            joblib.dump(dummy_model, os.path.join(model_directory_path, 'model.joblib'))
            return

    # Create directory if it doesn't exist
    os.makedirs(model_directory_path, exist_ok=True)

    # Load pre-extracted features
    X_features = pd.read_csv(os.path.join(model_directory_path, 'train_features.csv'), index_col='id')

    # Align indices with y_train
    common_indices = X_features.index.intersection(y_train.index)
    X_features = X_features.loc[common_indices]
    y_train = y_train.loc[common_indices]

    # Save aligned target values
    if save_features:
        y_train.to_csv(os.path.join(model_directory_path, 'train_targets.csv'))

    print(f"Extracted {X_features.shape[1]} features for {X_features.shape[0]} time series")

    # Check if we have enough data to train
    if len(X_features) < 10:
        print("Warning: Not enough data to train a model.")
        # Save a dummy model that always predicts the majority class
        majority_class = int(y_train.mean() >= 0.5)
        dummy_model = {"type": "dummy", "majority_class": majority_class}
        joblib.dump(dummy_model, os.path.join(model_directory_path, 'model.joblib'))
        return
    
    # Calculate class weights for unbalanced data
    classes = np.unique(y_train)
    class_weights = compute_class_weight('balanced', classes=classes, y=y_train)
    scale_pos_weight = class_weights[1] / class_weights[0] if len(class_weights) > 1 else 1

    print(f"Class distribution: {np.bincount(y_train)}")
    print(f"Scale pos weight: {scale_pos_weight}")

    def create_advanced_pipeline():
        """Create an ensemble pipeline with multiple models"""
        
        # Base models for ensemble
        models = [
            ('xgb_main', xgb.XGBClassifier(
                objective='binary:logistic',
                n_estimators=1500,  # Reduced for ensemble
                max_depth=8,
                learning_rate=0.0117638346084812,
                subsample=0.8880638162211006,
                colsample_bylevel=0.8297860865775033,
                colsample_bynode=0.8122445811896282,
                colsample_bytree=0.6583959588880559,
                random_state=89,
                use_label_encoder=False,
                eval_metric='auc',
                min_child_weight=15,
                gamma=0.9267499839984471,
                reg_alpha=0.37089061512357124,
                reg_lambda=0.0671185894939356,
                max_delta_step=2,
            )),
            
            ('model', xgb.XGBClassifier(
                            objective='binary:logistic',
                            n_estimators=2000,  # Increase for better performance
                            max_depth=8,       # Reduce to prevent overfitting on minority class
                            learning_rate=0.0117638346084812,
                            subsample=0.8880638162211006,
                            colsample_bylevel=0.8297860865775033,
                            colsample_bynode=0.8122445811896282,
                            colsample_bytree=0.6583959588880559,
                            random_state=89,
                            use_label_encoder=False,
                            eval_metric='auc',
                            
                            # Key parameters for unbalanced classification:
                            #scale_pos_weight=1.2002030800473853,  # Most important for imbalanced data
                            min_child_weight=15,                 # Higher values prevent overfitting
                            gamma= 0.9267499839984471,                          # Minimum loss reduction for splits
                            reg_alpha=0.37089061512357124,                      # L1 regularization
                            reg_lambda=0.0671185894939356,                     # L2 regularization
                            max_delta_step=2,                   # Helps with imbalanced datasets
                    )), # 0.803


            ('xgb_alt', xgb.XGBClassifier(
                objective='binary:logistic',
                n_estimators=1200,
                max_depth=6,  # Different depth
                learning_rate=0.02,  # Different learning rate
                subsample=0.9,
                colsample_bytree=0.8,
                random_state=42,  # Different random state
                use_label_encoder=False,
                eval_metric='auc',
                min_child_weight=10,
                gamma=0.5,
                reg_alpha=0.1,
                reg_lambda=0.1,
                scale_pos_weight=scale_pos_weight,
            )),
            
            ('rf', RandomForestClassifier(
                n_estimators=800,
                max_depth=10,
                min_samples_split=20,
                min_samples_leaf=10,
                class_weight='balanced',
                random_state=42,
                n_jobs=-1
            ))
        ]
        
        # Add LightGBM if available
        if LIGHTGBM_AVAILABLE:
            models.append(
                ('lgb', LGBMClassifier(
                    objective='binary',
                    n_estimators=1200,
                    learning_rate=0.05,
                    num_leaves=50,
                    feature_fraction=0.8,
                    bagging_fraction=0.8,
                    class_weight='balanced',
                    random_state=42,
                    verbose=-1
                ))
            )
        
        # Choose ensemble method based on availability
        if len(models) >= 3:  # Use stacking for 3+ models
            print(f"Creating stacking ensemble with {len(models)} models")
            ensemble = StackingClassifier(
                estimators=models,
                final_estimator=LogisticRegression(
                    class_weight='balanced',
                    max_iter=1000,
                    random_state=42
                ),
                cv=3,  # Reduced for faster training
                stack_method='predict_proba',
                n_jobs=-1
            )
        else:  # Use voting for fewer models
            print(f"Creating voting ensemble with {len(models)} models")
            ensemble = VotingClassifier(
                estimators=models,
                voting='soft',
                n_jobs=-1
            )
        
        # Complete pipeline
        return Pipeline([
            ('scaler', StandardScaler()),
            
            # Multi-stage feature selection
            ('variance_filter', VarianceThreshold(0.01)),
            ('univariate_selector', SelectKBest(
                score_func=mutual_info_classif, 
                k=min(600, X_features.shape[1])  # Don't exceed available features
            )),
            ('rfe_selector', RFE(
                estimator=xgb.XGBClassifier(
                    n_estimators=100,  # Fast estimator for RFE
                    random_state=42
                ),
                n_features_to_select=min(400, X_features.shape[1]//2),
                step=20  # Larger step for faster selection
            )),
            
            ('ensemble', ensemble)
        ])

    def create_simple_pipeline():
        """Fallback to simple single model pipeline"""
        return Pipeline([
            ('scaler', StandardScaler()),
            ('feature_selector', RFE(
                estimator=xgb.XGBClassifier(random_state=42),
                n_features_to_select=min(500, X_features.shape[1]),
                step=15
            )),
            ('model', xgb.XGBClassifier(
                objective='binary:logistic',
                n_estimators=2000,
                max_depth=8,
                learning_rate=0.0117638346084812,
                subsample=0.8880638162211006,
                colsample_bylevel=0.8297860865775033,
                colsample_bynode=0.8122445811896282,
                colsample_bytree=0.6583959588880559,
                random_state=89,
                use_label_encoder=False,
                eval_metric='auc',
                min_child_weight=15,
                gamma=0.9267499839984471,
                reg_alpha=0.37089061512357124,
                reg_lambda=0.0671185894939356,
                max_delta_step=2,
            ))
        ])

    # Try advanced pipeline, fall back to simple if needed
    try:
        print("Creating advanced ensemble pipeline...")
        pipeline = create_advanced_pipeline()
    except Exception as e:
        print(f"Failed to create advanced pipeline: {e}")
        print("Falling back to simple pipeline...")
        pipeline = create_simple_pipeline()

    # Train the model
    print("Training model...")
    pipeline.fit(X_features, y_train)
    
    # Get feature selection info
    try:
        if 'rfe_selector' in pipeline.named_steps:
            selected_mask = pipeline.named_steps['rfe_selector'].get_support()
            # Need to account for previous feature selection steps
            if 'univariate_selector' in pipeline.named_steps:
                # Get mask from univariate selector first
                univariate_mask = pipeline.named_steps['univariate_selector'].get_support()
                # Apply variance filter if it exists
                if 'variance_filter' in pipeline.named_steps:
                    variance_mask = pipeline.named_steps['variance_filter'].get_support()
                    # Combine all masks
                    final_mask = np.zeros(len(X_features.columns), dtype=bool)
                    final_mask[variance_mask] = univariate_mask[selected_mask]
                else:
                    final_mask = np.zeros(len(X_features.columns), dtype=bool)
                    final_mask[univariate_mask] = selected_mask
                selected_feature_names = X_features.columns[final_mask]
            else:
                selected_feature_names = X_features.columns[selected_mask]
        else:
            # Fallback for simple pipeline
            selected_mask = pipeline.named_steps['feature_selector'].get_support()
            selected_feature_names = X_features.columns[selected_mask]

        print(f"Selected {len(selected_feature_names)} features")
        
        # Get feature importances (works for ensemble too - uses the final estimator)
        if hasattr(pipeline.named_steps['ensemble' if 'ensemble' in pipeline.named_steps else 'model'], 'feature_importances_'):
            if 'ensemble' in pipeline.named_steps and hasattr(pipeline.named_steps['ensemble'], 'final_estimator_'):
                # For stacking classifier, get importances from meta-learner
                if hasattr(pipeline.named_steps['ensemble'].final_estimator_, 'coef_'):
                    feature_importance = np.abs(pipeline.named_steps['ensemble'].final_estimator_.coef_[0])
                else:
                    feature_importance = np.ones(len(selected_feature_names))  # Fallback
            else:
                # For single model or voting classifier
                feature_importance = pipeline.named_steps['ensemble' if 'ensemble' in pipeline.named_steps else 'model'].feature_importances_
            
            # Create a DataFrame of feature importances
            importance_df = pd.DataFrame({
                'Feature': selected_feature_names,
                'Importance': feature_importance
            }).sort_values('Importance', ascending=False)

            # Display top 20 features
            print("\nTop 20 Most Important Features:")
            print(importance_df.head(20))
            
            # Save feature importance
            importance_df.to_csv(os.path.join(model_directory_path, 'feature_importance.csv'), index=False)
            
    except Exception as e:
        print(f"Could not extract feature importance: {e}")

    # Calculate training metrics
    y_pred_proba = pipeline.predict_proba(X_features)[:, 1]
    auc = roc_auc_score(y_train, y_pred_proba)

    print(f"\nTraining Metrics:")
    print(f"AUC-ROC: {auc:.4f}")

    # Save the model
    print(f"Saving model to {os.path.join(model_directory_path, 'model.joblib')}")
    joblib.dump(pipeline, os.path.join(model_directory_path, 'model.joblib'))

    # Save model metrics
    metrics = {
        'auc': auc,
        'n_features': X_features.shape[1],
        'n_samples': X_features.shape[0],
        'model_type': 'ensemble' if 'ensemble' in pipeline.named_steps else 'single'
    }

    pd.Series(metrics).to_csv(os.path.join(model_directory_path, 'train_metrics.csv'))

### The `infer()` Function

In the inference function, your trained model (if any) is loaded and used to make predictions on test data.

**Important workflow:**
1. Load your model;
2. Use the `yield` statement to signal readiness to the runner;
3. Process each dataset one by one within the for loop;
4. For each dataset, use `yield prediction` to return your prediction.

**Note:** The datasets can only be iterated once!

In [38]:
import pandas as pd
import numpy as np
import os
import joblib
from typing import Iterable, Iterator

def infer(
    X_test: Iterable[pd.DataFrame],
    model_directory_path: str,
) -> Iterator[pd.Series]:
    """
    Generator function that yields predictions for structural breaks in streaming data.

    Parameters:
    -----------
    X_test : Iterable[pandas.DataFrame]
        An iterable of DataFrames, each containing time series data with 'value' and 'period' columns
    model_directory_path : str
        Directory containing the trained model

    Yields:
    -------
    pandas.Series
        Predicted probabilities of structural break for each time series
    """
    # Load the model
    model_path = os.path.join(model_directory_path, 'model.joblib')

    try:
        model = joblib.load(model_path)
        print(f"Successfully loaded model from {model_path}")
    except FileNotFoundError:
        raise FileNotFoundError(f"Model file not found at {model_path}")
    except Exception as e:
        raise RuntimeError(f"Error loading model: {str(e)}")

    # Signal that we're ready to process data
    yield

    # Process each incoming dataset
    for dataset in X_test:
        try:
            # Check if we have a dummy model
            if isinstance(model, dict) and model.get('type') == 'dummy':
                # For dummy model, just return the majority class
                prediction = pd.Series(model['majority_class'])
                yield prediction
                continue

            # Extract features from the dataset
            features = extract_all_features(dataset)

            # Convert features to DataFrame for model input
            if features:
                features_df = pd.DataFrame([features])

                # Ensure we have the same features as the model expects
                if hasattr(model, 'feature_names_in_'):
                    # For scikit-learn pipelines with feature names
                    missing_cols = set(model.feature_names_in_) - set(features_df.columns)
                    for col in missing_cols:
                        features_df[col] = 0  # Fill missing features with zeros

                    # Ensure correct column order
                    features_df = features_df[model.feature_names_in_]

                # Generate prediction
                if hasattr(model, 'predict_proba'):
                    # For classifiers that output probabilities
                    prob = model.predict_proba(features_df)[:, 1]
                    prediction = pd.Series(prob)
                else:
                    # For models that only output class labels
                    pred = model.predict(features_df)
                    prediction = pd.Series(pred)
            else:
                # If no features could be extracted, return neutral prediction
                prediction = pd.Series(0.5)

        except Exception as e:
            # If any error occurs, log it and return neutral prediction
            print(f"Error in prediction: {str(e)}")
            prediction = pd.Series(0.5)

        # Yield the prediction
        yield prediction


In [39]:
def infer(
    X_test: Iterable[pd.DataFrame],
    model_directory_path: str,
) -> Iterator[pd.Series]:
    """
    Generator function that yields predictions for structural breaks in streaming data.

    Parameters:
    -----------
    X_test : Iterable[pandas.DataFrame]
        An iterable of DataFrames, each containing time series data with 'value' and 'period' columns
    model_directory_path : str
        Directory containing the trained model

    Yields:
    -------
    pandas.Series
        Predicted probabilities of structural break for each time series
    """
    # Load the model
    model_path = os.path.join(model_directory_path, 'model.joblib')

    try:
        model = joblib.load(model_path)
        print(f"Successfully loaded model from {model_path}")
    except FileNotFoundError:
        raise FileNotFoundError(f"Model file not found at {model_path}")
    except Exception as e:
        raise RuntimeError(f"Error loading model: {str(e)}")

    # Signal that we're ready to process data
    yield

    # Initialize DGP extractor
    dgp_extractor = FastDGPFeatureExtractor()
    
    # Process each incoming dataset
    for dataset in X_test:
        try:
            # Check if we have a dummy model
            if isinstance(model, dict) and model.get('type') == 'dummy':
                # For dummy model, just return the majority class
                prediction = pd.Series(model['majority_class'])
                yield prediction
                continue

            # Extract features from the dataset
            features = extract_all_features(dataset)
            
            # Extract DGP features specifically for this dataset
            try:
                # Create a proper MultiIndex structure for DGP feature extraction
                # First, ensure we have a time index
                if not isinstance(dataset.index, pd.DatetimeIndex) and 'time' not in dataset.columns:
                    # Create a simple integer index if no time index exists
                    dataset_with_index = dataset.reset_index(drop=True)
                    dataset_with_index['time'] = range(len(dataset))
                else:
                    dataset_with_index = dataset.copy()
                
                # Create MultiIndex with a dummy ID for this single time series
                if 'time' in dataset_with_index.columns:
                    time_col = 'time'
                else:
                    time_col = dataset_with_index.index.name if dataset_with_index.index.name else 'time'
                    dataset_with_index = dataset_with_index.reset_index()
                
                # Create MultiIndex
                dataset_with_index['id'] = 0  # Dummy ID for single series
                dataset_with_index = dataset_with_index.set_index(['id', time_col])
                
                # Extract DGP features
                dgp_features = dgp_extractor.extract_features(dataset_with_index, max_series=1)
                
                # Add DGP features to our feature dictionary
                if not dgp_features.empty:
                    dgp_feature_dict = dgp_features.iloc[0].to_dict()
                    # Remove the 'id' key if it exists to avoid conflicts
                    dgp_feature_dict.pop('id', None)
                    features.update(dgp_feature_dict)
            except Exception as e:
                print(f"DGP feature extraction failed in infer: {e}")
                # Continue without DGP features

            # Convert features to DataFrame for model input
            if features:
                features_df = pd.DataFrame([features])

                # Ensure we have the same features as the model expects
                if hasattr(model, 'feature_names_in_'):
                    # For scikit-learn pipelines with feature names
                    missing_cols = set(model.feature_names_in_) - set(features_df.columns)
                    for col in missing_cols:
                        features_df[col] = 0  # Fill missing features with zeros

                    # Ensure correct column order
                    features_df = features_df[model.feature_names_in_]

                # Generate prediction
                if hasattr(model, 'predict_proba'):
                    # For classifiers that output probabilities
                    prob = model.predict_proba(features_df)[:, 1]
                    prediction = pd.Series(prob)
                else:
                    # For models that only output class labels
                    pred = model.predict(features_df)
                    prediction = pd.Series(pred)
            else:
                # If no features could be extracted, return neutral prediction
                prediction = pd.Series(0.5)

        except Exception as e:
            # If any error occurs, log it and return neutral prediction
            print(f"Error in prediction: {str(e)}")
            prediction = pd.Series(0.5)

        # Yield the prediction
        yield prediction

## Local testing

To make sure your `train()` and `infer()` function are working properly, you can call the `crunch.test()` function that will reproduce the cloud environment locally. <br />
Even if it is not perfect, it should give you a quick idea if your model is working properly.

In [40]:
crunch.test(
    # Uncomment to disable the train
    #force_first_train=False,

    # Uncomment to disable the determinism check
    # no_determinism_check=True,
)

[32m16:14:35[0m [33mno forbidden library found[0m
[32m16:14:35[0m [33m[0m
[32m16:14:35[0m started
[32m16:14:35[0m running local test
[32m16:14:35[0m [33minternet access isn't restricted, no check will be done[0m
[32m16:14:35[0m 
[32m16:14:36[0m starting unstructured loop...
[32m16:14:36[0m executing - command=train


data\X_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_train.parquet (204327238 bytes)
data\X_train.parquet: already exists, file length match
data\X_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/X_test.reduced.parquet (2380918 bytes)
data\X_test.reduced.parquet: already exists, file length match
data\y_train.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_train.parquet (61003 bytes)
data\y_train.parquet: already exists, file length match
data\y_test.reduced.parquet: download from https:crunchdao--competition--production.s3-accelerate.amazonaws.com/data-releases/146/y_test.reduced.parquet (2655 bytes)
data\y_test.reduced.parquet: already exists, file length match
Training model for structural break detection
X_train type: <class 'pandas.core.frame.DataFrame'>
X_train shape: (237

[32m16:17:54[0m executing - command=infer


Successfully loaded model from resources\model.joblib


[32m16:18:15[0m checking determinism by executing the inference again with 30% of the data (tolerance: 1e-08)
[32m16:18:15[0m executing - command=infer


Successfully loaded model from resources\model.joblib


[32m16:18:22[0m determinism check: passed
[32m16:18:22[0m [33msave prediction - path=data\prediction.parquet[0m
[32m16:18:22[0m ended
[32m16:18:22[0m [33mduration - time=00:03:46[0m
[32m16:18:22[0m [33mmemory - before="6.64 GB" after="3.93 GB" consumed="-2709954560 bytes"[0m


## Results

Once the local tester is done, you can preview the result stored in `data/prediction.parquet`.

In [41]:
prediction = pd.read_parquet("data/prediction.parquet")
prediction

Unnamed: 0_level_0,prediction
id,Unnamed: 1_level_1
10001,0.655638
10002,0.307177
10003,0.447560
10004,0.315210
10005,0.497388
...,...
10097,0.416451
10098,0.268131
10099,0.525434
10100,0.325190


### Local scoring

You can call the function that the system uses to estimate your score locally.

In [42]:
# Load the targets
target = pd.read_parquet("data/y_test.reduced.parquet")["structural_breakpoint"]

# Call the scoring function
sklearn.metrics.roc_auc_score(
    target,
    prediction,
)

0.8117370892018779