In [2]:
# Feature Engineering for Jane Street Market Prediction

import sys
sys.path.append('..')

import polars as pl
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from typing import List, Dict, Tuple

# Custom utilities
from src.data.data_loader import JaneStreetDataLoader

class FeatureEngineer:
    """
    Feature engineering pipeline for market prediction
    """
    def __init__(self, df: pl.DataFrame):
        self.df = df
        self.feature_cols = [col for col in df.columns if col.startswith('feature_')]
        
    def verify_feature_distributions(self) -> Dict:
        """
        Check feature scales and distributions
        """
        stats = {}
        for col in self.feature_cols:
            if self.df[col].null_count() / len(self.df) < 0.5:  # Only analyze features with <50% missing
                stats[col] = {
                    'mean': float(self.df[col].mean()),
                    'std': float(self.df[col].std()),
                    'min': float(self.df[col].min()),
                    'max': float(self.df[col].max()),
                    'null_pct': float(self.df[col].null_count() / len(self.df))
                }
        return stats
    
    def create_temporal_features(self, 
                               lags: List[int] = [1, 2, 3, 5, 10],
                               windows: List[int] = [5, 10, 20]) -> pl.DataFrame:
        """
        Create time-based features
        """
        df = self.df.sort(['symbol_id', 'date_id', 'time_id'])
        
        # 1. Lag features
        for lag in lags:
            # Target lags
            df = df.with_columns([
                pl.col('responder_6')
                  .shift(lag)
                  .over('symbol_id')
                  .alias(f'target_lag_{lag}')
            ])
            
            # Feature lags for important features
            for feat in self.get_important_features():
                df = df.with_columns([
                    pl.col(feat)
                      .shift(lag)
                      .over('symbol_id')
                      .alias(f'{feat}_lag_{lag}')
                ])
        
        # 2. Rolling statistics
        for window in windows:
            df = df.with_columns([
                pl.col('responder_6')
                  .rolling_mean(window)
                  .over('symbol_id')
                  .alias(f'return_ma_{window}'),
                  
                pl.col('responder_6')
                  .rolling_std(window)
                  .over('symbol_id')
                  .alias(f'return_vol_{window}')
            ])
        
        return df
    
    def create_symbol_features(self) -> pl.DataFrame:
        """
        Create symbol-specific features
        """
        df = self.df
        
        # 1. Symbol level statistics
        symbol_stats = (df.group_by('symbol_id')
                         .agg([
                             pl.col('responder_6').mean().alias('symbol_mean_return'),
                             pl.col('responder_6').std().alias('symbol_volatility'),
                             pl.len().alias('symbol_activity')
                         ]))
        
        # Join back to main dataset
        df = df.join(symbol_stats, on='symbol_id')
        
        # 2. Symbol regime indicators
        df = df.with_columns([
            (pl.col('symbol_volatility') > 
             pl.col('symbol_volatility').mean() + pl.col('symbol_volatility').std())
            .cast(pl.Int32)
            .alias('high_vol_symbol'),
            
            (pl.col('symbol_activity') > 
             pl.col('symbol_activity').mean() + pl.col('symbol_activity').std())
            .cast(pl.Int32)
            .alias('high_activity_symbol')
        ])
        
        return df
    
    def create_regime_features(self) -> pl.DataFrame:
        """
        Create market regime indicators
        """
        df = self.df
        
        # 1. Calculate daily volatility
        daily_stats = (df.group_by('date_id')
                        .agg([
                            pl.col('responder_6').std().alias('daily_vol'),
                            pl.col('responder_6').mean().alias('daily_return'),
                            pl.len().alias('daily_trades')
                        ]))
        
        vol_mean = float(daily_stats['daily_vol'].mean())
        vol_std = float(daily_stats['daily_vol'].std())
        
        # 2. Create regime indicators
        daily_stats = daily_stats.with_columns([
            pl.when(pl.col('daily_vol') > vol_mean + vol_std)
              .then(pl.lit('high'))
              .when(pl.col('daily_vol') < vol_mean - vol_std)
              .then(pl.lit('low'))
              .otherwise(pl.lit('normal'))
              .alias('vol_regime')
        ])
        
        # 3. Join back to main dataset
        df = df.join(daily_stats.select(['date_id', 'vol_regime']), on='date_id')
        
        # 4. Create dummy variables for regimes
        df = df.with_columns([
            (pl.col('vol_regime') == 'high').cast(pl.Int32).alias('high_vol_regime'),
            (pl.col('vol_regime') == 'low').cast(pl.Int32).alias('low_vol_regime')
        ])
        
        return df
    
    def handle_missing_values(self) -> pl.DataFrame:
        """
        Apply missing value strategy
        """
        df = self.df
        
        # 1. Identify features by missing value levels
        feature_missing = {col: df[col].null_count() / len(df) 
                         for col in self.feature_cols}
        
        high_missing = [col for col, pct in feature_missing.items() 
                       if pct > 0.5]
        moderate_missing = [col for col, pct in feature_missing.items() 
                          if 0.1 < pct <= 0.5]
        low_missing = [col for col, pct in feature_missing.items() 
                      if 0 < pct <= 0.1]
        
        # 2. Drop high missing features
        df = df.drop(high_missing)
        
        # 3. Forward fill moderate missing (within symbols)
        for col in moderate_missing:
            df = df.with_columns([
                pl.col(col)
                  .forward_fill()
                  .over('symbol_id')
                  .alias(col)
            ])
        
        # 4. Interpolate low missing
        for col in low_missing:
            df = df.with_columns([
                pl.col(col)
                  .interpolate()
                  .alias(col)
            ])
        
        return df
    
    def get_important_features(self) -> List[str]:
        """
        Identify important features based on correlation with target
        """
        important_features = []
        for col in self.feature_cols:
            if self.df[col].null_count() / len(self.df) < 0.5:
                corr = self.df.select(pl.corr(col, 'responder_6')).item()
                if abs(corr) > 0.1:  # Arbitrary threshold
                    important_features.append(col)
        return important_features
    
    def check_multicollinearity(self, threshold: float = 0.8) -> Dict:
        """
        Check for high correlations between features
        """
        high_corr_pairs = {}
        features = self.get_important_features()
        
        for i, feat1 in enumerate(features):
            for feat2 in features[i+1:]:
                corr = abs(self.df.select(pl.corr(feat1, feat2)).item())
                if corr > threshold:
                    high_corr_pairs[(feat1, feat2)] = corr
        
        return high_corr_pairs

