In [None]:
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import matplotlib.pyplot as plt

In [None]:
# 读取数据
sec_post_1d_df = pd.read_pickle(r'/Users/tiancaixiaohuoban/Desktop/实习/买方实习/中信建投期货（金融工程）/策略设计/dom_cont_post_1d')
# sec_post_1d_df

In [None]:
univ01 = ['AG', 'AL', 'NI', 'ZN',
          'RB', 'HC', 'I', 'J', 'JM', 'FG',
          'BU', 'L', 'MA', 'PP', 'RU', 'V', 'TA', 'FU', 'EG', 'SP',
          'A', 'M', 'P', 'OI', 'Y', 'SR', 'CF', 'C', 'RM']

univ02 = ['AG', 'AL', 'NI', 'ZN',
          'RB', 'HC', 'I', 'J', 'JM', 'FG',
          'BU', 'L', 'MA', 'PP', 'RU', 'V', 'TA', 'FU', 'EG', 'SP',
          'A', 'M', 'P', 'OI', 'Y', 'SR', 'CF', 'C', 'RM', 'CU', 'PB', 'SN', 'SF', 'SM', 'AP', 'SA']

univ_nonferrous = ['CU', 'AL', 'NI', 'ZN', 'PB', 'SN']
univ_black = ['RB', 'I', 'HC', 'J', 'JM', 'FG', 'SF', 'SM', 'SA']
univ_chemic = ['FU', 'BU', 'RU', 'L', 'TA', 'V', 'EG', 'MA', 'PP']
univ_agri = ['A', 'AP', 'C', 'CF', 'CS', 'JD', 'M', 'OI', 'P', 'RM', 'SR', 'Y']
univ_precious = ['AU', 'AG']

In [None]:
def kagi(P, H):
    '''
    create kagi contrusction for process P with parameter H
    
    Args:
        P (numpy.ndarray): input time series 
        H (float): parameter of kagi construction
            
    Returns:
        tau_a (list): time moments when process P has local maximum or minimum
        tau_b (list): time moments when local maximum or minimum is recognized
    '''
    
    tau_a = []
    tau_b = []
    
    # find tau_b[0]
    for u in range(0,len(P)-1):
        if (np.max(P[0:u+1]) - np.min(P[0:u+1])) > H:
            tau_b.append(u)
            break
    
    # if not found, return empty lists
    if len(tau_b)<1:
        return tau_a, tau_b

    # find tau_a[0]
    for u in range(0,tau_b[0]):
        if abs(P[u] - P[tau_b[0]]) > H:
            tau_a.append(u)
            break
    # if not found, return empty lists
    if len(tau_a)<1:
        return tau_a, tau_b

    # determine if tau_a[0] is min or max
    S_0 = np.sign(P[tau_a[0]] - P[tau_b[0]])

    if S_0==1: # if min
        n=1
    else: # if max
        n=0
    
    # process time series P
    finished = False
    while not finished:
        finished = True
        
        # if local minimum
        if n%2==1:
            # find tau_b
            for u in range(tau_a[-1]+1,len(P)):
                if (P[u] - np.min(P[tau_a[-1]:u+1])) > H:
                    if u > tau_b[-1]:
                        tau_b.append(u)
                        finished = False
                        break
                    else:
                        finished = True
            # find tau_a
            if not finished:
                argmin = np.argmin(P[tau_a[-1]:tau_b[-1]+1])
                tau_a.append(tau_a[-1]+argmin)
                n = n+1
        
        # if local maximum
        elif n%2==0:
            # find tau_b
            for u in range(tau_a[-1]+1,len(P)):
                if (np.max(P[tau_a[-1]:u+1]) - P[u]) > H:
                    if u > tau_b[-1]:
                        tau_b.append(u)
                        finished=False
                        break
                    else:
                        finished = True
            # find tau_a
            if not finished:
                argmax = np.argmax(P[tau_a[-1]:tau_b[-1]+1])
                tau_a.append(tau_a[-1]+argmax)
                n = n+1
                
    return tau_a, tau_b

In [None]:
def load_data(univ, sec_post_1d_df):
    # 从原始数据中提取出univ中的合约的收盘价数据
    data = pd.DataFrame()
    for contract in univ:
        df = sec_post_1d_df.loc[contract].T
        data[contract] = df['close']
    
    return data

