In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from itertools import product

In [6]:
class MeanReversionBacktester():
    '''
    Class for the vectorised backtesting of mean-reversion strategies based on Bollinger Bands.
    '''
    
    def __init__(self, symbol, sma, deviations, start, end, spread=None, tcost=None):
        '''
        Parameters
        ----------
        symbol: str
            The ticker symbol of the instrument that will be backtested.
        sma: int
            The window for the simple moving average. 
        deviations:
            The distance of the bollinger bands from the SMA. 
            Calculated in standard deviations from the mean (i.e. the SMA).
        start: str
            The start date for imported data.
        end: str
            The end date for imported data.
        spread:
            The spread between Buy and Sell prices. If you do not know the tcost,
            input this parameter.
        tcost: float
            The proportional transaction/trading costs per trade. 
        
        Note: For intraday_pairs.csv, data starts at 2018-01-01, and ends at 2019-12-30.
              Symbols on the file are EURUSD, GBPUSD and EURAUD.
        '''
        self.symbol = symbol
        self.SMA = sma
        self.dev = deviations
        self.start = start
        self.end = end
        self.spread = spread
        self.tc = tcost
        self.results = None
        self.get_data()
        if self.tc is None:
            self.get_tcost()
    
    def __repr__(self):
        return f'ContrarianBacktester(ticker = {self.symbol}, start = {self.start}, end = {self.end}, transactioncost = {self.tc}, SMA = {self.SMA}, deviations = {self.dev})'
    
    def get_data(self):
        '''
        Imports the data from a given source. Default is intraday_pairs.csv.
        '''
        raw = pd.read_csv(self.df, parse_dates=['time'], index_col='time')
        raw = raw[self.symbol].to_frame().dropna()
        raw = raw.loc[self.start:self.end].copy()
        raw.rename(columns = {self.symbol: 'Price'}, inplace = True)
        raw['Returns'] = np.log(raw/raw.shift(1))
        self.data = raw
    
    def get_tcost(self):
        
        data = self.data.copy().dropna()
        halfspread = (self.spread*0.0001)/2
        self.tc = halfspread / data['Price'].mean()       
    
    def set_parameters(self, sma=None, dev=None):
        
        '''
        Updates SMA and deviations parameters, and updates the prepared dataset to reflect the new parameters.
        '''
        if sma is not None:
            self.SMA = sma
            self.data['SMA'] = self.data['Price'].rolling(self.SMA).mean()
            self.data['Lower Band'] = self.data['SMA'] - (self.data['Price'].rolling(self.SMA).std() * self.dev)
            self.data['Upper Band'] = self.data['SMA'] + (self.data['Price'].rolling(self.SMA).std() * self.dev)
        if dev is not None:
            self.dev = dev
            self.data['Lower Band'] = self.data['SMA'] - (self.data['Price'].rolling(self.SMA).std() * self.dev)
            self.data['Upper Band'] = self.data['SMA'] + (self.data['Price'].rolling(self.SMA).std() * self.dev)
    
    def test_strategy(self):
        '''
        Backtests the strategy.
        '''
        
        data = self.data.copy().dropna()
        data['SMA'] = data['Price'].rolling(self.SMA).mean()
        data['Lower Band'] = data['SMA'] - (data['Price'].rolling(self.SMA).std() * self.dev)
        data['Upper Band'] = data['SMA'] + (data['Price'].rolling(self.SMA).std() * self.dev)
        data['Distance'] = data.Price - data.SMA
        
        data['Position'] = np.where(data.Price < data['Lower Band'], 1, np.nan)
        data['Position'] = np.where(data.Price > data['Upper Band'], -1, data['Position'])
        data['Position'] = np.where(data.Distance * data.Distance.shift(1) < 0, 0, data['Position'])
        data.Position = data.Position.ffill().fillna(0)
        data['Strategy Returns'] = data.Position.shift(1) * data['Returns']
        data.dropna(inplace=True)
        data['Cumulative Returns'] = data.Returns.cumsum().apply(np.exp)
        data['Cumulative Strat Returns'] = data['Strategy Returns'].cumsum().apply(np.exp)
        data['Trades'] = data.Position.diff().fillna(0).abs()
        data['Strategy Net Return'] = data['Strategy Returns'] - (data.Trades * self.tc)
        data['Cumulative Net Strat Return'] = data['Strategy Net Return'].cumsum().apply(np.exp)
        self.results = data
        
        performance = data['Cumulative Net Strat Return'].iloc[-1]
        outperform = performance - data['Cumulative Returns'].iloc[-1]
        return round(performance, 6), round(outperform, 6)
    
    def plot_results(self):
        '''
        Plots the performance of the trading strategy, and compares to the benchmark 
        i.e. if you had simply bought and held the instrument.
        '''
        if self.results is None:
            print("Run test_strategy() first.")
        else:
            title = f'{self.symbol} | SMA = {self.SMA} | Band Deviations = {self.dev} | Transaction Cost = {self.tc}'
            self.results[['Cumulative Returns', 
                          'Cumulative Strat Returns', 
                          'Cumulative Net Strat Return']].plot(title=title, figsize = (11,7))
    
    def optimise_parameters(self, SMArange, devrange):
        '''
        Finds the optimal strategy by finding the global maximum given the window range.
        
        Parameters
        
        SMA range: tuple
            Tuple of the form (start, end, step size), indicating the range of possible
            SMA window values. To be plugged into a range function that is used to calculate all
            combinations of possible SMA and deviation values.
        devrange: tuple
            Tuple of the form (start, end, step size), indicating the range of possible 
            bollinger band widths. To be plugged into a range function that is used to calculate 
            all combinations of possible SMA and deviation values.
        '''
        
        combinations = list(product(range(*SMArange),range(*devrange)))
        
        #testing combinations
        results = []
        for comb in combinations:
            self.set_parameters(comb[0], comb[1])
            results.append(self.test_strategy()[0])
        
        best_performance = np.max(results) #best performance
        optimum = combinations[np.argmax(results)] #the parameters of the best performance
        
        #run the optimal strat
        self.set_parameters(optimum[0], optimum[1])
        self.test_strategy()
        
        #create a df with the variety of results
        resultsframe = pd.DataFrame(data = combinations, columns = ['SMA', 'Deviations'])
        resultsframe['Performance'] = results
        self.results_overview = resultsframe
        
        return optimum, best_performance
    
    