In [None]:
import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import requests
import datetime as dt
import time

plt.style.use('ggplot')
pd.set_option('display.max_rows', None)

df_in = pd.read_csv('TWF_Futures_Minute_Trade.txt')
#print(df.head())

df_in.index = pd.to_datetime(df_in['Date'] + ' ' + df_in['Time'])
df_in = df_in.drop(['Date','Time','TotalVolume'],axis =1)
df_in.columns = ['open', 'high', 'low', 'close']
df_in['Hour'] = df_in.index.map(lambda x: x.hour)
df_in.head()


In [None]:
df_out = pd.read_csv('TXF_樣本外.csv')
#print(df0.head())
df_out['Date'] = df_out['Unnamed: 0'].str.split().str.get(0)
df_out['Time'] = df_out['Unnamed: 0'].str.split().str.get(1)
df_out.index = pd.to_datetime(df_out['Date'] + ' ' + df_out['Time'])
df_out = df_out.drop(['Unnamed: 0','Date','Time'],axis =1)
df_out.columns = ['open', 'high', 'low', 'close']
df_out['Hour'] = df_out.index.map(lambda x: x.hour)
df_out.tail()

In [None]:
df = pd.concat([df_in, df_out],axis=0)
#print(df.head())
#print(df.tail())

In [None]:
#以60根為一個單位，相當於60分鐘
rule = '60T'

#日盤
Morning = df[(df['Hour'] >= 8) & (df['Hour'] <= 13)]
Morning.index = Morning.index + dt.timedelta(minutes=15)

d1 = Morning.resample(rule=rule, closed='right', label='left').first()[['open']] #只要open的第一個
d2 = Morning.resample(rule=rule, closed='right', label='left').max()[['high']]
d3 = Morning.resample(rule=rule, closed='right', label='left').min()[['low']]
d4 = Morning.resample(rule=rule, closed='right', label='left').last()[['close']]

df_Morning = pd.concat([d1,d2,d3,d4], axis=1)
df_Morning = df_Morning.dropna()
df_Morning.index = df_Morning.index - dt.timedelta(minutes=15)
print(df_Morning.head())
print(df_Morning.tail())

In [None]:
df_Morning['Hour'] = df_Morning.index.map(lambda x: x.hour)

#訓練資料
trainData = df_Morning[(df_Morning.index >= '2010-01-01 00:00:00') & \
                        (df_Morning.index <= '2019-12-31 00:00:00')].copy()
print(trainData.head())
#然後再驗證
testData = df_Morning[(df_Morning.index >= '2020-1-1 00:00:00') & \
                      (df_Morning.index <= '2023-7-1 00:00:00')].copy()
#trainData.head()
print(testData.head())

In [None]:
Data = pd.concat([trainData, testData],axis=0)
print(Data.head())
print(Data.tail())

In [None]:
settlementDate_ = pd.read_csv('settlementDate.csv')
settlementDate_.columns = ['settlementDate', 'futures', 'settlementPrice']

bool_ = [False if 'W' in i else True for i in settlementDate_['futures']]

settlementDate = [i.replace('/','-') for i in list(settlementDate_[bool_]['settlementDate'])]
settlementDate = [pd.to_datetime(i).date() for i in settlementDate]

Donchian Channel
- 進場訊號: 20 天期最高價突破 -> 做多 / 20 天期最低價突破 -> 做空
- 出場訊號: 10天期反向突破最低價
- 上軌: 過去 20 天最高價 
- 下軌: 過去 20 天最低價 
- 趨勢濾網: 25 天 MA > 300 天 MA 時只能做多; 25 天 MA < 300 天 MA 時只能做空
- 無停損停利

回測假設
* 換月轉倉：結算日當天直接平倉
* 進場限制：結算日當天不進場
* 報酬計算：200 * 點數
* 手續費+滑價：單邊 600元

In [None]:
# 參數設定
fund = 1000000
feePaid = 600
K = 0.02

# 指標計算
# 趨勢濾網
open = Data['open']
close = Data['close']

import talib
ma25 = talib.SMA(close, timeperiod = 25)
ma300 = talib.SMA(close, timeperiod = 300)

# 25ma與300ma差距
MA_dif = ma25 - ma300
Data['MA_dif'] = MA_dif

#計算上下軌
high = Data['high']
low = Data['low']

upline = talib.MAX(high, timeperiod = 20)
downline = talib.MIN(low, timeperiod = 20)
max10 = talib.MAX(high, timeperiod= 10)
min10 = talib.MIN(low, timeperiod = 10)
Data['upline'] = upline
Data['downline'] = downline
Data['max10'] = max10
Data['min10'] = min10
#upline.head(100)
#downline.head(100)
print(Data.head())
Data.tail()