In [None]:
def parse_pair(pair):
    s1 = pair[:pair.find('-')]
    s2 = pair[pair.find('-')+1:]
    return s1,s2

In [None]:
def h_volatility(P, tau_a, tau_b):
    '''
    Calcuate H-volatility and H-inversion for process P with given times tau_a and tau_b
    
    Args:
        P (numpy.ndarray): input time series 
        tau_a (list): time moments when process P has local maximum or minimum
        tau_b (list): time moments when local maximum or minimum is recognized
            
    Returns:
        h_vol (float): H-volatility of order 1
        h_inv (int): H-inversion
    '''

    h_inv = len(tau_b)
    V = abs(np.diff(P[tau_a])).sum()
    h_vol = V / h_inv

    return h_vol, h_inv

In [None]:
def rolling_generate_signals1(prices, drift, rolling_window, pairs_num, flag):
    # generate signals for rolling window
    train_begin = datetime.strptime('2010-01-01','%Y-%m-%d') + timedelta(days=drift)
    train_begin = train_begin.strftime('%Y-%m-%d')
    train_end = datetime.strptime(train_begin,'%Y-%m-%d') + timedelta(days=rolling_window-1)
    train_end = train_end.strftime('%Y-%m-%d')
    test_begin = datetime.strptime(train_end,'%Y-%m-%d') + timedelta(days=1)
    test_begin = test_begin.strftime('%Y-%m-%d')
    test_end = datetime.strptime(test_begin,'%Y-%m-%d') + timedelta(days=21)
    test_end = test_end.strftime('%Y-%m-%d')
    prices_train = prices.loc[train_begin:train_end]
    prices_train = prices_train.dropna(axis=1,how='any')
    log_prices_train = np.log(prices_train)
    pairs_df = pd.DataFrame(columns=['H', 'H-volatility','H-inversion', 'Last extremum'])
    selected_pairs = []
    selected_stocks = []
    
    # generate pairs_df
    for s1 in log_prices_train.columns:
        for s2 in log_prices_train.columns:
            if (s1 != s2) and (f'{s2}-{s1}' not in pairs_df.index):
                spread = (log_prices_train[s1] - log_prices_train[s2]).values
                H = 0.9*spread.std()
                tau_a, tau_b = kagi(spread, H)
    
                if len(tau_a) < 1:
                    pairs_df.loc[f'{s1}-{s2}'] = -1, -1, -1, -1  # if no local min\max found
                else:
                    h_vol, h_inv = h_volatility(spread, tau_a, tau_b)
    
                    if spread[tau_b[-1]] > spread[tau_a[-1]]:
                        last_extr = 'min'
                    else:
                        last_extr = 'max'
    
                    pairs_df.loc[f'{s1}-{s2}'] = H, h_vol, h_inv, last_extr
    for pair in pairs_df.sort_values(by='H-inversion', ascending=False).index:
        s1,s2 = parse_pair(pair)
        if (s1 not in selected_stocks) and (s2 not in selected_stocks):
            selected_pairs.append(pair)
            selected_stocks.append(s1)
            selected_stocks.append(s2)
        if len(selected_pairs)==pairs_num:
            break
    # print(selected_pairs)
    # generate returns
    prices_test = prices.loc[test_begin:test_end]
    prices_test = prices_test.dropna(axis=1,how='any')
    returns_test = prices_test.pct_change()
    log_prices_test = np.log(prices_test)
    
    # generate signals
    positions = pd.DataFrame(index=returns_test.index, columns=selected_stocks)
    for t in log_prices_test.index:
        log_prices_tmp = log_prices_test.loc[:t]
        log_prices_tmp = log_prices_tmp.iloc[-21:]
    
        for pair in selected_pairs:
            s1, s2 = parse_pair(pair)
            spread = (log_prices_tmp[s1] - log_prices_tmp[s2]).values
            H = pairs_df.loc[pair, 'H']
            h_vol = pairs_df.loc[pair, 'H-volatility']
            tau_a, tau_b = kagi(spread, H)
            if not tau_a:  # no local min\max found
                tau_a = [0]
            if not tau_b:
                tau_b = [0]
    
            if tau_b[-1] == (len(spread) - 1):  # new local extremum detected
                if spread[tau_a[-1]] > spread[tau_b[-1]]:  # spread moves down
                    if h_vol > 1.4 * H:  # trend following
                        positions.loc[t, [s1, s2]] = [-1, 1]
                    else:  # contrarian
                        positions.loc[t, [s1, s2]] = [1, -1]
                else:  # spread moves up
                    if h_vol > 1.4 * H:  # trend following
                        positions.loc[t, [s1, s2]] = [1, -1]
                    else:  # contrarian
                        positions.loc[t, [s1, s2]] = [-1, 1]
    positions.fillna(method='ffill', inplace=True)
    # add positions at the beginning of the trading period
    for pair in selected_pairs:
        s1,s2 = parse_pair(pair)
        H = pairs_df.loc[pair, 'H']
        h_vol = pairs_df.loc[pair, 'H-volatility']
        last_extr = pairs_df.loc[pair, 'Last extremum']
        if h_vol > 1.4*H: # trend following
            if last_extr=='min':
                # short position
                positions[s1].fillna(-1, inplace=True) 
                positions[s2].fillna(1, inplace=True)
            else:
                # long position
                positions[s1].fillna(1, inplace=True) 
                positions[s2].fillna(-1, inplace=True)
        else: # contrarian
            if last_extr=='min':
                # long position
                positions[s1].fillna(1, inplace=True) 
                positions[s2].fillna(-1, inplace=True)
            else:
                # short position
                positions[s1].fillna(-1, inplace=True) 
                positions[s2].fillna(1, inplace=True)
    positions.fillna(method='ffill', inplace=True)
    positions.fillna(0, inplace=True)
    if(flag):
        return positions
    return positions.tail(1)

