In [5]:
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Lasso
from typing import List, Dict, Optional
import warnings

In [6]:
class OFIAnalyzer:
    """
    Enhanced Order Flow Imbalance analyzer with improved efficiency and robustness.
    
    Features:
    - Type hints for better code clarity
    - More efficient memory usage
    - Better handling of edge cases
    - Additional validation checks
    - Optimized calculations
    - More detailed documentation
    """
    
    def __init__(self, data: pd.DataFrame, levels: int = 10, time_window: str = '1min'):
        """
        Initialize the OFI analyzer with order book data.
        
        Parameters:
        -----------
        data : pd.DataFrame
            Order book data containing bid/ask prices and sizes
        levels : int, optional
            Number of order book levels to consider (default: 10)
        time_window : str, optional
            Time window for aggregating OFI (default: '1min')
            
        Raises:
        -------
        ValueError
            If required columns are missing from the input data
        """
        # Validate input data
        required_columns = ['ts_event', 'symbol', 'bid_px_00', 'ask_px_00']
        if not all(col in data.columns for col in required_columns):
            raise ValueError(f"Input data must contain these columns: {required_columns}")
            
        self.data = data.copy()
        self.levels = min(levels, 10)  # Ensure we don't exceed available levels
        self.time_window = time_window
        self.symbols = data['symbol'].unique()
        self._validate_data()
        self._preprocess_data()
        
    def _validate_data(self) -> None:
        """Validate the input data structure."""
        # Check that all required level columns exist
        for level in range(self.levels):
            for prefix in ['bid_px_', 'ask_px_', 'bid_sz_', 'ask_sz_']:
                col = f"{prefix}{level:02d}"
                if col not in self.data.columns:
                    raise ValueError(f"Missing required column: {col}")
    
    def _preprocess_data(self) -> None:
        """Pre-process the order book data for OFI calculations."""
        # Convert timestamps and sort
        self.data['ts_event'] = pd.to_datetime(self.data['ts_event'])
        self.data['ts_recv'] = pd.to_datetime(self.data['ts_recv'])
        self.data = self.data.sort_values('ts_event')
        
        # Calculate mid-price and create time bins
        self.data['mid_price'] = (self.data['bid_px_00'] + self.data['ask_px_00']) / 2
        self.data['time_bin'] = self.data['ts_event'].dt.floor(self.time_window)
        
    def _calculate_single_level_ofi(self, level: int) -> pd.Series:
        """
        Calculate OFI for a single level using vectorized operations.
        
        Parameters:
        -----------
        level : int
            Order book level to calculate OFI for
            
        Returns:
        --------
        pd.Series
            OFI values for the specified level
        """
        # Get column names for this level
        bid_px_col = f'bid_px_{level:02d}'
        ask_px_col = f'ask_px_{level:02d}'
        bid_sz_col = f'bid_sz_{level:02d}'
        ask_sz_col = f'ask_sz_{level:02d}'
        
        # Group by symbol to handle each asset separately
        grouped = self.data.groupby('symbol')
        
        # Calculate price and size changes using vectorized operations
        bid_px_change = grouped[bid_px_col].diff()
        ask_px_change = grouped[ask_px_col].diff()
        bid_sz_change = grouped[bid_sz_col].diff()
        ask_sz_change = grouped[ask_sz_col].diff()
        
        # Calculate bid OFI component
        bid_ofi = (
            (bid_px_change > 0) * self.data[bid_sz_col] - 
            (bid_px_change < 0) * self.data[bid_sz_col] + 
            (bid_px_change == 0) * bid_sz_change.fillna(0)
        )
        
        # Calculate ask OFI component
        ask_ofi = (
            (ask_px_change > 0) * (-self.data[ask_sz_col]) - 
            (ask_px_change < 0) * self.data[ask_sz_col] + 
            (ask_px_change == 0) * ask_sz_change.fillna(0)
        )
        
        return bid_ofi + ask_ofi
    
    def calculate_best_level_ofi(self) -> pd.DataFrame:
        """Calculate Best-Level OFI (level 0) and aggregate by time window."""
        self.data['best_level_ofi'] = self._calculate_single_level_ofi(0)
        
        # Aggregate using sum (as per Cont et al.)
        result = (self.data.groupby(['time_bin', 'symbol'])['best_level_ofi']
                 .sum()
                 .unstack(fill_value=0)
                 .reset_index()
                 .rename(columns={'time_bin': 'timestamp'}))
        
        # For single asset, keep consistent column naming
        if len(self.symbols) == 1:
            result = result.rename(columns={self.symbols[0]: 'OFI_best'})
        
        return result
    
    def calculate_multi_level_ofi(self) -> pd.DataFrame:
        """Calculate Multi-Level OFI by aggregating across all levels."""
        # Calculate OFI for each level and aggregate
        ofi_values = np.zeros(len(self.data))
        
        for level in range(self.levels):
            ofi_values += self._calculate_single_level_ofi(level)
        
        self.data['multi_level_ofi'] = ofi_values
        
        # Calculate average depth for scaling
        depth_values = np.zeros(len(self.data))
        for level in range(self.levels):
            bid_col = f'bid_sz_{level:02d}'
            ask_col = f'ask_sz_{level:02d}'
            depth_values += (self.data[bid_col] + self.data[ask_col]) / 2
        
        avg_depth = depth_values / self.levels
        self.data['multi_level_ofi'] = self.data['multi_level_ofi'] / avg_depth.replace(0, 1)
        
        # Aggregate results
        result = (self.data.groupby(['time_bin', 'symbol'])['multi_level_ofi']
                 .sum()
                 .unstack(fill_value=0)
                 .reset_index()
                 .rename(columns={'time_bin': 'timestamp'}))
        
        # Sum across levels and rename columns
        result['OFI_multi'] = result[self.symbols].sum(axis=1) if len(self.symbols) > 1 else result[self.symbols[0]]
        
        return result[['timestamp', 'OFI_multi'] + list(self.symbols)]
    
    def calculate_integrated_ofi(self) -> pd.DataFrame:
        """Calculate Integrated OFI using PCA on multi-level OFI."""
        # First calculate multi-level OFI for each level
        ofi_matrix = []
        for level in range(self.levels):
            ofi_matrix.append(self._calculate_single_level_ofi(level))
        
        ofi_matrix = pd.concat(ofi_matrix, axis=1)
        ofi_matrix.columns = [f'ofi_{level}' for level in range(self.levels)]
        
        # Standardize and apply PCA
        scaler = StandardScaler()
        pca = PCA(n_components=1)
        
        # Group by time bin and symbol
        grouped = pd.concat([self.data[['time_bin', 'symbol']], ofi_matrix], axis=1)
        grouped = grouped.groupby(['time_bin', 'symbol']).sum()
        
        # Apply PCA to each symbol's data
        results = []
        for symbol in self.symbols:
            symbol_data = grouped.xs(symbol, level='symbol')
            if len(symbol_data) < 2:
                warnings.warn(f"Not enough data points for PCA for symbol {symbol}")
                continue
                
            ofi_scaled = scaler.fit_transform(symbol_data)
            integrated_ofi = pca.fit_transform(ofi_scaled)
            
            # Normalize by L1 norm of weights
            weights = pca.components_[0]
            l1_norm = np.sum(np.abs(weights))
            integrated_ofi = integrated_ofi / l1_norm
            
            result = pd.DataFrame({
                'timestamp': symbol_data.index,
                'symbol': symbol,
                'OFI_integrated': integrated_ofi.flatten()
            })
            results.append(result)
        
        if not results:
            return pd.DataFrame(columns=['timestamp', 'OFI_integrated'])
            
        return pd.concat(results).reset_index(drop=True)
    
    def calculate_cross_asset_ofi(self, alpha: float = 0.1) -> pd.DataFrame:
        """Calculate Cross-Asset OFI using LASSO regression."""
        if len(self.symbols) < 2:
            warnings.warn("Insufficient assets for cross-asset OFI calculation")
            result = self.calculate_best_level_ofi()[['timestamp']].copy()
            result['OFI_cross'] = np.nan
            return result
        
        # Calculate best-level OFI for each asset
        asset_ofis = []
        for symbol in self.symbols:
            symbol_data = self.data[self.data['symbol'] == symbol].copy()
            symbol_data['ofi'] = self._calculate_single_level_ofi(0)
            
            ofi_ts = symbol_data.groupby('time_bin')['ofi'].sum().reset_index()
            ofi_ts = ofi_ts.rename(columns={'ofi': symbol})
            asset_ofis.append(ofi_ts)
        
        # Merge all assets' OFI
        ofi_matrix = asset_ofis[0]
        for df in asset_ofis[1:]:
            ofi_matrix = ofi_matrix.merge(df, on='time_bin', how='outer')
        ofi_matrix = ofi_matrix.fillna(0).sort_values('time_bin')
        
        # Calculate cross-impact using LASSO
        cross_results = []
        for target in self.symbols:
            X = ofi_matrix.drop(columns=['time_bin', target])
            y = ofi_matrix[target]
            
            if len(X) < 10:  # Minimum samples for meaningful regression
                cross_impact = np.zeros(len(X))
            else:
                lasso = Lasso(alpha=alpha, max_iter=10000, random_state=42)
                lasso.fit(X, y)
                cross_impact = X @ lasso.coef_
            
            cross_results.append(pd.DataFrame({
                'timestamp': ofi_matrix['time_bin'],
                'symbol': target,
                'cross_ofi': cross_impact
            }))
        
        # Combine results
        cross_ofi = pd.concat(cross_results)
        cross_ofi = cross_ofi.groupby('timestamp')['cross_ofi'].mean().reset_index()
        cross_ofi = cross_ofi.rename(columns={'cross_ofi': 'OFI_cross'})
        
        return cross_ofi
    
    def calculate_all_features(self) -> pd.DataFrame:
        """Calculate all OFI features and combine into a single DataFrame."""
        features = [
            self.calculate_best_level_ofi(),
            self.calculate_multi_level_ofi()[['timestamp', 'OFI_multi']],
            self.calculate_integrated_ofi(),
            self.calculate_cross_asset_ofi()
        ]
        
        # Merge all features
        result = features[0]
        for df in features[1:]:
            if not df.empty:
                result = result.merge(df, on='timestamp', how='left')
        
        # Fill NaN with 0 (except for cross-asset OFI which might be intentionally NaN)
        for col in result.columns:
            if col not in ['timestamp', 'symbol', 'OFI_cross']:
                result[col] = result[col].fillna(0)
        
        return result





