##### import

In [31]:
import os
import pandas as pd
import numpy as np
import math
import datetime
import matplotlib.pyplot as plt
from dateutil.relativedelta import relativedelta
from stock_centre import *
import scipy.stats as si
import plotly.express as px
from opt_assistant import *
hq = Stock_Data_Centre()
pro = TuShare().pro
opt_info = Opt_Assistant()

##### set global param

In [32]:
underlying_symbol = '510050'
is_index = False
contract_types = ['CO','PO']
bail_adjust = 0.12
min_multiple = 0.5
trade_logic = 2
open_date = 4
gears = [2, 2] #虚值和实值的档位，虚（实）值为正（负），如虚（实）一时， n=1（-1）
start_date = '2021-07-01'
end_date = '2023-06-30'

#### 一、回测前函数定义

##### 标准化函数

In [34]:
def param_standard(underlying_symbol):

    global start_date
    global end_date

    #  标的范围
    index_list = ['000300', '000852', '000016']
    etf_list = ['510050', '510300', '510500', '588000', '588080', '159915', '159901', '159919', '159922']

    #  标的不同系统中的代码和历史价格和
    if underlying_symbol in index_list:  # 指数的交易所为上交所
        underlying_symbol_for_stock = underlying_symbol + '.SH'
        underlying_symbol_for_opt = underlying_symbol + '.XSHG'
        underlying_prc_his = hq.get_hq(code=underlying_symbol_for_stock, start_date=start_date, end_date=end_date,
                                       index_data=True)
        handling_fee = 15
        commission = 15 * 3
    elif underlying_symbol in etf_list:  # etf的交易所按开头分类
        if underlying_symbol.startswith('5'):
            underlying_symbol_for_stock = underlying_symbol + '.SH'
            underlying_symbol_for_opt = underlying_symbol + '.XSHG'
        elif underlying_symbol.startswith('1'):
            underlying_symbol_for_stock = underlying_symbol + '.SZ'
            underlying_symbol_for_opt = underlying_symbol + '.XSHE'
        underlying_prc_his_Tushare = pro.fund_daily(ts_code=underlying_symbol_for_stock,
                                                    start_date=pd.to_datetime(start_date, format='%Y-%m-%d').strftime(
                                                        '%Y%m%d'),
                                                    end_date=pd.to_datetime(end_date, format='%Y-%m-%d').strftime(
                                                        '%Y%m%d'))
        underlying_prc_his = underlying_prc_his_Tushare.set_index(
            pd.to_datetime(underlying_prc_his_Tushare['trade_date'], format='%Y%m%d').dt.strftime('%Y-%m-%d'))
        underlying_prc_his.sort_index(inplace=True)
        handling_fee = 1.3 + 0.3
        commission = 5

    per_fee = handling_fee + commission

    return {'underlying_symbol_for_stock': underlying_symbol_for_stock, 
            'underlying_symbol_for_opt': underlying_symbol_for_opt, 'per_fee': per_fee, 
            'underlying_prc_his': underlying_prc_his}

# a = param_standard(underlying_symbol)['underlying_prc_his']
# a['trade_date'] = pd.to_datetime(a['trade_date'])
# a[a['trade_date'] == '2021-07-03'].empty
# a.sort_values(['open'], ascending=False).iloc[1,:]['open']
# a[a.index == '2021-07-05']

##### 交易日函数

In [35]:
def get_trade_dates(underlying_symbol, start_date, end_date):
    '''
    获取某标的在指定日期间的交易日序列
    :param underlying_symbol: 标的代码，仅数字，str格式
    :param start_date: 开始日期
    :param end_date: 截止日期
    :return: # list格式，list中形式为：str '2023-06-16'
    '''
    
    ts_prc_his = param_standard(underlying_symbol)['underlying_prc_his']
    
    if pd.to_datetime(start_date).strftime('%Y-%m-%d') <= pd.to_datetime(end_date).strftime('%Y-%m-%d'):
        ts_prc_his = ts_prc_his[(ts_prc_his.index >= pd.to_datetime(start_date).strftime('%Y-%m-%d'))
                            & (ts_prc_his.index <= pd.to_datetime(end_date).strftime('%Y-%m-%d'))]
        dates_info = ts_prc_his.index.tolist()
        return dates_info
    else:
        print('起始日期晚于结束日期')
        return []

