In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import itertools 
from itertools import product
from scipy.stats import linregress
from datetime import datetime
plt.style.use("seaborn")
pd.set_option("display.max_rows", None, "display.max_columns", None)#to display complete dataframe

In [53]:
class system_two():
    
    def __init__(self, file_name, start, end, timeframe, tc,  p1 = None, p2 = None, p3 = None):#Here we call all the data we're gonna work from the symbol to the 
        self.file_name = file_name #date range.  
           
        self.p1 = p1 #Bars since entry
        
        self.p2 = p2 
        
        self.p3 = p3 
        
        self.start = start#If we want to perform wf analysis, we should initialize the class with the first start in  
        self.end = end                                            #sample date, but with enough data to compute indicators.
        self.timeframe = timeframe
        self.tc = tc #transaction costs in pips (0.0001 notation)
        self.results = None #the self.results dataframe will be "None" until we call self.test_strategy() method
        self.get_data() #When the class is initialized, methods get_data() & prepare_data() are called inmediatelly
        self.prepare_data()

        ###################################### Trading Costs and pip value #################################################
        
        if ((self.file_name == "USDJPY_1h_BID_08_20_utc.csv")      | (self.file_name == "EURJPY_1h_BID_08_20_utc.csv")     |
            (self.file_name == "AUDJPY_1h_BID_08_20_utc.csv")      | (self.file_name =="USDJPY_4h_BID_05_21_EET_GMT.csv")  |
            (self.file_name =="EURJPY_4h_BID_05_21_EET_GMT.csv")   | (self.file_name =="AUDJPY_4h_BID_05_21_EET_GMT.csv")  |
            (self.file_name == "USDJPY_30m_BID_10_20_EET_GMT.csv") | (self.file_name == "EURJPY_30m_BID_10_20_EET_GMT.csv")|
            (self.file_name == "AUDJPY_30m_BID_10_20_EET_GMT.csv") | (self.file_name == "USDJPY_15m_BID_10_20_EET_GMT.csv")|
            (self.file_name == "EURJPY_15m_BID_10_20_EET_GMT.csv") | (self.file_name == "AUDJPY_15m_BID_10_20_EET_GMT.csv")|
            (self.file_name == "USDJPY_5m_BID_10_20_EET_GMT.csv")  | (self.file_name == "EURJPY_5m_BID_10_20_EET_GMT.csv") |
            (self.file_name == "AUDJPY_5m_BID_10_20_EET_GMT.csv") ):
                      
            self.tc = self.tc*100 #here we multiply by 100 to convert to jpy pips
            self.pip_value = 0.01
            
        else:
            self.tc = self.tc 
            self.pip_value = 0.0001
        
    def get_data(self):
        custom_date_parser = lambda x: datetime.strptime(x, "%d.%m.%Y %H:%M:%S.%f") #this function is a parser, this tells python how to recognize the datetime format we have in the csvfile from dukascopy
        raw = pd.read_csv(self.file_name, parse_dates = ["Gmttime"], date_parser=custom_date_parser, index_col = "Gmttime")
        raw = raw[["Open", "Close"]].dropna()              
        raw = raw.loc[self.start:self.end].copy() #We make a copy from the orininal imported df, and slice our date period
        self.data = raw
        
    def prepare_data(self): #Here we compute indicators which should be computed only once. 
        data = self.data.copy() #Make a copy from the df produced previouslly in self.get_data() method. 
        data = atr_percentage (data, atr_bp = 14, max_min_bp = 100)
        self.data = data
        
    def set_parameters(self, p1 = None, p2 = None, p3 = None):
        if p1 is not None:
            self.p1 = p1
            
        if p2 is not None:
            self.p2 = p2 #Parameter (LOOK BACK OF THE sma enveloped.)
            self.data = sma_envelop(dataframe = self.data, sma_period = self.p2, envelop_pips = self.pip_value*20)
            
        if p3 is not None:
            self.p3 = p3
            
    def test_strategy(self, start_test, end_test): #These parameters give us the possibility to perform a test in a determinated
        self.start_test = start_test               #range of time, without losing data computing sma or any other indicators.
        self.end_test = end_test
        
        data = self.data.copy().dropna()
        data = data.loc[self.start_test:self.end_test] #Cut the complete dataframe in the slice we want, after compute indicators
        data["hour"] = data.index.hour
        
        #####################
        bars_since_entry = self.p1 #parameter
        #####################

