In [1]:
%load_ext line_profiler
import pandas as pd
import json
import numpy as np
import warnings
from typing import Optional, List, Union, Callable
from datetime import datetime, timedelta
import pytz
warnings.filterwarnings("ignore")
import cufflinks as cf
cf.set_config_file(offline=True, offline_show_link=False, theme='white', sharing='public', colorscale='ggplot')

In [2]:
with open('../input/main-test-daniel-dataset-json/test.json', 'r', encoding='utf-8') as f:
    data = json.load(f)

In [3]:
def plot_pos(p, sl=None, tp=None, hline=None):
    if tp is None:
        tp = get_position_tps(p)[0]
    if sl is None:
        sl = p['sl']['price']
        
    open_ = pd.to_datetime(p['entry']['execution']['time'])
    close_ = open_ + timedelta(hours=1)
    
    sl_shape = cf.tools.get_shape(x0=open_, x1=close_, yref='close', y0=sl, y1=p['entry']['price'], kind='rect', fill=True, opacity=0.3, color='red', width=0.1)
    tp_shape = cf.tools.get_shape(x0=open_, x1=close_, yref='close', y0=tp, y1=p['entry']['price'], kind='rect', fill=True, opacity=0.3, color='green', width=0.1)
    shapes = [s for s in [sl_shape, tp_shape, hline] if s is not None]
    return p['rates'].set_index(pd.to_datetime(p['rates']['time'])).iplot(kind='candle', shapes=shapes, width=0.1)

In [4]:
def _candle_mean(candle: Union[pd.Series, dict]) -> float:
    avg = candle["low"] + (candle["high"] - candle["low"]) / 2
    return round(avg, 6)

In [5]:
def _get_executed_orders(p: dict, ignore: List[str] = None):
    
    if ignore is None:
        ignore = list()
        
    executed = [o for o in p['orders'] if o.get('execution') != dict()]
    sorted_executed = sorted(executed, key=lambda x: pd.to_datetime(x['execution']['time']))
    return [e for e in sorted_executed if e['name'] not in ignore]

In [6]:
def _determine_position_result(p: dict, partials: List[float], ignore: List[str]):
    
    SIDE = 1 if p['side'].upper() == 'BUY' else -1
    
    # get events (orders) in execution order
    events = _get_executed_orders(p, ignore=ignore)
    
    # makes the rates attr back into a df
    p['rates'] = pd.DataFrame(p['rates'])

    if len(events) == 0:
        last_candle = p['rates'].iloc[-1]
        r = SIDE * (_candle_mean(last_candle) - p['entry']['execution']['price']) / p['sl_delta']
        return (pd.to_datetime(last_candle.time, utc=True), r, 'EOP')

    result = 0
    for i, partial in enumerate(partials):

        if i > len(events) - 1:
            break

        if i == 0:
            result_type = events[i]['name']

        close = pd.to_datetime(events[i]['execution']['time'], utc=True)
        r = SIDE * (events[i]['execution']['price'] - p['entry']['execution']['price']) / p['sl_delta']

        # Closing events
        if events[i]['name'].upper() in ['SL', 'SL_TO_BE']:
            # if it's the first event record the loss,
            # otherwise i'm assuming the second losing 
            # partial is always at breakeven
            if i == 0:
                result = r
            break
        elif events[i]['name'].upper() == ['CLOSE']:
            result = r * (1 - sum(partials[0:i]))
            break

        else: result += r * partial

    return (close, result, result_type)

In [7]:
def get_position_tps(p):
    return sorted(set([o['price'] for o in p['orders'] if o['name'].upper() == 'TP']))

In [8]:
def find_best_sl(p, for_tp=0, tp=None):
    '''Tries to find the SL price at which "for_tp" wouldve been hit.
    Returns None if the direction was wrong (tp wouldve never been hit)
    Returns the new SL price expressed as % of the size of the current one'''
    
    if not isinstance(p['rates'], pd.DataFrame):
        p['rates'] = pd.DataFrame(p['rates'])

    if tp is None:
        try:
            tp = get_position_tps(p)[for_tp]
        except AttributeError:
            # if for_tp is not there, pick last tp
            tp = get_position_tps(p)[-1]
        
    bound = p['rates']
    SIDE = 1 if p['side'].upper() == 'BUY' else -1
    spread = 20 * p['symbol']['info']['trade_tick_size'] # let's just say 2 pips
    entry_hit = p['entry']['execution']['time']
    
    if SIDE == 1:
        tp_hit = (bound.high >= tp)
        if tp_hit.any():
            window = ~tp_hit.cumsum().astype(bool)
            new_sl = bound.loc[(bound.time > entry_hit) & window, 'low'].min() - spread
    
    elif SIDE == -1:
        tp_hit = (bound.low <= tp)
        if tp_hit.any():
            window = ~tp_hit.cumsum().astype(bool)
            new_sl = bound.loc[(bound.time > entry_hit) & window, 'high'].max() + spread
    
    
    # if there's no tp in rates that hit
    # there's no sl that could've saved the trade
    # the direction was just wrong
    try:
        if not pd.isnull(new_sl):
            return new_sl
    except UnboundLocalError:
        pass
    return None

