In [1]:
import pandas as pd
import numpy as np
from scipy.optimize import minimize
import warnings
warnings.filterwarnings('ignore')
train_spread = pd.read_csv("train_spread.csv", index_col=0)
train_spread_base = pd.read_csv("train_spread_base.csv", index_col=0)
train_z_score = pd.read_csv("train_z_score.csv", index_col=0) # 2012/01/30+
train_spread = train_spread.iloc[18:,:] # Resccale to 2012/01/30+
train_spread_base = train_spread_base.iloc[18:,:] # Resccale to 2012/01/30+

In [2]:
def objective(params, spread, z_score, base, pair_name):
    long_entryZscore, long_exitZscore, short_entryZscore, short_exitZscore =  params
    PnL = pd.DataFrame({f"{pair_name}(Z_score)": z_score,f"{pair_name}(Spread)": spread})
    PnL.index = pd.to_datetime(z_score.index)
    PnL['long entry'] = (PnL[f"{pair_name}(Z_score)"] < (long_entryZscore)) & (PnL[f"{pair_name}(Z_score)"].shift(1) > (long_entryZscore))
    PnL['long exit'] = (PnL[f"{pair_name}(Z_score)"] > (long_exitZscore)) & (PnL[f"{pair_name}(Z_score)"].shift(1) < (long_exitZscore))
    PnL['num units long'] = np.nan
    PnL.loc[PnL['long entry'],'num units long'] = 1 
    PnL.loc[PnL['long exit'],'num units long'] = 0 
    PnL['num units long'][0] = 0 
    PnL['num units long'] = PnL['num units long'].fillna(method='pad')
            
    PnL['short entry'] = (PnL[f"{pair_name}(Z_score)"] > (short_entryZscore)) & (PnL[f"{pair_name}(Z_score)"].shift(1) < (short_entryZscore))
    PnL['short exit'] = (PnL[f"{pair_name}(Z_score)"] < (short_exitZscore)) & (PnL[f"{pair_name}(Z_score)"].shift(1) > (short_exitZscore))
    PnL['num units short'] = np.nan
    PnL.loc[PnL['short entry'],'num units short'] = -1
    PnL.loc[PnL['short exit'],'num units short'] = 0
    PnL['num units short'][0] = 0 
    PnL['num units short'] = PnL['num units short'].fillna(method='pad')
    
    PnL['numUnits'] = PnL['num units long'] + PnL['num units short']
    PnL['spread pct ch'] = ((spread - spread.shift(1)) / base).values
    PnL['port rets'] = PnL['spread pct ch'] * PnL['numUnits'].shift(1)
    PnL['cum rets'] = PnL['port rets'].cumsum()
    PnL['cum rets'] = PnL['cum rets'] + 1
    end_val = PnL['cum rets'].iat[-1]

    try:
        sharpe_ratio = ((PnL['port rets'].mean() / PnL['port rets'].std()) * np.sqrt(252))
    except ZeroDivisionError:
        sharpe_ratio = 0.0
    return(-sharpe_ratio)

In [3]:
def backtest(spread, z_score, base, pair_name):
    result = minimize(objective, [-0.7, 0.05, 0.7, -0.05], args=(spread, z_score, base, pair_name), bounds=[(-1.1, 1.1), (-1.1, 1.1), (-1.1, 1.1), (-1.1, 1.1)], method='Nelder-Mead')
    long_entryZscore, long_exitZscore, short_entryZscore, short_exitZscore = result.x
    best_return = -result.fun
    return best_return, long_entryZscore, long_exitZscore, short_entryZscore, short_exitZscore

In [4]:
optim_table = pd.DataFrame(index=range(train_z_score.shape[1]), 
                           columns=["MaxSharpe", "long_entryZscore", "long_exitZscore", "short_entryZscore", "short_exitZscore"])
optim_table.index = train_z_score.columns

for pair in range(train_z_score.shape[1]):
    pair_name = train_z_score.columns[pair]
    z_score = train_z_score[pair_name]
    spread = train_spread.iloc[:,pair]
    base = train_spread_base.iloc[:,pair]
    optim_table.iloc[pair,:] = backtest(spread, z_score, base, pair_name)

optim_table

Unnamed: 0,MaxSharpe,long_entryZscore,long_exitZscore,short_entryZscore,short_exitZscore
ORCL GOOGL,2.142829,-0.665175,0.050439,0.745646,-0.048219
USLV GLD,1.959753,-0.82835,0.05173,0.483646,-0.052275
USLV AAPL,2.125589,-0.551336,0.053983,0.728412,-0.053249
USLV GOOGL,2.032549,-0.831308,0.051105,0.415312,-0.056196
USLV AMD,1.185272,-0.664329,0.046875,0.763904,-0.053986
...,...,...,...,...,...
CMCSA VDC,2.817447,-0.589789,0.058925,0.597373,-0.049333
CMCSA KXI,2.872766,-0.564648,0.053713,0.758276,-0.052278
CMCSA VHT,2.215425,-0.653663,0.05334,0.691434,-0.050305
CMCSA VNQ,2.624765,-0.690233,0.048605,0.748836,-0.048964


In [5]:
optim_table.to_csv('z_score_optim(Sharpe).csv', index=True)