In [1]:
%pwd

'/Users/IvanTang/quant/IMC_Prosperity3_GraniteFlow/src/analysis/round3'

In [2]:
%cd ../../../data/round3/days/

/Users/IvanTang/quant/IMC_Prosperity3_GraniteFlow/data/round3/days


In [3]:
import pandas as pd
import numpy as np

In [4]:
df_mid_0 = pd.read_csv('mid_price_day0.csv')
df_mid_1 = pd.read_csv('mid_price_day1.csv')
df_mid_2 = pd.read_csv('mid_price_day2.csv')


In [None]:
#补充时间序列
df_mid_0['T'] = (7 - df_mid_0['timestamp'] * 0.0001 / 100  ) / 365
df_mid_1['T'] = (6 - df_mid_1['timestamp'] * 0.0001 / 100  ) / 365
df_mid_2['T'] = (5 - df_mid_2['timestamp'] * 0.0001 / 100  ) / 365

In [None]:
from scipy.stats import norm
from scipy.optimize import minimize_scalar
import numpy as np

def bs_call_price(S, K, T, r, sigma):
    d1 = (np.log(S/K) + (r + 0.5 * sigma**2)*T) / (sigma*np.sqrt(T))
    d2 = d1 - sigma*np.sqrt(T)
    return S * norm.cdf(d1) - K * np.exp(-r*T) * norm.cdf(d2)

def implied_volatility(S, K, T, r, market_price):
    loss_fn = lambda sigma: (bs_call_price(S, K, T, r, sigma) - market_price)**2
    result = minimize_scalar(loss_fn, bounds=(1e-4, 3.0), method='bounded')
    return result.x

def calculate_iv(df):
    df = df.copy()
    strick_prices = [9500, 9750, 10000, 10250, 10500]
    for strick_price in strick_prices:
        df['iv_' + str(strick_price)] = df.apply(lambda row: implied_volatility(
            row['VOLCANIC_ROCK'], strick_price, row['T'], 0, row['VOLCANIC_ROCK_VOUCHER_' + str(strick_price)]), axis=1)
    return df


In [7]:
def calculate_moneyness(df):
    df = df.copy()
    strike_prices = [9500, 9750, 10000, 10250, 10500]
    for strike in strike_prices:
        df[f'm_{strike}'] = df.apply(lambda row: np.log(strike / row['VOLCANIC_ROCK']) / np.sqrt(row['T']), axis=1)
    return df

def fit_iv_curve(df):
    """返回每个时间点 t 的 beta 系数（β₀, β₁, β₂）"""
    betas = []

    for idx, row in df.iterrows():
        m_values = []
        iv_values = []

        for strike in [9500, 9750, 10000, 10250, 10500]:
            m = row[f'm_{strike}']
            iv = row[f'iv_{strike}']
            if 0 < iv < 2:  # 排除极端值
                m_values.append(m)
                iv_values.append(iv)

        if len(m_values) >= 3:
            X = np.column_stack([np.ones(len(m_values)), m_values, np.square(m_values)])
            beta = np.linalg.lstsq(X, iv_values, rcond=None)[0]
        else:
            beta = [np.nan, np.nan, np.nan]

        betas.append(beta)

    df[['beta_0', 'beta_1', 'beta_2']] = pd.DataFrame(betas, index=df.index)
    return df


def calculate_base_iv(df):
    df['base_iv'] = df['beta_0']  # 因为在 m = 0 时，β₁·m + β₂·m² 都为0
    return df

In [8]:
def preprocess(df):
    df = df.copy()
    df = calculate_iv(df)
    df = calculate_moneyness(df)
    df = fit_iv_curve(df)
    df = calculate_base_iv(df)
    return df

In [10]:
df_mid_0 = preprocess(df_mid_0)
df_mid_1 = preprocess(df_mid_1)
df_mid_2 = preprocess(df_mid_2)
df_mid_0.to_csv('day0.csv', index=False)
df_mid_1.to_csv('day1.csv', index=False)
df_mid_2.to_csv('day2.csv', index=False)