In [None]:
import os
import pandas as pd
import numpy as np
from scipy.stats import norm
from scipy.optimize import minimize, differential_evolution
from scipy.interpolate import CubicSpline, UnivariateSpline
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.ensemble import ExtraTreesRegressor
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

In [None]:

# BLACK-SCHOLES-MERTON MODEL 

class BlackScholesMerton:
    """
    Complete implementation of Black-Scholes-Merton option pricing model.
    Includes European option pricing and Greeks calculation.
    """
    
    def __init__(self, S, K, T, r=0.05, q=0.0):
        """
        S: Spot price (underlying)
        K: Strike price
        T: Time to maturity (years)
        r: Risk-free rate
        q: Dividend yield
        """
        self.S = S
        self.K = K
        self.T = T
        self.r = r
        self.q = q
        
    def _d1(self, sigma):
        """Calculate d1 parameter in BSM formula"""
        
        numerator = np.log(self.S / self.K) + (self.r - self.q + 0.5 * sigma**2) * self.T
        denominator = sigma * np.sqrt(self.T)
        return numerator / denominator if denominator != 0 else 0
    
    def _d2(self, sigma):
        """Calculate d2 parameter in BSM formula"""
        
        return self._d1(sigma) - sigma * np.sqrt(self.T)
    
    def call_price(self, sigma):
        """Calculate European call option price"""
        
        if self.T <= 0:
            return max(self.S - self.K, 0)
        
        d1 = self._d1(sigma)
        d2 = self._d2(sigma)
        
        price = (self.S * np.exp(-self.q * self.T) * norm.cdf(d1) - 
                 self.K * np.exp(-self.r * self.T) * norm.cdf(d2))
        return price
    
    def put_price(self, sigma):
        """Calculate European put option price"""
        
        if self.T <= 0:
            return max(self.K - self.S, 0)
        
        d1 = self._d1(sigma)
        d2 = self._d2(sigma)
        
        price = (self.K * np.exp(-self.r * self.T) * norm.cdf(-d2) - 
                 self.S * np.exp(-self.q * self.T) * norm.cdf(-d1))
        return price
    
    def vega(self, sigma):
        """Calculate Vega (sensitivity to volatility changes)"""
        
        if self.T <= 0:
            return 0
        d1 = self._d1(sigma)
        return self.S * np.exp(-self.q * self.T) * norm.pdf(d1) * np.sqrt(self.T)
    
    def delta_call(self, sigma):
        """Calculate Delta for call option"""
        
        if self.T <= 0:
            return 1.0 if self.S > self.K else 0.0
        return np.exp(-self.q * self.T) * norm.cdf(self._d1(sigma))
    
    def delta_put(self, sigma):
        """Calculate Delta for put option"""
        
        if self.T <= 0:
            return -1.0 if self.S < self.K else 0.0
        return -np.exp(-self.q * self.T) * norm.cdf(-self._d1(sigma))
    
    def gamma(self, sigma):
        """Calculate Gamma (second derivative wrt spot)"""
        
        if self.T <= 0:
            return 0
        d1 = self._d1(sigma)
        numerator = np.exp(-self.q * self.T) * norm.pdf(d1)
        denominator = self.S * sigma * np.sqrt(self.T)
        return numerator / denominator if denominator != 0 else 0



In [None]:

# SVI (STOCHASTIC VOLATILITY INSPIRED) VOLATILITY SMILE MODEL