# Load and prepare data
print("Loading data...")
loader = JaneStreetDataLoader()
sample_data = loader.load_training_sample(n_partitions=2)

# Create feature engineer
print("\nInitializing feature engineering...")
engineer = FeatureEngineer(sample_data)

# 1. Verify feature distributions
print("\nVerifying feature distributions...")
feature_stats = engineer.verify_feature_distributions()

# 2. Create features
print("\nCreating temporal features...")
df = engineer.create_temporal_features()

print("Creating symbol features...")
df = engineer.create_symbol_features()

print("Creating regime features...")
df = engineer.create_regime_features()

# 3. Handle missing values
print("\nHandling missing values...")
df = engineer.handle_missing_values()

# 4. Check for multicollinearity
print("\nChecking for multicollinearity...")
high_corr_features = engineer.check_multicollinearity()

# Display summary
print("\n=== Feature Engineering Summary ===")
print(f"Original features: {len(engineer.feature_cols)}")
print(f"Engineered features: {len(df.columns) - len(sample_data.columns)}")
print(f"High correlation pairs: {len(high_corr_features)}")

# Save processed dataset
print("\nSaving processed dataset...")
df.write_parquet("../data/processed/engineered_features.parquet")

Loading data...

Initializing feature engineering...

Verifying feature distributions...

Creating temporal features...
Creating symbol features...
Creating regime features...

Handling missing values...

Checking for multicollinearity...

=== Feature Engineering Summary ===
Original features: 79
Engineered features: -9
High correlation pairs: 0

