In [22]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
from itertools import product
from binance.client import Client
import warnings
plt.style.use('seaborn')
warnings.filterwarnings('ignore')
import seaborn as sns

In [28]:
class Backtester():
    '''
    
    Attributes
    ===========
    
    filepath: str
    
    symbol: str
    
    start: str
    
    end: str
    
    tc: float
        proportional trading costs per trade. 
        
    Methods
    ========
    
    get_data: imports the data
    
    test_strategy: prepares data and backtest strategy
    
    prepare_data: prepares the data for backtesting
    
    run_backtest: runs the strategy backtest
    
    plot_results: plots cummulative performance
    
    optimize_strategy: backtests with different parameters
    
    find_best_strategy: finds optimal strategy
    
    print_performance: calculates and prints various performance metrics'''
    
    def __init__(self, symbol, start, end, interval):
        
        self.symbol = symbol
        self.start = start
        self.end = end
        self.interval = interval
        self.api = 'bDbPPyfDDlt2S4PGQJnvGITtKMY82JsQyDlWUzYzNP59pOlAm0m7S2DmGkiAKrnT'
        self.secret = 'xHP8Vp6obdqETUCoVvkGIBZsnTvK6mgNWKQI5cP132OkNPjCkCbEUIc8XzwctVBh'
#         self.available_intervals = ['1m', '3m', '5m', '15m', '30m', '1h', '2h', '6h', '8h', '10h', '12h', '1d', '3d', '1w', '1M']
        self.results = None
        self.tc = -0.00085
        self.return_thresholds = []
        self.volume_thresholds = []
        self.get_data()
        self.tp_year = (self.data.Close.count() / ((self.data.index[-1] - self.data.index[0]).days / 365.25))
        
        #Setting up the condition variables
        self.cond1 = None
        self.cond2 = None
        self.cond3 = None
        self.cond4 = None
        
    def __repr__(self):
        return 'Backtester(symbol = {}, start = {}, end = {})'.format(self.symbol, self.start, self.end)
    
    def get_data(self):
        
        client = Client(api_key = self.api, api_secret = self.secret, tld = "com")
        
        bars = client.get_historical_klines(symbol = self.symbol, interval = self.interval, 
                                       start_str = self.start, end_str = self.end, limit = 1000)
        
        df = pd.DataFrame(bars)
        #df.iloc[:,0] select all rows of column 0
        df['Date'] = pd.to_datetime(df.iloc[:,0], unit = 'ms')
        df.columns = ['Open Time', 'Open', 'High', 'Low', 'Close',
               'Volume', 'Close Time', 'Quote Asset Volume', 
               'Number of Trades', 'Tracker Buy Base Asset Volume',
               'Taker Buy Quote Asset Volume', 'Ignore', 'Date']
        df = df[['Date','Close', 'Volume']].copy()
        df.set_index("Date", inplace = True)
        for column in df.columns:
            df[column] = pd.to_numeric(df[column], errors = 'coerce')
            
        df['returns'] = np.log(df.Close / df.Close.shift(1))
        
        self.multiple = np.exp(df.returns.sum())
        
        df['creturns'] = np.exp(df.returns.cumsum())
        self.data = df
        self.prepare_data()
        
    def test_strategy(self, percentiles = None, thresh = None):
        ''' 
        Prepares the data and backtests the trading strategy incl. reporting (Wrapper)
        
        Parameters
        ==========
        percentiles: tuple (return_low_perc, return_high_perc ,vol_low_perc, vol_high_perc)
            return and volume percentiles to be considered for the strategy.
            
        thresh: tuple (return_low_thresh, return_high_thresh,vol_low_thresh, vol_high_perc)
            return and volume thresholds to be considered for the strategy'''
        
        self.run_backtest()
        
        data = self.results.copy()
        data['creturns'] = data['returns'].cumsum().apply(np.exp)
        data['cstrategy'] = data['strategy'].cumsum().apply(np.exp)
        self.results = data
        
        self.print_performance()
        
        
        
    def prepare_data(self):
        ''' Prepares the Data for Backtesting
        '''
        ###########################Strategy Specific#################################
        data = self.data[['Close', 'Volume', 'returns']].copy()
        data['vol_ch'] = np.log(data.Volume.div(data.Volume.shift(1)))
        data.loc[data.vol_ch > 3, 'vol_ch'] = np.nan
        data.loc[data.vol_ch < -3, 'vol_ch'] = np.nan
        
        self.data = data
        
        
    def setup_thresholds(self, return_thresh, volume_thresh, optimize = False):
        '''
        Use this method like indicated below:
            4 arguments: [10,20,80,90] = where the first two are related to either return or volume for SHORT position. 
            The values in the 3 and 4 position are LONG position for either return or volume. 
            
            2 arguments: [10,90] = where the first argument is SHORT position for either return or volume. 
            The second argument is LONG position for either return or volume.
        '''
        data = self.data
        
        self.return_thresh = return_thresh
        self.volume_thresh = volume_thresh
        
        if(len(return_thresh) == 4 and len(volume_thresh) == 4):
            #Assigning percentil values to the integers provided
            self.return_thresholds = np.percentile(data.returns.dropna(), [return_thresh[0], return_thresh[1], return_thresh[2], return_thresh[3]])
            self.volume_thresholds = np.percentile(data.vol_ch.dropna(), [volume_thresh[0], volume_thresh[1], volume_thresh[2], volume_thresh[3]])
            
            #Assigning return conditions
            self.cond1 = data.returns.between(self.return_thresholds[0], self.return_thresholds[1])
            self.cond3 = data.returns.between(self.return_thresholds[2], self.return_thresholds[3])
            
            #Assigning volume conditions
            self.cond2 = data.vol_ch.between(self.volume_thresholds[0], self.volume_thresholds[1])
            self.cond4 = data.vol_ch.between(self.volume_thresholds[2], self.volume_thresholds[3])
            
        elif(len(return_thresh) == 4 and len(volume_thresh) == 2):
            #Assigning percentil values to the integers provided
            self.return_thresholds = np.percentile(data.returns.dropna(), [return_thresh[0], return_thresh[1], return_thresh[2], return_thresh[3]])
            self.volume_thresholds = np.percentile(data.vol_ch.dropna(), [volume_thresh[0], volume_thresh[1]])
            
            #Assigning volume conditions
            if(volume_thresh[0] > 50):
                self.cond2 = data.vol_ch >= self.volume_thresholds[0]
            else:
                self.cond2 = data.vol_ch <= self.volume_thresholds[0]
                
            if(volume_thresh[1] > 50):
                self.cond4 = data.vol_ch >= self.volume_thresholds[1]
            else:
                self.cond4 = data.vol_ch <= self.volume_thresholds[1]
                
            #Assigning return conditions    
            self.cond1 = data.returns.between(self.return_thresholds[0], self.return_thresholds[1])
            self.cond3 = data.returns.between(self.return_thresholds[2], self.return_thresholds[3])
            
        elif(len(return_thresh) == 2 and len(volume_thresh) == 4):
            #Assigning percentil values to the integers provided
            self.return_thresholds = np.percentile(data.returns.dropna(), [return_thresh[0], return_thresh[1]])
            self.volume_thresholds = np.percentile(data.vol_ch.dropna(), [volume_thresh[0], volume_thresh[1], volume_thresh[2], volume_thresh[3]])
            
            #Assigning return conditions
            if(return_thresh[0] > 50):
                self.cond1 = data.returns >= self.return_thresholds[0]
            else:
                self.cond1 = data.returns <= self.return_thresholds[0]
                
            if(return_thresh[1] > 50):
                self.cond3 = data.returns >= self.return_thresholds[1]
            else:
                self.cond3 = data.returns <= self.return_thresholds[1]
            
            #Assigning volume conditions
            self.cond2 = data.vol_ch.between(self.volume_thresholds[0], self.volume_thresholds[1])
            self.cond4 = data.vol_ch.between(self.volume_thresholds[2], self.volume_thresholds[3])
            
        elif(len(return_thresh) == 2 and len(volume_thresh) == 2):
            #Assigning percentil values to the integers provided
            self.return_thresholds = np.percentile(data.returns.dropna(), [return_thresh[0], return_thresh[1]])
            self.volume_thresholds = np.percentile(data.vol_ch.dropna(), [volume_thresh[0], volume_thresh[1]])
            
            #Assigning return conditions
            if(return_thresh[0] > 50):
                self.cond1 = data.returns >= self.return_thresholds[0]
            else:
                self.cond1 = data.returns <= self.return_thresholds[0]
                
            if(return_thresh[1] > 50):
                self.cond3 = data.returns >= self.return_thresholds[1]
            else:
                self.cond3 = data.returns <= self.return_thresholds[1]
            
            #Assigning volume conditions
            if(volume_thresh[0] > 50):
                self.cond2 = data.vol_ch >= self.volume_thresholds[0]
            else:
                self.cond2 = data.vol_ch <= self.volume_thresholds[0]
                
            if(volume_thresh[1] > 50):
                self.cond4 = data.vol_ch >= self.volume_thresholds[1]
            else:
                self.cond4 = data.vol_ch <= self.volume_thresholds[1]
        
        data['position'] = 0
        data.loc[self.cond1 & self.cond2, 'position'] = -1
        data.loc[self.cond3 & self.cond4, 'position'] = 1
        
        self.results = data
        
        if(not optimize):
            self.test_strategy()
        
    def mean_matrix(self):
        data = self.data
        data['return_category'] = pd.qcut(data.returns, q = 10, labels = [-5, -4, -3, -2, -1, 1, 2, 3, 4, 5])
        data['volume_category'] = pd.qcut(data.vol_ch, q = 10, labels = [-5, -4, -3, -2, -1, 1, 2, 3, 4, 5])
        
        matrix = pd.crosstab(data.volume_category.shift(), data.return_category.shift(), values = data.returns, aggfunc = np.mean )
        
        plt.figure(figsize = (12,8))
        sns.set(font_scale = 1)
        sns.heatmap(matrix, cmap = 'RdYlBu', annot = True, robust = True, fmt = '.5f')
        plt.show()
        
        self.matrix = matrix
        self.data = data
        
    def run_backtest(self):
        '''Runs the strategy backtest
        '''
        
        data = self.data.copy()
        data['strategy'] = data['position'].shift(1) * data['returns']
        data['trades'] = data.position.diff().fillna(0).abs()
        data.strategy = data.strategy + data.trades * self.tc
        
        self.results = data
        
    def plot_results(self):
        ''' Plots the cumulative performance of the trading strategy compared to buy and hold
        '''
        
        if self.results is None:
            print('Run test_strategy() first.')
        else:
            title = '{} | TC = {}'.format(self.symbol, self.tc)
            self.results[['creturns', 'cstrategy']].plot(title = title, figsize=(12,8))
            
    def optimize_strategy(self, metric = 'Multiple'):
        ''' 
        Backtests strategy for different parameter values incl. Optimization and Reporting (Wrapper).
        
        Parameters
        ==========
        
        return_low_range: tuple
            tuples of the form (start, end, step_size)
            
        return_high_range: tuple
            tuples of the form (start, end, step_size)
            
        vol_low_range: tuple
            tuples of the form (start, end, step_size)
            
        vol_high_range: tuple
            tuples of the form (start, end, step_size)
            
        metric: str
            performance metric to be optimized (can be "Multiple" or "Sharpe")
            '''
        width = 4
        width2 = 3
        width3 = 2
        sequence = 1
        self.metric = metric
        return_thresh = self.return_thresh
        volume_thresh = self.volume_thresh
        
        if metric == 'Multiple':
            performance_function = self.calculate_multiple
        elif metric == 'Sharpe':
            performance_function = self.calculate_sharpe
            
        if(len(self.return_thresh) == 4 and len(self.volume_thresh) == 4):
            #calculating the range of returns
            if(np.max(return_thresh) < 98 and np.min(return_thresh) > 2 and np.max(volume_thresh) < 98 and np.min(volume_thresh) > 2):
                return_low_short = range(*(return_thresh[0] - width3, return_thresh[0] + width3, sequence))
                return_high_short = range(*(return_thresh[1] - width3, return_thresh[1] + width3, sequence))
                return_low_long = range(*(return_thresh[2] - width3, return_thresh[2] + width3, sequence))
                return_high_long = range(*(return_thresh[3] - width3, return_thresh[3] + width3, sequence))
                #calculating the range of volume
                vol_low_short = range(*(volume_thresh[0] - width3, volume_thresh[0] + width3, sequence))
                vol_high_short = range(*(volume_thresh[1] - width3, volume_thresh[1] + width3, sequence))
                vol_low_long = range(*(volume_thresh[2] - width3, volume_thresh[2] + width3, sequence))
                vol_high_long = range(*(volume_thresh[3] - width3, volume_thresh[3] + width3, sequence))
                #calculating the combinations of the above ranges
                combinations = list(product(return_low_short, return_high_short,return_low_long, return_high_long,vol_low_short, vol_high_short, vol_low_long, vol_high_long))
                #create performance array to store the individual multiple or result of each combination
                performance = []
                for comb in combinations:
                    self.setup_thresholds( [comb[0], comb[1], comb[2], comb[3]], [comb[4], comb[5], comb[6], comb[7]], True)
                    self.run_backtest()
                    performance.append(performance_function(self.results.strategy))
                #store the result of each full combination to determine at a later moment the best one
                self.results_overview = pd.DataFrame(data = np.array(combinations), columns = ['return_low_short', 'return_high_short','return_low_long', 'return_high_long','vol_low_short', 'vol_high_short','vol_low_long', 'vol_high_long'])
                self.results_overview['performance'] = performance
                self.find_best_strategy()
            else:
                print('Los percentiles brindados se salen de los parametros de optimizacion debe de seleccionar parametros menos a 98 y mayores 2')
        elif(len(self.return_thresh) == 4 and len(self.volume_thresh) == 2):
            #calculating the range of returns
            if(np.max(return_thresh) < 94 and np.min(return_thresh) > 6 and np.max(volume_thresh) < 94 and np.min(volume_thresh) > 6):
                return_low_short = range(*(return_thresh[0] - width2, return_thresh[0] + width2, sequence))
                return_high_short = range(*(return_thresh[1] - width2, return_thresh[1] + width2, sequence))
                return_low_long = range(*(return_thresh[2] - width2, return_thresh[2] + width2, sequence))
                return_high_long = range(*(return_thresh[3] - width2, return_thresh[3] + width2, sequence))
                #calculating the range of volume
                vol_short = range(*(volume_thresh[0] - width, volume_thresh[0] + width, sequence))
                vol_long = range(*(volume_thresh[1] - width, volume_thresh[1] + width, sequence))
                #calculating the combinations of the above ranges
                combinations = list(product(return_low_short, return_high_short,return_low_long, return_high_long, vol_short, vol_long))
                #create performance array to store the individual multiple or result of each combination
                performance = []
                for comb in combinations:
                    self.setup_thresholds( [comb[0], comb[1], comb[2], comb[3]], [comb[4], comb[5]], True)
                    self.run_backtest()
                    performance.append(performance_function(self.results.strategy))
                #store the result of each full combination to determine at a later moment the best one
                self.results_overview = pd.DataFrame(data = np.array(combinations), columns = ['return_low_short', 'return_high_short','return_low_long', 'return_high_long','vol_short','vol_long'])
                self.results_overview['performance'] = performance
                self.find_best_strategy()
            else:
                print('Los percentiles brindados se salen de los parametros de optimizacion debe de seleccionar parametros menos a 94 y mayores 6')
        elif(len(self.return_thresh) == 2 and len(self.volume_thresh) == 4):
            #calculating the range of returns
            if(np.max(return_thresh) < 94 and np.min(return_thresh) > 6 and np.max(volume_thresh) < 94 and np.min(volume_thresh) > 6):
                return_low_short = range(*(return_thresh[0] - width, return_thresh[0] + width, sequence))
                return_high_long = range(*(return_thresh[1] - width, return_thresh[1] + width, sequence))
                #calculating the range of volume
                vol_low_short = range(*(volume_thresh[0] - width2, volume_thresh[0] + width2, sequence))
                vol_high_short = range(*(volume_thresh[1] - width2, volume_thresh[1] + width2, sequence))
                vol_low_long = range(*(volume_thresh[2] - width2, volume_thresh[2] + width2, sequence))
                vol_high_long = range(*(volume_thresh[3] - width2, volume_thresh[3] + width2, sequence))
                #calculating the combinations of the above ranges
                combinations = list(product(return_low_short, return_high_long,vol_low_short, vol_high_short, vol_low_long, vol_high_long))
                #create performance array to store the individual multiple or result of each combination
                performance = []
                for comb in combinations:
                    self.setup_thresholds( [comb[0], comb[1]], [comb[2], comb[3], comb[4], comb[5]], True)
                    self.run_backtest()
                    performance.append(performance_function(self.results.strategy))
                #store the result of each full combination to determine at a later moment the best one
                self.results_overview = pd.DataFrame(data = np.array(combinations), columns = ['return_short', 'return_long','vol_low_short', 'vol_high_short','vol_low_long', 'vol_high_long'])
                self.results_overview['performance'] = performance
                self.find_best_strategy()
            else:
                print('Los percentiles brindados se salen de los parametros de optimizacion debe de seleccionar parametros menos a 94 y mayores 6')
            
        elif(len(self.return_thresh) == 2 and len(self.volume_thresh) == 2):
            #calculating the range of returns
            if(np.max(return_thresh) < 94 and np.min(return_thresh) > 6 and np.max(volume_thresh) < 94 and np.min(volume_thresh) > 6):
                return_low_short = range(*(return_thresh[0] - width, return_thresh[0] + width, sequence))
                return_high_long = range(*(return_thresh[1] - width, return_thresh[1] + width, sequence))
                #calculating the range of volume
                vol_low_short = range(*(volume_thresh[0] - width, volume_thresh[0] + width, sequence))
                vol_high_long = range(*(volume_thresh[1] - width, volume_thresh[1] + width, sequence))
                #calculating the combinations of the above ranges
                combinations = list(product(return_low_short, return_high_long,vol_low_short, vol_high_long))
                #create performance array to store the individual multiple or result of each combination
                performance = []
                for comb in combinations:
                    self.setup_thresholds( [comb[0], comb[1]], [comb[2], comb[3]], True)
                    self.run_backtest()
                    performance.append(performance_function(self.results.strategy))
                #store the result of each full combination to determine at a later moment the best one
                self.results_overview = pd.DataFrame(data = np.array(combinations), columns = ['return_short', 'return_long','vol_short', 'vol_long'])
                self.results_overview['performance'] = performance
                self.find_best_strategy()
            else:
                print('Los percentiles brindados se salen de los parametros de optimizacion debe de seleccionar parametros menos a 94 y mayores 6')
        
    def find_best_strategy(self):
        ''' Finds the optimal strategy (global maximum).
        '''
        
        best = self.results_overview.nlargest(1, 'performance')
        if(len(self.return_thresh)==4):
            return_perc = [best.return_low_short.iloc[0], best.return_high_short.iloc[0],best.return_low_long.iloc[0], best.return_high_long.iloc[0] ]
        else:
            return_perc = [best.return_short.iloc[0], best.return_long.iloc[0]]
            
        if(len(self.volume_thresh)==4):
            vol_perc = [best.vol_low_short.iloc[0], best.vol_high_short.iloc[0], best.vol_low_long.iloc[0], best.vol_high_long.iloc[0]]
        else:
            vol_perc = [best.vol_short.iloc[0], best.vol_long.iloc[0]]
        
        perf = best.performance.iloc[0]
        print('Return_Perc: {} | Volume_Perc: {} : {}'.format(return_perc, vol_perc, perf))
        self.setup_thresholds(return_perc, vol_perc, False)
        
        ##########################PERFORMANCE#################################
        
    def print_performance(self):
        ''' Calculates and prints various Performance Metrics.
        '''
        
        data = self.results.copy()
        strategy_multiple = round(self.calculate_multiple(data.strategy), 6)
        bh_multiple = round(self.calculate_multiple(data.returns), 6)
        outperf = round(strategy_multiple - bh_multiple, 6)
        cagr = round(self.calculate_cagr(data.strategy), 6)
        ann_mean = round(self.calculate_annualized_mean(data.strategy), 6)
        ann_std = round(self.calculate_annualized_std(data.strategy), 6)
        sharpe = round(self.calculate_sharpe(data.strategy), 6)
        
        print(100 * '=')
        print('SIMPLE PRICE & VOLUME STRATEGY | INSTRUMENT = {} | THRESHOLDS = {} | {}'.format(self.symbol, self.return_thresholds ,self.volume_thresholds))
        print(100 * '=')
        print('PERFORMANCE MEASURES:')
        print('\n')
        print('Multiple (Strategy):             {}'.format(strategy_multiple))
        print('Multiple (Buy-and-Hold):         {}'.format(bh_multiple))
        print(38 * '-')
        print('Out-/ Underperformance:          {}'.format(outperf))
        print('\n')
        print('CAGR:                            {}'.format(cagr))
        print('Annualized Mean:                 {}'.format(ann_mean))
        print('Annualized Std:                  {}'.format(ann_std))
        print('Sharpe Ratio:                    {}'.format(sharpe))
        
        print(100 * '=')
        
    def export_excel(self):
        self.results.to_excel('data_table.xls')
        
    def calculate_multiple(self , series):
        return np.exp(series.sum())
    
    def calculate_cagr(self, series):
        return np.exp(series.sum()) ** (1/((series.index[-1] - series.index[0]).days / 365.25)) - 1
    
    def calculate_annualized_mean(self, series):
        return series.mean() * self.tp_year
    
    def calculate_annualized_std(self, series):
        return series.std() * np.sqrt(self.tp_year)
    
    def calculate_sharpe(self, series):
        if series.std() == 0:
            return np.nan
        else:
            return self.calculate_cagr(series) / self.calculate_annualized_std(series)