In [1]:
import sys
sys.path.append("../")

In [2]:
import pandas as pd
import datetime as dt
from technical.indicators import rsi
from technical.patterns import apply_patterns

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [3]:
BUY = 1
SELL = -1
NONE = 0
RSI_LIMIT = 50.0

def apply_signal(row):
    if row.ENGULFING == True:
        if row.direction == BUY and row.mid_l > row.EMA_200:
            if row.RSI_14 > RSI_LIMIT:
                return BUY
        if row.direction == SELL and row.mid_h < row.EMA_200:
            if row.RSI_14 < RSI_LIMIT:
                return SELL
    return NONE     
    
def apply_take_profit(row, PROFIT_FACTOR):
    if row.SIGNAL != NONE:
        if row.SIGNAL == BUY:
            if row.direction == BUY:
                return (row.ask_c - row.ask_o) * PROFIT_FACTOR + row.ask_c
            else:
                return (row.ask_o - row.ask_c) * PROFIT_FACTOR + row.ask_o
        else:
            if row.direction == SELL:
                return (row.bid_c - row.bid_o) * PROFIT_FACTOR + row.bid_c
            else:
                return (row.bid_o - row.bid_c) * PROFIT_FACTOR + row.bid_o
    else:
        return 0.0


def apply_stop_loss(row):
    if row.SIGNAL != NONE:
        if row.SIGNAL == BUY:
            if row.direction == BUY:
                return row.ask_o
            else:
                return row.ask_c
        else:
            if row.direction == SELL:
                return row.bid_o
            else:
                return row.bid_c
    else:
        return 0.0

In [4]:
class Trade:
    def __init__(self, row, profit_factor, loss_factor):
        self.running = True
        self.start_index_m5 = row.name
        self.profit_factor = profit_factor
        self.loss_factor = loss_factor
        
        if row.SIGNAL == BUY:
            self.start_price = row.start_price_BUY
            self.trigger_price = row.start_price_BUY
            
        if row.SIGNAL == SELL:
            self.start_price = row.start_price_SELL
            self.trigger_price = row.start_price_SELL
            
        self.SIGNAL = row.SIGNAL
        self.TP = row.TP
        self.SL = row.SL
        self.result = 0.0
        self.end_time = row.time
        self.start_time = row.time
        
    def close_trade(self, row, result, trigger_price):
        self.running = False
        self.result = result
        self.end_time = row.time
        self.trigger_price = trigger_price
        
    def update(self, row):
        if self.SIGNAL == BUY:
            if row.bid_h >= self.TP:
                self.close_trade(row, self.profit_factor, row.bid_h)
            elif row.bid_l <= self.SL:
                self.close_trade(row, self.loss_factor, row.bid_l)
        if self.SIGNAL == SELL:
            if row.ask_l <= self.TP:
                self.close_trade(row, self.profit_factor, row.ask_l)
            elif row.ask_h >= self.SL:
                self.close_trade(row, self.loss_factor, row.ask_h)   

In [5]:
def apply_signals(df, PROFIT_FACTOR, sig):
    df["SIGNAL"] = df.apply(sig, axis=1)
    df["TP"] = df.apply(apply_take_profit, axis=1, PROFIT_FACTOR=PROFIT_FACTOR)
    df["SL"] = df.apply(apply_stop_loss, axis=1)


def create_signals(df, time_d=1):
    df_signals = df[df.SIGNAL != NONE].copy() 
    df_signals['m5_start'] = [x + dt.timedelta(minutes=60 * time_d) for x in df_signals.time]
    df_signals.drop(['time', 'mid_o', 'mid_h', 'mid_l', 'bid_o', 'bid_h', 'bid_l',
    'ask_o', 'ask_h', 'ask_l', 'direction'], axis=1, inplace=True)
    df_signals.rename(columns={
        'bid_c' : 'start_price_BUY', # for tracking the trade  
        'ask_c' : 'start_price_SELL',
        'm5_start' : 'time'
    }, inplace=True)
    return df_signals