In [None]:
def rolling_generate_signals2(prices, drift, rolling_window, pairs_num,flag):
    # generate signals for rolling window
    train_begin = datetime.strptime('2010-01-01','%Y-%m-%d') + timedelta(days=drift)
    train_begin = train_begin.strftime('%Y-%m-%d')
    train_end = datetime.strptime(train_begin,'%Y-%m-%d') + timedelta(days=rolling_window-1)
    train_end = train_end.strftime('%Y-%m-%d')
    test_begin = datetime.strptime(train_end,'%Y-%m-%d') + timedelta(days=1)
    test_begin = test_begin.strftime('%Y-%m-%d')
    test_end = datetime.strptime(test_begin,'%Y-%m-%d') + timedelta(days=21)
    test_end = test_end.strftime('%Y-%m-%d')
    prices_train = prices.loc[train_begin:train_end]
    prices_train = prices_train.dropna(axis=1,how='any')
    log_prices_train = np.log(prices_train)
    pairs_df = pd.DataFrame(columns=['H', 'H-volatility','H-inversion', 'Last extremum'])
    selected_pairs = []
    selected_stocks = []
    
    # generate pairs_df
    for s1 in log_prices_train.columns:
        for s2 in log_prices_train.columns:
            if (s1 != s2) and (f'{s2}-{s1}' not in pairs_df.index):
                spread = (log_prices_train[s1] - log_prices_train[s2]).values
                H = 0.9*spread.std()
                tau_a, tau_b = kagi(spread, H)
    
                if len(tau_a) < 1:
                    pairs_df.loc[f'{s1}-{s2}'] = -1, -1, -1, -1  # if no local min\max found
                else:
                    h_vol, h_inv = h_volatility(spread, tau_a, tau_b)
    
                    if spread[tau_b[-1]] > spread[tau_a[-1]]:
                        last_extr = 'min'
                    else:
                        last_extr = 'max'
    
                    pairs_df.loc[f'{s1}-{s2}'] = H, h_vol, h_inv, last_extr
    for pair in pairs_df.sort_values(by='H-inversion', ascending=False).index:
        s1,s2 = parse_pair(pair)
        if (s1 not in selected_stocks) and (s2 not in selected_stocks):
            selected_pairs.append(pair)
            selected_stocks.append(s1)
            selected_stocks.append(s2)
        if len(selected_pairs)==pairs_num:
            break
    # print(selected_pairs)
    # generate returns
    prices_test = prices.loc[test_begin:test_end]
    prices_test = prices_test.dropna(axis=1,how='any')
    returns_test = prices_test.pct_change()
    log_prices_test = np.log(prices_test)
    
    # generate signals
    positions = pd.DataFrame(index=returns_test.index, columns=selected_stocks)
    for t in log_prices_test.index:
        log_prices_tmp = log_prices_test.loc[:t]
        log_prices_tmp = log_prices_tmp.iloc[-21:]
    
        for pair in selected_pairs:
            # 提取pair中的两个合约
            s1,s2 = parse_pair(pair)
            # 计算spread
            spread = (log_prices_tmp[s1] - log_prices_tmp[s2]).values
            #H = pairs_df.loc[pair, 'H']
            #h_vol = pairs_df.loc[pair, 'H-volatility']
            H = 0.9*spread.std()
            tau_a,tau_b = kagi(spread, H)
            if not tau_a:  # no local min\max found
                tau_a = [0]
            if not tau_b:
                tau_b = [0]
            h_vol, h_inv = h_volatility(spread, tau_a, tau_b)
            
            if tau_b[-1] == (len(spread)-1):# 若发现新的极值点
                if spread[tau_a[-1]] > spread[tau_b[-1]]: # 价差增大
                    if h_vol > 1.4*H: # 趋势跟踪
                        positions.loc[t, [s1,s2]] = [-1,1]
                    else: # contrarian
                        positions.loc[t, [s1,s2]] = [1,-1]
                else: # 价差减少
                    if h_vol > 1.4*H: # 趋势跟踪
                        positions.loc[t, [s1,s2]] = [1,-1]
                    else: # contrarian
                        positions.loc[t, [s1,s2]] = [-1,1]
    positions.fillna(method='ffill', inplace=True)
    positions.fillna(0, inplace=True)
    if(flag):
        return positions
    # 返回positions的最后一行
    return positions.tail(1)

