In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
import numpy as np
import time
import random
import calendar

In [None]:
HO = pd.read_csv('HO-5min.asc', index_col = 'Date').iloc[:,:5]
HO.index = pd.to_datetime(HO.index)
HO.iloc[:,1:] = HO.iloc[:,1:].multiply(100)

In [None]:
XB = pd.read_csv('XB-5min.asc', index_col = 'Date').iloc[:,:5]
XB.index = pd.to_datetime(XB.index)
XB.iloc[:,1:] = XB.iloc[:,1:].multiply(100)

In [None]:
def add_months(sourcedate, months):
    month = sourcedate.month - 1 + months
    year = sourcedate.year + month // 12
    month = month % 12 + 1
    day = 1
    return datetime(year, month, day)

In [None]:
class DonchianChannel:
    
    def __init__(self,PV,Slpg):
        self.PV = PV
        self.Slpg = Slpg
        self.Transaction = []
    
    def buy(self):
        self.market_position += 1
        
    def sell(self):
        self.market_position -= 1
            
    def EquityCurve(self,data,ChnLen,StpPct, maxbar = 0): 
        
        ##Equity curve of data start at ChnLen-th point with StopLoss of StpPct
        ##Output: A series of portfolio value
        
        portfolio_value = 100000
        
        HighestHigh = np.array(data.High.rolling(ChnLen).max())
        LowestLow = np.array(data.Low.rolling(ChnLen).min())
        n = len(data)
        val = np.zeros(n)+100000
        
        market_position = 0
        PrevPeak = 0
        PrevTrough = 0
        
        Date = np.array(data.index)
        Time = np.array(data.Time)
        Open = np.array(data.Open)
        High = np.array(data.High)
        Close = np.array(data.Close)
        Low = np.array(data.Low)
        
        if maxbar == 0:
            maxbar = ChnLen
        
        for i in range(maxbar,n):
            
            if market_position == 0:

                if High[i] >= HighestHigh[i]:
                    market_position += 1
                    EntryPrice = HighestHigh[i]
                    PrevPeak = HighestHigh[i]
                    self.Transaction.append(['long',Date[i],Time[i],EntryPrice])
                    val[i:] -= self.Slpg/2  
                    
                elif Low[i] <= LowestLow[i]:
                    market_position -= 1
                    EntryPrice = LowestLow[i]
                    PrevTrough = LowestLow[i]
                    self.Transaction.append(['short',Date[i],Time[i],EntryPrice])
                    val[i:] -= self.Slpg/2
                    
            if market_position > 0:
                if Close[i] >= PrevPeak:
                    PrevPeak = Close[i]
                    val[i] += (Close[i] - EntryPrice)*self.PV
                elif Low[i] <= PrevPeak*(1-StpPct):
                    market_position -= 1
                    self.Transaction.append(['end long',Date[i],Time[i],PrevPeak*(1-StpPct)])
                    val[i] += (PrevPeak*(1-StpPct) - EntryPrice)*self.PV - self.Slpg/2
                    val[i+1:] = val[i]
                else:
                    val[i] += (Close[i] - EntryPrice)*self.PV
                
            elif market_position < 0:
                if Close[i] <= PrevTrough:
                    PrevTrough = Close[i]
                    val[i] += -(Close[i] - EntryPrice)*self.PV
                elif High[i] >= PrevTrough*(1+StpPct):
                    market_position += 1
                    self.Transaction.append(['end short',Date[i],Time[i],PrevTrough*(1+StpPct)])
                    val[i] += -(PrevTrough*(1+StpPct) - EntryPrice)*self.PV - self.Slpg/2
                    val[i+1:] = val[i]
                else:
                    val[i] += -(Close[i] - EntryPrice)*self.PV       
        if market_position > 0:
            self.Transaction.append(['end long',Date[i],Time[i],Close[i]])
            val[-1] += -self.Slpg/2
        elif market_position < 0:
            self.Transaction.append(['end short',Date[i],Time[i],Close[i]])
            val[-1] += -self.Slpg/2
        return val
       
    def performance(self,Open,High,Close,Low,HighestHigh,LowestLow,ChnLen,StpPct,method ='roa'):
        
        ##Input: np.array - Open,High,Close,Low,HighestHigh,LowestLow
        ##            int - ChnLen
        ##          float - StpPct
        ##         method - 'roa' or 'RoMaD'
        ##Output:   score of our strategy
        
        n = len(Open)
        market_position = 0
        EntryPrice = 0
        PrevPeak = 0
        PrevTrough = 0
        val = np.zeros(n)+100000
        
        n = len(Open)
        prev_max = 0
        maximum_drawdown = 0
        
        for i in range(ChnLen,n):
            
            if market_position == 0:

                if High[i] >= HighestHigh[i]:
                    market_position += 1
                    EntryPrice = HighestHigh[i]
                    PrevPeak = HighestHigh[i]
                    val[i:] -= self.Slpg/2  
                    
                elif Low[i] <= LowestLow[i]:
                    market_position -= 1
                    EntryPrice = LowestLow[i]
                    PrevTrough = LowestLow[i]
                    val[i:] -= self.Slpg/2
                    
            if market_position > 0:
                if Close[i] >= PrevPeak:
                    PrevPeak = Close[i]
                    val[i] += (Close[i] - EntryPrice)*self.PV
                elif Low[i] <= PrevPeak*(1-StpPct):
                    market_position -= 1
                    val[i] += (PrevPeak*(1-StpPct) - EntryPrice)*self.PV - self.Slpg/2
                    val[i+1:] = val[i]
                else:
                    val[i] += (Close[i] - EntryPrice)*self.PV
                
            elif market_position < 0:
                if Close[i] <= PrevTrough:
                    PrevTrough = Close[i]
                    val[i] += -(Close[i] - EntryPrice)*self.PV
                elif High[i] >= PrevTrough*(1+StpPct):
                    market_position += 1
                    val[i] += -(PrevTrough*(1+StpPct) - EntryPrice)*self.PV - self.Slpg/2
                    val[i+1:] = val[i]
                else:
                    val[i] += -(Close[i] - EntryPrice)*self.PV
            
            if val[i] > prev_max:
                prev_max = val[i]
            drawdown = max(0,prev_max-val[i])
            if drawdown > maximum_drawdown:
                maximum_drawdown = drawdown
                
        profit = val[-1]-100000
    
        if method == 'roa':
            return profit
        elif method == 'RoMaD':
            return profit/maximum_drawdown
               
    def fit_random_search(self,data,size,iteration,method='roa'):
        
        ## Intput: pd.DataFrame - data
        ##                  int - max_iteration
        ##               string - method
        ## Output: tuple of parameters of Chn and StpPct
        
        Open = np.array(data.Open)
        High = np.array(data.High)
        Close = np.array(data.Close)
        Low = np.array(data.Low)
               
        Chn_start = 5
        Chn_end = 100
        StpPct_start = 5
        StpPct_end = 100
        
        ##First Iteration
        for loop in range(3):
            
            Chn_mid = int((Chn_end + Chn_start)/2)
            StpPct_mid = int((StpPct_end + StpPct_start)/2)             
            
            Chn1 = np.random.randint(Chn_start,Chn_mid,size)*100
            StpPct1 = np.random.randint(StpPct_mid,StpPct_end+1,size)/1000
            
            Chn2 = np.random.randint(Chn_mid,Chn_end+1,size)*100
            StpPct2 = np.random.randint(StpPct_mid,StpPct_end+1,size)/1000
            
            Chn3 = np.random.randint(Chn_start,Chn_mid,size)*100
            StpPct3 = np.random.randint(StpPct_start,StpPct_mid,size)/1000
            
            Chn4 = np.random.randint(Chn_mid,Chn_end+1,size)*100
            StpPct4 = np.random.randint(StpPct_start,StpPct_mid,size)/1000
            
            p1 = 0
            p2 = 0
            p3 = 0
            p4 = 0
            
            for i in range(size):
                
                HighestHigh1 = np.array(data.High.rolling(Chn1[i]).max())
                LowestLow1 = np.array(data.Low.rolling(Chn1[i]).min())
                p1 += self.performance(Open,High,Close,Low,HighestHigh1,LowestLow1,Chn1[i],StpPct1[i],method)
                
                HighestHigh2 = np.array(data.High.rolling(Chn2[i]).max())
                LowestLow2 = np.array(data.Low.rolling(Chn2[i]).min())
                p2 += self.performance(Open,High,Close,Low,HighestHigh2,LowestLow2,Chn2[i],StpPct2[i],method)
                
                HighestHigh3 = np.array(data.High.rolling(Chn3[i]).max())
                LowestLow3 = np.array(data.Low.rolling(Chn3[i]).min())
                p3 += self.performance(Open,High,Close,Low,HighestHigh3,LowestLow3,Chn3[i],StpPct3[i],method)
                
                HighestHigh4 = np.array(data.High.rolling(Chn4[i]).max())
                LowestLow4 = np.array(data.Low.rolling(Chn4[i]).min())
                p4 += self.performance(Open,High,Close,Low,HighestHigh4,LowestLow4,Chn4[i],StpPct4[i],method)
                
            if p1 == max(p1,p2,p3,p4):
                Chn_end = Chn_mid
                StpPct_start = StpPct_mid
            elif p2 == max(p1,p2,p3,p4):
                Chn_start = Chn_mid
                StpPct_start = StpPct_mid
            elif p3 == max(p1,p2,p3,p4):
                Chn_end = Chn_mid
                StpPct_end = StpPct_mid
            else:
                Chn_start = Chn_mid
                StpPct_end = StpPct_mid
        
        Best_Chn = 0
        Best_StpPct = 0
        b_score = 0
        
        for i in range(iteration):
            
            Chn = np.random.randint(Chn_start,Chn_end+1)*100
            StpPct = np.random.randint(StpPct_start,StpPct_end+1)/1000
            HighestHigh = np.array(data.High.rolling(Chn).max())
            LowestLow = np.array(data.Low.rolling(Chn).min())
            score = self.performance(Open,High,Close,Low,HighestHigh,LowestLow,Chn,StpPct,method)
        
            if score > b_score:
                Best_Chn = Chn
                Best_StpPct = StpPct
                b_score = score
            
        return (Best_Chn,Best_StpPct)    