做多

In [None]:
import numpy as np
df_arr = np.array(Data)
time_arr = np.array(Data.index)
date_arr = [pd.to_datetime(i).date() for i in time_arr]


#建立存放資料的單位
BS_Long = None 
buy_Long = [] #買進時間點
sell_Long = [] #賣出時間點
profit_list_Long = [0] #未實現損益
profit_fee_list_Long = [0] #損益-手續費
profit_fee_list_realized_Long = [] #總共賺的錢，每筆已實現損益

#開始交易流程
for i in range(len(df_arr)):

    #回測期間最後一天就跳出這個迴圈
    if i == len(df_arr)-1:
        break

    ## 進場邏輯
    ### 當收盤價突破20天最高價
    entryLong = df_arr[i,3] >= df_arr[i-1,6]
    ### 結算日不進場
    entryCondition = date_arr[i] not in settlementDate

    ## 出場邏輯
    ### 收盤價反向突破10天最低價
    exitShort = df_arr[i,3] <= df_arr[i-1,9]
    #print(exitShort)
    ### 結算日時放到結算
    exitCondition = date_arr[i] in settlementDate and df_arr[i,4] >= 11

    #做多的狀態
    if BS_Long == 'B':
        # 停利停損條件
        stopLoss_Long = df_arr[i,3] <= df_arr[t,0] * (1-K)
        #stopProfit_Long = df_arr[i,3] >= df_arr[t,0] * (1+K)


    #還沒進場不用計算損益
    if BS_Long == None:
        profit_list_Long.append(0)
        profit_fee_list_Long.append(0)

        #確認進場&相關設定
        if entryLong and entryCondition and df_arr[i,5]>0:
            #更改狀態至做多
            BS_Long = 'B'
            #紀錄進場時間
            t = i+1
            buy_Long.append(t)
            print("Buy Price: {}, time: {}".format(df_arr[t,0], time_arr[t]))

    #進場開始計算未實現損益
    elif BS_Long == 'B':

        profit_Long = 200 * (df_arr[i+1,0] - df_arr[i,0])
        profit_list_Long.append(profit_Long)

        #進場條件達成，計算未實現損益-交易成本
        if exitShort or i == len(df_arr)-2 or exitCondition or stopLoss_Long:
            #計算已實現損益
            pl_round_Long = 200 * (df_arr[i+1,0] - df_arr[t,0]) #t=i+1 進場時間
            profit_fee_Long = profit_Long - feePaid*2
            profit_fee_list_Long.append(profit_fee_Long)

            #紀錄出場時間
            sell_Long.append(i+1)

            #重置交易狀態
            BS_Long = None
            print("Sell Price: {}, time: {}".format(df_arr[i+1,0], time_arr[i+1]))
            print("Trade completed")
            print()

            # Realized PnL
            profit_fee_realized_Long = pl_round_Long - feePaid*2
            profit_fee_list_realized_Long.append(profit_fee_realized_Long)

        #出場條件未達成，計算未實現損益
        else:
            profit_fee_Long = profit_Long
            profit_fee_list_Long.append(profit_fee_Long)
        
    
equityLong = pd.DataFrame({'profit':np.cumsum(profit_list_Long), 'profitfee':np.cumsum(profit_fee_list_Long)}, \
                          index=Data.index)
equityLong.plot(grid=True, figsize=(12,6))


In [None]:
print(equityLong)

In [None]:
import seaborn as sns
import matplotlib.pyplot as py
plt.style.use('ggplot')

多單績效

In [None]:
equityLong['equity'] = equityLong['profitfee'] + fund
equityLong['drawdown_percent'] = (equityLong['equity']/equityLong['equity'].cummax()) - 1
equityLong['drawdown'] = equityLong['equity'] - equityLong['equity'].cummax()
print(profit_fee_list_realized_Long)
profit_Long = equityLong['profitfee'].iloc[-1]
ret_Long = equityLong['equity'][-1]/equityLong['equity'][0] - 1
mdd_Long = abs(equityLong['drawdown_percent'].min())
calmarRatio_Long = ret_Long / mdd_Long
tradeTimes_Long = len(buy_Long) + len(sell_Long)
winRate_Long = len([i for i in profit_fee_list_realized_Long if i > 0]) / len(profit_fee_list_realized_Long)
profitFactor_Long = sum([i for i in profit_fee_list_realized_Long if i>0]) /  \
                    abs(sum([i for i in profit_fee_list_realized_Long if i<0]))

