# Volatility Curve Predictor

This notebook implements a volatility curve prediction model using advanced interpolation techniques and statistical methods.

## 1. Import Libraries and Setup

In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from scipy.interpolate import interp1d, splrep, splev
import warnings
warnings.filterwarnings('ignore')

print("Libraries imported successfully")

Libraries imported successfully


## 2. Load and Prepare Data

In [2]:
# Load data
print("Loading data...")
train = pd.read_parquet('train_data.parquet')
test = pd.read_parquet('test_data.parquet')
sample_sub = pd.read_csv('sample_submission.csv')

print(f"\nTrain data shape: {train.shape}")
print(f"Test data shape: {test.shape}")
print(f"Sample submission shape: {sample_sub.shape}")

# Get all IV columns from TEST data
iv_columns = [col for col in test.columns if col.startswith(('call_iv_', 'put_iv_'))]
print(f"\nNumber of IV columns: {len(iv_columns)}")

# Create strike dictionary from TEST columns
strike_dict = {}
for col in iv_columns:
    strike = col.split('_')[-1]
    if strike not in strike_dict:
        strike_dict[strike] = {'call': None, 'put': None}
    
    if col.startswith('call_iv_'):
        strike_dict[strike]['call'] = col
    else:
        strike_dict[strike]['put'] = col

print(f"\nNumber of unique strikes: {len(strike_dict)}")
print("\nStrike dictionary:")
print(strike_dict)

Loading data...

Train data shape: (178340, 97)
Test data shape: (12065, 96)
Sample submission shape: (12065, 53)

Number of IV columns: 52

Number of unique strikes: 36