Saving processed dataset...


In [1]:
import polars as pl
import numpy as np
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass

@dataclass
class MarketRegime:
    """Market regime configuration"""
    name: str
    volatility_threshold: float
    correlation_threshold: float

class EnhancedFeatureEngineer:
    """
    Enhanced feature engineering with regime awareness and robust calculations
    """
    def __init__(self, df: pl.DataFrame):
        """Initialize with dataframe and detect feature columns"""
        self.df = df
        self.feature_cols = [col for col in df.columns if col.startswith('feature_')]
        self.regimes = None
        self._set_dynamic_thresholds()
    
    def _set_dynamic_thresholds(self) -> None:
        """Set regime thresholds using historical data distribution"""
        try:
            # Calculate daily volatility
            vol_stats = (self.df
                        .group_by('date_id')
                        .agg(pl.col('responder_6').std().alias('daily_vol')))
            
            # Get distribution quantiles
            vol_quantiles = vol_stats.select([
                pl.col('daily_vol').quantile(0.33).alias('low_vol'),
                pl.col('daily_vol').quantile(0.67).alias('high_vol')
            ])
            
            # Set thresholds
            low_vol = float(vol_quantiles.get_column('low_vol')[0])
            high_vol = float(vol_quantiles.get_column('high_vol')[0])
            
            self.regimes = {
                'low_vol': MarketRegime('low_volatility', low_vol, 0.3),
                'normal': MarketRegime('normal', high_vol, 0.5),
                'high_vol': MarketRegime('high_volatility', float('inf'), 0.7)
            }
            
        except Exception as e:
            print(f"Error setting thresholds: {e}")
            # Set fallback thresholds
            self.regimes = {
                'low_vol': MarketRegime('low_volatility', 0.5, 0.3),
                'normal': MarketRegime('normal', 1.0, 0.5),
                'high_vol': MarketRegime('high_volatility', float('inf'), 0.7)
            }
    
    def _validate_feature_set(self, df: pl.DataFrame, stage: str) -> None:
        """Validate feature quality after each stage"""
        null_counts = df.null_count()
        high_nulls = []
        for col in df.columns:
            null_count = null_counts.get_column(col)[0]
            if null_count/len(df) > 0.1:
                high_nulls.append((col, null_count))
        
        if high_nulls:
            print(f"\nWarning: High null counts after {stage}:")
            for col, count in high_nulls:
                print(f"- {col}: {count} nulls ({count/len(df)*100:.1f}%)")
    
    def create_base_features(self, 
                        windows: List[int] = [5, 10, 20],
                        min_window_fraction: float = 0.5) -> pl.DataFrame:
        """Create foundational technical indicators"""
        df = self.df.sort(['symbol_id', 'date_id', 'time_id'])
        
        for window in windows:
            min_periods = max(int(window * min_window_fraction), 1)
            
            try:
                # Fix: Use simpler rolling operations first
                df = df.with_columns([
                    # Price changes
                    pl.col('responder_6')
                    .diff()
                    .over(['symbol_id'])
                    .alias(f'price_change_{window}'),
                    
                    # Simple moving average
                    pl.col('responder_6')
                    .rolling_mean(window, min_periods=min_periods)
                    .over(['symbol_id'])
                    .alias(f'sma_{window}'),
                    
                    # Volatility
                    pl.col('responder_6')
                    .rolling_std(window, min_periods=min_periods)
                    .over(['symbol_id'])
                    .alias(f'volatility_{window}')
                ])
                
                # Add lag features
                df = df.with_columns([
                    pl.col('responder_6')
                    .shift(i)
                    .over(['symbol_id'])
                    .alias(f'lag_{i}')
                    for i in range(1, 6)
                ])
                
                # Calculate momentum using the SMA
                df = df.with_columns([
                    (pl.col('responder_6') - pl.col(f'sma_{window}'))
                    .alias(f'momentum_{window}')
                ])
                
                # Calculate returns safely
                df = df.with_columns([
                    (pl.col('responder_6') / 
                    pl.col('responder_6').shift(window)
                    .over(['symbol_id'])
                    .fill_null(pl.col('responder_6')) - 1)
                    .alias(f'return_{window}')
                ])
                
                self._validate_feature_set(df, f"base features window {window}")
                
            except Exception as e:
                print(f"Error creating base features for window {window}: {e}")
                import traceback
                traceback.print_exc()
        
        return df
    
    def detect_regimes(self, 
                  df: pl.DataFrame, 
                  window: int = 20,
                  min_window_fraction: float = 0.5) -> pl.DataFrame:
        """Detect market regimes using multiple indicators"""
        try:
            min_periods = max(int(window * min_window_fraction), 1)
            
            # Calculate market metrics
            market_metrics = (df.group_by(['date_id'])  # Removed time_id for stability
                            .agg([
                                pl.col('responder_6').std().alias('market_vol'),
                                pl.col('responder_6').mean().alias('market_return'),
                                pl.col('responder_6').count().alias('active_symbols')
                            ]))
            
            # Add rolling metrics
            market_metrics = market_metrics.with_columns([
                pl.col('market_vol')
                .rolling_mean(window, min_periods=min_periods)
                .alias('rolling_vol')
            ])
            
            # Join and detect regimes
            df = df.join(market_metrics, on=['date_id'])
            
            # Create regime column
            df = df.with_columns([
                pl.when(pl.col('rolling_vol') > self.regimes['normal'].volatility_threshold)
                .then(pl.lit('high_volatility'))
                .when(pl.col('rolling_vol') < self.regimes['low_vol'].volatility_threshold)
                .then(pl.lit('low_volatility'))
                .otherwise(pl.lit('normal'))
                .alias('volatility_regime')
            ])
            
            return df
            
        except Exception as e:
            print(f"Error in regime detection: {e}")
            import traceback
            traceback.print_exc()
            return df.with_columns(pl.lit('normal').alias('volatility_regime'))
    
    def create_regime_features(self, 
                         df: pl.DataFrame, 
                         windows: List[int] = [5, 10, 20]) -> pl.DataFrame:
        """Create regime-aware and adaptive features"""
        try:
            # Create regime indicators first
            df = df.with_columns([
                (pl.col('volatility_regime') == 'high_volatility')
                .cast(pl.Int32)
                .alias('high_vol_regime'),
                (pl.col('volatility_regime') == 'low_volatility')
                .cast(pl.Int32)
                .alias('low_vol_regime')
            ])
            
            for window in windows:
                # Create regime-weighted features
                df = df.with_columns([
                    # Regime-weighted momentum
                    pl.col(f'momentum_{window}')
                    .mul(pl.when(pl.col('high_vol_regime') == 1)
                            .then(0.7)
                            .when(pl.col('low_vol_regime') == 1)
                            .then(1.3)
                            .otherwise(1.0))
                    .alias(f'regime_momentum_{window}'),
                    
                    # Excess returns
                    (pl.col(f'return_{window}') - 
                    pl.col(f'return_{window}').mean().over('date_id'))
                    .alias(f'excess_return_{window}')
                ])
            
            return df
            
        except Exception as e:
            print(f"Error creating regime features: {e}")
            import traceback
            traceback.print_exc()
            return df
    
    def create_symbol_features(self, 
                         df: pl.DataFrame,
                         window: int = 20,
                         min_periods: int = 10) -> pl.DataFrame:
        """Create symbol-specific features"""
        try:
            # Basic symbol metrics
            symbol_metrics = (df.group_by('symbol_id')
                            .agg([
                                pl.col('responder_6').std().alias('symbol_vol'),
                                pl.col('responder_6').mean().alias('symbol_return'),
                                pl.col('responder_6').count().alias('symbol_liquidity')
                            ]))
            
            # Join and create relative metrics
            df = df.join(symbol_metrics, on='symbol_id')
            
            # Add rankings
            df = df.with_columns([
                pl.col('symbol_vol')
                .rank()
                .over('date_id')
                .alias('vol_rank'),
                pl.col('symbol_liquidity')
                .rank()
                .over('date_id')
                .alias('liquidity_rank')
            ])
            
            return df
            
        except Exception as e:
            print(f"Error creating symbol features: {e}")
            import traceback
            traceback.print_exc()
            return df
    
    def analyze_importance(self, 
                         df: pl.DataFrame, 
                         features: List[str],
                         min_samples: int = 100) -> Dict:
        """Analyze feature importance with robust statistics"""
        importance_stats = {}
        
        for feature in features:
            try:
                # Filter valid data using null check
                valid_data = df.filter(
                    ~pl.col(feature).is_null() & ~pl.col('responder_6').is_null()
                )
                
                if len(valid_data) < min_samples:
                    print(f"Warning: Insufficient samples for {feature}")
                    continue
                
                # Calculate correlations
                overall_corr = float(valid_data.select(
                    pl.corr(feature, 'responder_6')
                ).item())
                
                # Get regime-specific correlations
                regime_corrs = valid_data.group_by('volatility_regime').agg([
                    pl.corr(feature, 'responder_6').alias('correlation')
                ])
                
                regime_dict = {
                    str(row[0]): float(row[1]) 
                    for row in regime_corrs.iter_rows()
                }
                
                if abs(overall_corr) > 0.05:  # Store only meaningful correlations
                    importance_stats[feature] = {
                        'overall_correlation': overall_corr,
                        'regime_correlations': regime_dict
                    }
                
            except Exception as e:
                print(f"Error analyzing {feature}: {e}")
        
        return importance_stats
    
    def create_all_features(self) -> Tuple[pl.DataFrame, Dict]:
        """Create all enhanced features with staged execution"""
        try:
            print("\nStage 1: Creating base features...")
            df = self.create_base_features()
            print("Base features created successfully")
            
            print("\nStage 2: Detecting market regimes...")
            df = self.detect_regimes(df)
            print("Regime detection completed")
            
            print("\nStage 3: Creating regime-aware features...")
            df = self.create_regime_features(df)
            print("Regime features created")
            
            print("\nStage 4: Creating symbol-specific features...")
            df = self.create_symbol_features(df)
            print("Symbol features created")
            
            # Analyze new features
            original_cols = set(self.df.columns)
            new_features = [col for col in df.columns if col not in original_cols]
            
            print(f"\nCreated {len(new_features)} new features.")
            importance_stats = self.analyze_importance(df, new_features)
            
            return df, importance_stats
            
        except Exception as e:
            print(f"Error in feature creation pipeline: {e}")
            import traceback
            traceback.print_exc()
            raise