print('Profit_Long : ',profit_Long)
print('Return_Long : ',ret_Long)
print('Max DrawDown_Long : ',mdd_Long)
print('Caimar Ratio_Long : ',calmarRatio_Long)
print('Trade Times_Long : ',tradeTimes_Long)
print('Win Rate_Long : ',winRate_Long)
print('Profit Factor_Long : ',profitFactor_Long)

In [None]:
print(equityLong)

做空

In [None]:
#建立存放資料的單位
BS_Short = None 
buy_Short = [] #買進時間點
sell_Short = [] #賣出時間點
profit_list_Short = [0] #未實現損益
profit_fee_list_Short = [0] #損益-手續費
profit_fee_list_realized_Short = [] #總共賺的錢，每筆已實現損益

#開始交易流程
for i in range(len(df_arr)):

    #回測期間最後一天就跳出這個迴圈
    if i == len(df_arr)-1:
        break

    ## 進場邏輯
    ### 當收盤價突破20天最低價
    entryShort = df_arr[i,3] <= df_arr[i-1,7]
    ### 結算日不進場
    entryCondition = date_arr[i] not in settlementDate

    ## 出場邏輯
    ### 收盤價反向突破10天最高價
    exitLong = df_arr[i,3] >= df_arr[i-1,9]
    #print(exitShort)
    ### 結算日時放到結算
    exitCondition = date_arr[i] in settlementDate and df_arr[i,4] >= 11

    #做空的狀態
    if BS_Short == 'B':
        # 停利停損條件
        stopLoss_Short = df_arr[i,3] >= df_arr[t,0] * (1+K) 
        #stopProfit_Short = df_arr[i,3] <= df_arr[t,0] * (1-K) 

    #還沒進場不用計算損益
    if BS_Short == None:
        profit_list_Short.append(0)
        profit_fee_list_Short.append(0)

        #確認進場&相關設定
        if entryShort and entryCondition and df_arr[i,5]<0:
            #更改狀態至做多
            BS_Short = 'B'
            #紀錄進場時間
            t = i+1
            buy_Short.append(t)
            print("Buy Price: {}, time: {}".format(df_arr[t,0], time_arr[t]))

    #進場開始計算未實現損益
    elif BS_Short == 'B':

        profit_Short = 200 * -(df_arr[i+1,0] - df_arr[i,0])
        profit_list_Short.append(profit_Short)
 
        #進場條件達成，計算未實現損益-交易成本
        if exitLong or i == len(df_arr)-2 or exitCondition or stopLoss_Short:
            #計算已實現損益
            pl_round_Short = 200 * -(df_arr[i+1,0] - df_arr[t,0]) #t=i+1 進場時間
            profit_fee_Short = profit_Short - feePaid*2
            profit_fee_list_Short.append(profit_fee_Short)

            #紀錄出場時間
            sell_Short.append(i+1)

            #重置交易狀態
            BS_Short = None
            print("Sell Price: {}, time: {}".format(df_arr[i+1,0], time_arr[i+1]))
            print("Trade completed")
            print()

            # Realized PnL
            profit_fee_realized_Short = pl_round_Short - feePaid*2
            profit_fee_list_realized_Short.append(profit_fee_realized_Short)

        #出場條件未達成，計算未實現損益
        else:
            profit_fee_Short = profit_Short
            profit_fee_list_Short.append(profit_fee_Short)
        
    
equityShort = pd.DataFrame({'profit':np.cumsum(profit_list_Short), 'profitfee':np.cumsum(profit_fee_list_Short)}, \
                           index=Data.index)
equityShort.plot(grid=True, figsize=(12,6));
    
    


In [None]:
print(equityShort)

In [None]:
equityShort['equity'] = equityShort['profitfee'] + fund
equityShort['drawdown_percent'] = (equityShort['equity']/equityShort['equity'].cummax()) - 1
equityShort['drawdown'] = equityShort['equity'] - equityShort['equity'].cummax()
print(profit_fee_list_realized_Short)
profit_Short = equityShort['profitfee'].iloc[-1]
ret_Short = equityShort['equity'][-1]/equityShort['equity'][0] - 1
mdd_Short = abs(equityShort['drawdown_percent'].min())
calmarRatio_Short = ret_Short / mdd_Short
tradeTimes_Short = len(buy_Short) + len(sell_Short)
winRate_Short = len([i for i in profit_fee_list_realized_Short if i > 0]) / len(profit_fee_list_realized_Short)
profitFactor_Short = sum([i for i in profit_fee_list_realized_Short if i>0]) / \
                        abs(sum([i for i in profit_fee_list_realized_Short if i<0]))