Strike dictionary:
{'24000': {'call': 'call_iv_24000', 'put': 'put_iv_24000'}, '24100': {'call': 'call_iv_24100', 'put': 'put_iv_24100'}, '24200': {'call': 'call_iv_24200', 'put': 'put_iv_24200'}, '24300': {'call': 'call_iv_24300', 'put': 'put_iv_24300'}, '24400': {'call': 'call_iv_24400', 'put': 'put_iv_24400'}, '24500': {'call': 'call_iv_24500', 'put': 'put_iv_24500'}, '24600': {'call': 'call_iv_24600', 'put': 'put_iv_24600'}, '24700': {'call': 'call_iv_24700', 'put': 'put_iv_24700'}, '24800': {'call': 'call_iv_24800', 'put': 'put_iv_24800'}, '24900': {'call': 'call_iv_24900', 'put': 'put_iv_24900'}, '25000': {'call': 'call_iv_25000', 'put': 'put_iv_25000'}, '25100': {'call': 'call_iv_25100', 'put': 'put_iv_25100'}, '25200': {'call': 'call_iv_25200', 'put': 'put_iv_25200'}, '25300': {'call': 'call_iv_25300', 'put': 

## 3. Advanced Statistical Calculations

In [None]:
def calculate_advanced_statistics(df):
    """Calculate advanced statistical metrics"""
    print("Calculating advanced statistics...")
    stats = df.copy()

    stats['underlying_returns'] = np.log(stats['underlying'] / stats['underlying'].shift(1))
    
    # Volatility of volatility
    stats['vol_of_vol'] = stats['underlying_returns'].rolling(window=20).std().rolling(window=20).std()
    
    # Realized volatility
    stats['realized_vol'] = stats['underlying_returns'].rolling(window=20).std() * np.sqrt(252)
    
    # Volatility skew
    stats['vol_skew'] = stats['underlying_returns'].rolling(window=20).skew()
    
    # Volatility kurtosis
    stats['vol_kurt'] = stats['underlying_returns'].rolling(window=20).kurt()
    
    # Volatility term structure
    for window in [5, 10, 20, 60]:
        stats[f'vol_term_{window}'] = stats['underlying_returns'].rolling(window=window).std() * np.sqrt(252)
    
    # Fill numerical values first
    numerical_cols = stats.select_dtypes(include=[np.number]).columns
    stats[numerical_cols] = stats[numerical_cols].fillna(method='ffill').fillna(method='bfill').fillna(0)
    
    # Volatility regime indicators - create after filling numerical values
    try:
        stats['vol_regime'] = pd.qcut(stats['realized_vol'], q=5, labels=['very_low', 'low', 'medium', 'high', 'very_high'])
    except ValueError:
        # Handle case when there are not enough unique values
        stats['vol_regime'] = pd.Series(['medium'] * len(stats), index=stats.index)
    
    # Fill any remaining categorical columns with their mode
    categorical_cols = stats.select_dtypes(include=['category']).columns
    for col in categorical_cols:
        mode_val = stats[col].mode()[0] if not stats[col].mode().empty else None
        stats[col] = stats[col].fillna(mode_val)
    
    print(f"Created {len(stats.columns)} statistical features")
    return stats

# Test statistics calculation on a small sample
sample_stats = calculate_advanced_statistics(train.head(100))
print(f"\nSample statistics shape: {sample_stats.shape}")
print("\nStatistical features:")
print(sample_stats.columns.tolist())

Calculating advanced statistics...


TypeError: Cannot setitem on a Categorical with a new category (0), set the categories first

## 4. Curve Interpolation Functions

In [None]:
def interpolate_curve(x, y, x_new, method='spline'):
    """Interpolate volatility curve using different methods"""
    if method == 'spline':
        # Cubic spline interpolation
        tck = splrep(x, y, k=3)
        return splev(x_new, tck)
    elif method == 'linear':
        # Linear interpolation
        f = interp1d(x, y, kind='linear', bounds_error=False, fill_value=(y[0], y[-1]))
        return f(x_new)
    else:
        raise ValueError(f"Unknown interpolation method: {method}")

print("Curve interpolation functions defined successfully")

## 5. Prediction Function

In [None]:
def predict_iv(data):
    print("Starting volatility curve prediction...")
    data = data.copy()
    
    # Phase 1: Put-call parity
    print("\nPhase 1: Applying put-call parity...")
    for strike, cols in strike_dict.items():
        call_col = cols['call']
        put_col = cols['put']
        
        if call_col in data.columns and put_col in data.columns:
            call_mask = data[call_col].isna() & data[put_col].notna()
            data.loc[call_mask, call_col] = data.loc[call_mask, put_col]
            
            put_mask = data[put_col].isna() & data[call_col].notna()
            data.loc[put_mask, put_col] = data.loc[put_mask, call_col]
    
    # Phase 2: Calculate advanced statistics
    print("\nPhase 2: Calculating advanced statistics...")
    stats = calculate_advanced_statistics(data)
    
    # Phase 3: Curve interpolation
    print("\nPhase 3: Performing curve interpolation...")
    for idx, row in data.iterrows():
        for strike, cols in strike_dict.items():
            strike_price = float(strike)
            call_col = cols['call']
            put_col = cols['put']
            
            if call_col in data.columns and put_col in data.columns:
                # Get available IVs for this timestamp
                available_strikes = []
                available_ivs = []
                
                for s, c in strike_dict.items():
                    if not np.isnan(data.at[idx, c['call']]):
                        available_strikes.append(float(s))
                        available_ivs.append(data.at[idx, c['call']])
                    if not np.isnan(data.at[idx, c['put']]):
                        available_strikes.append(float(s))
                        available_ivs.append(data.at[idx, c['put']])
                
                if len(available_strikes) >= 3:
                    try:
                        # Try spline interpolation first
                        iv_pred = interpolate_curve(available_strikes, available_ivs, strike_price, method='spline')
                        
                        if np.isnan(data.at[idx, call_col]):
                            data.at[idx, call_col] = iv_pred
                        
                        if np.isnan(data.at[idx, put_col]):
                            data.at[idx, put_col] = iv_pred
                    except:
                        try:
                            # Fallback to linear interpolation
                            iv_pred = interpolate_curve(available_strikes, available_ivs, strike_price, method='linear')
                            
                            if np.isnan(data.at[idx, call_col]):
                                data.at[idx, call_col] = iv_pred
                            
                            if np.isnan(data.at[idx, put_col]):
                                data.at[idx, put_col] = iv_pred
                        except:
                            # If both methods fail, use nearest available IV
                            if np.isnan(data.at[idx, call_col]):
                                nearest_idx = np.argmin(np.abs(np.array(available_strikes) - strike_price))
                                data.at[idx, call_col] = available_ivs[nearest_idx]
                            
                            if np.isnan(data.at[idx, put_col]):
                                data.at[idx, put_col] = data.at[idx, call_col]
    
    # Phase 4: Smoothing and consistency
    print("\nPhase 4: Applying smoothing and consistency checks...")
    for idx, row in data.iterrows():
        for strike, cols in strike_dict.items():
            call_col = cols['call']
            put_col = cols['put']
            
            if call_col in data.columns and put_col in data.columns:
                avg_iv = (data.at[idx, call_col] + data.at[idx, put_col]) / 2
                data.at[idx, call_col] = 0.9 * data.at[idx, call_col] + 0.1 * avg_iv
                data.at[idx, put_col] = 0.9 * data.at[idx, put_col] + 0.1 * avg_iv
    
    # Ensure all values are within reasonable bounds
    for col in iv_columns:
        if col in data.columns:
            data[col] = np.clip(data[col], 0.01, 1.0)
    
    print("\nPrediction completed successfully")
    return data

## 6. Validation and Testing

In [None]:
# Create validation split
print("Creating validation split...")
train_df, val_df = train_test_split(train, test_size=0.2, random_state=42)
print(f"Training set size: {len(train_df)}")
print(f"Validation set size: {len(val_df)}")

# Apply to validation set
print("\nRunning validation...")
val_pred = predict_iv(val_df)

# Calculate MSE only on originally masked validation points
mse_vals = []
for col in iv_columns:
    if col in val_df.columns and col in val_pred.columns:
        mask = val_df[col].isna() & val_pred[col].notna()
        if mask.any():
            se = (val_df.loc[mask, col] - val_pred.loc[mask, col]) ** 2
            mse_vals.append(se.mean())

validation_mse = np.mean(mse_vals) if mse_vals else 0
print(f"\nValidation MSE (masked points only): {validation_mse:.12f}")

## 7. Generate Final Predictions

In [None]:
# Apply to test set
print("Generating final predictions...")
test_pred = predict_iv(test)

# Prepare submission
submission = test_pred[['timestamp'] + iv_columns].copy()
submission.columns = sample_sub.columns

# Verify no missing values
assert submission.isna().sum().sum() == 0, "Missing values detected"
submission.to_csv('submission.csv', index=False)

print("\nFinal Submission Preview:")
print(submission.head())
print(f"\nSubmission shape: {submission.shape}")
print(f"Validation MSE: {validation_mse:.12f}")