# Backtester



### Setup

In [162]:
import logging
from rich.logging import RichHandler
logging.basicConfig(format="[u]%(funcName)s[/](): %(message)s",
                    datefmt="[%x %X]",
                    level=logging.DEBUG,
                    force=True,
                    handlers=[RichHandler(rich_tracebacks=True,
                                          markup=True)])
from secrets import *
from settings import *
from typing import Union
from flatten_json import flatten
from interpreter import interpret
import json
import pymt5adapter as mt5
import pytz
from datetime import datetime, timedelta
import numpy as np
import time
import math
import enum

from rich.progress import (Progress, BarColumn,
                           TimeRemainingColumn, SpinnerColumn)

from rich.console import Console
from rich import box
from rich.table import Table, Column
import plotly.express as px
import pandas as pd
pd.options.mode.chained_assignment = None  # default='warn'

log = logging.getLogger()

In [2]:
mt5_log = logging.getLogger('mt5')
mt5_log.setLevel(logging.WARNING)
conn = mt5.connected(
            path=TERMINAL_PATH,
            portable=True,
            server=server_mt5,
            login=login_mt5,
            password=password_mt5,
            timeout=5000,
            logger=mt5_log, # default is None
            ensure_trade_enabled=False,  # default is False
            enable_real_trading=True,  # default is False
            raise_on_errors=True,  # default is False
            return_as_dict=False, # default is False 
            return_as_native_python_objects=False )

In [3]:
FIRST_PARSE_TF = mt5.TIMEFRAME.M30
SECOND_PARSE_TF = mt5.TIMEFRAME.M1

## Signals Manipulation

### clean_json()

In [161]:
def clean_json(json_path, tz_localize=pytz.timezone('Europe/Rome')):
    '''Takes in a messy json and cleans it to use it for backtesting'''
    
    with open(json_path, 'r', encoding="utf8") as f:
        data = json.loads(f.read())
    
    # use flatten module to unnest values
    raw_data = [flatten(d) for d in data['messages']]
    
    # it's easier to manipulate with pandas
    df = pd.DataFrame(raw_data)
    
    # merges different nesting-level texts
    text_cols = df.columns.intersection([f'text_{i}' for i in range(0,100)] +
                                        [f'text_{i}_text' for i in range(0,100)])
    
    df['agg_text'] = df.text.fillna('').str.cat(df[text_cols].fillna(''), sep=' ').str.strip()
    
    # keeps these cols and cleans the rest
    necessary_cols=['id', 'date', 'agg_text']
    clean_df = df[necessary_cols].copy()
    
    # drops empty rows
    clean_df.drop(clean_df[clean_df['agg_text'].str.strip()==""].index, inplace=True)
    clean_df.rename(columns={'agg_text': 'text'}, inplace=True)
    
    # corrects timestamps
    clean_df['date'] = pd.to_datetime(df['date']).dt.tz_localize(tz_localize)
    
    # resets index
    clean_df.reset_index(inplace=True, drop=True)
    
    # the python dict is a bit faster to iterate over
    return clean_df.to_dict(orient='index')

### Order()

In [198]:
class Order():
    '''The Event class can be anything from a target (SL, TP, BE) 
    or a time-based event (a close signal, for example). The answer that
    the Event class should help answer is "is this something we need to 
    investigate?" If the answer is yes, then you can use an Event to do so'''
    def __init__(self,
                 time: datetime,
                 side: SIDE,
                 type_: TYPE,
                 price: float = None,
                 status: STATUS = None,
                 text: str = None):
        
        self.price = price
        self.time = time
        self.status = STATUS.PENDING if status is None else status
        self.side = side
        self.type_ = type_
        self.text = text
        
class MarketOrder(Order):
    def __init__(self,
                 time: datetime,
                 side: SIDE,
                 price: float = None,
                 text: str = None):
        super().__init__(time, side, TYPE.MARKET, price=price, text=text)
        
class LimitOrder(Order):
    def __init__(self,
                 time: datetime,
                 side: SIDE,
                 price: float,
                 text: str = None):
        super().__init__(time, side, TYPE.LIMIT, price=price, text=text)
        
class StopOrder(Order):
    def __init__(self,
                 time: datetime,
                 side: SIDE,
                 price: float,
                 text: str = None):
        super().__init__(time, side, TYPE.STOP, price=price, text=text)

### Signal()

In [199]:
class Signal():
    
    def __init__(self, 
                 time: datetime,
                 text: str,
                 side: SIDE,
                 sl: float, # when it's initialized there can only be one
                 entry: Union[None, float] = None, 
                 tp: Union[None, float, list] = None):
        
        self.time = time
        self.text = text
        self.side = side
        # Side(-arg) will turn it into the opposite (buy->sell)
        self.sl = [StopOrder(time, SIDE(-side), sl, text)]
        
        self.tp = []
        if tp is not None:
            try:
                self.tp = [LimitOrder(time, SIDE(-side), x, text=text) for x in tp]
            except TypeError:
                self.tp = [LimitOrder(time, SIDE(-side), tp, text=text)]
        
        self.entry = MarketOrder(time, side, text=text)
        if entry is not None:
            self.entry = LimitOrder(time, side, entry, text=text)

### Signals()

In [None]:
class Signals():
    
    def __init__(self):
        pass
    
    def from_json(self, path):
        
        msgs = clean_json(path)
        signals = []
        
        for msg in msgs:
            # calls the interpreter
            parsed = interpreter.interpret(msg['text'], msg['date'])
            # gets the flag
            flag = parsed.get_flag()
            if flag in ['limit', 'market']:
                signals.append(Order())
            
            if flag == 'update-tp':
                for i, r in enumerate(reversed(signals)):
                    if i >= 10: break
                    if r.get_flag() in ['limit', 'market']:
                        signals[len(signals)-i-1].add_tp(parsed.tp)
                        break

            signals.append(parsed)

        return pd.DataFrame([s.to_dict() for s in signals])

### get_signals()

In [6]:
# filter out no-data and incoherent signals
def get_signals(signals_df,
                n=slice(0,1e9),
                override_tp=[],
                override_be=None,
                sl_coefficient=1,
                ignore_close_signals=False):
    
    '''This function is responsible for the manipulation of the signals
    :param override_tp: changes the list of tps parsed by the interpreter to a custom list (in R)
    :param override_be: changes the be parsed by the interpreter to a custom be_set level (in R)
    :param sl_coefficient: this will be multiplied by the sl to manipulate the sl parsed by the interpreter
    :param ignore_close_signals: deletes the close_call column
    Returns a dict'''
    
    log = logging.getLogger('signals_manipulation')
    
    log.debug(f'locals(): { {k:v for k, v in locals().items() if k!="signals_df"} }')
    values = signals_df.iloc[n].to_dict(orient='index').values()
    
    if not override_tp:
        override_tp = []
    if not override_be:
        override_be = -1
    
    ret = []
    for i, sig in enumerate(values):
        
        log.debug(f'manipulating signal n.{i}: {sig}')
        
        # since the tp and be are modified before the sl is modified,
        # the resulting R will be enhanced. E.g.: override_tp=1,
        # sl_coefficient=0.9, resulting_tp=(1 / 0.9)= 1.11 > 1
        sl_delta = sl_coefficient * abs(sig['entry']-sig['sl'])
        be_delta = override_be * abs(sig['entry']-sig['sl'])
        
        tps = []
        for tp in override_tp:
            if sig['side'] == 'buy':
                tps.append(round(sig['entry'] + tp*abs(sig['entry']-sig['sl']), 5))
            else: tps.append(round(sig['entry'] - tp*abs(sig['entry']-sig['sl']), 5))
        
        if sig['side'] == 'buy':
            sl = sig['entry'] - sl_delta
            be = sig['entry'] + be_delta
        else: 
            sl = sig['entry'] + sl_delta
            be = sig['entry'] - be_delta

        sig['sl'] = sl
        sig['tp'] = tps if tps else sig['tp']
        
        if override_be > 0:
            sig['be'] = be
        
        log.debug(f'\result signal n.{i}: tp={sig["tp"]} sl={sig["sl"]} '\
                  f'be={sig["be"] if "be" in sig else "none"}')
        
        ret.append(sig)

    return ret