class SVIVolatilitySmile:
    """
    SVI parameterization for volatility smiles.
    Total variance: w(k) = a + b(ρ(k-m) + sqrt((k-m)^2 + σ^2))
    where k = log(K/S) is log-moneyness
    
    Parameters:
    a: overall level of variance
    b: angle between the two asymptotes
    ρ (rho): orientation (skew)
    m: horizontal shift (ATM level)
    σ (sigma): smoothness of the smile
    """
    
    def __init__(self):
        self.params = None
        
    def total_variance(self, k, a, b, rho, m, sigma):
        """
        Calculate total variance w(k) for given log-moneyness k
        """
        
        sqrt_term = np.sqrt((k - m)**2 + sigma**2)
        return a + b * (rho * (k - m) + sqrt_term)
    
    def fit(self, strikes, spot, ivs, T=30/365):
        """
        Fit SVI parameters to observed implied volatilities
        
        strikes: array of strike prices
        spot: underlying spot price
        ivs: array of implied volatilities
        T: time to maturity in years
        """
        
        # Convert to log-moneyness
        k = np.log(strikes / spot)
        
        # Convert IV to total variance: w = σ^2 * T
        w_market = ivs**2 * T
        
        def objective(params):
            a, b, rho, m, sigma = params
            w_model = self.total_variance(k, a, b, rho, m, sigma)
            return np.sum((w_model - w_market)**2)
        
        # Constraints for no-arbitrage conditions
        # 1. a + b*sigma*sqrt(1-rho^2) >= 0
        # 2. b >= 0
        # 3. |rho| < 1
        # 4. sigma > 0
        
        bounds = [
            (0.001, 1.0),      # a
            (0.001, 1.0),      # b
            (-0.999, 0.999),   # rho
            (-0.5, 0.5),       # m
            (0.01, 1.0)        # sigma
        ]
        
        x0 = [0.04, 0.1, -0.2, 0.0, 0.1]
        
        result = differential_evolution(objective, bounds, seed=42, maxiter=500, 
                                       atol=1e-7, tol=1e-7, workers=-1)
        
        self.params = result.x
        return self.params
    
    def predict(self, strikes, spot, T=30/365):
        """
        Predict implied volatilities for given strikes using fitted SVI model
        """
        
        if self.params is None:
            raise ValueError("Model must be fitted first")
        
        k = np.log(strikes / spot)
        a, b, rho, m, sigma = self.params
        w = self.total_variance(k, a, b, rho, m, sigma)
        
        # Convert total variance back to IV: σ = sqrt(w/T)
        iv = np.sqrt(w / T)
        return iv


In [None]:
# ARBITRAGE-FREE VOLATILITY INTERPOLATION

class ArbitrageFreeInterpolator:
    """
    Implements arbitrage-free interpolation of volatility surface using:
    1. No calendar arbitrage (variance is increasing in time)
    2. No butterfly arbitrage (density is positive)
    3. Monotonic put spreads
    """
    
    def __init__(self, spot, r=0.05, q=0.0):
        self.spot = spot
        self.r = r
        self.q = q
    
    def check_butterfly_arbitrage(self, strikes, ivs, T=30/365):
        """
        Check for butterfly arbitrage by ensuring call prices are convex in strike
        """
        
        prices = []
        for K, sigma in zip(strikes, ivs):
            bsm = BlackScholesMerton(self.spot, K, T, self.r, self.q)
            prices.append(bsm.call_price(sigma))
        
        prices = np.array(prices)
        
        if len(prices) >= 3:
            second_deriv = np.diff(prices, n=2)
            has_arbitrage = np.any(second_deriv < -1e-6)
            return has_arbitrage
        return False
    
    def interpolate_monotonic(self, strikes, ivs):
        """
        Perform monotonic spline interpolation to preserve shape
        """
        
        valid_mask = ~np.isnan(ivs) & (ivs > 0)
        valid_strikes = strikes[valid_mask]
        valid_ivs = ivs[valid_mask]
        
        if len(valid_strikes) < 3:
            return ivs 
        
        spline = UnivariateSpline(valid_strikes, valid_ivs, k=3, s=0.001)
        interpolated = spline(strikes)
        
        interpolated = np.maximum(interpolated, 0.01)
        
        return interpolated


In [None]:

# ADVANCED MISSING DATA IMPUTATION WITH BSM CONSTRAINTS