print('Profit_Short : ',profit_Short)
print('Return_Short : ',ret_Short)
print('Max DrawDown_Short : ',mdd_Short)
print('Caimar Ratio_Short : ',calmarRatio_Short)
print('Trade Times_Short : ',tradeTimes_Short)
print('Win Rate_Short : ',winRate_Short)
print('Profit Factor_Short : ',profitFactor_Short)

In [None]:
equity = equityLong + equityShort
print(equity.head())
print(equity.tail())
#equity.plot(grid=True, figsize=(12,6))
equity['profit'].plot(grid=True, figsize=(12,6))
equity['profitfee'].plot(grid=True, figsize=(12,6))
equity['drawdown'].plot(grid=True, figsize=(12,6))
equity['drawdown_percent'].plot(grid=True, figsize=(12,6))

In [None]:
profit_fee_list_realized = profit_fee_list_realized_Long + profit_fee_list_realized_Short
print(profit_fee_list_realized)
profit = equity['profitfee'].iloc[-1]
ret = equity['equity'][-1]/equity['equity'][0] - 1
mdd = abs(equity['drawdown_percent'].min())
calmarRatio = ret / mdd
tradeTimes = tradeTimes_Long + tradeTimes_Short
winRate = len([i for i in profit_fee_list_realized if i > 0]) / len(profit_fee_list_realized)
profitFactor = sum([i for i in profit_fee_list_realized if i>0]) / \
    abs(sum([i for i in profit_fee_list_realized if i<0]))

print('Profit : ',profit)
print('Return : ',ret)
print('Max DrawDown : ',mdd)
print('Caimar Ratio : ',calmarRatio)
print('Trade Times : ',tradeTimes)
print('Win Rate : ',winRate)
print('Profit Factor : ',profitFactor)

In [None]:
# 時間損益(年)
equity.index = pd.to_datetime(equity.index) #確保索引是datetime型態
print(equity.index)
years = ['2011', '2012', '2013', '2014', '2015', '2016', '2017', '2018', '2019','2020','2021','2022','2023']
print(years)
year_ret = []
for i in years:
    year_ret.append(equity.loc[i]['equity'].iloc[-1]/equity.loc[i]['equity'].iloc[0] - 1)

df = pd.DataFrame({'Return':year_ret},index = years)

# heatmap函式
py.figure(figsize=(10,1))
sns.heatmap(df.transpose(), annot=True, cmap='OrRd')
py.title('Return by year')
py.show()
print('')

In [None]:
# 時間損益(年)
equity.index = pd.to_datetime(equity.index) #確保索引是datetime型態
print(equity.index)
years = ['2011', '2012', '2013', '2014', '2015', '2016', '2017', '2018', '2019','2020','2021','2022','2023']
print(years)
year_MDD = []
for i in years:
    year_MDD.append(equity.loc[i]['drawdown_percent'].min())

df = pd.DataFrame({'MDD':year_MDD},index = years)

# heatmap函式
py.figure(figsize=(10,1))
sns.heatmap(df.transpose(), annot=True, cmap='summer')
py.title('MDD by year')
py.show()
print('')

In [None]:
# quantstats回測報表
import quantstats
ret = equityLong['equity'].pct_change(periods=1).dropna()
print(ret)
quantstats.reports.full(ret)
quantstats.reports.html(ret,output='stats.html',title='backtest result',download_filename='result.html')

In [None]:
# quantstats回測報表
import quantstats
ret = equityShort['equity'].pct_change(periods=1).dropna()
print(ret)
quantstats.reports.full(ret)
quantstats.reports.html(ret,output='stats.html',title='backtest result',download_filename='result.html')

In [None]:
# quantstats回測報表
import quantstats
ret = equity['equity'].pct_change(periods=1).dropna()
print(ret)
quantstats.reports.full(ret)
quantstats.reports.html(ret,output='stats.html',title='backtest result',download_filename='result.html')

In [None]:
IRR = (1 + 0.0411)**(1/14)-1
IRR

In [None]:
IRR = (1 + 0.3296)**(1/14)-1
IRR

In [None]:
IRR = (1 - 0.2474)**(1/14)-1
IRR

策略參數最佳化

In [None]:
optimizationList = []
fund = 1000000
feePaid = 600