In [9]:
def find_best_tp(p, sl=None):
    '''Tries to find the TP price at which "sl" wouldve never been hit.
    Returns None if the trade went straight into loss
    Returns the new TP price expressed in R compared to for_sl'''
    
    if not isinstance(p['rates'], pd.DataFrame):
        p['rates'] = pd.DataFrame(p['rates'])
        
    if sl is None:
        sl = p['sl']['price']
    
    bound = p['rates']
    SIDE = 1 if p['side'].upper() == 'BUY' else -1
    spread = 20 * p['symbol']['info']['trade_tick_size'] # let's just say 2 pips
    entry_hit = p['entry']['execution']['time']
    
    if SIDE == 1:      
        sl_hit_cond = (bound.low <= sl + spread/2)
        if len(sl_hit_cond) == 0:
            sl_hit_cond = True
        # this means: take the sl_hit series, which is gonna look
        # something like 0,0,0,0,1,1,1,0,1,0,0,0,1,0 and make the
        # cumsum and then turn it into bools, which is gonna make
        # it look like False,False,False,False,True,True,True,True etc.
        # then take the inverse, which is what we want (all of this
        # works because 2 is considere True after booleanization)
        window = ~sl_hit_cond.cumsum().astype(bool)
        new_tp = bound.loc[(bound.time > entry_hit) & window, 'high'].max() - spread
        
    elif SIDE == -1:
        sl_hit_cond = (bound.high >= sl - spread/2)
        if len(sl_hit_cond) == 0:
            sl_hit_cond = True
        window = ~sl_hit_cond.cumsum().astype(bool)
        new_tp = bound.loc[(bound.time > entry_hit) & window, 'low'].min() + spread
        
    # if the tp is lower than the entry that means that
    # the trade went straight to sl, so return none
    if SIDE * (new_tp - p['entry']['execution']['price']) < 0:
        return None
    return new_tp

In [10]:
def find_absolute_best_params(p):
    vec = np.arange(0.5, 2, 0.5)
    SIDE = 1 if p['side'].upper() == 'BUY' else -1
    entry = p['entry']['execution']['price']
    pairs = []
    
    better_sl = find_best_sl(p)
    for i in range(100):
        if better_sl is not None:
            better_tp = find_best_tp(p, sl=better_sl)
            if better_tp is not None:
                commit = (better_sl, better_tp)
                pairs.append(commit)
                # when the results are equal, stop iteration
                if (i != 0 and
                    abs(commit[0] - pairs[-2][0]) < 1e-7 and
                    abs(commit[1] - pairs[-2][1]) < 1e-7):
                        break
                better_sl = find_best_sl(p, tp=better_tp)
    
    # cleanup: we don't want new sl over 2x the original or less than 0.5
    pairs = [pair for pair in pairs if (abs(pair[0]-entry)/p['sl_delta'] >= 0.5)]
    if len(pairs) == 0:
        return (None, None)
    return sorted(pairs, key=lambda x: abs(x[0] - entry) / abs(x[1] - entry))[0]

In [11]:
def make_results(given: List['Position'] = None,
                 partials: List[float] = None,
                 ignore: List[str] = None,
                 cleanse: Optional[Callable] = True):

    if given is None:
        given = data[:]
    
    if partials is None:
        partials = [1]

    if ignore is None:
        ignore = list()
        
    if cleanse is True:
        def base_filter(df: pd.DataFrame) -> pd.DataFrame:
            df = df.drop(df[df.result>4].index, axis=0)
            df = df.drop(df[df.sl_pips>50].index, axis=0)
            df = df.drop(df[df.sl_pips<5].index, axis=0)
            df = df.drop(df.loc['2021-12-22'].index, axis=0) # outlier
            return df
        cleanse = base_filter

    results = list()
    for p in given:
        res = _determine_position_result(p, partials=partials, ignore=ignore)
        entry_time = p['entry']['execution']['time']
        entry = p['entry']['execution']['price']
        if not pd.api.types.is_datetime64_any_dtype(p['rates']['time'].dtype):
            p['rates']['time'] = pd.to_datetime(p['rates']['time'])
        if not isinstance(p['entry']['execution']['time'], datetime):
            p['entry']['execution']['time'] = datetime.fromisoformat(entry_time)
        open_ = pd.to_datetime(p['time'], utc=True)
        close_ = pd.to_datetime(res[0], utc=True)
        # finds better sl for tp0
        better_sl = find_best_sl(p, for_tp=0)
        # finds better tp for current sl
        better_tp = find_best_tp(p)
        # absolute best R possible
        x = find_absolute_best_params(p)
        try: # find_absolute_best_params can return (None, None)
            best_possible_R = abs(x[0] - entry) / abs(x[1] - entry)
        except TypeError:
            best_possible_R = None
        results.append(dict(open=open_,
                            close=close_,
                            symbol=p['symbol']['name'],
                            side=p['side'],
                            sl_pips=p['sl_delta']/p['symbol']['info']['trade_tick_size']/10,
                            result=float(res[1]),
                            type=str(res[2]),
                            better_sl=abs(better_sl-entry)/p['sl_delta'] if better_sl is not None else None,
                            better_tp=abs(better_tp-entry)/p['sl_delta'] if better_tp is not None else None,
                            best_possible_R=best_possible_R))

    df = pd.DataFrame(results).set_index('open', drop=True).sort_index()
    if cleanse is not None:
        df = cleanse(df)
    
    return df

In [12]:
i = 9
display(_determine_position_result(data[i], partials=[1], ignore=[]))
display(f'best_sl={find_best_sl(data[i])}')
display(f'best_tp={find_best_tp(data[i])}')
display(f'best_params={find_absolute_best_params(data[i])}')
plot_pos(data[i])

(Timestamp('2020-12-01 13:46:00+0000', tz='UTC'), 1.5324675324674675, 'TP')

'best_sl=None'

'best_tp=nan'

'best_params=(None, None)'

In [13]:
%lprun -f find_absolute_best_params make_results()