## Date Manipulation

### localized_date_to_mt5()

In [7]:
def localized_date_to_mt5(date):
    '''Converts a non-naive date to the format expected by mt5'''
    
    log = logging.getLogger('date_manipulation')
    
    # timezones
    nytz = pytz.timezone('US/Eastern')
    gmt2 = pytz.timezone('Etc/GMT-2')
    gmt3 = pytz.timezone('Etc/GMT-3')
    
    # convert to nytz
    nydate = date.astimezone(nytz)

    log.debug(f'gmt2: {date.astimezone(gmt2)}')
    log.debug(f'gmt3: {date.astimezone(gmt3)}')
    log.debug(f'dst: {bool(nydate.dst())}')
    
    # if date is a Timestamp object, convert to datetime
    if isinstance(date, pd.Timestamp):
        date = date.to_pydatetime()

    # check if it's dst, if it is, assign gmt3
    if bool(nydate.dst()):
        return date.astimezone(gmt3).replace(tzinfo=pytz.UTC)
    else: return date.astimezone(gmt2).replace(tzinfo=pytz.UTC)

### mt5_date_to_utc()

In [118]:
def mt5_date_to_utc(data):
    '''
    Corrects the messy mt5 date policy: takes a naive datetime obj
    as input and localizes it based on whether NY is in DST or not
    '''
    # we use New York timezone as MT5 changes their timezones
    # according to the NYSE. We need to detect NY DST changes
    nytz = pytz.timezone('US/Eastern')
    
    # this is REALLY counterintuitive (would expect GMT+2/+3) but here's the explanation
    # https://stackoverflow.com/questions/54842491/printing-datetime-as-pytz-timezoneetc-gmt-5-yields-incorrect-result
    gmt2 = pytz.timezone('Etc/GMT-2')
    gmt3 = pytz.timezone('Etc/GMT-3')
    
    try:
        
        date = data
        
        # localize mt5 date to gmt2 (might be wrong, we don't know yet)
        gmt2date = gmt2.localize(date)
        gmt3date = gmt3.localize(date)
        #log.debug(f'gmt2date: {gmt2date}')
        #log.debug(f'gmt3date: {gmt3date}')
        
        # turn it into ny time
        nydate = gmt2date.astimezone(nytz)
        #log.debug(f'dst: {bool(nydate.dst())}')
        
        # check if it's dst, if it is, gmt2 is wrong, return utc
        if bool(nydate.dst()):
            return gmt3date.astimezone(pytz.utc)
        
        # if it's not, localize to gmt2 and then return utc
        else: return gmt2date.astimezone(pytz.utc)
        
    except AttributeError:
        
        # iterates over bar/ticks and adjusts the dates
        df = pd.DataFrame(data)
        # if df is empty, return it
        if df.shape[0] == 0:
            return df
        # convert time in seconds/ms into the datetime format
        df['time'] = pd.to_datetime(df['time'], unit='s')
        if 'time_msc' in df.columns:
            df['time'] = pd.to_datetime(df['time_msc'], unit='ms')
        # applies itself to column
        df['time'] = df['time'].map(lambda d: mt5_date_to_utc(d))
        
        return df

In [9]:
logging.getLogger('date_manipulation').setLevel(logging.DEBUG)
date = datetime(2021, 1, 21, 22, 50)
from_me_to_mt5 = pytz.timezone('Europe/Rome').localize(date)
result1 = localized_date_to_mt5(from_me_to_mt5)
from_mt5_to_me = mt5_date_to_utc(date)
print(f'from_me_to_mt5: {from_me_to_mt5} -> {result1}')
print(f'from_mt5_to_utc: {date} -> {from_mt5_to_me}')

from_me_to_mt5: 2021-01-21 22:50:00+01:00 -> 2021-01-21 23:50:00+00:00
from_mt5_to_utc: 2021-01-21 22:50:00 -> 2021-01-21 20:50:00+00:00


## Backtester Engine

### Target()

In [10]:
class Target():
    '''This object is just a struct that we'll use to mantain consistency between the different requests'''
    
    def __init__(self,
                 signal: dict, 
                 category: str,
                 side : str,
                 level: float):
        
        self.time = signal['time']
        self.ticker = signal['ticker']
        self.sl = signal['sl']
        self.entry = signal['entry']
        self.tp = signal['tp']
        self.be = signal['be'] if 'be' in signal else None
        self.category = category
        self.side = side
        self.level = level
        self.date_set = None
        self.activated = None
        
        if 'activated' in signal:
            self.activated = signal['activated']
    
    def __str__(self):
        return str(self.__class__) + ": " + str(self.__dict__)
    
    def set_date(self, date: datetime):
        self.date_set = date
        
    def date(self):
        
        if self.date_set:
            return self.date_set
        
        # this is very important as it makes sure that if the entry time
        # has been found, all of the other methods (mainly find_hit) will use
        # the trade activation time as to check for validity, rather than
        # the signal time
        if self.activated:
            return self.activated
        
        return self.time

### copy_rates()

In [11]:
def copy_rates(symbol: str,
               timeframe=mt5.TIMEFRAME.H1,
               datetime_from: datetime = None,
               datetime_to: datetime = None,
               start_pos: int = None,
               count: int = None, 
               include_start = True,
               include_end = True) -> pd.DataFrame:
    
    '''Generic function to use keywords to automatically call the correct copy rates function depending on the
    keyword args passed in.
    :param symbol: Financial instrument name, for example, "EURUSD".
    :param timeframe: Timeframe the bars are requested for. Set by a value from the TIMEFRAME enumeration.
    :param datetime_from: Date of opening of the first bar from the requested sample. Set by the 'datetime'
        object or as a number of seconds elapsed since 1970.01.01.
    :param datetime_to: Date, up to which the bars are requested. Set by the 'datetime' object or as a number
        of seconds elapsed since 1970.01.01. Bars with the open time <= date_to are returned.
    :param start_pos: Initial index of the bar the data are requested from. The numbering of bars goes from
        present to past. Thus, the zero bar means the current one.
    :param count: Number of bars to receive.
    :include_start: Makes the result include the candle that cointain datetime_from
    :include_end: Makes the result include the candle that cointain datetime_to
    :return: Returns the dataframe generated by mt5_date_to_utc(), which is just the datetime adjusted 
        dataframed based on the nparray that is given by the original copy_rates functions. 
        Returns None in case of an error. The info on the error can be obtained using last_error().'''

    cols = ['time', 'open', 'high', 'low', 'close']
    cols_ticks = ['time', 'bid', 'ask']
    
    try:
        for i in range(20):
            
            if datetime_from is not None:    
                
                # adjusts the date to feed to mt5
                datetime_from_adj = localized_date_to_mt5(datetime_from).timestamp()

                if count is not None:
                    
                    if timeframe == 'ticks':
                        rates = mt5.copy_ticks_from(
                            symbol, datetime_from_adj, count, mt5.COPY_TICKS_INFO)
                    
                    else: rates = mt5.copy_rates_from(
                        symbol, timeframe, datetime_from_adj, count)

                elif datetime_to is not None:
                    
                    if include_start == True:
                        datetime_from -= timedelta(
                            seconds = mt5.period_seconds(timeframe) if timeframe != 'ticks' else 0)
                    
                    if include_end == False: # default behaviour is inclusive
                        datetime_to -= timedelta(
                            seconds = mt5.period_seconds(timeframe) if timeframe != 'ticks' else 0)
                        
                    datetime_from_adj = localized_date_to_mt5(datetime_from).timestamp()
                    
                    # adjusts the date to feed to mt5
                    datetime_to_adj = localized_date_to_mt5(datetime_to).timestamp()

                    if timeframe == 'ticks':
                        rates = mt5.copy_ticks_range(
                            symbol, datetime_from_adj, datetime_to_adj, mt5.COPY_TICKS_INFO)
                    
                    else: rates = mt5.copy_rates_range(
                        symbol, timeframe, datetime_from_adj, datetime_to_adj)
            
            if rates is None:
            # there was an error, but pass for now
                continue
            
            if len(rates) > 0:
                break
            time.sleep(0.1)

        if rates is None:
        # there was an error
            raise ValueError(f'copy_rates() has returned None, mt5.last_error() = {mt5.last_error()}')
            
        candles = len(rates) if timeframe != 'ticks' else 0
        ticks = len(rates) if timeframe == 'ticks' else 0
        
        # adjusts results and outputs
        return ( mt5_date_to_utc(rates)[cols if timeframe != 'ticks' else cols_ticks], 
                 {'candles':candles, 'ticks':ticks} )

        # only count is specified
        if all(x is None for x in [datetime_from, datetime_to, start_pos]):
            start_pos = 0

        for i in range(20):
            
            if timeframe == 'ticks':
                rates = [] # TODO
                log.error('bare (nothing specified) copy_rates with timeframe "ticks" hasn\'t been implemented yet')
                break
            else: rates = mt5.copy_rates_from_pos(symbol, timeframe, start_pos, count)
        
            if rates is None:
            # there was an error, but pass for now
                continue

            if len(rates) > 0:
                break
            time.sleep(0.1)

        if rates is None:
        # there was an error
            raise ValueError(f'copy_rates() has returned None, mt5.last_error() = {mt5.last_error()}')
        
        candles = len(rates) if timeframe != 'ticks' else 0
        ticks = len(rates) if timeframe == 'ticks' else 0
                
        # adjusts results and outputs
        return ( mt5_date_to_utc(rates)[cols if timeframe != 'ticks' else cols_ticks], 
                 {'candles':candles, 'ticks':ticks} )

    
    
    except SystemError:
        return None

