In [2]:
import numpy as np
import pandas as pd
from datetime import datetime
from sklearn.linear_model import LinearRegression

In [7]:
INTC = pd.read_csv("INTC.txt")
SMH = pd.read_csv("SMH.txt")

In [8]:
INTC.head()

Unnamed: 0,index,time,bid,bidvol,ask,askvol
0,0,2014-11-05 09:31:00,34.07,100.0,34.1,39500.0
1,1,2014-11-05 09:32:00,33.98,200.0,33.99,1516.0
2,2,2014-11-05 09:33:00,33.98,900.0,33.99,500.0
3,3,2014-11-05 09:34:00,33.9,700.0,33.91,2105.0
4,4,2014-11-05 09:35:00,33.97,1400.0,33.98,800.0


In [9]:
SMH.head()

Unnamed: 0,index,time,bid,bidvol,ask,askvol
0,0,2014-11-05 09:31:00,51.75,1000.0,52.09,1000.0
1,1,2014-11-05 09:32:00,51.85,700.0,51.9,1000.0
2,2,2014-11-05 09:33:00,51.75,1000.0,51.79,400.0
3,3,2014-11-05 09:34:00,51.63,1000.0,51.66,400.0
4,4,2014-11-05 09:35:00,51.63,1400.0,51.67,1200.0


In [None]:
import numpy as np
import pandas as pd
from datetime import datetime
from sklearn.linear_model import LinearRegression

def compute_mid_price(INTC, SMH):
    # compute the mid price series for Stock INTC and ETF SMH respectively
    # mid price is the mean of ask price and bid price
    midINTC = (INTC['bid'] + INTC['ask'])/2
    midSMH = (SMH['bid'] + SMH['ask'])/2
    return midINTC, midSMH

def parameter_estimation(midINTC, midSMH):
    # estimate the parameter kappa (K) using VAR S(t) = A + B*S(t-1) + error
    # in other words, you need to run two linear regressions:
    # 1. midINTC(t) = a1 + b11*midINTC(t-1) + b12*midSMH(t-1) + error
    # 2. midSMH(t) = a1 + b21*midINTC(t-1) + b22*midSMH(t-1) + error
    # the estimation for B is [[b11,b12],[b21,b22]]
    # recover K = I - B
    lr1 = LinearRegression()
    lr2 = LinearRegression() 
    
    X = pd.concat([midINTC.shift().dropna(),midSMH.shift().dropna()],axis=1)
    
    lr1.fit(X,midINTC[1:])
    lr2.fit(X,midSMH[1:])
    
    B = np.array([lr1.coef_,lr2.coef_])
    I = np.eye(2)
    K = I - B
    return K
    

def eigenvalue_decomposition(K):
    # compute the eigenvalues of K and store it in the array lambda_
    # compute the inverse U^(-1) of eigenbasis U 
    lambda_, U = np.linalg.eig(K)
    return lambda_, np.linalg.inv(U)
    
def construct_cointegration_factor(invU, midINTC, midSMH):
    # compare the two eigenvalues
    # set the cointegration factor as the process corresponds to the largest eigenvalue
    arr = np.array([midINTC[:-1], midSMH[:-1]])
    cf = invU @ arr
    return cf[0]

def compute_inner_outer_bands(cf):
    # calculate avg: the mean of cointegration factor 
    # calculate m: the deviation of inner bands from the mean, set as 0.1*std(cf)
    # the inner bands are therefore given by avg±m = avg±0.1*std(cf)
    # calculate M: the deviation of outer bands from the mean, set as 1*std(cf)
    # the outer bands are therefore given by avg±M = avg±1*std(cf)
    avg = np.mean(cf)
    m = .1 * np.std(cf)
    M = np.std(cf)
    return avg, m, M
    

