In [163]:
import pandas as pd 
import numpy as np
from scipy import stats

###### 数据读取和预处理

In [3]:
df = pd.read_csv("../data/ag2302.csv")

In [9]:
df["time_gap"] = df["ts"] - df["ts"].shift(1)

In [20]:
temp_list = df["time_gap"]
signal_list = []
for i in range(len(temp_list)):
    if temp_list[i] > 5000:
        signal_list.append(i)
        
df_list = []

for i in range(1,len(signal_list)):
    if signal_list[i] - signal_list[i-1] > 1000 :
        temp_df = df.iloc[signal_list[i-1]+1:signal_list[i]]
        temp_df = temp_df.reset_index(drop = True)
        if len(temp_df) > 3000:
            df_list.append(temp_df)

print(len(df_list))

23


# ----------------------------------------------------------------------------------------------------------------

<center><font size=5>Intro<font><center>
<font size=3> -该脚本用于计算在不同止盈止损价格下，在指定样本中的胜率，和自定义敞口价差下的收益率,以及结果的二项检验p值<font><br/>
<font size=3> -可以先计算胜率或收益矩阵，再依据策略在样本中进行挑选，在通过矩阵做差检验策略的有效性和合理的止损止盈价格<font><br/>
<font size=3> -该计算过程建议在小波动止盈止损下使用，因为当止盈止损价差时，可能导致大量回测样本在同一点平仓，影响统计显著性<font>

# ----------------------------------------------------------------------------------------------------------------

###### 计算信息矩阵

<font size=4 face="黑体">止盈止损标记矩阵</font>
<br/>
<font size=3 face="黑体">（用于标记成交对的止盈止损情况）</font>
<center>$\begin{pmatrix}
  A_{p1}B_{p1} & A_{p1}B_{p2} & A_{p1}B_{p3} & A_{p1}B_{p4} & A_{p1}B_{p5}\\   
  A_{p2}B_{p1} & A_{p2}B_{p2} & A_{p2}B_{p3} & A_{p2}B_{p4} & A_{p2}B_{p5}\\
  A_{p3}B_{p1} & A_{p3}B_{p2} & A_{p3}B_{p3} & A_{p3}B_{p4} & A_{p3}B_{p5}\\
  A_{p4}B_{p1} & A_{p4}B_{p2} & A_{p4}B_{p3} & A_{p4}B_{p4} & A_{p4}B_{p5}\\
  A_{p5}B_{p1} & A_{p5}B_{p2} & A_{p5}B_{p3} & A_{p5}B_{p4} & A_{p5}B_{p5}
 \end{pmatrix}$<center>
 <br/>
 <center>$A_{pi}B_{pj}$表示以$p_{i}$为止盈，$p_{j}$为止损的交易对的成交情况<center>
 <br/>
 <center>具体的，1为止盈，-1为止损，0为未发生成交<center>
 <br/>
 
 <font size=4 face="黑体">成交标记矩阵</font>
 <br/>
<font size=3 face="黑体">（用于标记成交对的是否有成交情况）</font>
<center>$\begin{pmatrix}
  Mark(A_{p1}B_{p1}) & Mark(A_{p1}B_{p2}) & Mark(A_{p1}B_{p3}) & Mark(A_{p1}B_{p4} & A_{p1}B_{p5})\\   
  Mark(A_{p2}B_{p1}) & Mark(A_{p2}B_{p2}) & Mark(A_{p2}B_{p3}) & Mark(A_{p2}B_{p4} & A_{p2}B_{p5})\\
  Mark(A_{p3}B_{p1}) & Mark(A_{p3}B_{p2}) & Mark(A_{p3}B_{p3}) & Mark(A_{p3}B_{p4} & A_{p3}B_{p5})\\
  Mark(A_{p4}B_{p1}) & Mark(A_{p4}B_{p2}) & Mark(A_{p4}B_{p3}) & Mark(A_{p4}B_{p4} & A_{p4}B_{p5})\\
  Mark(A_{p5}B_{p1}) & Mark(A_{p5}B_{p2}) & Mark(A_{p5}B_{p3}) & Mark(A_{p5}B_{p4} & A_{p5}B_{p5})
 \end{pmatrix}$<center>
 <br/>
 <center>$A_{pi}B_{pj}$数据流结束前是否有成交<center>
 <br/>
 <center>具体的，1为有成交，0为在当前数据流中未发生该交易对的成交<center>
 <br/>
    

# ----------------------------------------------------------------------------------------------------------------

快速计算session中每个矩阵指标的方法：
<br/>
1、生成 每个价格的位置序列 的对应hashmap<br/>
2、对每i时间节点，对矩阵中的每个量价对，寻找对应价格的第一个id，判断并填充矩阵<br/>
3、对自身量价节点的对应序列进行更新（删除对应第一个id）<br/>
4、重复步骤2<br/>