class BSMConstrainedImputer:
    """
    Custom imputer that respects Black-Scholes-Merton constraints:
    1. Put-Call Parity
    2. Monotonicity constraints
    3. Volatility smile shape preservation
    """
    
    def __init__(self, underlying_prices, call_strikes, put_strikes, T=30/365, r=0.05, q=0.0):
        self.S = underlying_prices
        self.call_strikes = np.array(call_strikes)
        self.put_strikes = np.array(put_strikes)
        self.T = T
        self.r = r
        self.q = q
        
    def enforce_put_call_parity(self, call_ivs, put_ivs, S):
        """
        Enforce put-call parity: C - P = S*e^(-qT) - K*e^(-rT)
        Adjust IVs to maintain this relationship
        """
        
        adjusted_call_ivs = call_ivs.copy()
        adjusted_put_ivs = put_ivs.copy()
        
        for i, (K_call, K_put) in enumerate(zip(self.call_strikes, self.put_strikes)):
            if K_call == K_put: 
                K = K_call
                
                if not np.isnan(call_ivs[i]) and not np.isnan(put_ivs[i]):
                    bsm = BlackScholesMerton(S, K, self.T, self.r, self.q)
                    C = bsm.call_price(call_ivs[i])
                    P = bsm.put_price(put_ivs[i])
                    
                    theoretical_diff = S * np.exp(-self.q * self.T) - K * np.exp(-self.r * self.T)
                    actual_diff = C - P
                    
                    if abs(actual_diff - theoretical_diff) > 0.01:
        
                        avg_iv = (call_ivs[i] + put_ivs[i]) / 2
                        adjusted_call_ivs[i] = avg_iv
                        adjusted_put_ivs[i] = avg_iv
        
        return adjusted_call_ivs, adjusted_put_ivs
    
    def impute_with_smile(self, ivs, strikes, S):
        """
        Impute missing IVs using SVI volatility smile model
        """
        
        valid_mask = ~np.isnan(ivs) & (ivs > 0)
        
        if np.sum(valid_mask) < 5:  
            valid_strikes = strikes[valid_mask]
            valid_ivs = ivs[valid_mask]
            
            if len(valid_strikes) >= 2:
                interpolator = np.interp
                imputed = interpolator(strikes, valid_strikes, valid_ivs)
            else:
                imputed = np.full_like(ivs, np.nanmedian(ivs))
            
            return imputed
        
        try:
            svi = SVIVolatilitySmile()
            valid_strikes = strikes[valid_mask]
            valid_ivs = ivs[valid_mask]
            
            svi.fit(valid_strikes, S, valid_ivs, self.T)
            imputed = svi.predict(strikes, S, self.T)
            
            result = ivs.copy()
            result[~valid_mask] = imputed[~valid_mask]
            
            return result
        except:
            
            valid_strikes = strikes[valid_mask]
            valid_ivs = ivs[valid_mask]
            imputed = np.interp(strikes, valid_strikes, valid_ivs)
            result = ivs.copy()
            result[~valid_mask] = imputed[~valid_mask]
            return result


In [None]:

testData = "/kaggle/input/nk-iv-prediction/test_data.parquet"
df = pd.read_parquet(testData, engine="pyarrow")

callCols = sorted([c for c in df.columns if c.startswith("call_iv_")],
                  key=lambda s: int(s.split("_")[-1]))
putCols = sorted([c for c in df.columns if c.startswith("put_iv_")],
                 key=lambda s: int(s.split("_")[-1]))
ivCols = callCols + putCols

call_strikes = np.array([int(c.split("_")[-1]) for c in callCols])
put_strikes = np.array([int(c.split("_")[-1]) for c in putCols])

print(f"\n Data loaded: {len(df)} rows")
print(f" Call strikes: {len(call_strikes)}, Put strikes: {len(put_strikes)}")

In [None]:
# STEP 1: Initial Imputation using Machine Learning

print("\n[1/5] Running ML-based iterative imputation...")

featureCols = ["underlying"] + ivCols
X = df[featureCols].astype(float)

etRegressor = ExtraTreesRegressor(
    n_estimators=150,
    max_depth=20,
    min_samples_split=5,
    max_features=0.8,
    random_state=42,
    n_jobs=-1
)

imputer = IterativeImputer(
    estimator=etRegressor,
    max_iter=15,
    tol=1e-4,
    initial_strategy="median",
    verbose=1,
    random_state=42,
    imputation_order="random"
)

X_imputed = imputer.fit_transform(X.values)
imputed_df = pd.DataFrame(X_imputed[:, 1:], columns=ivCols, index=df.index)


In [None]:
# STEP 2: Apply BSM Constraints and Volatility Smile Modeling

print("\n[2/5] Applying Black-Scholes-Merton constraints...")

final_ivs = []

for idx in range(len(df)):
    S = df.iloc[idx]["underlying"]
    
    call_ivs = imputed_df.iloc[idx][callCols].values
    put_ivs = imputed_df.iloc[idx][putCols].values
    
    bsm_imputer = BSMConstrainedImputer(S, call_strikes, put_strikes)
    
    # Impute with volatility smile
    call_ivs_smooth = bsm_imputer.impute_with_smile(call_ivs, call_strikes, S)
    put_ivs_smooth = bsm_imputer.impute_with_smile(put_ivs, put_strikes, S)
    
    # Enforce put-call parity
    call_ivs_final, put_ivs_final = bsm_imputer.enforce_put_call_parity(
        call_ivs_smooth, put_ivs_smooth, S
    )
    
    row_ivs = np.concatenate([call_ivs_final, put_ivs_final])
    final_ivs.append(row_ivs)
    
    if (idx + 1) % 1000 == 0:
        print(f"  Processed {idx + 1}/{len(df)} rows...")

final_ivs_df = pd.DataFrame(final_ivs, columns=ivCols, index=df.index)