# print(get_trade_dates(underlying_symbol, start_date, end_date))

def get_trade_date_gap(underlying_symbol, start_date, end_date):
    '''
    获取某标的在指定日期间间隔的交易日天数
    :param underlying_symbol: 标的代码，仅数字，str格式
    :param start_date: 开始日期
    :param end_date: 截止日期
    :return: int格式
    '''
    dates_gap = len(get_trade_dates(underlying_symbol, start_date, end_date))
    return dates_gap

# print(get_trade_date_gap(underlying_symbol, '2023-01-12', '2023-01-30'))

def get_trade_date(underlying_symbol, spec_date):
    '''
    获取某标的距离指定日最近的交易日
    :param underlying_symbol: 指定标的
    :param spec_date: 指定日期，形式'%Y-%m-%d'
    :return:
    '''
    trade_date = get_trade_date(underlying_symbol, spec_date, spec_date)
    while len(trade_date) == 0: # 说明该日非交易日
        spec_date = pd.to_datetime(spec_date)
        spec_date  += relativedelta(days=1)
        trade_date = get_trade_date(underlying_symbol, spec_date, spec_date)

    return spec_date

##### 保证金计算函数

In [36]:
def calculate_init_margin(code, date):
    '''
    计算指定合约在某日的开仓保证金水平
    :param code: 拟交易的期权合约代码，形式‘10004405.XSHG'
    :param date: 指定日期, str '2023-06-16'
    :param bail_adjust: 保证金调整系数
    :param min_multiple: 最低保障倍数
    :return:
    '''

    global hq
    global opt_info
    global underlying_symbol
    global bail_adjust
    global min_multiple
    opt_quotation_info = pd.read_feather(f"{hq.data_path}/quotation/opt/1d/{pd.to_datetime(date).strftime('%Y-%m-%d')}.fea")
    temp_opt_prc_info = opt_quotation_info[opt_quotation_info['code']==code]
    temp_opt_info = opt_info.get_contract_info(code)
    ts_prc_info = param_standard(underlying_symbol)['underlying_prc_his']
    temp_ts_prc_info = ts_prc_info[ts_prc_info.index == pd.to_datetime(date).strftime('%Y-%m-%d')]

    contract_type = temp_opt_info['contract_type'].tolist()[0]
    contract_type = (contract_type=='CO') - (contract_type=='PO') # CO=1,PO=-1
    contract_unit = temp_opt_info['contract_unit'].tolist()[0]
    exercise_price = temp_opt_info['exercise_price'].tolist()[0]
    expire_date = temp_opt_info['expire_date'].tolist()[0]

    opt_pre_settle = temp_opt_prc_info['pre_settle'].tolist()[0]
    ts_pre_close = temp_ts_prc_info['pre_close'].tolist()[0]

    # 期权虚值额 = max(行权价-标的前收盘价, 0) * opt_type
    virtual_pre_value = np.max(exercise_price - ts_pre_close, 0) * contract_type

    # 每手看涨期权保证金=合约乘数*(合约前结算价+max(标的前收盘价×合约保证金调整系数-前虚值额， 最低保障系数×标的前收盘价×合约保证金调整系数))
    # 每手看跌期权保证金=合约乘数*(合约前结算价+max(标的前收盘价×合约保证金调整系数-前虚值额，最低保障系数×合约行权价格×合约保证金调整系数）
    init_margin = contract_unit * (opt_pre_settle + bail_adjust * 
                                   max(ts_pre_close - virtual_pre_value,
                                       min_multiple * (ts_pre_close*(contract_type==1)
                                                       +exercise_price*(contract_type==-1))
                                       )
                                   )
    return init_margin

