In [2]:
import pandas as pd
import math
from scipy import stats
import time
import numpy as np
import os
import plotly.express as px
import matplotlib.pyplot as plt


In [7]:
def get_specific_data(df, is_call, is_Amercian):
    """return specific dataframe according to is_call and is_Amercian

    Args:
        df (dataframe): option data
        is_call (bool): True if call, False if put
        is_Amercian (bool): True if Amercian, False if European
    
    Returns:
        dataframe: specific dataframe
    """
    
    if is_Amercian:
        if is_call:
            return df.loc[(df.call_put == 'C') & (df['style'] == 'A')]
        elif is_call:
            return df.loc[(df.call_put == 'P') & (df['style'] == 'A')]
    else:
        if is_call:
            return df.loc[(df.call_put == 'C') & (df['style'] == 'E')]
        else:
            return df.loc[(df.call_put == 'P') & (df['style'] == 'E')]
        
def statistic(df):
    mean = (df['values'] - df.settlement).mean()
    std = (df['values'] - df.settlement).std()
    sigma_na = df.sigma.isna().sum()
    imp_na = df.imp_v.isna().sum()
    print(f'The mean of the difference is {mean}, the std is {std}')
    print(f"The number Nan of sigma is {sigma_na}, the number Nan of imp_v is {imp_na}")
    return mean, std

In [4]:
df = pd.read_csv('/Users/dylan/DylanLi/XJTLU/期权项目/Data/Output_data/option/200512总表.csv')

In [6]:
option_C_A = get_specific_data(df, True, True)
option_P_A = get_specific_data(df, False, True)
option_C_E = get_specific_data(df, True, False)
option_P_E = get_specific_data(df, False, False)
option_C = df.loc[(df.call_put == 'C')]
option_P = df.loc[(df.call_put == 'P')]
option_A = df.loc[(df['style'] == 'A')]
option_E = df.loc[(df['style'] == 'E')]

In [None]:
# 计算理论值
def cal_value(df,r,it, is_call):
    """适用于 Groupby 对象, 用于计算 BS 模型理论值, Vega 函数, 理论波动率, 和隐含波动率
    
    Inputs:
        df (DataFrame): 期权表
        r (float): 年化收益率
        it (int): 迭代次数
        is_call (bool): 是否是看涨期权

    Returns:
        df (DataFrame): 期权表
    """    
    def cal_imp(df, S0, T, K, it):
        sigma_imp = df['sigma'].copy()
        close = df['settlement']
        for i in range(it):
            #print(f'K:{K}')
            d1 = ((np.log(S0 / K)) + (r + 0.5 * sigma_imp ** 2) * T) / (sigma_imp * np.sqrt(T))
            #print(f'd1:{d1}')
            d2 = ((np.log(S0 / K)) + (r - 0.5 * sigma_imp ** 2) * T) / (sigma_imp * np.sqrt(T))
            #print(f'd2:{d2}')
            C = np.exp(-r * T) * (S0 * stats.norm.cdf(d1, 0.0, 1.0) - K * stats.norm.cdf(d2, 0.0, 1.0))
            #print(f'C:{C}')
            # vega
            vega = S0 * np.sqrt(T) * stats.norm.pdf(d1, 0.0, 1.0)
            #print(f'vega:{vega}')
            sigma_imp -= (C - close)/vega
            # debug print
            count = sigma_imp.isna().sum()
            #print(f'na_count:{count}')
        return sigma_imp
        
    df_time = df.drop_duplicates(subset=['Date'], keep='first', inplace=False)
    df_time['sigma'] = np.std(df_time.futures_close.pct_change(1))
    dict_ = df_time[['Date', 'sigma']].set_index('Date').to_dict()['sigma']
    
    df['sigma'] = df['Date'].map(dict_)
    
    # 理论 sigma 列
    sigma = df['sigma']
    # print(sigma)
    S0 = df['futures_close']
    df.Date = pd.to_datetime(df.Date)
    df.opt_expiration_date = pd.to_datetime(df.opt_expiration_date)
    T = (df.opt_expiration_date - df.Date).dt.days/365
    K = df['strike']
    d1 = ((np.log(S0 / K)) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    d2 = ((np.log(S0 / K)) + (r - 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    
    if is_call:
        # 理论值列
        df['values'] = np.exp(-r * T) * (S0 * stats.norm.cdf(d1, 0.0, 1.0) - K * stats.norm.cdf(d2, 0.0, 1.0))
        #隐含波动
        sigma_imp = cal_imp(df, S0, T, K, it)
        df['imp_v'] = sigma_imp
        new_d1 = ((np.log(S0 / K)) + (r + 0.5 * sigma_imp ** 2) * T) / (sigma_imp * np.sqrt(T))
        new_d2 = ((np.log(S0 / K)) + (r - 0.5 * sigma_imp ** 2) * T) / (sigma_imp * np.sqrt(T))
        df['imp_values'] = np.exp(-r * T) * (S0 * stats.norm.cdf(new_d1, 0.0, 1.0) - K * stats.norm.cdf(new_d1, 0.0, 1.0))
    else:
        df['values'] = np.exp(-r * T) * (K * (1 - stats.norm.cdf(d2, 0.0, 1.0)) - S0 * (1 - stats.norm.cdf(d1, 0.0, 1.0)))
        cal_imp(df, S0, T, K, it)
    

    return df

In [None]:
# CONFIG
R = 0.015
IT = 100
IS_CALL = True