PS：所有操作基于单一列表进行

# ----------------------------------------------------------------------------------------------------------------

 <font size=4 face="黑体">测试止盈止损tick量价选择(单位：合约最小单位)</font>
     
 | p1 | p2 | p3 | p2 | p3 |
| :-----:| :----: | :----: |:----: | :----: |
| 2 | 3 | 4 |5 | 6 |

In [139]:
TICK_UNIT = 1

In [140]:
# price_map = {0:2*TICK_UNIT,1:3*TICK_UNIT,2:4*TICK_UNIT,3:5*TICK_UNIT,4:6*TICK_UNIT,5:7*TICK_UNIT,6:8*TICK_UNIT,7:9*TICK_UNIT,8:10*TICK_UNIT,9:11*TICK_UNIT}
price_map = {0:2*TICK_UNIT,1:3*TICK_UNIT,2:4*TICK_UNIT,3:5*TICK_UNIT,4:6*TICK_UNIT}
DISTANCE = float('+inf') # 自定义距离，若交易节点和当前节点的距离大于该值则不成交
# DISTANCE = 1000

In [141]:
# 1、生成对应hashmap
def GetHash(target_list):
    answer_map = {}
    for i in range(len(target_list)):
        if target_list[i] in answer_map:
            answer_map[target_list[i]].append(i) 
        else:
            answer_map[target_list[i]] = [i]
    return answer_map

# 2、多头矩阵生成
def GetMatrixLong(price):
    winlose_matrix = np.zeros((len(price_map),len(price_map)))
    complete_matrix = np.zeros((len(price_map),len(price_map)))
    for i in range(len(price_map)):
        for j in range(len(price_map)):
            winprice = price + price_map[i]
            loseprice = price - price_map[j]
            winid = float('+inf')
            loseid = float('+inf')
            if winprice in tool_map:
                if tool_map[winprice][0]-tool_map[price][0] < DISTANCE:
                    winid = tool_map[winprice][0]
            if loseprice in tool_map:
                if tool_map[loseprice][0]-tool_map[price][0] < DISTANCE:
                    loseid = tool_map[loseprice][0]
            if winid < loseid:
                winlose_matrix[i][j] = 1
                complete_matrix[i][j] = 1
            elif winid > loseid:
                winlose_matrix[i][j] = -1
                complete_matrix[i][j] = 1
    # 更新toolmap
    if len(tool_map[price]) == 1:
        del tool_map[price]
    else:
        tool_map[price] = tool_map[price][1:]
    return winlose_matrix,complete_matrix

# 空头矩阵生成
def GetMatrixShort(price):
    winlose_matrix = np.zeros((len(price_map),len(price_map)))
    complete_matrix = np.zeros((len(price_map),len(price_map)))
    for i in range(len(price_map)):
        for j in range(len(price_map)):
            winprice = price - price_map[i]
            loseprice = price + price_map[j]
            winid = float('+inf')
            loseid = float('+inf')
            if winprice in tool_map:
                if tool_map[winprice][0]-tool_map[price][0] < DISTANCE:
                    winid = tool_map[winprice][0]
            if loseprice in tool_map:
                if tool_map[loseprice][0]-tool_map[price][0] < DISTANCE:
                    loseid = tool_map[loseprice][0]
            if winid < loseid:
                winlose_matrix[i][j] = 1
                complete_matrix[i][j] = 1
            elif winid > loseid:
                winlose_matrix[i][j] = -1
                complete_matrix[i][j] = 1
    # 更新toolmap
    if len(tool_map[price]) == 1:
        del tool_map[price]
    else:
        tool_map[price] = tool_map[price][1:]
    return winlose_matrix,complete_matrix

# 计算对应列表信息矩阵
def get_matrix_info(target_list,LongShort):
    p_matrix_list = []
    c_matrix_list = []
    if LongShort == "long":
        for id_ in range(len(target_list)):
            price = target_list[id_]
            p,c = GetMatrixLong(price)
            p_matrix_list.append(p)
            c_matrix_list.append(c)
    elif LongShort == "short":
        for id_ in range(len(target_list)):
            price = target_list[id_]
            p,c = GetMatrixLong(price)
            p_matrix_list.append(p)
            c_matrix_list.append(c)
    return p_matrix_list,c_matrix_list

In [150]:
# test
target_list = df_list[4]["Bp"]
print("sample_length:",len(target_list))
tool_map = GetHash(target_list)
p,c = get_matrix_info(target_list,"long")

sample_length: 36629


In [151]:
p[0]

array([[-1., -1., -1., -1., -1.],
       [-1., -1., -1., -1., -1.],
       [-1., -1., -1., -1., -1.],
       [-1., -1., -1., -1., -1.],
       [-1., -1., -1., -1., -1.]])

###### 统计信息生成