class GuruTester:
    def __init__(self, df_big,
                    apply_signal, 
                    df_m5,
                    LOSS_FACTOR = -1.0,
                    PROFIT_FACTOR = 1.5,
                    time_d=1 ):
        self.df_big = df_big.copy()
        self.apply_signal = apply_signal
        self.df_m5 = df_m5.copy()
        self.LOSS_FACTOR = LOSS_FACTOR
        self.PROFIT_FACTOR = PROFIT_FACTOR
        self.time_d = time_d

        self.prepare_data()
        
    def prepare_data(self):
        
        print("prepare_data...")

        apply_signals(self.df_big,
                    self.PROFIT_FACTOR,
                    self.apply_signal)


        df_m5_slim = self.df_m5[['time','bid_h', 'bid_l', 'ask_h', 'ask_l' ]].copy()
        df_signals = create_signals(self.df_big, time_d=self.time_d)

        self.merged = pd.merge(left=df_m5_slim, right=df_signals, on='time', how='left')
        self.merged.fillna(0, inplace=True)
        self.merged.SIGNAL = self.merged.SIGNAL.astype(int)
        # print(self.merged[self.merged.SIGNAL != NONE].copy().shape)
        # sys.exit(1)

    
    def run_test(self):
        print("run_test...")
        open_trades_m5 = []
        closed_trades_m5 = []

        for index, row in self.merged.iterrows():
            if row.SIGNAL != NONE:
                open_trades_m5.append(Trade(row, self.PROFIT_FACTOR, self.LOSS_FACTOR))  
            for ot in open_trades_m5:
                ot.update(row)
                if ot.running == False:
                    closed_trades_m5.append(ot)
            open_trades_m5 = [x for x in open_trades_m5 if x.running == True]
            
            
            
        self.df_results = pd.DataFrame.from_dict([vars(x) for x in closed_trades_m5]) 
        print("Result:", self.df_results.result.sum())

In [6]:
def run_pair(pair):
    df_an = pd.read_pickle(f"../data/candles/{pair}_H1.pkl")
    df_m5 = pd.read_pickle(f"../data/candles/{pair}_M5.pkl")
    df_an.reset_index(drop=True, inplace=True)
    df_m5.reset_index(drop=True, inplace=True)
    df_an = rsi(df_an)
    df_an = apply_patterns(df_an)
    df_an['EMA_200'] = df_an.mid_c.ewm(span=200, min_periods=200).mean()
    our_cols = ['time', 'mid_o', 'mid_h', 'mid_l', 'mid_c',
            'bid_o', 'bid_h', 'bid_l', 'bid_c', 
            'ask_o', 'ask_h', 'ask_l', 'ask_c',
            'ENGULFING', 'direction', 'EMA_200', 'RSI_14' ]
    df_slim = df_an[our_cols].copy()
    df_slim.dropna(inplace=True)
    
    df_slim.reset_index(drop=True, inplace=True)
    gt = GuruTester(
        df_slim,
        apply_signal,
        df_m5,
    )
    
    gt.run_test()
    return gt.df_results
    

In [7]:
res = []
for p in ["EUR_USD"]:
    res.append(dict(pair=p,res=run_pair(p)))

prepare_data...
run_test...
Result: -1016.0


In [8]:
for r in res:
    print(r['pair'], r['res'].result.sum())

EUR_USD -1016.0


In [9]:
# EUR_USD -1016.0
# GBP_JPY -1057.5

In [10]:
def apply_signals(df, PROFIT_FACTOR, sig):
    df["SIGNAL"] = df.apply(sig, axis=1)
    df["TP"] = df.apply(apply_take_profit, axis=1, PROFIT_FACTOR=PROFIT_FACTOR)
    df["SL"] = df.apply(apply_stop_loss, axis=1)


def create_signals(df, time_d=1):
    df_signals = df[df.SIGNAL != NONE].copy() 
    df_signals['m5_start'] = [x + dt.timedelta(minutes=55) for x in df_signals.time]
    df_signals.drop(['time', 'mid_o', 'mid_h', 'mid_l', 'bid_o', 'bid_h', 'bid_l',
    'ask_o', 'ask_h', 'ask_l', 'direction'], axis=1, inplace=True)
    df_signals.rename(columns={
        'bid_c' : 'start_price_BUY', # for tracking the trade  
        'ask_c' : 'start_price_SELL',
        'm5_start' : 'time'
    }, inplace=True)
    return df_signals