**Test in order to check whether copy_rates is behaving as supposed**

In [12]:
loc = lambda date: pytz.utc.localize(date) 

check_time  = [loc(datetime(2021, 10, 21, 11)),
               loc(datetime(2021, 9, 23, 11)),
               loc(datetime(2021, 5, 10, 6)),
               loc(datetime(2021, 1, 26, 15)),
               loc(datetime(2020, 10, 6, 15)),
               loc(datetime(2022, 2, 3, 12))]

check_open  = [157.382, 150.303, 152.826, 142.170, 136.993, 155.775]
check_high  = [157.721, 150.960, 153.315, 142.456, 137.054, 156.331]
check_low   = [157.310, 150.154, 152.816, 142.135, 136.705, 155.538]
check_close = [157.688, 150.940, 153.264, 142.429, 136.740, 156.180]

H1 = mt5.TIMEFRAME.H1
H4 = mt5.TIMEFRAME.H4
M5 = mt5.TIMEFRAME.M5

tf = [H1, H1, H1, H1, H1, M5]
dst = [1, 1, 1, 0, 1, 0]

frame = pd.DataFrame({'time': check_time,
                      'open': check_open,
                      'high': check_high,
                      'low': check_low,
                      'close': check_close,
                      'timeframe': tf,
                      'dst': dst})
frame

Unnamed: 0,time,open,high,low,close,timeframe,dst
0,2021-10-21 11:00:00+00:00,157.382,157.721,157.31,157.688,16385,1
1,2021-09-23 11:00:00+00:00,150.303,150.96,150.154,150.94,16385,1
2,2021-05-10 06:00:00+00:00,152.826,153.315,152.816,153.264,16385,1
3,2021-01-26 15:00:00+00:00,142.17,142.456,142.135,142.429,16385,0
4,2020-10-06 15:00:00+00:00,136.993,137.054,136.705,136.74,16385,1
5,2022-02-03 12:00:00+00:00,155.775,156.331,155.538,156.18,5,0


In [13]:
with conn:
    result = []
    for row in frame.itertuples():
        # count (backwards) THIS IS CORRECT
        # datetime_to THIS IS ALSO CORRECT
        # TICKS
        r, tracker = copy_rates('GBPJPY', mt5.TIMEFRAME.M1, datetime_from=row.time, 
                                datetime_to=row.time+timedelta(seconds=mt5.period_seconds(row.timeframe)),
                                include_start=False, include_end=False)
        
        print(tracker)
        '''
        r = mt5.copy_ticks_range('GBPJPY', datetime_from=row.time.timestamp(), 
                                 datetime_to=(
                                     row.time+timedelta(
                                         seconds=mt5.period_seconds(
                                             row.timeframe))).timestamp(), flags=mt5.COPY_TICKS_INFO)
        
        r = pd.DataFrame(r)'''
        f = {}
        try:
            f['time']  = [r['time'].iloc[0]]
            f['open']  = [r['open'].iloc[0]]
            f['high']  = [r['high'].max()]
            f['low']   = [r['low'].min()]
            f['close'] = [r['close'].iloc[-1]]
            f['dst'] = bool(row.dst)
            f['var'] = ( (f['open'][0]-row.open) +
                         (f['high'][0]-row.high) +
                         (f['low'][0]-row.low)   +
                         (f['close'][0]-row.close) ) * 100 / 4
            
            result.append(pd.DataFrame(f))
        except IndexError: # there no tick data yet
            pass

pd.concat(result)

{'candles': 60, 'ticks': 0}


{'candles': 60, 'ticks': 0}


{'candles': 60, 'ticks': 0}


{'candles': 60, 'ticks': 0}


{'candles': 60, 'ticks': 0}


{'candles': 5, 'ticks': 0}


Unnamed: 0,time,open,high,low,close,dst,var
0,2021-10-21 11:00:00+00:00,157.384,157.719,157.308,157.686,True,-0.1
0,2021-09-23 11:00:00+00:00,150.291,150.959,150.16,150.936,True,-0.275
0,2021-05-10 06:00:00+00:00,152.826,153.315,152.816,153.26,True,-0.1
0,2021-01-26 15:00:00+00:00,142.17,142.456,142.131,142.428,False,-0.125
0,2020-10-06 15:00:00+00:00,136.993,137.054,136.705,136.74,True,0.0
0,2022-02-03 12:00:00+00:00,155.766,156.327,155.552,156.184,False,0.125


### Rates()

In [14]:
class Rates():
    '''This object is used to pass around rates yielded from copy_rates()'''
    
    def __init__(self, symbol: str, blueberry: bool = False):
        self.history = {}
        self.loaded_candles = 0
        self.loaded_ticks = 0
        self.symbol = symbol
        if blueberry:
            if symbol not in ['BTCUSD', 'XAUUSD']:
                self.symbol += '.i' # blueberry
    
    def get(self,
            timeframe = mt5.TIMEFRAME.H1,
            datetime_from: datetime = None,
            datetime_to: datetime = None,
            start_pos: int = None,
            count: int = None, 
            include_start = True,
            include_end = True,
            og = None) -> pd.DataFrame:
        
        self.last_timeframe = timeframe
        self.last_datetime_from = datetime_from
        self.last_datetime_to = datetime_from
        self.last_start_pos = start_pos
        self.last_count = count
        
        rates, tracker = copy_rates(self.symbol,
                                    timeframe=timeframe,
                                    datetime_from=datetime_from,
                                    datetime_to=datetime_to,
                                    start_pos=start_pos,
                                    count=count, 
                                    include_start=include_start,
                                    include_end=include_end)
        
        self.loaded_candles += tracker['candles']
        self.loaded_ticks += tracker['ticks']
        
        if og: self.og_view = rates
        self.current_view = rates
        
        return rates

### has_candle_hit()

In [15]:
def has_candle_hit(target: Target, h: float, l: float, spread: float = 0.0002):
    '''
    Determines whether a candle has hit a level. You need high and low 
    because those are the extremes of the range. You also need to now
    what kind of order it is and which side (buy, sell) you're acting on.
    Supported categories are: limit, stop
    Spread works by multiplying the 'spread' times the last level value
    '''
    
    cat = target.category
    side = target.side
    level = target.level
    spread = spread*level
        
    if cat == 'limit':
            
        # no spread on limits, it will be checked
        # by the ticks check anyway
        
        if side == 'buy':
            if l+spread*0 <= level:
                return True
        else: 
            if h-spread*0 >= level:
                return True
    
    elif cat == 'stop':
        
        # on stops we need spreads in order
        # to make sure that sl aren't actually hit
        
        if side == 'buy':
            if h+spread >= level:
                return True
        else: 
            if l-spread <= level:
                return True
    
    return False

### find_hit()