# print(calculate_init_margin('10004405.XSHG', '2023-03-21'))

def calculate_maintainance_margin(code, date):
    '''
    计算指定合约在某日的开仓保证金水平
    :param code: 拟交易的期权合约代码，形式‘10004405.XSHG'
    :param date: 指定日期, str '2023-06-16'
    :param bail_adjust: 保证金调整系数
    :param min_multiple: 最低保障倍数
    :return:
    '''

    global hq
    global opt_info
    global underlying_symbol
    global bail_adjust
    global min_multiple
    opt_quotation_info = pd.read_feather(f"{hq.data_path}/quotation/opt/1d/{pd.to_datetime(date).strftime('%Y-%m-%d')}.fea")
    temp_opt_prc_info = opt_quotation_info[opt_quotation_info['code']==code]
    temp_opt_info = opt_info.get_contract_info(code)
    ts_prc_info = param_standard(underlying_symbol)['underlying_prc_his']
    temp_ts_prc_info = ts_prc_info[ts_prc_info.index == pd.to_datetime(date).strftime('%Y-%m-%d')]

    contract_type = temp_opt_info['contract_type'].tolist()[0]
    contract_type = (contract_type=='CO') - (contract_type=='PO') # CO=1,PO=-1
    contract_unit = temp_opt_info['contract_unit'].tolist()[0]
    exercise_price = temp_opt_info['exercise_price'].tolist()[0]
    expire_date = temp_opt_info['expire_date'].tolist()[0]

    opt_settle = temp_opt_prc_info['settle_price'].tolist()[0]
    ts_close = temp_ts_prc_info['close'].tolist()[0]

    # 期权虚值额 = max(行权价-标的收盘价, 0) * opt_type
    virtual_pre_value = np.max(exercise_price - ts_close, 0) * contract_type

    # 每手看涨期权保证金=合约乘数*(合约结算价+max(标的收盘价×合约保证金调整系数-虚值额， 最低保障系数×标的收盘价×合约保证金调整系数))
    # 每手看跌期权保证金=合约乘数*(合约结算价+max(标的收盘价×合约保证金调整系数-虚值额，最低保障系数×合约行权价格×合约保证金调整系数))
    maintainance_margin = contract_unit * (opt_settle + bail_adjust *
                                   max(ts_close - virtual_pre_value,
                                       min_multiple * (ts_close*(contract_type==1)
                                                       +exercise_price*(contract_type==-1))
                                       )
                                   )
    return maintainance_margin

# print(calculate_maintainance_margin('10004405.XSHG', '2023-03-21'))

##### 希腊字母计算函数

