In [41]:
import pandas as pd
import numpy as np
from typing import (List,Tuple,Dict,Union)
from data_service import (get_shibor_data,get_interpld_shibor,query_china_shibor_all)

from CVIX import (filter_contract,cal_forward_price,cal_forward_price,get_cboe_vix)

本篇算法来源：
>《20180707_东北证券_金融工程_市场波动风险度量：vix与skew指数构建与应用》

In [3]:
opt_data = pd.read_csv('opt_data.csv',index_col=[0])
# df_rate = pd.read_csv('shibor_df.csv',index_col=0) # from jqdata

shibor_df = get_shibor_data('2015-02-09','2022-05-27')
rate_df = get_interpld_shibor(shibor_df)

In [155]:
def _get_near_or_next_options(df: pd.Series, n: int = 1) -> pd.DataFrame:
    """获取opt_data中近月及次近月

    Args:
        df (pd.Series)
        | idnex | list_date | exercise_date | exercise_price | contract_type | code          |
        | :---- | :-------- | :------------ | :------------- | :------------ | :------------ |
        | 0     | 2021/7/29 | 2022/3/23     | 4.332          | CO            | 10003549.XSHG |

    Returns:
        pd.DataFrame
    """
    cond = (df['maturity'] <= np.sort(df['maturity'].unique())[n])

    return df[cond]

def _build_strike_matrix(df: pd.DataFrame) -> pd.DataFrame:
    """构建期权价差表

    Parameters
    ----------
    df : pd.DataFrame
        | index | date      | exercise_date | close  | contract_type | exercise_price | maturity |
        | :---- | :-------- | :------------ | :----- | :------------ | :------------- | :------- |
        | 0     | 2021/7/29 | 2022/3/23     | 0.5275 | call          | 4.332          | 0.649315 |

    Returns
    -------
    pd.DataFrame
        | contract_type  | call   | put    | diff    |
        | :------------- | :----- | :----- | :------ |
        | exercise_price |        |        |         |
        | 2.2            | 0.1826 | 0.0617 | 0.1209  |
        | 2.25           | 0.146  | 0.0777 | 0.0683  |
        | 2.3            | 0.1225 | 0.0969 | 0.0256  |
        | 2.35           | 0.0942 | 0.1268 | 0.0326 |
        | 2.4            | 0.0735 | 0.1542 | 0.0807 |
    """
    matrix: pd.DataFrame = pd.pivot_table(
        df, index='exercise_price', columns='contract_type', values='close')
    matrix['diff'] = (matrix['call'] - matrix['put'])
    return matrix


def _get_min_strike_diff(strike_matrix: pd.DataFrame) -> pd.Series:
    """获取strike_matrix认购期权和认沽期权价差绝对值最小数据信息

    Parameters
    ----------
    df : pd.DataFrame
        | contract_type  | call   | put    | diff    |
        | :------------- | :----- | :----- | :------ |
        | exercise_price |        |        |         |
        | 2.2            | 0.1826 | 0.0617 | 0.1209  |
        | 2.25           | 0.146  | 0.0777 | 0.0683  |
        | 2.3            | 0.1225 | 0.0969 | 0.0256  |
        | 2.35           | 0.0942 | 0.1268 | 0.0326 |
        | 2.4            | 0.0735 | 0.1542 | 0.0807 |

    Returns
    -------
    pd.Series
        index - exercise_price|call|put|diff| values
    """
    df_ = strike_matrix.reset_index()
    min_idx = df_['diff'].abs().idxmin()

    return df_.loc[min_idx]

def calc_delta_k_table(strike_matrix: pd.DataFrame) -> pd.Series:
    """期权合约行权价价值表

    Parameters
    ----------
    strike_matrix : pd.DataFrame
        | contract_type  | call   | put    | diff    |
        | :------------- | :----- | :----- | :------ |
        | exercise_price |        |        |         |
        | 2.2            | 0.1826 | 0.0617 | 0.1209  |
        | 2.25           | 0.146  | 0.0777 | 0.0683  |
        | 2.3            | 0.1225 | 0.0969 | 0.0256  |
        | 2.35           | 0.0942 | 0.1268 | 0.0326  |
        | 2.4            | 0.0735 | 0.1542 | 0.0807  |

    Returns
    -------
    pd.Series
        index-exercies_price values-中间价
    """
    exercise_price: np.ndarray = strike_matrix.index._values

    diff: List = []
    diff.append(exercise_price[1] - exercise_price[0])
    diff.extend(0.5 * (exercise_price[2:] - exercise_price[0:-2]))
    diff.append(exercise_price[-1] - exercise_price[-2])

    return pd.Series(index=exercise_price, data=diff)
    