In [16]:
def find_hit(target: Target,
             rates: Rates,
             timeframe = 'ticks'):
    '''
    This function iterates over the view until it finds the candle that fills all of the criterias
    If timeframe='ticks', it gets down to the ticks to find the true ask/bid values of the hit
    Returns None if it doesn't find one or corresponding the pd.Series row if it does
    '''
    
    log = logging.getLogger('find_hit')
    debug = True if log.level <= logging.DEBUG else False
    
    og_view = rates.og_view
    og_view['hit'] = og_view.apply(lambda x: has_candle_hit(target, x['high'], x['low']), axis=1)
    hits = og_view[og_view['hit']] # this are the candles where there is a hit
    hits = hits[hits['time'] > (target.date() - timedelta(minutes=int(FIRST_PARSE_TF)))]
    
    log.debug(f'L1 ({int(FIRST_PARSE_TF)}m)')
    if debug: display(og_view)
    
    # iterating over true_rates
    for i, row in enumerate(hits.itertuples()):
        
        t_from = row.time # it's a namedtuple
        
        # gets view
        view = rates.get(timeframe=SECOND_PARSE_TF,
                         datetime_from=t_from,
                         datetime_to=t_from+timedelta(seconds=mt5.period_seconds(FIRST_PARSE_TF)),
                         include_start=False)
            
        # finds which of the smaller candles has a hit on it
        view['hit'] = view.apply(lambda x: has_candle_hit(target, x['high'], x['low']), axis=1)
        hits2 = view[view['hit']]
        hits2 = hits2[hits2['time'] > (target.date()-timedelta(seconds=mt5.period_seconds(SECOND_PARSE_TF)))]
        
        log.debug(f'L2 ({int(SECOND_PARSE_TF)}m): candle({i}) ')
        if debug: display(view)
        
        # ITERATES OVER THE SMALLER CANDLES
        for j, row2 in enumerate(hits2.itertuples()):
         
            if timeframe == 'ticks':
                
                ticks = rates.get(timeframe='ticks',
                                  datetime_from=row2.time,
                                  datetime_to=row2.time+timedelta(seconds=mt5.period_seconds(SECOND_PARSE_TF)))
                
                time_mask = (ticks['time']>=(target.date()+timedelta(seconds=1))) # 1 second to operate

                
                try:
                    # behaviour is different depending target category
                    if target.category == 'limit':
                        if target.side == 'buy':
                            hit_tick = ticks[time_mask & (ticks['ask']<=target.level)].iloc[0]
                        else: hit_tick = ticks[time_mask & (ticks['bid']>=target.level)].iloc[0]

                    elif target.category == 'stop':
                        if target.side == 'buy':
                            hit_tick = ticks[time_mask & (ticks['ask']>=target.level)].iloc[0]
                        else: hit_tick = ticks[time_mask & (ticks['bid']<=target.level)].iloc[0]

                except IndexError:
                    log.debug(f'L3 (ticks): candle({i};{j}) no-hits')
                    continue
                    
                log.debug(f'L3 (ticks): candle({i};{j})')
                if debug: 
                    i_hit = ticks.index.get_loc(hit_tick.name)
                    display(ticks[time_mask], ticks.iloc[i_hit-1:i_hit+2])
                
                return hit_tick


            else: # timeframe is not 'ticks', i.e. mt5.TIMEFRAME.M1

                if mt5.period_seconds(timeframe) < mt5.period_seconds(SECOND_PARSE_TF):

                    view = rates.get(timeframe=timeframe,
                                     datetime_from=row2.time,
                                     datetime_to=row2.time,
                                     include_start=False)
                    

            # finds which of the 'timeframe' candle has a hit on it
            view['hit'] = view.apply(lambda x: has_candle_hit(target, x['high'], x['low']), axis=1)

            try: 
                hits = view[view['hit']]
                hits = hits[hits['time']>target.date()-timedelta(seconds=mt5.period_seconds(timeframe))]
                candle = hits.iloc[0]
            # theres no candle after signal['date'] where the order could've been filled
            except IndexError:
                continue                
                
            return candle

    return None

### determine_R()

In [17]:
def determine_R(signal: dict, rates, before: datetime = None, mode: str = 'max', recursion=True):
    '''Determines max or min R before datetime specified'''
    
    view = rates.current_view
    if recursion:
        view = rates.og_view
    
    # rates must be before the time specified and after the entry else return None
    must_be_active = ( view['time'] > 
                       signal['activated']-timedelta(minutes=FIRST_PARSE_TF if recursion else 1) )
    
    if before is not None:
        view = view[(view['time']<before) & (must_be_active)]
    else: view = view[must_be_active]
    
    if view.shape[0] == 0: # tp was before the entry
        return None
    
    sl = abs(signal['true_entry']-signal['sl'])

    if signal['side'] == 'buy':
        if mode == 'max':
            view.loc[:, 'running_R'] = view.apply(lambda x: (x['high']-signal['true_entry'])/sl, axis=1)
        else: view.loc[:, 'running_R'] = view.apply(lambda x: (x['low']-signal['true_entry'])/sl, axis=1)
    else: 
        if mode == 'max':
            view.loc[:, 'running_R'] = view.apply(lambda x: (signal['true_entry']-x['low'])/sl, axis=1)
        else: view.loc[:, 'running_R'] = view.apply(lambda x: (signal['true_entry']-x['high'])/sl, axis=1)

    # resets the index so that it can be easily compared to length
    view = view.reset_index()
    
    max_condition = ( (view['running_R'].idxmax() == view.shape[0]-1) or
                      (view['running_R'].idxmax() == 0) )
    
    min_condition = ( (view['running_R'].idxmin() == view.shape[0]-1) or
                      (view['running_R'].idxmin() == 0) )
    
    # if it's the first or the last candle it zooms down to the
    # one minute to find the more trustworthy value
    if ( ((mode == 'max' and max_condition) or
          (mode == 'min' and min_condition)) and 
        
          (int(FIRST_PARSE_TF) != 1) and recursion ):
    
        t_from = ( view.loc[view['running_R'].idxmax(), 'time'] if mode=='max' else
                   view.loc[view['running_R'].idxmin(), 'time'] )
        
        t_to = t_from + timedelta(minutes=int(FIRST_PARSE_TF))

        rates.get(timeframe=mt5.TIMEFRAME.M1,
                  datetime_from=t_from,
                  datetime_to=t_to)
        
        return determine_R(signal, rates, before, mode=mode, recursion=False) # recursive
    
    return view['running_R'].max() if mode=='max' else view['running_R'].min()

### before()

In [18]:
def before(event, than: list, df):
    
    r = pd.notna(df[event])
    for t in than:
        if t not in df.columns:
            continue
        r = r & (pd.isnull(df[t]) | (df[event] < df[t]))
    return r

### backtest()