In [37]:
def calculate_greeks(code, date, interest_rate_type='1y', his_vol_len=20):
    '''
    计算一张期权的希腊字母（已乘乘数）
    :param code: 期权代码
    :param date: 需要求希腊值的日期
    :param interest_rate_type: Shibor的类型，范围：1w, 2w, 1m, 3m, 6m, 9m, 1y
    :param his_vol_len: int, 历史波动率的时间跨度（天）
    :return: 理论价格，delta，gamma，vega，theta，rho, 杠杆率
    '''

    global underlying_symbol

    period_end_date = pd.to_datetime(date, format='%Y-%m-%d')

    period_start_date = period_end_date.replace(year=period_end_date.year - 1)

    contract_info = opt_info.get_contract_info(code=code)
    
    daily_info = opt_info.get_daily_info(code=code, date=date)

    underlying_symbol_for_stock = param_standard(underlying_symbol=underlying_symbol)['underlying_symbol_for_stock']
    #  标的历史价格
    if contract_info['underlying_type'].iloc[0] == 'INDEX':
        underlying_prc_his = hq.get_hq(code=underlying_symbol_for_stock, start_date=period_start_date,
                                       end_date=period_end_date,
                                       index_data=True)
    elif contract_info['underlying_type'].iloc[0] == 'ETF':
        underlying_prc_his_Tushare = pro.fund_daily(ts_code=underlying_symbol_for_stock,
                                                    start_date=period_start_date.strftime('%Y%m%d'),
                                                    end_date=period_end_date.strftime('%Y%m%d'))
        underlying_prc_his = underlying_prc_his_Tushare.set_index(
            pd.to_datetime(underlying_prc_his_Tushare['trade_date'], format='%Y%m%d').dt.strftime('%Y-%m-%d'))
        underlying_prc_his.sort_index(ascending=False, inplace=True)
    
    contract_unit = contract_info['contract_unit'].iloc[0]
    
    s = underlying_prc_his['close'].iloc[0]

    k = contract_info['exercise_price'].iloc[0]
    
    r = pro.shibor(start_date=period_end_date.strftime('%Y%m%d'), end_date=period_end_date.strftime('%Y%m%d'))[
            interest_rate_type].iloc[0] / 100

    exercise_date = contract_info['exercise_date'].iloc[0]

    t = len(hq.get_trade_date(start_date=period_end_date, end_date=exercise_date, only_list=True)) / 365

    sigma = underlying_prc_his.head(his_vol_len)['close'].pct_change().std() * np.sqrt(252)

    if contract_info['contract_type'].iloc[0] == 'CO':
        n = 1
    elif contract_info['contract_type'].iloc[0] == 'PO':
        n = -1

    d1 = (np.log(s / k) + (r + 0.5 * sigma ** 2) * t) / (sigma * np.sqrt(t))
    d2 = d1 - sigma * np.sqrt(t)

    theo_price = round(contract_unit * n * (s * si.norm.cdf(n * d1) - k * np.exp(-r * t) * si.norm.cdf(n * d2)), 2)

    delta = round(contract_unit * n * si.norm.cdf(n * d1), 2)

    gamma = round(contract_unit * si.norm.pdf(d1) / (s * sigma * np.sqrt(t)), 2)

    vega = round(contract_unit * (s * si.norm.pdf(d1) * np.sqrt(t)) / 100, 2)

    theta = round(contract_unit * (-1 * (s * si.norm.pdf(d1) * sigma) / (2 * np.sqrt(t)) - n * r * k * np.exp(-r * t) * si.norm.cdf(n * d2)) / 365, 2)

    rho = round(contract_unit * n * (t * k * np.exp(-r * t) * si.norm.cdf(d2) * 0.01), 2)

    leverage = round(s * delta / daily_info['settle_price'].iloc[0], 2)

    return {'theo_price': theo_price, 'delta': delta, 'gamma': gamma, 'vega': vega, 'theta': theta, 'rho': rho,
            'leverage': leverage}


# print(calculate_greeks('10004405.XSHG', '2023-03-21', interest_rate_type='1y', his_vol_len=20))

##### 逻辑2选择交易期权函数

In [38]:
def get_trade_opt_for_logic2(date, contract_type, gear):
    '''
    选择理想的期权
    :param date: 指定日期, str '2023-06-16'
    :return:
    '''

    global underlying_symbol
    global open_date
    global opt_info

    underlying_symbol_for_opt = param_standard(underlying_symbol)['underlying_symbol_for_opt']
    ts_prc_info = param_standard(underlying_symbol)['underlying_prc_his']
    temp_ts_prc_info = ts_prc_info[ts_prc_info.index == pd.to_datetime(date).strftime('%Y-%m-%d')]
    
    date = pd.to_datetime(date)
    def get_month_opt(date):
        y_m = date.strftime("%Y-%m") # 当月合约
        flag = False

        while not flag:
            opts = opt_info.get_gear_info(underlying_symbol_for_opt, contract_type,
                                         exercise_ym=y_m, ascending=True, date=date)
            if opts.empty == False:
                expire_date = pd.to_datetime(opts['expire_date'].iloc[0])
                duration = get_trade_date_gap(underlying_symbol, date.strftime("%Y-%m-%d"), expire_date.strftime("%Y-%m-%d"))
                if duration > open_date:
                    flag = True
                else:
                    y_m = (date + relativedelta(months=1)).strftime("%Y-%m")

        return opts

    month_opts = get_month_opt(date)

    def get_exercise_opt(month_opts):
        ts_pre_close = temp_ts_prc_info['pre_close'].tolist()[0]
        if gear * ((contract_type=='CO') - (contract_type=='PO')) > 0: # 看涨期权的虚值、看跌期权的实值往上取
            exercise_opt = month_opts[month_opts['exercise_price'] > ts_pre_close].iloc[abs(gear)-1]
        elif gear * ((contract_type=='CO') - (contract_type=='PO')) < 0: # 看涨期权的虚值、看跌期权的实值往下取
            exercise_opt = month_opts[month_opts['exercise_price'] < ts_pre_close].iloc[-abs(gear)]

        exercise_opt_code = exercise_opt['code']

        return exercise_opt_code

    exercise_opt_code = get_exercise_opt(month_opts)
    return exercise_opt_code