In [None]:
def calculate_metrics(cumret):
    '''
    calculate performance metrics from cumulative returns
    '''
    total_return = (cumret[-1] - cumret[0])/cumret[0]
    apr = (1+total_return)**(252/len(cumret)) - 1
    rets = pd.DataFrame(cumret).pct_change()
    sharpe = np.sqrt(21) * np.nanmean(rets) / np.nanstd(rets)
    
    # maxdd and maxddd
    highwatermark=np.zeros(cumret.shape)
    drawdown=np.zeros(cumret.shape)
    drawdownduration=np.zeros(cumret.shape)
    for t in np.arange(1, cumret.shape[0]):
        highwatermark[t]=np.maximum(highwatermark[t-1], cumret[t])
        drawdown[t]=cumret[t]/highwatermark[t]-1
        if drawdown[t]==0:
            drawdownduration[t]=0
        else:
            drawdownduration[t]=drawdownduration[t-1]+1
    maxDD=np.min(drawdown)
    maxDDD=np.max(drawdownduration)
    
    return total_return, apr, sharpe, maxDD, maxDDD

In [None]:
# # 定义两个日期
# date1 = datetime(2010, 1, 1)
# date2 = datetime(2024, 1, 25)
# # 计算间隔天数
# interval = (date2 - date1).days
# interval

## strategy 1.3 pairs

In [None]:
from tqdm import tqdm
prices = load_data(univ02, sec_post_1d_df)
# prices = prices.dropna()
for drift in tqdm(range(0, 5011)):
    # merge positions
    if drift == 0:
        positions = rolling_generate_signals1(prices, drift, 126, 3, 1)
        positions_all = positions
        continue
    else:
        positions = rolling_generate_signals1(prices, drift, 126, 3, 0)
    positions_all = pd.concat([positions_all, positions], axis=0)

In [None]:
positions_all

In [None]:
positions_all.fillna(0)
# 找出重复的索引
duplicated_index = positions_all.index.duplicated(keep='first')
# 删除具有重复索引的列
positions_all = positions_all[~duplicated_index]

In [None]:
# 将position_all中的int类型转换为float类型
for col in positions_all.columns:
    positions_all[col] = positions_all[col].astype(float)
test_signal_df = positions_all.T
test_signal_df = test_signal_df.fillna(0)
test_signal_df