class GuruTester:
    def __init__(self, df_big,
                    apply_signal, 
                    df_m5,
                    LOSS_FACTOR = -1.0,
                    PROFIT_FACTOR = 1.5,
                    time_d=1 ):
        self.df_big = df_big.copy()
        self.apply_signal = apply_signal
        self.df_m5 = df_m5.copy()
        self.LOSS_FACTOR = LOSS_FACTOR
        self.PROFIT_FACTOR = PROFIT_FACTOR
        self.time_d = time_d

        self.prepare_data()
        
    def prepare_data(self):
        
        print("prepare_data...")

        apply_signals(self.df_big,
                    self.PROFIT_FACTOR,
                    self.apply_signal)


        df_m5_slim = self.df_m5[['time','bid_h', 'bid_l', 'ask_h', 'ask_l' ]].copy()
        df_signals = create_signals(self.df_big, time_d=self.time_d)

        self.merged = pd.merge(left=df_m5_slim, right=df_signals, on='time', how='left')
        self.merged.fillna(0, inplace=True)
        self.merged.SIGNAL = self.merged.SIGNAL.astype(int)
        # print(self.merged[self.merged.SIGNAL != NONE].copy().shape)
        # sys.exit(1)

    
    def run_test(self):
        print("run_test...")
        open_trades_m5 = []
        closed_trades_m5 = []

        for index, row in self.merged.iterrows():
            for ot in open_trades_m5:
                ot.update(row)
                if ot.running == False:
                    closed_trades_m5.append(ot)
            open_trades_m5 = [x for x in open_trades_m5 if x.running == True]
            if row.SIGNAL != NONE:
                open_trades_m5.append(Trade(row, self.PROFIT_FACTOR, self.LOSS_FACTOR))  
            
            
            
        self.df_results = pd.DataFrame.from_dict([vars(x) for x in closed_trades_m5]) 
        print("Result:", self.df_results.result.sum())

def run_pair(pair):
    df_an = pd.read_pickle(f"../data/candles/{pair}_H1.pkl")
    df_m5 = pd.read_pickle(f"../data/candles/{pair}_M5.pkl")
    df_an.reset_index(drop=True, inplace=True)
    df_m5.reset_index(drop=True, inplace=True)
    df_an = rsi(df_an)
    df_an = apply_patterns(df_an)
    df_an['EMA_200'] = df_an.mid_c.ewm(span=200, min_periods=200).mean()
    our_cols = ['time', 'mid_o', 'mid_h', 'mid_l', 'mid_c',
            'bid_o', 'bid_h', 'bid_l', 'bid_c', 
            'ask_o', 'ask_h', 'ask_l', 'ask_c',
            'ENGULFING', 'direction', 'EMA_200', 'RSI_14' ]
    df_slim = df_an[our_cols].copy()
    df_slim.dropna(inplace=True)
    
    df_slim.reset_index(drop=True, inplace=True)
    gt = GuruTester(
        df_slim,
        apply_signal,
        df_m5,
    )
    
    gt.run_test()
    return gt.df_results


sec_res = []
for p in ["EUR_USD" ]:
    sec_res.append(dict(pair=p,res=run_pair(p)))


prepare_data...
run_test...
Result: -1030.0


In [11]:
sec_res[0]['res'].head(2)

Unnamed: 0,running,start_index_m5,profit_factor,loss_factor,start_price,trigger_price,SIGNAL,TP,SL,result,end_time,start_time
0,False,2479,1.5,-1.0,1.21375,1.2166,1,1.216565,1.21209,1.5,2018-01-12 18:55:00+00:00,2018-01-12 14:55:00+00:00
1,False,2635,1.5,-1.0,1.21994,1.22106,1,1.220965,1.21949,1.5,2018-01-15 04:55:00+00:00,2018-01-15 03:55:00+00:00


In [12]:
res[0]['res'].head(2)