In [None]:
def backtest(data,training_period,test_period):
    
    ##Input: int - training_period(months) 
    ##       int - test_period(months)
    
    start_train_date = add_months(data.index[0],0)
    start_test_date = add_months(start_train_date,training_period)
    end_date = data.index[-1]
    
    total_months = (end_date.year - start_test_date.year) * 12 + (end_date.month - start_test_date.month) + 1
    
    number_of_test = int(total_months/test_period)
    
    EquityC = np.array([100000])
    Parameters = []
    for index in range(number_of_test):
        print(index)
        data_train = data[(data.index >= add_months(start_train_date,test_period*index)) 
                          & (data.index < add_months(start_test_date,(index)*test_period))]
        data_test = data[(data.index >= add_months(start_test_date,test_period*index)) 
                         & (data.index < add_months(start_test_date,(index+1)*test_period))]
        
        param = a.fit_random_search(data_train, size = 35, iteration = 100, method = 'RoMaD')
        
        Parameters.append(param)
        
        new_data = pd.concat([data_train.iloc[-param[0]:],data_test])
        
        test_result = a.EquityCurve(new_data,param[0],param[1]) 
        test_append = test_result[param[0]:]+EquityC[-1]-100000
        EquityC = np.append(EquityC,test_append)
        
    return EquityC[1:],Parameters

