In [7]:
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from scipy.optimize import minimize_scalar
import matplotlib.pyplot as plt
import math

# Load and concatenate historical price data
files = ["data3/prices_round_3_day_0.csv", "data3/prices_round_3_day_1.csv", "data3/prices_round_3_day_2.csv"]
dfs = [pd.read_csv(f, delimiter=";") for f in files]
df = pd.concat(dfs, ignore_index=True)

# --- Step 1: Filter for VOLCANIC_ROCK_VOUCHER only ---
df = df[df["product"].str.startswith("VOLCANIC_ROCK_VOUCHER")].copy()
df["strike"] = df["product"].str.extract(r"(\d+)$").astype(int)

# Mid prices
df["underlying"] = (df["bid_price_1"] + df["ask_price_1"]) / 2
df["voucher_price"] = df["mid_price"] = (df["bid_price_1"] + df["ask_price_1"]) / 2  # if voucher price not separate, use this

# Time to expiry
df["tte"] = 8/365 - (df["timestamp"] / 1_000_000) / 365
df = df[df["tte"] > 0]  # Ensure time-to-expiry is valid

# Moneyness
df["m"] = np.log(df["strike"] / df["underlying"]) / np.sqrt(df["tte"])

# --- Step 2: Black-Scholes Helper Functions ---

def norm_cdf(x):
    return 0.5 * (1 + math.erf(x / np.sqrt(2)))

def bs_call(S, K, T, sigma):
    if T <= 0 or sigma <= 0:
        return max(0, S - K)
    d1 = (np.log(S/K) + 0.5*sigma**2*T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    return S * norm_cdf(d1) - K * norm_cdf(d2)

def find_iv(S, K, T, V_obs):
    if T <= 0 or V_obs < max(0, S - K):
        return np.nan
    def loss(sigma):
        return (bs_call(S, K, T, sigma) - V_obs) ** 2
    result = minimize_scalar(loss, bounds=(1e-4, 2.0), method="bounded")
    return result.x if result.success else np.nan

# --- Step 3: Calculate Implied Volatility ---
ivs = []
for _, row in df.iterrows():
    iv = find_iv(row["underlying"], row["strike"], row["tte"], row["voucher_price"])
    ivs.append(iv)
df["iv"] = ivs

df = df.dropna(subset=["iv", "m"])  # drop rows with failed IV calculation
if df.empty:
    raise ValueError("No valid rows with computed implied volatility.")

# --- Step 4: Fit IV Curve as a function of moneyness ---
X = df["m"].values.reshape(-1, 1)
X_poly = np.hstack([X**2, X, np.ones_like(X)])
y = df["iv"].values

model = LinearRegression().fit(X_poly, y)
print(f"\nFitted IV Curve:\nIV(m) = {model.coef_[0]:.4f} * m^2 + {model.coef_[1]:.4f} * m + {model.coef_[2]:.4f}")

# --- Step 5: Plot Fitted Curve ---
m_vals = np.linspace(-2, 2, 300).reshape(-1, 1)
m_poly = np.hstack([m_vals**2, m_vals, np.ones_like(m_vals)])
iv_pred = model.predict(m_poly)

plt.figure(figsize=(10, 6))
plt.scatter(df["m"], df["iv"], alpha=0.3, s=8, label="Observed IV")
plt.plot(m_vals, iv_pred, color="red", label="Fitted IV Curve")
plt.title("Implied Volatility vs Moneyness")
plt.xlabel("Moneyness m = log(K/S) / sqrt(T)")
plt.ylabel("Implied Volatility")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