# get_trade_opt_for_logic2('2023-01-12', 'CO', 2)

#### 二、盘前函数定义(logic2, 回测起始日多腿开仓已验证)

In [40]:
def before_trade(date):
    '''
    盘前函数，给出交易信号
    :param date: 指定日期，str %Y-%m-%d
    :return:
    '''
    global trade_logic
    global contract_types
    global gears
    global end_date
    global open_date
    global underlying_symbol
    
    if os.path.exists('position.csv') == False: # 尚未开始回测, 开仓
        if trade_logic == 2:
            trade_opt_code = []
            for i, contract_type in enumerate(contract_types):
                gear = gears[i]
                trade_opt_code.append(get_trade_opt_for_logic2(date, contract_type, gear))
            return {'is_open':True, 'is_close':False,
                    'trade_opt_code':trade_opt_code, 'close_opt_code':[]}

    else: # 读取前一日结果并给出交易信号
        backtest_remain_term = get_trade_date_gap(underlying_symbol, date, end_date)
        position = pd.read_csv('position.csv')
        last_date = pd.to_datetime(position.sort_values(['datetime'], ascending=True).iloc[-1,:]['datetime']).strftime('%Y-%m-%d')
        latest_position = position[pd.to_datetime(position['datetime']).strftime('%Y-%m-%d') == last_date]
        
        if trade_logic == 2:
            need_close_opt = latest_position[(latest_position['expire_days'] == open_date) & (latest_position['res_num'] > 0)] # 取出需要展期的合约
            need_hold_opt = latest_position[latest_position['expire_days'] > open_date & (latest_position['res_num'] > 0)] # 取出可以继续持有的合约
            if (need_close_opt.empty == False) and (backtest_remain_term > open_date): # 有需要展期(含平仓/开仓）的合约
                close_opt_code = need_close_opt['code'].tolist()
                trade_opt_code = []
                for i, contract_type in enumerate(contract_types):
                    gear = gears[i]
                    trade_opt_code.append(get_trade_opt_for_logic2(date,contract_type, gear))
                return {'is_open':True, 'is_close':True,
                    'trade_opt_code':trade_opt_code, 'close_opt_code':close_opt_code}
            elif ((need_close_opt.empty == False) and (backtest_remain_term <= open_date)) or ((need_hold_opt.empty == False) and (backtest_remain_term <= 1)): 
                # 平仓且不再展期（无新开仓）
                if need_close_opt.empty == False:
                    close_opt_code = need_close_opt['code'].tolist()
                elif need_hold_opt.empty == False:
                    close_opt_code = need_hold_opt['code'].tolist()
                return {'is_open': False, 'is_close': True,
                        'trade_opt_code': [], 'close_opt_code': close_opt_code}
            else:
                return {'is_open': False, 'is_close': False,
                        'trade_opt_code': [], 'close_opt_code': []}
        
before_trade(start_date)

{'is_open': True,
 'is_close': False,
 'trade_opt_code': ['10003420.XSHG', '10003426.XSHG'],
 'close_opt_code': []}