In [7]:
def main():
    """Main execution function."""
    try:
        # Load and process data
        data = pd.read_csv('first_25000_rows.csv')
        analyzer = OFIAnalyzer(data, time_window='1min')
        
        # Calculate features
        features = analyzer.calculate_all_features()
        
        # Save and display results
        features.to_csv('enhanced_ofi_features-task-1.csv', index=False)
        print("Enhanced OFI features calculated and saved to enhanced_ofi_features.csv")
        print("\nSample results:")
        print(features.head())
        
        return features
        
    except Exception as e:
        print(f"Error processing data: {str(e)}")
        return None

In [8]:
if __name__ == '__main__':
    main()

Enhanced OFI features calculated and saved to enhanced_ofi_features.csv

Sample results:
                  timestamp  OFI_best   OFI_multi symbol  OFI_integrated  \
0 2024-10-21 11:54:00+00:00    -533.0  -34.960958   AAPL       -0.640427   
1 2024-10-21 11:55:00+00:00   -1621.0  -84.327239   AAPL       -0.493726   
2 2024-10-21 11:56:00+00:00    -468.0  -67.501703   AAPL       -0.508024   
3 2024-10-21 11:57:00+00:00    -482.0 -133.390723   AAPL       -0.344030   
4 2024-10-21 11:58:00+00:00    -224.0 -223.095515   AAPL       -0.065161   

   OFI_cross  
0        NaN  
1        NaN  
2        NaN  
3        NaN  
4        NaN  