# Example usage
if __name__ == "__main__":
    print("Loading data...")
    df = pl.read_parquet("../data/processed/engineered_features.parquet")

    print("\nInitializing feature engineering...")
    engineer = EnhancedFeatureEngineer(df)
    enhanced_df, importance_stats = engineer.create_all_features()

    print("\nFeature Importance Analysis:")
    for feature, stats in importance_stats.items():
        print(f"\n{feature}:")
        print(f"- Overall correlation: {stats['overall_correlation']:.4f}")
        print("- Regime-specific correlations:")
        for regime, corr in stats['regime_correlations'].items():
            print(f"  * {regime}: {corr:.4f}")

    print("\nSaving enhanced features...")
    enhanced_df.write_parquet("../data/processed/enhanced_features.parquet")

Loading data...

Initializing feature engineering...

Stage 1: Creating base features...
Error creating base features for window 5: argument 'interpolation': 'float' object cannot be converted to 'PyString'
Error creating base features for window 10: argument 'interpolation': 'float' object cannot be converted to 'PyString'
Error creating base features for window 20: argument 'interpolation': 'float' object cannot be converted to 'PyString'

Stage 2: Detecting market regimes...

Stage 3: Creating regime-aware features...
Error creating regime features: sub operation not supported for dtypes `str` and `str`

Stage 4: Creating symbol-specific features...


: 