In [None]:
## Example Code

data = HO[(HO.index > datetime(1996,1,1)) & (PL.index < datetime(2020,1,1))]

In [None]:
a = DonchianChannel(420,70)
start = time.time()
z = backtest(data,48,6)
end = time.time()
end - start

In [None]:
n = len(a.Transaction)
df = pd.DataFrame(columns=['StartPos','StartDate','StartTime','EntryPrice','EndPos','EndDate','EndTime','ClosePrice'])
for i in range(int(n/2)):
    df.loc[i] = a.Transaction[2*i]+a.Transaction[2*i+1]
df['Slpg'] = [91]*int(n/2)
profit = np.zeros(int(n/2))
for i in range(int(n/2)):
    if df.StartPos[i] == 'long':
        profit[i] = (df.ClosePrice[i]-df.EntryPrice[i])*420 - 91
    else:
        profit[i] = (df.EntryPrice[i]-df.ClosePrice[i])*420 - 91
df['Profit'] = profit
        
n_param = len(z[1])
df_param = pd.DataFrame(columns=['StartQuarter','ChannelLength','Stop Percent'])
start_q = data.index[0]
for i in range(n_param):
    df_param.loc[i] = [add_months(start_q,3*i+24),z[1][i][0],z[1][i][1]]
    
n_e = len(z[0])
df_EC = pd.DataFrame(columns=['Time','Equity'])
df_EC['Time']= data[data.index>=datetime(2012,1,1)].Time
df_EC['Equity'] = z[0]

In [None]:
df_EC.to_csv("HO_20yrs_EC_4Ytrain_1Qtest_1.csv")
df.to_csv("HO_20yrs_TradeTables_4Ytrain_1Qtest_1.csv")
df_param.to_csv("HO_20yrs_Parameters_4Ytrain_1Qtest_1.csv")