In [None]:
# STEP 3: Arbitrage-Free Interpolation

print("\n[3/5] Applying arbitrage-free interpolation...")

for idx in range(len(df)):
    S = df.iloc[idx]["underlying"]
    af_interpolator = ArbitrageFreeInterpolator(S)
    
    # Interpolate calls
    call_ivs = final_ivs_df.iloc[idx][callCols].values
    call_ivs_interp = af_interpolator.interpolate_monotonic(call_strikes, call_ivs)
    final_ivs_df.loc[df.index[idx], callCols] = call_ivs_interp
    
    # Interpolate puts
    put_ivs = final_ivs_df.iloc[idx][putCols].values
    put_ivs_interp = af_interpolator.interpolate_monotonic(put_strikes, put_ivs)
    final_ivs_df.loc[df.index[idx], putCols] = put_ivs_interp


In [None]:
# STEP 4: Final Constraints and Bounds

print("\n[4/5] Applying final constraints...")

# Clip to reasonable IV bounds (0.5% to 200%)
final_ivs_df = final_ivs_df.clip(lower=0.005, upper=2.0)


In [None]:
# STEP 5: Calculate Greeks for Validation

print("\n[5/5] Calculating Greeks for validation...")

# calculating Greeks for first row
sample_idx = 0
S_sample = df.iloc[sample_idx]["underlying"]

print(f"\n Sample Validation (Row {sample_idx}, Spot={S_sample:.2f}):")
print("Strike | Call IV | Call Delta | Call Gamma | Vega")
print("-" * 60)

for i in range(min(5, len(call_strikes))):
    K = call_strikes[i]
    sigma = final_ivs_df.iloc[sample_idx][callCols[i]]
    
    bsm = BlackScholesMerton(S_sample, K, 30/365)
    delta = bsm.delta_call(sigma)
    gamma = bsm.gamma(sigma)
    vega = bsm.vega(sigma)
    
    print(f"{K:6d} | {sigma:7.4f} | {delta:10.4f} | {gamma:10.6f} | {vega:8.4f}")

In [None]:
# STEP 6: Prepare Submission
print("\n Preparing submission file...")

result_df = pd.concat([
    df[["timestamp", "underlying"]].reset_index(drop=True),
    final_ivs_df.reset_index(drop=True)
], axis=1)

submission_df = result_df[["timestamp"] + ivCols]
submission_df.to_csv("submission_full.csv", index=False)

print(" Saved final results to submission_full.csv")
print(f" Total rows: {len(submission_df)}")
print(f" Total columns: {len(submission_df.columns)}")

In [None]:
#  Volatility Smile


plot_idx = 0
row = result_df.iloc[plot_idx]
S = row["underlying"]

call_ivs_plot = row[callCols].values
put_ivs_plot = row[putCols].values

# Calculate moneyness
call_moneyness = call_strikes / S
put_moneyness = put_strikes / S

plt.figure(figsize=(14, 6))

# Subplot 1: IV vs Strike
plt.subplot(1, 2, 1)
plt.plot(call_strikes, call_ivs_plot, 'b-o', label='Call IV', markersize=4, linewidth=2)
plt.plot(put_strikes, put_ivs_plot, 'r-x', label='Put IV', markersize=4, linewidth=2)
plt.axvline(S, color='gray', linestyle='--', alpha=0.7, label=f'ATM (S={S:.0f})')
plt.xlabel('Strike Price', fontsize=11, fontweight='bold')
plt.ylabel('Implied Volatility', fontsize=11, fontweight='bold')
plt.title('Volatility Surface: IV vs Strike', fontsize=12, fontweight='bold')
plt.legend(fontsize=10)
plt.grid(True, alpha=0.3)

# Subplot 2: Volatility Smile (IV vs Moneyness)
plt.subplot(1, 2, 2)
plt.plot(call_moneyness, call_ivs_plot, 'b-o', label='Call IV', markersize=4, linewidth=2)
plt.plot(put_moneyness, put_ivs_plot, 'r-x', label='Put IV', markersize=4, linewidth=2)
plt.axvline(1.0, color='gray', linestyle='--', alpha=0.7, label='ATM (K/S=1)')
plt.xlabel('Moneyness (K/S)', fontsize=11, fontweight='bold')
plt.ylabel('Implied Volatility', fontsize=11, fontweight='bold')
plt.title('Volatility Smile: IV vs Moneyness', fontsize=12, fontweight='bold')
plt.legend(fontsize=10)
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('volatility_smile.png', dpi=150, bbox_inches='tight')
print(" Saved volatility smile plot to volatility_smile.png")

