In [None]:
import numpy as np
import pandas as pd
#pd.set_option('display.max_rows', None)
import time
from datetime import datetime
from scipy.stats import norm
from scipy.integrate import quad
from sko.PSO import PSO
from sko.GA import GA
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
import xgboost as xgb

import warnings
warnings.filterwarnings("ignore")

#上证50
df1=pd.read_csv('./上证50ETF.csv',index_col=0)
#df1['returns']=df1['CLOSE'].pct_change()
df1['returns']=np.log(df1['CLOSE']).diff()
df1['history_volatility']=df1['returns'].rolling(5).std()*np.sqrt(250)
df1=df1['2021/1/29':'2021/9/17']
df1=df1[['CLOSE','returns','history_volatility']]

#期权
df2=pd.read_csv('./50ETF购2021年9月.csv',index_col=0)

#国债利率
df3=pd.read_csv('./国债利率.csv',index_col=0)
df3=df3.fillna(method='pad')/100
df3=df3['2021/1/29':'2021/9/17']

def get_data(name):
    df=pd.DataFrame(df2[name])
    df.columns=['price']
    df['K']=float(name[-4:])
    T=datetime.strptime('2021-9-22', '%Y-%m-%d')
    df['T']=((T-pd.DatetimeIndex(df2.index)).days.values+1)/365
    df['S']=df1['CLOSE'].values
    df['r']=df3['TB1Y.WI'].values
    return df

for i in range(5):
    if i==0:
        data=get_data(df2.columns[i])
    else:
        newdata=get_data(df2.columns[i])
        data=pd.concat([data,newdata])
data=data.dropna(axis=0)

train_data,test_data= train_test_split(data,test_size = 0.2,random_state = 0)

In [None]:
train_data=pd.read_excel('./stacking_train_data.xlsx',index_col=0)
test_data=pd.read_excel('./stacking_test_data.xlsx',index_col=0)
train_x=train_data[train_data.columns[1:]]
train_y=train_data['price']
test_x=test_data[test_data.columns[1:]]
test_y=test_data['price']

In [None]:
print(len(train_data))
print(len(test_data))
print(len(train_data)+len(test_data))

In [None]:
def my_heston_call(S,K,T,r,params):
    
    def characteristic_function(S,K,T,r,params,phi,type_j):
        
        v0,kappa, theta, sigma, rho=params
        
        if type_j==1:
            u = 0.5
            b = kappa - rho*sigma
        else: 
            u = -0.5
            b = kappa
        
        a = kappa*theta
        x = np.log(S)
        d = np.sqrt((rho*sigma*phi*1j-b)**2 - sigma**2*(2*u*phi*1j-phi**2))
        g = (b-rho*sigma*phi*1j+d)/(b-rho*sigma*phi*1j-d)
        D = r*phi*1j*T + (a/sigma**2)*((b-rho*sigma*phi*1j+d)*T - 2*np.log((1-g*np.exp(d*T))/(1-g)))
        E = ((b-rho*sigma*phi*1j+d)/sigma**2)*(1-np.exp(d*T))/(1-g*np.exp(d*T))
        
        return np.exp(D + E*v0 + 1j*phi*x)
    
    def integral_1(S,K,T,r,params,phi):
        integral_1 = (np.exp(-1*1j*phi*np.log(K))*characteristic_function(S,K,T,r,params,phi,1))/(1j*phi)    
        return integral_1.real
    
    def integral_2(S,K,T,r,params,phi):
        integral_2 = (np.exp(-1*1j*phi*np.log(K))*characteristic_function(S,K,T,r,params,phi,2))/(1j*phi)    
        return integral_2.real
    
    ifun_1 = lambda phi: integral_1(S,K,T,r,params,phi)
    p1 = 0.5 + (1 / np.pi) * quad(ifun_1, 0, 100)[0]
    ifun_2 = lambda phi: integral_2(S,K,T,r,params,phi)
    p2 = 0.5 + (1 / np.pi) * quad(ifun_2, 0, 100)[0]
    
    return S* p1 - np.exp(-r * T) * K * p2


def my_func_train(params):
    heston_train=[]
    for i in range(len(train_x)):
        S=train_x['S'].iloc[i]
        K=train_x['K'].iloc[i]
        T=train_x['T'].iloc[i]
        r=train_x['r'].iloc[i]
        result=my_heston_call(S,K,T,r,params)
        heston_train.append(result)
    heston_train=np.array(heston_train)
    se=mean_squared_error(heston_train,train_y.values)
    return se


def my_func_test(params):
    heston_test=[]
    for i in range(len(test_x)):
        S=test_x['S'].iloc[i]
        K=test_x['K'].iloc[i]
        T=test_x['T'].iloc[i]
        r=test_x['r'].iloc[i]
        result=my_heston_call(S,K,T,r,params)
        heston_test.append(result)
    heston_test=np.array(heston_test)
    se=mean_squared_error(heston_test,test_y.values)
    return se

# GA-Heston

In [None]:
time_list=[]
se_list=[]

In [None]:
for i in range(5):
    print(f'iteration:{i}')
    start =time.time()
    ga = GA(func=my_func_train, n_dim=5, size_pop=10, max_iter=100, lb=[0.0001,0.0001,0.0001,0.0001,0.0001], ub=[2,20,2,2,1], precision=1e-7)
    best_x, best_y = ga.run()
    end = time.time()
    time_list.append((end-start)/3600)
    se_list.append(my_func_test(best_x))

In [None]:
ga_result=pd.DataFrame(time_list,columns=['time'])
ga_result['test se']=se_list