<center><font size=4 face="黑体">计算指定矩阵集合不同交易对下的胜率和收益期望（自定义敞口价差）</font><center>
<br/>
<center>对于选择出的节点的信息矩阵<center>
<center><font size=3>$\{M_{pi}\},i=0,2...N ;\ \{M_{ci}\},i=0,2...N$</font><center>
<br/>
<center>进行累加求和：<center>
<center><font size=3>$M_{sump}=\sum_{i=0}^NM_{pi};\ M_{sumc}=\sum_{i=0}^NM_{ci}$</font><center>
<br/>
<center>计算止盈频率：<center>
<center><font size=4>$M_{winprob}=\frac{M_{sump}+M_{sumc}}{2 M_{sumc}}$</font><center>
<center>(此处矩阵的除法为对应元素相除)<center>
<br/>
<center>计算频率收益：<center>
<center><font size=3>$M_{E[profit]}[i][j]=P[i]*M_{winprob}[i][j]-P[j]*(1-M_{winprob}[i][j])$</font><center>
 

In [152]:
def GetStatisticInfo(p,c,gap_price):
    # 矩阵求和
    Msump = np.zeros((len(price_map),len(price_map)))
    Msumc = np.zeros((len(price_map),len(price_map)))
    for i in range(len(p)):
        Msump += p[i]
        Msumc += c[i]
    # 计算止盈频率
    M_winprob = (Msump+Msumc)/(2*Msumc)
    # 计算期望收益（自定义敞口价差）
    M_Eprofit = np.zeros((len(price_map),len(price_map)))
    for i in range(len(price_map)):
        for j in range(len(price_map)):
            M_Eprofit[i][j] = price_map[i]*M_winprob[i][j] - price_map[j]*(1-M_winprob[i][j])-1
    return M_winprob,M_Eprofit,Msumc

In [153]:
M_winprob,M_Eprofit,Msumc = GetStatisticInfo(p,c,1)

In [154]:
M_winprob

array([[0.53105955, 0.62479845, 0.68372004, 0.72244541, 0.75155093],
       [0.45261201, 0.55334737, 0.6105902 , 0.65118504, 0.67209258],
       [0.41539976, 0.5061249 , 0.56187794, 0.58967257, 0.61844777],
       [0.36240044, 0.44616863, 0.49046022, 0.52241646, 0.54762693],
       [0.31755603, 0.39285319, 0.43593007, 0.46312635, 0.48880991]])

In [155]:
M_Eprofit

array([[-0.8757618 , -0.87600776, -0.89767976, -0.94288213, -0.98759258],
       [-0.73693994, -0.6799158 , -0.72586862, -0.79051967, -0.9511668 ],
       [-0.50760144, -0.45712567, -0.50497648, -0.69294685, -0.81552226],
       [-0.46319692, -0.43065092, -0.58585803, -0.77583545, -0.97610375],
       [-0.45955174, -0.46432125, -0.64069935, -0.90561012, -1.13428111]])

In [156]:
Msumc

array([[36591., 36591., 36591., 36591., 36591.],
       [36581., 36581., 36581., 36581., 36553.],
       [36572., 36572., 36572., 36466., 36438.],
       [36410., 36410., 36374., 36268., 36240.],
       [36318., 36184., 36148., 36042., 36014.]])

###### 二项分布假设检验p值

<font size=4 face="黑体">计算结果置信水平：<font>
<br/>
<font size=3>假设每一组交易对在样本内进行二项分布检验，计算对应p值<font>
<br/>
<font size=3>PS:使用外部样本评价，或子样本抽样评价<font>

In [168]:
def get_p_value_matrix(p_sample,M_winprob):
    p_value_matrix = np.zeros((len(price_map),len(price_map)))

    sample_list = []
    for i in range(len(price_map)):
        temp = []
        for j in range(len(price_map)):
            temp.append([])
        sample_list.append(temp)

    for i in range(len(p_sample)):
        for a in range(len(price_map)):
            for b in range(len(price_map)):
                sample_list[a][b].append(p[i][a][b])

    for i in range(len(price_map)):
        for j in range(len(price_map)):
            sample_temp = sample_list[i][j]
            x = int((np.sum(sample_temp) + len(sample_temp))/2)
            p_value_matrix[i][j] = stats.binom_test(x, n=len(sample_temp), p=M_winprob[i][j], alternative='greater')
    
    return p_value_matrix
        
    

In [169]:
p_value_matrix = get_p_value_matrix(p_sample,M_winprob)

In [170]:
p_value_matrix

array([[1.        , 1.        , 1.        , 1.        , 1.        ],
       [1.        , 0.99999997, 0.99999997, 0.99979017, 0.97233061],
       [1.        , 1.        , 0.99999983, 0.78326599, 0.3719954 ],
       [1.        , 1.        , 0.99999962, 0.81087613, 0.96288167],
       [1.        , 0.99999724, 0.98371688, 0.98819324, 0.9992819 ]])