In [27]:
def backtest(signals_df,
             n=slice(0,1e9),
             override_tp=[],
             partials_array=[1],
             override_be=-100,
             sl_coefficient=1,
             end_of_period='day',
             end_of_day='18:30',
             precision='ticks'):

    '''
    The backtesting main routine. For each signal, this algorithm will first determine whether
    it's a market execution or a limit. If it's a limit, it will fetch the 30m candles up until
    5:30PM Europe/Rome time and see if it activated. If it did, it will go on the same way as for
    the market execution type: it will fetch the 5 minutes candles up until, again, 5:30PM Europe/Rome
    time and check for 1) when was sl hit, 2) when was tp hit, 3) the max R gained by the trade before
    closing. When it finds a hit, it will zoom in on the 1 minute candle where it happened and find the
    exact minute it did for increased accuracy (it could even go to ticks but we don't need that much
    detail). Returns a dataframe containing info on the time the signal was sent, the time it activated
    and all of the other three things it checked out.
    '''
    
    log = logging.getLogger('backtest')
    debug = True if log.level <= logging.DEBUG else False
    log.debug(f'locals(): { {k:v for k, v in locals().items() if k!="signals_df"} }')
    
    signals = get_signals(signals_df,
                          n=n, # filters n rows
                          override_tp=override_tp,
                          override_be=override_be,
                          sl_coefficient=sl_coefficient)
    
    with Progress("[i][progress.description]{task.description}",
                  SpinnerColumn(spinner_name='betaWave', finished_text="✓", style="black"),
                  BarColumn(complete_style='cyan'),
                  "{task.completed} of {task.total}", "•",
                  "c: {task.fields[counter][candles]:,.0f}", "•",
                  "t: [b]{task.fields[counter][ticks]:,.0f}", "•",
                  "[progress.percentage]{task.percentage:>3.0f}%", "•",
                  TimeRemainingColumn(), expand=False, disable=debug) as progress:
        
        counter = {'candles': 0, 'ticks': 0}
        main_task = progress.add_task('Analysing',
                                      total=len(signals),
                                      counter=counter,
                                      name='main_task')
        
        symbol_info = {}
        for n, signal in enumerate(signals):
            
            log.debug(f'======== SIGNAL n. {n} ========')
            
            # updates progress bar
            progress.update(main_task,
                            advance=1,
                            counter=counter,
                            refresh=True)
            
            # time of signal
            t_from = signal['time']
            # end of day
            hour_end, minute_end = end_of_day.split(':')
            day = signal['time'].replace(hour=int(hour_end), minute=int(minute_end)-1)
            # end of week
            
            start_of_week = day - timedelta(days=day.weekday())
            week = start_of_week + timedelta(days=4) # that week's friday
            # set end of period that the backtester will look at 
            eop = locals()[end_of_period]
            
            # signal was too late, skip it
            if t_from >= eop:
                log.debug(f's::{n} signal was too late, skip it')
                continue
                
            # get the point value for this symbol
            if signal['ticker'] not in symbol_info:
                log.debug(f's::{n} signal["ticker"]={signal["ticker"]} not in symbol_info')
                info = mt5.symbol_info(signal['ticker'])
                symbol_info[signal['ticker']] = info
            
            # adds tick value to final dict
            signals[n]['tick_size'] = symbol_info[signal['ticker']].trade_tick_size
            signals[n]['tick_digits'] = int(math.log10(signals[n]['tick_size']))
            
            rates = Rates(signal['ticker'], blueberry=False)
            og_rates = rates.get(timeframe=FIRST_PARSE_TF,
                                 datetime_from=t_from,
                                 datetime_to=eop, og=True)
                                
            ##### ENTRY #####
            entrytrgt = Target(signals[n], 'limit', signal['side'], signal['entry'])
            assert entrytrgt.level == signals[n]['entry']
              
            log.debug(f'Looking for entry, date={entrytrgt.date()} level={entrytrgt.level} '\
                      f'side={entrytrgt.side} mode={entrytrgt.category}')
            
            if signal['flag'] == 'limit': # limit order
              
                entry_hit = find_hit(entrytrgt, rates, timeframe=precision)
                # if no hits were found, skip ahead
                if entry_hit is None:
                    continue

                signals[n]['activated'] = entry_hit['time']
                # limit orders are always filled at the limit price, but since the signal can be sent
                # when the price is already better, it could be executed at market 
                
                if 'ask' in entry_hit: # precision=ticks
                    signals[n]['true_entry'] = ( entry_hit['ask'] if entrytrgt.side == 'buy' 
                                                                  else entry_hit['bid'] )
                else: 
                    signals[n]['true_entry'] = round(
                        entry_hit['low'] + (entry_hit['high'] - entry_hit['low']) / 2, 
                        signals[n]['tick_digits'])
                    
                
            elif signal['flag'] in ['instant', 'market']: # market order

                # get first ticks after signal['date']
                entry_hit = rates.get(timeframe=precision,
                                      datetime_from=signal['date'],
                                      count=1).iloc[0]
                
                signals[n]['activated'] = entry_hit['time']
                
                if 'ask' in entry_hit: # precision=ticks
                    signals[n]['true_entry'] = ( entry_hit['ask'] if entrytrgt.side == 'buy' 
                                                                  else entry_hit['bid'] )
                else: 
                    signals[n]['true_entry'] = round(
                        entry_hit['low'] + (entry_hit['high'] - entry_hit['low']) / 2, 
                        signals[n]['tick_digits'])
                
            log.debug(f'Entry found @{signals[n]["true_entry"]} on {signals[n]["activated"]}') 
           
            # there was one case
            if signals[n]['true_entry'] == signal['sl']:
                log.debug('true_entry==signal["sl"], for some reason')
                continue
                    
            # this makes it so that all of the next find_hit() will
            # start from when the trade has actually been activated
            
            
            ##### SL #####
            # look for sl hit, reusing the same frame
            opp_side = 'sell' if signal['side'] == 'buy' else 'buy'
            sltrgt = Target(signals[n], 'stop', opp_side, signal['sl'])
            
            log.debug(f'Looking for SL, date={sltrgt.date()} level={sltrgt.level} '\
                      f'side={sltrgt.side} mode={sltrgt.category}')           
            sl_hit = find_hit(sltrgt, rates, timeframe=precision)

            if sl_hit is not None:
                signals[n]['sl_hit'] = sl_hit['time']
                
                if 'ask' in sl_hit: # precision=ticks
                    signals[n]['true_sl'] = ( sl_hit['ask'] if sltrgt.side == 'buy' 
                                                                  else sl_hit['bid'] )
                else: 
                    signals[n]['true_sl'] = round(
                        sl_hit['low'] + (sl_hit['high'] - sl_hit['low']) / 2, 
                        signals[n]['tick_digits'])
                    
                log.debug(f'SL found @{signals[n]["true_sl"]} on {signals[n]["sl_hit"]}') 
           

            ##### BE #####
            be_set = None
            be_hit = None # otherwise next loop will use last one
            
            if 'be' in signal:
                
                # look for be_set and be_hit, reusing the same frame
                betrgt = Target(signals[n], 'limit', opp_side, signal['be'])
            
                log.debug(f'Looking for BE set, date={betrgt.date()} level={betrgt.level} '\
                      f'side={betrgt.side} mode={betrgt.category}') 
                be_set = find_hit(betrgt, rates, timeframe=precision)

                if be_set is not None:
                    # assigns be_hit to signal
                    signals[n]['be_set'] = be_set['time']
                    
                    if 'ask' in be_set: # precision=ticks
                        signals[n]['true_be_set'] = ( be_set['ask'] if betrgt.side == 'buy' 
                                                                    else be_set['bid'] )
                    else: 
                        signals[n]['true_be_set'] = round(
                            be_set['low'] + (be_set['high'] - be_set['low']) / 2, 
                            signals[n]['tick_digits'])
                    
                   
                    log.debug(f'BE (set) found @{signals[n]["true_be_set"]} on {signals[n]["be_set"]}') 

                    betrgt.set_date(be_set['time']) # updates the date of be target
                    betrgt.level = signals[n]['true_entry']
                    betrgt.category = 'stop' # ofc
                    
                    log.debug(f'Looking for BE hit, date={betrgt.date()} level={betrgt.level} '\
                      f'side={betrgt.side} mode={betrgt.category}') 
                    be_hit = find_hit(betrgt, rates, timeframe=precision)
                    
                    if be_hit is not None:
                        signals[n]['be_hit'] = be_hit['time']
                        
                        if 'ask' in be_hit: # precision=ticks
                            signals[n]['true_be_hit'] = ( be_hit['ask'] if betrgt.side == 'buy' 
                                                                          else be_hit['bid'] )
                        else: 
                            signals[n]['true_be_hit'] = round(
                                be_hit['low'] + (be_hit['high'] - be_hit['low']) / 2, 
                                signals[n]['tick_digits'])
                    
                       
                        log.debug(f'BE (hit) found @{signals[n]["true_be_hit"]} on {signals[n]["be_hit"]}')


            ##### TP #####
            signals[n]['tp_hit'] = []
            signals[n]['true_tp'] = []
            signals[n]['minR'] = []
            
            # look for tp hit, reusing the same frame
            for i, tp in enumerate(signal['tp']):
                
               # check if it's a reasonable multiple of entry
                if not (signals[n]['true_entry']*0.975 < tp < 1.025*signals[n]['true_entry']):
                    removed = signals[n]['tp'].pop(i)
                    assert removed == tp
                    log.debug(f'tp{i}={tp} has been removed due to being unreasonable')
                    continue
                
                tptrgt = Target(signals[n], 'limit', opp_side, tp)
                
                log.debug(f'Looking for TP{i}, date={tptrgt.date()} level={tptrgt.level} '\
                      f'side={tptrgt.side} mode={tptrgt.category}') 
                tp_hit = find_hit(tptrgt, rates, timeframe=precision)
                
                if tp_hit is None:
                    for attr in ('minR', 'tp_hit', 'true_tp'):
                        signals[n][attr].append(None)
                else:
                    minR = determine_R(signals[n], rates, before=tp_hit['time'], mode='min')
                    signals[n]['minR'].append(minR)
                    signals[n]['tp_hit'].append(tp_hit['time'])
                    
                    if 'ask' in tp_hit: # precision=ticks
                        signals[n]['true_tp'].append( tp_hit['ask'] if tptrgt.side == 'buy' 
                                                                else tp_hit['bid'] )
                    else: 
                        signals[n]['true_tp'].append(round(
                            tp_hit['low'] + (tp_hit['high'] - tp_hit['low']) / 2, 
                            signals[n]['tick_digits']))
                    
   
                    log.debug(f'TP{i} found @{signals[n][f"true_tp"][i]} on {signals[n][f"tp_hit"][i]}')
                    
         
            ##### R_MAX #####
            # look for maxR (before sl is hit) using the same frame
            maxR = determine_R(signals[n], 
                               rates, 
                               before=sl_hit['time'] if sl_hit is not None else None,
                               mode='max')

            signals[n]['maxR'] = maxR

                
            ##### R_EOP #####
            # get R at the end of period
            if signal['side'] == 'buy':
                eopR = ( (og_rates.loc[og_rates.index[-1], 'close'] - signals[n]['true_entry']) /
                          abs(signals[n]['true_entry'] - signal['sl']) )
            else:
                eopR = ( (signals[n]['true_entry'] - og_rates.loc[og_rates.index[-1], 'close']) /
                          abs(signals[n]['true_entry'] - signal['sl']) )
            
            signals[n]['eopR'] = eopR
            
            
            # ##### RESULTS #####
            sl_delta = abs(signals[n]['true_entry']-signals[n]['sl'])
            tp_hit = signals[n]['tp_hit']
            sl_hit = sl_hit['time'] if sl_hit is not None else None
            be_hit = be_hit['time'] if be_hit is not None else None
            
            #trade close @SL (before tp0_hit and be_hit)
            if ( (sl_hit is not None) and 
                  all([sl_hit < x if x is not None else True for x in (tp_hit[0], be_hit)]) ):
                
                true_sl_delta = abs(signals[n]['true_entry'] - signals[n]['true_sl'])
                signals[n]['result_R'] = - true_sl_delta / sl_delta
                signals[n]['result_pips'] = - true_sl_delta / (signals[n]['tick_size']*10)
                signals[n]['closed_at'] = 'sl'
                
            #trade close @BE (before tp0_hit and sl_hit)
            elif ( all([x is not None for x in (be_set, be_hit)]) and
                   all([be_hit < x if x is not None else True for x in (tp_hit[0], sl_hit)]) ):
                
                true_be_delta = signals[n]['true_be_hit'] - signals[n]['true_entry']
                side_coeff = 1 if signal['side']=='buy' else -1
                signals[n]['result_R'] = (side_coeff*true_be_delta) / sl_delta
                signals[n]['result_pips'] = (side_coeff*true_be_delta) / (signals[n]['tick_size']*10)
                signals[n]['closed_at'] = 'be'
            
            # trade closed @TP
            partials_array += [0]*20
            result_R = 0
            result_pips = 0
            for i, tp in enumerate(tp_hit):
                
                if ( (tp is not None) and
                      all([tp < x if x is not None else True for x in (be_hit, sl_hit)]) ):
                
                    true_tp_delta = signals[n]['true_tp'][i] - signals[n]['true_entry']
                    side_coeff = 1 if signal['side']=='buy' else -1
                    result_R += partials_array[i] * ((side_coeff*true_tp_delta) / sl_delta)
                    signals[n]['result_R'] = result_R
                    result_pips += partials_array[i] * ( (side_coeff*true_tp_delta) / 
                                                         (signals[n]['tick_size']*10) )
                    signals[n]['result_pips'] = result_pips
                    signals[n]['closed_at'] = f'tp{i}'

                
            # update tracker & progress bar
            signals[n]['loaded_candles'] = rates.loaded_candles
            signals[n]['loaded_ticks'] = rates.loaded_ticks
            counter['candles'] += rates.loaded_candles
            counter['ticks'] += rates.loaded_ticks
            
        log.info('Analysis completed')
        return pd.DataFrame(signals)