# 最佳化參數：Dayin & Dayout
for Dayin in range(20,110,10):
    for Dayout in range(10, 110, 10):

        print('----------')
        print(f'Dayin: {Dayin}')
        print(f'Dayout: {Dayout}')

        open = trainData['open']
        close = trainData['close']

        import talib
        ma25 = talib.SMA(close, timeperiod = 25)
        ma300 = talib.SMA(close, timeperiod = 300)

        # 25ma與300ma差距
        MA_dif = ma25 - ma300
        trainData['MA_dif'] = MA_dif

        #計算上下軌
        high = trainData['high']
        low = trainData['low']

        upline = talib.MAX(high, timeperiod = N)
        downline = talib.MIN(low, timeperiod = N)
        max10 = talib.MAX(high, timeperiod= X)
        min10 = talib.MIN(low, timeperiod = X)
        trainData['upline'] = upline
        trainData['downline'] = downline
        trainData['max10'] = max10
        trainData['min10'] = min10

        df_arr = np.array(trainData)
        time_arr = np.array(trainData.index)
        date_arr = [pd.to_datetime(i).date() for i in time_arr]

        BS = None
        buy = []
        sell = []
        sellshort = []
        buytocover = []
        profit_list = [0]
        profit_fee_list = [0]
        profit_fee_list_realized = []
        rets = []

        for i in range(len(df_arr)):

            if i == len(df_arr)-1:
                break

            ## 進場邏輯
            entryLong = df_arr[i,3] >= df_arr[i-1,6]
            entrySellShort = df_arr[i,3] <= df_arr[i-1,7]
            entryCondition = date_arr[i] not in settlementDate

            ## 出場邏輯
            exitShort = df_arr[i,3] <= df_arr[i-1,9]
            exitBuyToCover = df_arr[i,3] >= df_arr[i-1,8]
            exitCondition = date_arr[i] in settlementDate and df_arr[i,4] >= 11

            ## 停利停損邏輯
            if BS == 'B':
                stopLoss = df_arr[i,3] <= df_arr[t,0] * (1-K)

            elif BS == 'S':
                stopLoss = df_arr[i,3] >= df_arr[t,0] * (1+K)
                

            #     if exitCondition == True:
            #         print(f'{time_arr[i]}')

            if BS == None:
                profit_list.append(0)
                profit_fee_list.append(0)

                if entryLong and entryCondition and df_arr[i,5]>0:
                    BS = 'B'
                    t = i+1
                    buy.append(t)

                elif entrySellShort and entryCondition and df_arr[i,5]<0:
                    BS = 'S'
                    t = i+1
                    sellshort.append(t)

            elif BS == 'B':
                profit = 200 * (df_arr[i+1,0] - df_arr[i,0])
                profit_list.append(profit) #紀錄損益

                if exitShort or i == len(df_arr)-2 or exitCondition or stopLoss:
                    pl_round = 200 * (df_arr[i+1,0] - df_arr[t,0])
                    profit_fee = profit - feePaid*2
                    profit_fee_list.append(profit_fee)
                    sell.append(i+1)
                    BS=None

                    # Realized PnL
                    profit_fee_realized = pl_round - feePaid*2
                    profit_fee_list_realized.append(profit_fee_realized)
                    rets.append(profit_fee_realized/(200*df_arr[t,0]))

                else:
                    profit_fee = profit
                    profit_fee_list.append(profit_fee)

            elif BS == 'S':
                profit = 200 * (df_arr[i,0] - df_arr[i+1,0])
                profit_list.append(profit)

                if exitBuyToCover or i == len(df_arr)-2 or exitCondition or stopLoss:
                    pl_round = 200 * (df_arr[t,0] - df_arr[i+1,0])
                    profit_fee = profit - feePaid*2
                    profit_fee_list.append(profit_fee)
                    buytocover.append(i+1)
                    BS=None

                    # Realized PnL
                    profit_fee_realized = pl_round - feePaid*2
                    profit_fee_list_realized.append(profit_fee_realized)
                    rets.append(profit_fee_realized/(200*df_arr[t,0]))

                else:
                    profit_fee = profit
                    profit_fee_list.append(profit_fee)

        equity = pd.DataFrame({'profit':np.cumsum(profit_list), 'profitfee':np.cumsum(profit_fee_list)}, index=trainData.index)
        equity['equity'] = equity['profitfee'] + fund
        equity['drawdown_percent'] = (equity['equity'] / equity['equity'].cummax()) - 1
        equity['drawdown'] = equity['equity'] - equity['equity'].cummax()
        ret = equity['equity'][-1]/equity['equity'][0] - 1
        mdd = abs(equity['drawdown_percent'].min())
        calmarRatio = ret / mdd

        optimizationList.append([Dayin, Dayout, ret, calmarRatio])

print(optimizationList)