In [None]:
import pandas as pd
audata = pd.read_csv("RESULT/cu/cu_final.csv", index_col=[0], parse_dates=[0])
audata

In [None]:
audata.id_A.unique()

In [None]:
data = audata[audata.id_A == 'cu2103'][['last_A', 'last_B']]
data

#### 相关性检验 

In [None]:
import numpy as np
from scipy import stats
correlation, pvalue = stats.spearmanr(data.last_A, data.last_B)
print("coef:", round(correlation, 3), ",p-value:", pvalue)
print("------")
correlation, pvalue = stats.spearmanr(np.diff(data.last_A) / data.last_A.shift()[1:], np.diff(data.last_B)/data.last_B.shift()[1:])
print("coef:", round(correlation, 3), ",p-value:", pvalue)


#### ADF检验 

In [None]:
import statsmodels.tsa.stattools as ts
adf1 = ts.adfuller(np.diff(data.last_A))
adf2 = ts.adfuller(np.diff(data.last_B))
print(adf1)
print(adf2)

In [None]:
adf3 = ts.adfuller(data.last_A)
adf4 = ts.adfuller(data.last_B)
print(adf3)
print(adf4)

#### OLS回归

In [None]:
import statsmodels.api as sm
model = sm.OLS(data.last_A, data.last_B).fit()
print(model.params)
beta = model.params['last_B']

#### 残差单位根检验

In [None]:
eps = data.last_A - data.last_B * beta
print(ts.adfuller(eps))

In [None]:
import matplotlib.pyplot as plt
plt.figure()
plt.plot([i for i in range(len(eps))], eps.values)
plt.show()

#### 序列去中心化 

In [None]:
Spread = data.last_A - data.last_B

In [None]:
plt.figure(figsize=(20,8))
plt.plot([i for i in range(len(eps))], eps.values)
plt.plot([i for i in range(len(Spread))], Spread)
plt.show()

In [None]:
MSpread = Spread  - (Spread).mean()
MSpread

#### 样本内回测


In [None]:
holding = False
direction = None  # "a-b", "b-a"
enter_record = []
out_record = []

pure_pnl = []
trading_cost = []
pnl = []
cost_rate = 0.5 / 10000


K = 0.31

up_threshold =  K * np.std(MSpread)
down_threshold = - up_threshold
print(up_threshold, down_threshold)

for i, v in enumerate(MSpread.values):
    if not holding:
        # 判断进场条件
        if v > up_threshold:
            holding = True
            direction = 'b-a'
            print('b-a进场', i, v)
            enter_record.append((i, v))
        elif v < down_threshold:
            holding = True
            direction = 'a-b'
            print('a-b进场', i, v)
            enter_record.append((i, v))
            
    elif holding:
        # 判断离场条件
        if direction == 'a-b' and v > 0.1:
            holding = False
            print('a-b出场', i, v)
            out_record.append((i, v))
            
            out_price = v
            last_enter_price = enter_record[-1][1]
            single_ret = (out_price - last_enter_price)
            single_cost = cost_rate * (data.last_A[i] + data.last_B[i] + 
                                         data.last_A[enter_record[-1][0]] + data.last_B[enter_record[-1][0]])

            pure_pnl.append(single_ret)
            trading_cost.append(single_cost)
            pnl.append(single_ret - single_cost)
            
            
        elif direction =='b-a' and v < -0.1:
            holding = False
            print('b-a出场', i, v)
            out_record.append((i, v))
            single_ret = (enter_record[-1][1] - v)
            single_cost = cost_rate * (data.last_A[i] + data.last_B[i] + 
                                         data.last_A[enter_record[-1][0]] + data.last_B[enter_record[-1][0]])            
            
            pure_pnl.append(single_ret)
            trading_cost.append(single_cost)
            pnl.append(single_ret - single_cost)
            
if holding:
    print('还有持仓，强平')
    ret = (MSpread[-1] - enter_record[-1][1])
    trading_cost = cost_rate * (data.last_A[-1] + data.last_B[-1] + 
                                         data.last_A[enter_record[-1][0]] + data.last_B[enter_record[-1][0]])
    pure_pnl += ret
    pnl += ret - trading_cost
    

In [None]:
# plt.figure(figsize=(20,8))
# plt.plot([i for i in range(len(Spread))], MSpread)
# plt.scatter(np.array(enter_record)[:, 0], np.array(enter_record)[:, 1], color = 'r')
# plt.scatter(np.array(out_record)[:, 0], np.array(out_record)[:, 1], color = 'b')

In [None]:
plt.figure(figsize=(10,6))
plt.plot(np.array(pnl).cumsum(), label='pnl')
plt.plot(np.array(pure_pnl).cumsum(), label='pure_pnl')
plt.plot(np.array(trading_cost).cumsum(), label='trading_cost')
plt.legend()

#### 最佳参数 

In [None]:
cost_rate = 0.5 / 10000
best_K = None
best_K_res = None


K_options = np.arange(0.01, 2.5, 0.01)

for K in K_options:
    
    holding = False
    direction = None  # "a-b", "b-a"
    enter_record = []
    out_record = []

    pure_pnl = []
    trading_cost = []
    pnl = []
    
    up_threshold =  K * np.std(MSpread)
    down_threshold = - up_threshold
    
    for i, v in enumerate(MSpread):
        if not holding:
            # 判断进场条件
            if v > up_threshold:
                holding = True
                direction = 'b-a'
                enter_record.append((i, v))
            elif v < down_threshold:
                holding = True
                direction = 'a-b'
                enter_record.append((i, v))

        elif holding:
            # 判断离场条件
            if direction == 'a-b' and v > 0.1:
                holding = False
                out_record.append((i, v))

                out_price = v
                last_enter_price = enter_record[-1][1]
                single_ret = (out_price - last_enter_price)
                single_cost = cost_rate * (data.last_A[i] + data.last_B[i] + 
                                             data.last_A[enter_record[-1][0]] + data.last_B[enter_record[-1][0]])

                pure_pnl.append(single_ret)
                trading_cost.append(single_cost)
                pnl.append(single_ret - single_cost)


            elif direction =='b-a' and v < -0.1:
                holding = False
                out_record.append((i, v))
                single_ret = (enter_record[-1][1] - v)
                single_cost = cost_rate * (data.last_A[i] + data.last_B[i] + 
                                             data.last_A[enter_record[-1][0]] + data.last_B[enter_record[-1][0]])            

                pure_pnl.append(single_ret)
                trading_cost.append(single_cost)
                pnl.append(single_ret - single_cost)

                
    if holding:
        print('还有持仓，强平')
        ret = (MSpread[-1] - enter_record[-1][1])
        trading_cost = cost_rate * (data.last_A[-1] + data.last_B[-1] + 
                                             data.last_A[enter_record[-1][0]] + data.last_B[enter_record[-1][0]])
        pure_pnl += ret
        pnl += ret - trading_cost
        
    final_pnl = np.array(pnl).cumsum()[-1]
    if best_K_res is None:
        best_K_res = final_pnl
        K_res = K
        
    elif final_pnl > best_K_res:
        best_K_res = final_pnl
        k_res = K

In [None]:
k_res
best_K_res

In [None]:
print("收益率：")
best_K_res / (2 * 9/100 * 70000)