def _get_median_price_table(strike_matrix: pd.DataFrame, F: float) -> Tuple[pd.Series,float]:
    """根据执行价矩阵获取中间报价表

    Parameters
    ----------
    strike_matrix : pd.DataFrame
        | contract_type  | call   | put    | diff    |
        | :------------- | :----- | :----- | :------ |
        | exercise_price |        |        |         |
        | 2.2            | 0.1826 | 0.0617 | 0.1209  |
        | 2.25           | 0.146  | 0.0777 | 0.0683  |
        | 2.3            | 0.1225 | 0.0969 | 0.0256  |
        | 2.35           | 0.0942 | 0.1268 | 0.0326  |
        | 2.4            | 0.0735 | 0.1542 | 0.0807  |
    F : float
        F

    Returns
    -------
    pd.Seroes
        index-exercies_price values-中间价
    """
    # 获取K
    exercise_price: np.ndarray = strike_matrix.index._values
    # 选取比F小，但差值又最小的合约
    K_cond1: np.ndarray = (exercise_price < F)
    # K值
    K_0: float = strike_matrix.loc[K_cond1, 'diff'].idxmin()

    # 根据K构建中间价
    call_ser: pd.Series = strike_matrix.loc[exercise_price > K_0, 'call']
    put_ser: pd.Series = strike_matrix.loc[exercise_price < K_0, 'put']
    median: float = (
        strike_matrix.loc[K_0, 'call'] + strike_matrix.loc[K_0, 'put']) * 0.5

    # 合并call,put
    all_ser: pd.Series = pd.concat((put_ser, call_ser))
    # 添加中间价
    all_ser[K_0] = median

    return all_ser.sort_index(),K_0
    
def _get_free_rate(shibor_ser:pd.Series,near_term:float,next_tern:float)->Tuple:
    """根据near_term,next_tern获取对应的shibor值

    Parameters
    ----------
    shibor_ser : pd.Series
        shibor数据
    near_term : float
        近月
    next_tern : float
        次近月

    Returns
    -------
    Tuple
        近月无风险收益,次近月无风险收益
    """
    near_term_ = int(near_term * 360)
    next_tern_ = int(next_tern * 360)

    return shibor_ser.loc[near_term_],shibor_ser.loc[next_tern_]
    
def calc_F(K: float, R: float, T: float, C: float, P: float) -> float:
    """计算远期价格水平

    Parameters
    ----------
    K : float
        K为认购期权和认沽期权间价差最小的期权合约对应的执行价
    R : float
        无风险收益
    T : float
        期限
    C : float
        C 为对应的认购期权价格
    P : float
        P 为认沽期权价格

    Returns
    -------
    float
        远期价格水平
    """
    return K + np.exp(R*T)*(C-P)


In [156]:
watch_date = '2015-02-09'
COND = (opt_data['date'] == watch_date)
filter_df = opt_data[COND]

# 模拟单日
tmp_df = filter_df.groupby('date', group_keys=False).apply(
    _get_near_or_next_options)


# 获取近月、次近月
near_term, next_term = np.sort(tmp_df['maturity'].unique())

# 获取当日对应的无风险收益
near_term_rate, next_term_rate = _get_free_rate(
    rate_df.loc[watch_date], near_term, next_term)

# 获取对应的期权信息
## 近月
near_strike_matrix: pd.DataFrame = _build_strike_matrix(
    tmp_df[tmp_df['maturity'] == near_term])

near_strike_ser = _get_min_strike_diff(near_strike_matrix)

## 次近月
next_strike_matrix: pd.DataFrame = _build_strike_matrix(
    tmp_df[tmp_df['maturity'] == next_term])

next_strike_ser = _get_min_strike_diff(next_strike_matrix)

# 计算远期价格
F_1 = calc_F(near_strike_ser['exercise_price'], near_term_rate,
             near_term, near_strike_ser['call'], near_strike_ser['put'])

F_2 = calc_F(next_strike_ser['exercise_price'], next_term_rate,
             next_term, next_strike_ser['call'], next_strike_ser['put'])

# 计算中间价格表
near_median_table,near_K_0 = _get_median_price_table(near_strike_matrix,F_1)
next_median_table,next_K_0 = _get_median_price_table(next_strike_matrix,F_2)

# 计算delta_k
near_delta_k = calc_delta_k_table(near_median_table)
next_delta_k = calc_delta_k_table(next_median_table)

In [158]:
# VIX计算中间参数变量表
tmp_variable = pd.concat((near_median_table,near_delta_k),axis=1).reset_index()
tmp_variable.columns = ['K','Q(K)','Delta_K']
tmp_variable['F'] = F_1
tmp_variable['T'] = near_term
tmp_variable['R'] = near_term_rate

In [163]:
def calc_sigma(K_0: float, K: np.ndarray, delta_K: np.ndarray, Q_K: np.ndarray, F: float, R: float, T: float) -> float:

    return (2 / T) * np.sum(delta_K / np.power(K, 2) * np.exp(R * T) * Q_K) - (1 / T) * np.power(F/K_0-1, 2)


In [164]:
calc_sigma(near_K_0,tmp_variable['K'].values,tmp_variable['Delta_K'].values,tmp_variable['Q(K)'].values,F_1,near_term_rate,near_term)

0.06460378055274148

In [None]:
calc_sigma(near_K_0,tmp_variable['K'].values,near_median_table.values.values,tmp_variable['Q(K)'].values,F_1,near_term_rate,near_term)

In [162]:
tmp_variable['K'].values,tmp_variable['Delta_K'].values

(array([2.2 , 2.25, 2.3 , 2.35, 2.4 ]),
 array([0.0617, 0.0777, 0.1097, 0.0942, 0.0735]))

In [None]:
# near_term_opt 近期
# next_term_opt 下期