Unnamed: 0,running,start_index_m5,profit_factor,loss_factor,start_price,trigger_price,SIGNAL,TP,SL,result,end_time,start_time
0,False,2480,1.5,-1.0,1.21375,1.2166,1,1.216565,1.21209,1.5,2018-01-12 18:55:00+00:00,2018-01-12 15:00:00+00:00
1,False,2636,1.5,-1.0,1.21994,1.22106,1,1.220965,1.21949,1.5,2018-01-15 04:55:00+00:00,2018-01-15 04:00:00+00:00


In [13]:
print(res[0]['res'].shape,  sec_res[0]['res'].shape)

(3391, 12) (3410, 12)


In [16]:
df_m5 = pd.read_pickle(f"../data/candles/EUR_USD_M5.pkl")
df_m5.iloc[126268: 126271]

Unnamed: 0,time,volume,mid_o,mid_h,mid_l,mid_c,bid_o,bid_h,bid_l,bid_c,ask_o,ask_h,ask_l,ask_c
126268,2019-09-11 21:50:00+00:00,2,1.10092,1.10092,1.1009,1.1009,1.10083,1.10083,1.1008,1.1008,1.10102,1.10102,1.10101,1.10101
126269,2019-09-11 22:00:00+00:00,10,1.10092,1.101,1.10092,1.101,1.10083,1.10093,1.10083,1.10091,1.10102,1.10109,1.10102,1.10109
126270,2019-09-11 22:05:00+00:00,9,1.10098,1.10105,1.10096,1.10098,1.1009,1.10096,1.10087,1.10089,1.10106,1.10114,1.10104,1.10107


In [15]:
pd.concat([res[0]['res'], sec_res[0]['res']]).drop_duplicates(keep=False, subset=res[0]['res'].columns.difference(['start_index_m5', 'start_time', 'end_time']))

Unnamed: 0,running,start_index_m5,profit_factor,loss_factor,start_price,trigger_price,SIGNAL,TP,SL,result,end_time,start_time
886,False,126269,1.5,-1.0,1.10101,1.10109,-1,1.10071,1.10086,-1.0,2019-09-11 22:00:00+00:00,2019-09-11 22:00:00+00:00
1540,False,210689,1.5,-1.0,1.16458,1.16465,-1,1.163785,1.16461,-1.0,2020-11-01 23:00:00+00:00,2020-11-01 23:00:00+00:00
2069,False,281503,1.5,-1.0,1.15964,1.15961,1,1.16027,1.15987,-1.0,2021-10-14 22:00:00+00:00,2021-10-14 22:00:00+00:00
255,False,38513,1.5,-1.0,1.17427,1.17395,1,1.175755,1.17428,-1.0,2018-07-08 21:00:00+00:00,2018-07-06 20:55:00+00:00
383,False,55581,1.5,-1.0,1.16447,1.1648,-1,1.163755,1.16473,-1.0,2018-09-28 05:35:00+00:00,2018-09-28 04:55:00+00:00
660,False,94010,1.5,-1.0,1.12212,1.12232,-1,1.12016,1.12176,-1.0,2019-04-07 21:00:00+00:00,2019-04-05 20:55:00+00:00
672,False,96642,1.5,-1.0,1.12373,1.12417,-1,1.12285,1.12405,-1.0,2019-04-19 05:00:00+00:00,2019-04-19 02:55:00+00:00
708,False,102567,1.5,-1.0,1.116,1.11649,-1,1.114745,1.11597,-1.0,2019-05-19 21:00:00+00:00,2019-05-17 20:55:00+00:00
720,False,104074,1.5,-1.0,1.12134,1.1211,1,1.12188,1.12118,-1.0,2019-05-27 04:35:00+00:00,2019-05-27 03:55:00+00:00
990,False,141067,1.5,-1.0,1.10236,1.10228,-1,1.10063,1.10223,-1.0,2019-11-24 22:20:00+00:00,2019-11-22 21:55:00+00:00


In [None]:
# res[0]['res'].drop(columns=['start_index_m5', 'start_time', 'end_time'], axis=1, inplace=True)
# sec_res[0]['res'].drop(columns=['start_index_m5', 'start_time', 'end_time'], axis=1, inplace=True)

In [17]:
print(res[0]['res'].shape,  sec_res[0]['res'].shape)

(3391, 12) (3410, 12)


In [17]:
3413 - 3391

22