def backtest(cf,m,M):
    # When cf goes above upper outer band avg+M, open a short position on cf
    # close the short position when cf reverts and hits upper inner band avg+m
    # When cf goes below lower outer band avg-M, open a long position on cf
    # close the long position when cf reverts and hits lower inner band avg-m 
    # initial pnl is 0, assume you can long or short at most 1 contract of cf at any time
    df = pd.DataFrame(cf, columns=['cf'])
    df["uob"] = np.mean(cf) + M
    df["lob"] = np.mean(cf) - M
    df["uib"] = np.mean(cf) + m
    df["lib"] = np.mean(cf) - m
    
    pnl = 0
    position = 0
    
    for i in range(len(df)):
        if position == 0 and df['cf'][i] > df.uob[i]:
            sell = df['cf'][i]
            position = -1
            
        if position < 0 and df['cf'][i] <= df.uib[i]:
            buy = df.cf[i]
            position = 0
            pnl += sell - buy
            
        if position == 0 and df['cf'][i] < df.lob[i]:
            buy = df['cf'][i]
            position = 1
            
        if position > 0 and df['cf'][i] >= df.lib[i]:
            sell = df.cf[i]
            position = 0
            pnl += sell - buy 
    
    return pnl
def test_0(INTC,SMH):
    midINTC, midSMH = compute_mid_price(INTC,SMH)
    print(midINTC)
    print(midSMH)
    
def test_1(INTC,SMH):
    midINTC, midSMH = compute_mid_price(INTC,SMH)
    K = parameter_estimation(midINTC, midSMH)
    print(K)
    
def test_2(INTC,SMH):
    midINTC, midSMH = compute_mid_price(INTC,SMH)
    K = parameter_estimation(midINTC, midSMH)
    lambda_, invU = eigenvalue_decomposition(K)
    for i in range(2):
        print(f"eigenvalue {np.round(lambda_[i],8)} corresponds to factor ({np.round(invU[i,0],8)})xINTC+({np.round(invU[i,1],8)})xSMH ")
    
def test_3(INTC,SMH):
    midINTC, midSMH = compute_mid_price(INTC,SMH)
    K = parameter_estimation(midINTC, midSMH)
    lambda_, invU = eigenvalue_decomposition(K)
    cf = construct_cointegration_factor(invU, midINTC, midSMH)
    print(pd.Series(cf))
    
def test_4(INTC,SMH):
    midINTC, midSMH = compute_mid_price(INTC,SMH)
    K = parameter_estimation(midINTC, midSMH)
    lambda_, invU = eigenvalue_decomposition(K)
    cf = construct_cointegration_factor(invU, midINTC, midSMH)
    avg, m, M = compute_inner_outer_bands(cf)
    print(f"inner band: {np.round(avg,6)}±{np.round(m,6)}, outer band: {np.round(avg,6)}±{np.round(M,6)}")
          
def test_5(INTC,SMH):
    midINTC, midSMH = compute_mid_price(INTC,SMH)
    K = parameter_estimation(midINTC, midSMH)
    lambda_, invU = eigenvalue_decomposition(K)
    cf = construct_cointegration_factor(invU, midINTC, midSMH)
    _, m, M = compute_inner_outer_bands(cf)
    pnl = backtest(cf,m,M)
    print(np.round(pnl,6))


if __name__ == '__main__':
    test_id = int(input().strip())
    row_num = int(input().strip())
    
    Data = []
    col_names = list(map(str, input().split(',')))
    for i in range(row_num):
        line=list(map(str, input().split(',')))
        line[1] = datetime.strptime(line[1],'%Y-%m-%d %H:%M:%S')
        for j in range(2,6):
            if line[j] == '':
                line[j] = np.nan
            else:
                line[j] = np.round(float(line[j]),2)
        Data.append(line[1:])
    
    INTC = pd.DataFrame(Data, columns= col_names[1:])
    
    Data = []
    col_names = list(map(str, input().split(',')))
    for i in range(row_num):
        line=list(map(str, input().split(',')))
        line[1] = datetime.strptime(line[1],'%Y-%m-%d %H:%M:%S')
        for j in range(2,6):
            if line[j] == '':
                line[j] = np.nan
            else:
                line[j] = np.round(float(line[j]),2)
        Data.append(line[1:])
        
    SMH = pd.DataFrame(Data, columns= col_names[1:])
    
    if test_id == 0:
        test_0(INTC,SMH)
    if test_id == 1:
        test_1(INTC,SMH)
    if test_id == 2:
        test_2(INTC,SMH)
    if test_id == 3:
        test_3(INTC,SMH)
    if test_id == 4:
        test_4(INTC,SMH)
    if test_id == 5:
        test_5(INTC,SMH)