## Backtester Engine v2

### Constants

In [210]:
class SIDE(enum.IntEnum):
    BUY = 1
    SELL = -1
    
class TYPE(enum.IntEnum):
    LIMIT = 0
    STOP = 1
    MARKET = 2
    SLTP = 3
    
class STATUS(enum.IntEnum):
    PENDING = 0
    EXECUTED = 1

class TIMEFRAME(enum.IntEnum):
    
    TICKS = 0
    M1 = 1
    M2 = 2
    M3 = 3
    M4 = 4
    M5 = 5
    M6 = 6
    M10 = 10
    M12 = 12
    M15 = 15
    M20 = 20
    M30 = 30
    H1 = 60
    H2 = 60*2
    H3 = 60*3
    H4 = 60*4
    H6 = 60*6
    H8 = 60*8
    H12 = 60*12
    D1 = 60*24
    W1 = 60*24*7
    MN1 = 60*24*30
    
    # this is used in get_rates()
    def mt5(timeframe: Union[str, 'TIMEFRAME']):
        return getattr(mt5.TIMEFRAME, str(timeframe).split('.')[-1])
    
class FLAG(enum.IntEnum):
    MESSAGE = 0
    ORDER = 1
    PARSING_ERROR = 2
    PARTIALS = 3
    SLTP_UPDATE = 4
    CLOSE = 5

### Rates()

In [107]:
class Rates():
    
    def __init__(self,
                 symbol: str,
                 rates: pd.DataFrame,
                 timeframe: TIMEFRAME,
                 length: int):
        
        # df is already ok thx to mt5_date_to_utc()
        self.df = rates
        self.timeframe = timeframe
        self.timeframe_seconds = mt5.period_seconds(self.timeframe)
        self.length = length
        
    def __len__(self):
        return self.length
    
    def where_event_happened(self, event: Event):
        '''Returns the filtered dataframe based on the event passed'''
        if event.price:
            pass

### get_rates()

In [157]:
def get_rates(symbol: str,
              timeframe=mt5.TIMEFRAME.H1,
              from_: datetime = None,
              to_: datetime = None,
              start_pos: int = None,
              count: int = None, 
              include_first = True,
              include_last = True,
              n_tries: int = 10) -> Rates:
    
    log = logging.getLogger('get_rates')
    
    for i in range(n_tries):
        
        if from_:
            
            # adjusts the date to feed to mt5
            datetime_from_adj = localized_date_to_mt5(from_)
            
            if count: # number of candles to retrieve
                if timeframe == TIMEFRAME.TICKS:
                    rates = mt5.copy_ticks_from(symbol, datetime_from_adj,
                                                count, mt5.COPY_TICKS_INFO)
                else: 
                    rates = mt5.copy_rates_from(symbol, TIMEFRAME.mt5(timeframe),
                                                datetime_from_adj.timestamp(), count)
                    
            # we can only do this now as _from is inclusive & backwards    
            correction = timedelta(minutes = int(timeframe))
            if timeframe == TIMEFRAME.TICKS: correction = 0
            if include_first: datetime_from_adj -= correction
          
            if to_: # to_ with no from_ param is not supported for now

                # adjusts the date to feed to mt5
                datetime_to_adj = localized_date_to_mt5(to_)
                
                # default behaviour is inclusive
                if not include_last: datetime_to_adj -= correction

                if timeframe == TIMEFRAME.TICKS:
                    rates = mt5.copy_ticks_range(
                        symbol, datetime_from_adj.timestamp(),
                        datetime_to_adj.timestamp(), mt5.COPY_TICKS_INFO)
                else: 
                    rates = mt5.copy_rates_range(
                        symbol, TIMEFRAME.mt5(timeframe), datetime_from_adj.timestamp(),
                        datetime_to_adj.timestamp())
        
        # only count is specified
        if all(x is None for x in [from_, to_]):
            
            if start_pos is None:
                start_pos = 0
        
            # does both start_pos & count and only count
            if timeframe == TIMEFRAME.TICKS:
                log.warning('This method can\'t work with these parameters yet')
                rates = None
                break
            else:
                rates = mt5.copy_rates_from_pos(symbol, TIMEFRAME.mt5(timeframe),
                                                start_pos, count)

        # some error occurred
        if rates is not None:
            log.debug(f'mt5.copy_rates() returned None. {mt5.last_error()=}')
            continue

        if len(rates) > 0:
            break
        time.sleep(0.1)
    
    cols_ticks = ['time', 'bid', 'ask']
    cols = ['time', 'open', 'high', 'low', 'close']
    right_cols = [cols if timeframe != TIMEFRAME.TICKS else cols_ticks]
    
    if rates is not None and len(rates) > 0: log.debug(f'Successfully loaded {len(rates)=}')
    else: log.warning('no rates found, returning Rates obj with rates param equal to None')
    
    return Rates(symbol, mt5_date_to_utc(rates), timeframe, 
                 len(rates) if rates is not None else 0)