#//////////////////////////////////////////////// STRATEGY CORE //////////////////////////////////////////////////////////////

        position = [0] * len(data.Close)
        self.trades = []
        self.trades_dates = []
        entry_index = 0
        entry_price = 0

        for j in range (1, len(data.Close)):

            if (data.hour[j] == 14): #Entry hour
                
                if (position[j-1] == 0):
                    
                    if(data.Close[j-1] <= data.sma_lower[j-1]):

                        if(data.atr_percentage[j-1] <= 35):

                            position[j] = 1
                            entry_index = j
                            entry_price = data.Open[j]

            if ((position [j-1] == 1) & (j < (entry_index + bars_since_entry))):

                position[j] = 1

            if ((j == (entry_index + bars_since_entry)) & (position[j-1] == 1)):

                self.trades.append((data.Open[j] - entry_price)*-1) #Multiply by -1 if its a bear strategy
                self.trades_dates.append(data.index[j])
                
#/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

        self.trades #list which saves all the trades. 
        data["position"] = position
        data.dropna(inplace=True)
        self.results = data #dataframe which contains test results
        
############################################################################################################################### 
                   
        self.trades_w_costs = [w - self.tc for w in self.trades]#add trading costs       
            
###############################################################################################################################            
        
        if len(self.trades) > 0: #if there are more than 1 trade
            perf = np.cumsum(self.trades_w_costs)[-1] # absolute performance of the strategy
        else:
            perf = -1000
        
        return round(perf, 6)
    
        
    def optimize_parameters(self, opt_start, opt_end, param_1_range = None, param_2_range = None, param_3_range = None):
        self.opt_start = opt_start
        self.opt_end = opt_end     
        
        #param_1_range & param_2_range are tupples -simple arrays of three components- where the first is the start range,the second is the last range member & last is the pase or step
        #using * we can pass the tupples stored in SMA_S_range & SMA_L_range to the range function, if we don't use * we get an error because range function need 3 parameters, * "Unpacks" the tupple.
        
        if ((self.p1 != None) & (self.p2 == None) & (self.p3 == None)):
            combinations = list(range(*param_1_range))
            results = []              
            for comb in combinations: 
                self.set_parameters(comb)
                results.append(self.test_strategy(start_test = self.opt_start, end_test = self.opt_end))
            
        
        if ((self.p1 != None) & (self.p2 != None) & (self.p3 == None)):      
            combinations = list(product(range(*param_1_range), range(*param_2_range)))
            results = []              
            for comb in combinations: 
                self.set_parameters(comb[0], comb[1])
                results.append(self.test_strategy(start_test = self.opt_start, end_test = self.opt_end))
                
                
        if ((self.p1 != None) & (self.p2 != None) & (self.p3 != None)):      
            combinations = list(product(range(*param_1_range), range(*param_2_range), range(*param_3_range)))
            results = []              
            for comb in combinations: 
                self.set_parameters(comb[0], comb[1], comb[2])
                results.append(self.test_strategy(start_test = self.opt_start, end_test = self.opt_end))

        self.best_perf = np.max(results) # best performance
        self.opt = combinations[np.argmax(results)] # optimal parameters, np.argmax() gives me the index of the highest value in the 
                                               # array or list named results, and with this index i can call from the list 
                                               #combinations the parameters with the highest return. 
                   
        return self.opt, self.best_perf
    
    def walk_forward (self, wf_param_1_range = None, wf_param_2_range = None, wf_param_3_range = None):
        
        if (self.timeframe == "4h"):
            
            is_start_dates = ["2008-01-01", "2011-01-01", "2014-01-01"]
            is_end_dates   = ["2011-12-31", "2014-12-31", "2017-12-31"]       
            os_start_dates = ["2012-01-01", "2015-01-01", "2018-01-01"]
            os_end_dates   = ["2014-12-31", "2017-12-31", "2020-12-31"]
            wf_date_ranges = ["2012-2014", "2015-2017", "2018-2020"]        
        
        if (self.timeframe == "1h"):
        
            is_start_dates = ["2011-01-01", "2013-01-01", "2015-01-01", "2017-01-01"]
            is_end_dates   = ["2012-12-31", "2014-12-31", "2016-12-31", "2018-12-31"]       
            os_start_dates = ["2013-01-01", "2015-01-01", "2017-01-01", "2019-01-01"]
            os_end_dates   = ["2014-12-31", "2016-12-31", "2018-12-31", "2020-12-31"]
            wf_date_ranges = ["2013-2014", "2015-2016", "2017-2018", "2019-2020"]
            
        if (self.timeframe == "30m"):
        
            is_start_dates = ["2013-01-01", "2015-01-01", "2017-01-01"]
            is_end_dates   = ["2014-12-31", "2016-12-31", "2018-12-31"]       
            os_start_dates = ["2015-01-01", "2017-01-01", "2019-01-01"]
            os_end_dates   = ["2016-12-31", "2018-12-31", "2020-12-31"]
            wf_date_ranges = ["2015-2016", "2017-2018", "2019-2020"]
            
        if (self.timeframe == "15m"):
            
            is_start_dates = ["2014-01-01", "2016-01-01", "2018-01-01"]
            is_end_dates   = ["2014-12-31", "2016-12-31", "2018-12-31"]       
            os_start_dates = ["2015-01-01", "2017-01-01", "2019-01-01"]
            os_end_dates   = ["2016-12-31", "2018-12-31", "2020-12-31"]
            wf_date_ranges = ["2015-2016", "2017-2018", "2019-2020"]
         
        if (self.timeframe == "5m"):
            
            is_start_dates = ["2014-01-01", "2016-01-01", "2018-01-01"]
            is_end_dates   = ["2014-12-31", "2016-12-31", "2018-12-31"]       
            os_start_dates = ["2015-01-01", "2017-01-01", "2019-01-01"]
            os_end_dates   = ["2016-12-31", "2018-12-31", "2020-12-31"]
            wf_date_ranges = ["2015-2016", "2017-2018", "2019-2020"]
            
        
        self.wf_opt_parameters = []
        wf_performance = []
             
        if ((self.p1 != None) & (self.p2 == None) & (self.p3 == None)):
            for i in range (len(is_start_dates)):
                self.optimize_parameters(is_start_dates[i], is_end_dates[i], wf_param_1_range)
                self.wf_opt_parameters.append(self.opt)                                                                                      
                wf_performance.append(self.best_perf)
                
        if ((self.p1 != None) & (self.p2 != None) & (self.p3 == None)): 
            for i in range (len(is_start_dates)):
                self.optimize_parameters(is_start_dates[i],is_end_dates[i], wf_param_1_range, wf_param_2_range)
                self.wf_opt_parameters.append(self.opt)                                                                                      
                wf_performance.append(self.best_perf)
                
        if ((self.p1 != None) & (self.p2 != None) & (self.p3 != None)):
            for i in range (len(is_start_dates)):
                self.optimize_parameters(is_start_dates[i],is_end_dates[i], wf_param_1_range, wf_param_2_range,
                                                                                                         wf_param_3_range)
                self.wf_opt_parameters.append(self.opt)                                                                                      
                wf_performance.append(self.best_perf)
                
                
        if ((self.p1 != None) & (self.p2 == None) & (self.p3 == None)): 
            self.wf_opt_parameters = pd.DataFrame (data = self.wf_opt_parameters, columns = ["param_1"])
            
        if ((self.p1 != None) & (self.p2 != None) & (self.p3 == None)): 
            self.wf_opt_parameters = pd.DataFrame (data = self.wf_opt_parameters, columns = ["param_1", "param_2"])
            
        if ((self.p1 != None) & (self.p2 != None) & (self.p3 != None)):
            self.wf_opt_parameters = pd.DataFrame (data = self.wf_opt_parameters, columns = ["param_1", "param_2",
                                                                                                        "param_3"])

        self.wf_opt_parameters ["dates"] = wf_date_ranges
        self.wf_opt_parameters = self.wf_opt_parameters.set_index("dates")
        self.wf_opt_parameters ["performance"] = wf_performance 
        
        
        performance_os = [] #Contains the absolute performance of the out of sample periods
        list_of_total_trades = [] #list of lists.
        list_of_df_results = [] #contains the four pd dataframes we get for each test, this is a list which contains 
                                                                                                          #dataframes ¡WTF! xD
        
        if ((self.p1 != None) & (self.p2 == None) & (self.p3 == None)):
            for i in range (len(is_start_dates)):
                self.set_parameters(self.wf_opt_parameters.param_1[i])
                performance_os.append(self.test_strategy(start_test = os_start_dates[i], end_test = os_end_dates[i]))
                list_of_total_trades.append(self.trades)
                list_of_df_results.append(self.results)
        
        if ((self.p1 != None) & (self.p2 != None) & (self.p3 == None)):
            for i in range (len(is_start_dates)):
                self.set_parameters(self.wf_opt_parameters.param_1[i], self.wf_opt_parameters.param_2[i])
                performance_os.append(self.test_strategy(start_test = os_start_dates[i], end_test = os_end_dates[i]))
                list_of_total_trades.append(self.trades)
                list_of_df_results.append(self.results)
        
        if ((self.p1 != None) & (self.p2 != None) & (self.p3 != None)):
            for i in range (len(is_start_dates)):
                self.set_parameters(self.wf_opt_parameters.param_1[i], self.wf_opt_parameters.param_2[i],
                                                                                       self.wf_opt_parameters.param_3[i])
                performance_os.append(self.test_strategy(start_test = os_start_dates[i], end_test = os_end_dates[i]))
                list_of_total_trades.append(self.trades)
                list_of_df_results.append(self.results)
        
        self.df_complete = pd.concat(list_of_df_results) #Here we joint the dataframes generated containing the resoults of the out of sample periods  
        
        self.total_trades=[]
        for i in list_of_total_trades: #with these fors we joint the list of lists into one only list named self.total_trades
            for j in i:
                self.total_trades.append(j)
        
        self.total_trades_with_costs = [w - self.tc for w in self.total_trades] # add trading costs  
         
    #--------------------------------------------------------------------------------------------------------------------------
        if len(self.total_trades_with_costs) > 0: #if there are more than 1 trade
            
            ###################################### Calculate profit factor ##############################################
        
            pos = [w for w in self.total_trades_with_costs if w > 0]
            neg = [w for w in self.total_trades_with_costs if w < 0]

            gain = np.sum(pos)
            lost = abs(np.sum(neg))

            self.inversed_pf = round (lost/gain, 3) #inversed profit factor

            ###################################### Calculate slope and r^2 ##############################################

            trend = linregress (range(0,len(self.total_trades_with_costs), 1), np.cumsum(self.total_trades_with_costs))

            self.slope = trend.slope       #slope of the equity curve
            self.r_sqrt = trend.rvalue**2  #r^2 of the linear regression

            #################################### Calculate profitable index #############################################

            self.index = (self.inversed_pf / self.r_sqrt) #This index must be less than 1 to be profitable and the less the better

            #############################################################################################################
            
        else:
            self.index = -1000
        
        
        
        return self.total_trades
    
    def plot_wf (self):
        
        self.self.total_trades.cumsum().plot(figsize=(12, 8))