In [None]:
def backtest(positions, fee_rate, univ, start_date, end_date, pair_num, strategy_name):
    prices = load_data(univ, sec_post_1d_df)
    # calculate returns
    prices_test = prices.loc[start_date:end_date]
    returns_test = prices_test.pct_change()
    positions = positions.loc[start_date:end_date]
    # calculate fee
    fee = positions.diff().abs() * fee_rate
    ret = ((positions.shift() * returns_test[univ])-fee).sum(axis=1) / pair_num * 2
    cumret = np.nancumprod(ret+1)
    cumret = pd.Series(cumret, index=ret.index)
    columns = ['Total return', 'APR', 'Sharpe ratio', 'Max Drawdown', 'Max Drawdown Duration']
    performance_df = pd.DataFrame(columns=columns)
    performance_df.loc[strategy_name] = calculate_metrics(cumret)
    print(performance_df)
    plt.figure(figsize=(18,4))
    plt.plot(cumret, label='strategy cumulative returns')
    plt.legend()

In [None]:
backtest(positions_all,0.0001,univ02,'2010-05-07','2024-01-25',3,'strategy 1.3 pairs')

## strategy 2.3 pairs

In [None]:
from tqdm import tqdm
prices = load_data(univ02, sec_post_1d_df)
# prices = prices.dropna()
for drift in tqdm(range(0, 5011)):
    # merge positions
    if drift == 0:
        positions = rolling_generate_signals2(prices, drift, 126, 3, 1)
        positions_all = positions
        continue
    else:
        positions = rolling_generate_signals2(prices, drift, 126, 3, 0)
    positions_all = pd.concat([positions_all, positions], axis=0)

In [None]:
positions_all

In [None]:
positions_all.fillna(0)
# 找出重复的索引
duplicated_index = positions_all.index.duplicated(keep='first')
# 删除具有重复索引的列
positions_all = positions_all[~duplicated_index]

In [None]:
# 将position_all中的int类型转换为float类型
for col in positions_all.columns:
    positions_all[col] = positions_all[col].astype(float)
test_signal_df = positions_all.T
test_signal_df

In [None]:
backtest(positions_all,0.0001,univ02,'2010-05-07','2024-01-25',3,'strategy 2.3 pairs')

## strategy 1.10 pairs

In [None]:
from tqdm import tqdm
prices = load_data(univ02, sec_post_1d_df)
# prices = prices.dropna()
for drift in tqdm(range(0, 5011)):
    # merge positions
    if drift == 0:
        positions = rolling_generate_signals1(prices, drift, 126, 10, 1)
        positions_all = positions
        continue
    else:
        positions = rolling_generate_signals1(prices, drift, 126, 10, 0)
    positions_all = pd.concat([positions_all, positions], axis=0)

In [None]:
positions_all

In [None]:
positions_all.fillna(0)
# 找出重复的索引
duplicated_index = positions_all.index.duplicated(keep='first')
# 删除具有重复索引的列
positions_all = positions_all[~duplicated_index]

In [None]:
# 将position_all中的int类型转换为float类型
for col in positions_all.columns:
    positions_all[col] = positions_all[col].astype(float)
test_signal_df = positions_all.T
test_signal_df

In [None]:
backtest(positions_all,0.0001,univ02,'2010-05-07','2024-01-25',10,'strategy 1.10 pairs')

## strategy 2.10 pairs

In [None]:
from tqdm import tqdm
prices = load_data(univ02, sec_post_1d_df)
# prices = prices.dropna()
for drift in tqdm(range(0, 5011)):
    # merge positions
    if drift == 0:
        positions = rolling_generate_signals2(prices, drift, 126, 10, 1)
        positions_all = positions
        continue
    else:
        positions = rolling_generate_signals2(prices, drift, 126, 10, 0)
    positions_all = pd.concat([positions_all, positions], axis=0)

In [None]:
positions_all

In [None]:
positions_all.fillna(0)
# 找出重复的索引
duplicated_index = positions_all.index.duplicated(keep='first')
# 删除具有重复索引的列
positions_all = positions_all[~duplicated_index]

In [None]:
# 将position_all中的int类型转换为float类型
for col in positions_all.columns:
    positions_all[col] = positions_all[col].astype(float)
test_signal_df = positions_all.T
test_signal_df

In [None]:
backtest(positions_all,0.0001,univ02,'2010-05-07','2024-01-25',10,'strategy 2.10 pairs')