#### testing get_rates()

In [158]:
loc = lambda date: pytz.utc.localize(date) 

check_time  = [loc(datetime(2021, 10, 21, 11)),
               loc(datetime(2021, 9, 23, 11)),
               loc(datetime(2021, 5, 10, 6)),
               loc(datetime(2021, 1, 26, 15)),
               loc(datetime(2020, 10, 6, 15)),
               loc(datetime(2022, 2, 3, 12))]

check_open  = [157.382, 150.303, 152.826, 142.170, 136.993, 155.775]
check_high  = [157.721, 150.960, 153.315, 142.456, 137.054, 156.331]
check_low   = [157.310, 150.154, 152.816, 142.135, 136.705, 155.538]
check_close = [157.688, 150.940, 153.264, 142.429, 136.740, 156.180]

H1 = TIMEFRAME.H1
H4 = TIMEFRAME.H4
M5 = TIMEFRAME.M5

tf = [H1, H1, H1, H1, H1, M5]
dst = [1, 1, 1, 0, 1, 0]

frame = pd.DataFrame({'time': check_time,
                      'open': check_open,
                      'high': check_high,
                      'low': check_low,
                      'close': check_close,
                      'timeframe': tf,
                      'dst': dst})
frame

Unnamed: 0,time,open,high,low,close,timeframe,dst
0,2021-10-21 11:00:00+00:00,157.382,157.721,157.31,157.688,60,1
1,2021-09-23 11:00:00+00:00,150.303,150.96,150.154,150.94,60,1
2,2021-05-10 06:00:00+00:00,152.826,153.315,152.816,153.264,60,1
3,2021-01-26 15:00:00+00:00,142.17,142.456,142.135,142.429,60,0
4,2020-10-06 15:00:00+00:00,136.993,137.054,136.705,136.74,60,1
5,2022-02-03 12:00:00+00:00,155.775,156.331,155.538,156.18,5,0


In [159]:
with conn:
    result = []
    for row in frame.itertuples():
        # count (backwards) THIS IS CORRECT
        # datetime_to THIS IS ALSO CORRECT
        # TICKS
        r = get_rates('GBPJPY', TIMEFRAME.M1, from_=row.time, 
                      to_=row.time+timedelta(minutes=int(row.timeframe)),
                      include_first=True, include_last=False)
        
        '''
        r = mt5.copy_ticks_range('GBPJPY', datetime_from=row.time.timestamp(), 
                                 datetime_to=(
                                     row.time+timedelta(
                                         seconds=mt5.period_seconds(
                                             row.timeframe))).timestamp(), flags=mt5.COPY_TICKS_INFO)
        
        r = pd.DataFrame(r)'''
        r = r.df
        f = {}
        try:
            f['time']  = [r['time'].iloc[0]]
            f['open']  = [r['open'].iloc[0]]
            f['high']  = [r['high'].max()]
            f['low']   = [r['low'].min()]
            f['close'] = [r['close'].iloc[-1]]
            f['dst'] = bool(row.dst)
            f['var'] = ( (f['open'][0]-row.open) +
                         (f['high'][0]-row.high) +
                         (f['low'][0]-row.low)   +
                         (f['close'][0]-row.close) ) * 100 / 4
            
            result.append(pd.DataFrame(f))
        except IndexError: # there no tick data yet
            pass

pd.concat(result)

Unnamed: 0,time,open,high,low,close,dst,var
0,2021-10-21 10:59:00+00:00,157.37,157.719,157.308,157.686,True,-0.45
0,2021-09-23 10:59:00+00:00,150.297,150.959,150.16,150.936,True,-0.125
0,2021-05-10 05:59:00+00:00,152.826,153.315,152.816,153.26,True,-0.1
0,2021-01-26 14:59:00+00:00,142.148,142.456,142.131,142.428,False,-0.675
0,2020-10-06 14:59:00+00:00,136.981,137.054,136.705,136.74,True,-0.3
0,2022-02-03 11:59:00+00:00,155.788,156.327,155.552,156.184,False,0.675


### Backtest()

In [29]:
class Backtest():
    
    def __init__(self,
                 signals_df: pd.DataFrame,
                 n: slice, # 
                 override_tp=None,
                 override_be=None,
                 sl_coefficient=1,
                 partials_array=[1],
                 precision=mt5.TIMEFRAME.M1,
                 end_of_period='day',
                 end_of_day='18:30'):
        

IndentationError: expected an indented block (2450419085.py, line 13)

# Research

### Setup

In [20]:
msg = clean_json('results_daniel.json')
signals_all = get_signals_from_msgs(msg)

# only keep market orders and limits
signals_df = signals_all[((signals_all['flag'] == 'limit') | (signals_all['flag'] == 'market'))]

## Backtest

In [28]:
logging.getLogger('signals_manipulation').setLevel(logging.INFO)
logging.getLogger('date_manipulation').setLevel(logging.INFO)
logging.getLogger('find_hit').setLevel(logging.INFO)
logging.getLogger('backtest').setLevel(logging.INFO)

with conn:
    df = backtest(signals_df,
                  n=slice(0,300),
                  override_tp=None,
                  override_be=None,
                  sl_coefficient=1,
                  partials_array=[1],
                  precision=mt5.TIMEFRAME.M1,
                  end_of_period='day',
                  end_of_day='18:30')

Output()

IndexError: list index out of range

In [None]:
df

In [None]:
df['cumR'] = df['result_R'].cumsum()
fig = px.scatter(df, x='activated', y=['result_R'])
fig.show()

## Analysis
**What do we need to consider?**

<u>Overview
- Trades (trades/signals)
     - di cui limiti (limits/trades)
         - di cui eseguiti a mercato (mkt. limits/limits)
- Winrate (wins/trades)
- Vincita media (pips/R)
- Perdita media (pips/R)
- Longs vinti (won/longs)
- Shorts vinti (won/shorts)
- Breakeven (be/trades)
    - che avrebbero vinto (h. wins/be)
    - che avrebbero perso (h. losses/be)
- Saldo/mese (pips/R)
- Miglior trade (pips/R)
- Peggior trade (pips/R)
- Durata media trade (hh:mm)
- Coefficiente Profitto
- Dev. std. vittorie (pips/R)
- Z-Score
- Expectancy (pips/R)
- AHPR
- GHPR
- Maximum drawdown

<u>Per TP (colonne)
- Winrate
- Risultato medio (pips/R)
- Coefficiente (wr*R)
    
<u>Altri
- distribution of R_min in winning trades by tp
- distribution of R_max in losing trades
- distribution winrate per weekday
- streak analysis
- trade matrix?

### get_results()

In [None]:
def determine_trade_length(row):
    
    start = row['activated']
    
    if row['tp0_close']:
        return row['tp0_hit'] - start
    if row['sl_close']:
        return row['sl_hit'] - start
    if row['be_close']:
        return row['be_hit'] - start
    return start.replace(hour=18, minute=30) - start

    
def get_results(sig_df: pd.DataFrame):
    
    # setting up the table
    overview = Table(Column(header="Descrizione", justify="left", ratio=3),
                     Column(header="Valore", justify="right", no_wrap=True, ratio=2), 
                     box=box.SIMPLE, expand=True)
    
    '''the number of trades is calculating taking the n. of trades that activated and subtracting 
    those that activated after any closing condition was hit'''
    
    # trades
    close_conditions = ['sl_hit', 'tp0_hit', 'be_hit']
    entry_hit = pd.notna(sig_df['true_entry'])
    df = sig_df[entry_hit & before('activated', than=close_conditions, df=sig_df)]
    trades = df.shape[0]
    value = f'({trades}/{sig_df.shape[0]}) {trades/sig_df.shape[0]:.1%}'
    overview.add_row('1.   Trades', value)
    
    # fixes keyerror
    if 'be_hit' not in df.columns:
        df['be_hit'] = pd.NA
        
    # add key info to dataframe to make the rest of the calulations easier
    df['tp0_close'] = before('tp0_hit', than=['sl_hit', 'be_hit'], df=df)
    df['sl_close'] = before('sl_hit', than=['tp0_hit', 'be_hit'], df=df)
    df['eop_close'] = pd.isna(df['sl_hit']) & pd.isna(df['tp0_hit']) & pd.isna(df['be_hit'])
    df['be_close'] = before('be_hit', than=['tp0_hit', 'sl_hit'], df=df)
    df['tp0_pips'] = abs(df['tp'].str[0] - df['true_entry']) / (df['trade_tick_size']*10)
    df['sl_pips'] = -abs(df['sl'] - df['true_entry']) / (df['trade_tick_size']*10)
    df['eop_pips'] = df['eopR']*abs(df['true_entry']-df['sl']) / (df['trade_tick_size']*10)
    df['won'] = ( df['tp0_close'] | (df['eop_close'] & df['eopR']>0) )
    
    # di cui limiti
    only_lims = df[df['flag']=='limit']
    lims = only_lims.shape[0]
    value = f'({lims}/{trades}) {lims/trades:.1%}'
    overview.add_row('1.1  Limiti', value)
    
    # di cui a mercato
    only_lims = df[df['flag']=='limit']
    lims = only_lims.shape[0]
    value = f'({lims}/{trades}) {lims/trades:.1%}'
    overview.add_row('1.2  Limiti eseguiti a mercato (TO-DO)', value)
    
    # winrate (excluding bes)
    wins = df['won'].sum()
    bes = df['be_close'].sum()
    value = f'({wins}/{trades-bes}) [b][u]{(wins)/(trades-bes):.1%}'
    overview.add_row('2.   [u b]Winrate', value)
    
    # longs vinti
    longs_won = df[df['won'] & (df['side']=='buy')].shape[0]
    longs = df[(df['side']=='buy')].shape[0]
    value = f'({longs_won}/{longs}) {longs_won/longs:.1%}'
    overview.add_row('3.   Longs vinti', value)
    
    # shorts vinti
    sells_won = df[df['won'] & (df['side']=='sell')].shape[0]
    sells = df[(df['side']=='sell')].shape[0]
    try: value = f'({sells_won}/{sells}) {sells_won/sells:.1%}'
    except ZeroDivisionError: value = f'({sells_won}/{sells}) -%'
    overview.add_row('4.   Shorts vinti', value)
    
    # breakeven
    value = f'({bes}/{trades}) {bes/trades:.1%}'
    overview.add_row('5.   Breakeven', value)
    
    # che avrebbero vinto TO-CHECK
    bes_won = df[df['be_close'] & before('tp0_hit', than='sl_hit', df=df)].shape[0]
    if bes==0: value = f'({bes_won}/{bes}) -%'
    else: value = f'({bes_won}/{bes}) {bes_won/bes:.1%}' 
    overview.add_row('5.1  Breakeven che avrebbero vinto', value)
    
    # che avrebbero perso TO-CHECK
    bes_lost = df[df['be_close'] & before('sl_hit', than='tp0_hit', df=df)].shape[0]
    if bes==0: value = f'({bes_won}/{bes}) -%'
    else: value = f'({bes_lost}/{bes}) {bes_lost/bes:.1%}'
    overview.add_row('5.2  Breakeven che avrebbero perso', value)
    
    # vincita media
    pips_won_tot = ( df.loc[df['tp0_close'], 'tp0_pips'].sum() +
                     df.loc[df['eop_close'] & df['eopR']>0, 'eop_pips'].sum() )
    
    R_won_tot = ( df.loc[df['tp0_close'], 'R_tp0'].sum() +
                  df.loc[df['eop_close'] & df['eopR']>0, 'eopR'].sum() )
    
    value = f'{pips_won_tot/wins:.1f} pips / [b u]{R_won_tot/wins:.2f}R'
    overview.add_row('6.   [b u]Vincita media', value)
    
    # perdita media
    losses = trades-wins
    pips_lost_tot = ( df.loc[df['sl_close'], 'sl_pips'].sum() +
                      df.loc[df['eop_close'] & df['eopR']<0, 'eop_pips'].sum() )
    
    R_lost_tot = ( -df[df['sl_close']].shape[0] -
                    df.loc[df['eop_close'] & df['eopR']<0, 'eopR'].sum() )
    
    value = f'(-) {-pips_lost_tot/losses:.1f} pips / {-R_lost_tot/losses:.2f}R'
    overview.add_row('7.   Perdita media', value)
    
    # saldo totale
    balance_pips = pips_won_tot + pips_lost_tot
    balance_R = R_won_tot + R_lost_tot
    value = f'{balance_pips:.1f} pips / {balance_R:.2f}R'
    overview.add_row('8.   Saldo totale', value)
    
    # saldo mensilet
    timespan = df['time'].iloc[-1] - df['time'].iloc[0]
    mo_factor = 3600*24*30/timespan.total_seconds()
    value = f'{balance_pips*mo_factor:.1f} pips / [b u]{balance_R*mo_factor:.2f}R'
    overview.add_row('8.1  [b u]Saldo mensile', value)
    
    # miglior trade (tp0)
    best_trade_pips = df.loc[df['tp0_close'], 'tp0_pips'].max()
    best_trade_R = df.loc[df['tp0_close'], 'R_tp0'].max()
    value = f'{best_trade_pips:.1f} pips / {best_trade_R:.2f}R'
    overview.add_row('9.   Miglior trade', value)
    
    # peggior trade
    worst_trade_pips = -df.loc[df['sl_close'], 'sl_pips'].min()
    worst_trade_R = 1 # might wanna fix this
    value = f'(-) {worst_trade_pips:.1f} pips / {worst_trade_R:.2f}R'
    overview.add_row('10.  Peggior trade', value)
    
    # expectancy
    exp_pips = ((pips_won_tot/wins)*(wins/(trades-bes)))+((pips_lost_tot/losses)*(1-(wins/(trades-bes))))
    exp_R = ((R_won_tot/wins)*(wins/(trades-bes)))+((R_lost_tot/losses)*(1-(wins/(trades-bes))))
    value = f'{exp_pips:.1f} pips / [b u]{exp_R:.2f}R'
    overview.add_row('11.  [b u]Expectancy', value)
    
    # durata media trade
    df['trade_length'] = df.apply(lambda x: determine_trade_length(x), axis=1)
    avg_trade_length = df['trade_length'].mean()
    value = f'{str(timedelta(seconds=int(avg_trade_length.total_seconds())))}'
    overview.add_row('12.  Durata media trade', value)
    
    #display(df)
    console.print(overview)

In [None]:
get_results(df)

### Results

In [None]:
get_results(df)

In [None]:
fig = px.bar(df, y='loaded_ticks')
fig.show()

In [None]:
data = df[(df['R_min_tp0']>-1) & (df['R_min_tp0']<0)]
fig = px.histogram(df['R_min_tp0'], x="R_min_tp0", nbins=50, marginal="box")
fig.show()

In [None]:
data = df[pd.isnull(df['sl_hit']) & pd.isnull(df['tp0_hit'])]
fig = px.histogram(data, x="eopR", nbins=50, marginal="box", histnorm='percent')
fig.show()