In [1]:
import os
import sys
root_dir = os.path.abspath(os.path.join(os.path.dirname('../pruebillas.ipynb'), '..'))
os.chdir(root_dir)

sys.path.insert(0, os.path.join(root_dir, 'src'))

In [2]:
from backtesting import Backtest, Strategy
from backtesting.lib import crossover, plot_heatmaps, resample_apply, barssince
import pandas as pd
import talib as ta
import pandas_ta as pandas_ta
import numpy as np

from backbone.utils.general_purpose import diff_pips

In [4]:
# symbols_path = './backbone/data/backtest/symbols/USDCAD.csv'
# df = pd.read_csv(symbols_path)
# df = df[['Date','Open','High','Low','Close', 'Volume']]


from datetime import datetime
import MetaTrader5 as mt5
import pytz

print("MetaTrader5 package author: ", mt5.__author__)
print("MetaTrader5 package version: ", mt5.__version__)

# establish connection to MetaTrader 5 terminal
if not mt5.initialize():
    raise Exception("initialize() failed, error code =",mt5.last_error())

# set time zone to UTC
timezone = pytz.timezone("Etc/UTC")

# create 'datetime' objects in UTC time zone to avoid the implementation of a local time zone offset
utc_from = datetime(2023, 5, 1, tzinfo=timezone)
utc_to = datetime(2024, 1, 1, tzinfo=timezone)
rates = mt5.copy_rates_range('USDCAD', mt5.TIMEFRAME_M5, utc_from, utc_to)

mt5.shutdown()

# create DataFrame out of the obtained data
df = pd.DataFrame(rates)

# convert time in seconds into the datetime format
df['time'] = pd.to_datetime(df['time'], unit='s')
                          
df = df.rename(columns={
  'time':'Date', 
  'open':'Open', 
  'high':'High', 
  'low':'Low', 
  'close':'Close', 
  'tick_volume':'Volume'
})

df

MetaTrader5 package author:  MetaQuotes Ltd.
MetaTrader5 package version:  5.0.4288


Unnamed: 0,Date,Open,High,Low,Close,Volume,spread,real_volume
0,2023-05-01 00:00:00,1.35321,1.35326,1.35321,1.35326,4,166,0
1,2023-05-01 00:05:00,1.35330,1.35345,1.35329,1.35345,6,154,0
2,2023-05-01 00:10:00,1.35345,1.35366,1.35345,1.35366,17,128,0
3,2023-05-01 00:15:00,1.35396,1.35405,1.35358,1.35367,41,78,0
4,2023-05-01 00:20:00,1.35373,1.35424,1.35365,1.35389,85,55,0
...,...,...,...,...,...,...,...,...
50042,2023-12-29 23:35:00,1.32527,1.32527,1.32499,1.32517,163,9,0
50043,2023-12-29 23:40:00,1.32517,1.32520,1.32505,1.32513,72,12,0
50044,2023-12-29 23:45:00,1.32511,1.32537,1.32501,1.32504,191,13,0
50045,2023-12-29 23:50:00,1.32502,1.32527,1.32496,1.32525,255,14,0


In [5]:
import numpy as np
import pandas as pd

class RMITrendSniper:
    def __init__(self, length=14, positive_above=66, negative_below=30, ema_length=5):
        self.length = length
        self.positive_above = positive_above
        self.negative_below = negative_below
        self.ema_length = ema_length
    
    def calculate_rmi(self, close, high, low, volume):
        # Calcula el RSI
        delta = close.diff()
        gain = np.where(delta > 0, delta, 0)
        loss = np.where(delta < 0, -delta, 0)
        avg_gain = pd.Series(gain).rolling(window=self.length, min_periods=1).mean()
        avg_loss = pd.Series(loss).rolling(window=self.length, min_periods=1).mean()
        rs = avg_gain / avg_loss
        rsi = 100 - (100 / (1 + rs))
        
        # Calcula el MFI (Money Flow Index)
        typical_price = (high + low + close) / 3
        money_flow = typical_price * volume
        pos_flow = np.where(typical_price > typical_price.shift(1), money_flow, 0)
        neg_flow = np.where(typical_price < typical_price.shift(1), money_flow, 0)
        pos_mf = pd.Series(pos_flow).rolling(window=self.length, min_periods=1).sum()
        neg_mf = pd.Series(neg_flow).rolling(window=self.length, min_periods=1).sum()
        mfi = 100 - (100 / (1 + pos_mf / neg_mf))
        
        # Calcula RMI-MFI
        rmi_mfi = (rsi + mfi) / 2
        
        return rmi_mfi
    
    def calculate_signals(self, close, high, low, volume):
        rmi_mfi = self.calculate_rmi(close, high, low, volume)
        ema = close.ewm(span=self.ema_length, adjust=False).mean()
        ema_change = ema.diff()

        p_mom = (rmi_mfi.shift(1) < self.positive_above) & (rmi_mfi > self.positive_above) & (rmi_mfi > self.negative_below) & (ema_change > 0)
        n_mom = (rmi_mfi < self.negative_below) & (ema_change < 0)
        
        
        # Definir las condiciones de momento positivo y negativo
        signal = pd.Series(0, index=close.index)
        signal.loc[p_mom] = 1
        signal.loc[n_mom] = -1
        
        return signal, rmi_mfi
    
    def apply(self, close, high, low, volume):
        signals, rmi_mfi = self.calculate_signals(close, high, low, volume)
        return signals, rmi_mfi


In [9]:
rmi_trend_sniper = RMITrendSniper()
df['rmi_signal'], df['rmi_fmi'] = rmi_trend_sniper.apply(df.Close, df.High, df.Low, df.Volume)
df['rmi_signal'].value_counts()

rmi_signal
 0    43884
-1     4556
 1     1607
Name: count, dtype: int64

In [10]:
train_start = '2023-05-01'
train_end = '2023-06-01'

test_start = '2011-01-01'
test_end = '2023-08-01'

train_data = df[(df.Date > train_start) & (df.Date < train_end)]
test_data = df[(df.Date > test_start) & (df.Date < test_end)]

train_data.loc[:, 'Date'] = pd.to_datetime(train_data.Date)
test_data.loc[:, 'Date'] = pd.to_datetime(test_data.Date)

train_data = train_data.set_index('Date')
test_data = test_data.set_index('Date')


In [15]:
from backtesting import Strategy
from functools import reduce

class RsiBBands(Strategy):
    position_size = 3500

    def init(self):
        self.rmi_signal = self.data.rmi_signal
        self.rmi_fmi = self.data.rmi_fmi
        
    def next(self):
        close_prices = self.data.Close
        rmi = self.data.rmi_signal
        actual_close_price = close_prices[-1]

        today = self.data.index[-1].tz_localize('UTC').tz_convert('UTC')

        # Verificar si hay posiciones abiertas
        if self.position:
            if self.position.is_long and rmi == -1:
                self.position.close()
            
            if self.position.is_short and rmi == 1:
                self.position.close()

        else: 
            if rmi == 1:
                self.buy(size=self.position_size)
            elif rmi == -1:
                self.sell(size=self.position_size)

In [16]:

bt_train = Backtest(
    test_data, 
    RsiBBands, 
    cash=15000, 
    commission=0.0002,
    margin=1/30
)

stats = bt_train.run()

# stats = bt_train.optimize(
#     maximize='Return [%]',
#     max_tries=500,
# )

bt_train.plot(filename='./RsiBBands.html', resample=False)

stats

  formatter=DatetimeTickFormatter(days=['%d %b', '%a %d'],
  formatter=DatetimeTickFormatter(days=['%d %b', '%a %d'],


Start                     2023-05-01 00:00:00
End                       2023-07-31 23:55:00
Duration                     91 days 23:55:00
Exposure Time [%]                   77.861629
Equity Final [$]                 14863.236541
Equity Peak [$]                  15006.170976
Return [%]                          -0.911756
Buy & Hold Return [%]               -2.550877
Return (Ann.) [%]                   -3.437159
Volatility (Ann.) [%]                1.623512
Sharpe Ratio                              0.0
Sortino Ratio                             0.0
Calmar Ratio                              0.0
Max. Drawdown [%]                   -1.154223
Avg. Drawdown [%]                   -0.140243
Max. Drawdown Duration       88 days 04:50:00
Avg. Drawdown Duration        8 days 08:12:00
# Trades                                  335
Win Rate [%]                        62.089552
Best Trade [%]                       0.436773
Worst Trade [%]                     -0.888904
Avg. Trade [%]                    

In [None]:
print(f'n: {stats._strategy.n}')
print(f'percent_profit: {stats._strategy.percent_profit}')
print(f'percent_loss: {stats._strategy.percent_loss}')
print(f'grid_size: {stats._strategy.grid_size}')
print(f'position_size: {stats._strategy.position_size}')
print(f'rsi up: {stats._strategy.rsi_up_threshold}')
print(f'rsi down: {stats._strategy.rsi_down_threshold}')

In [None]:
stats._trades.groupby(by=['ExitTime']).agg({'PnL':['sum','count']})

In [None]:
# test_data['Date'] = pd.to_datetime(test_data['Date'])
# test_data = test_data.set_index('Date')
bt_test = Backtest(
    test_data, 
    ConsecutiveCandlesGrid, 
    cash=15000, 
    commission=0.0002,
    margin=1/30
)
 
test_stats = bt_test.run(
    n=6,
    percent_profit=0.0005, #stats._strategy.percent_profit,
    percent_loss=0.01, #stats._strategy.percent_loss,
    grid_size=10, #stats._strategy.grid_size,
    rsi_up_threshold=65, #stats._strategy.volume_threshold,
    rsi_down_threshold=40, #stats._strategy.volume_threshold,
    position_size=3500, #stats._strategy.position_size,
)


bt_test.plot(filename='./ConsecutiveCandlesGridTest.html', resample=False)

test_stats

In [None]:

import plotly.express as px

y = test_stats._equity_curve.Equity
x = test_stats._equity_curve.index


fig = px.line(x=x, y=y)
fig.update_traces(textposition="bottom right")
fig.show()

In [None]:
trades = test_stats._trades.groupby(by=['ExitTime']).agg({'PnL':['sum','count'], 'Duration':'max'})
trades.columns = trades.columns.droplevel(0)
trades = trades.reset_index().rename(columns={'count':'ammount_trades'})
trades = trades.rename(columns={'sum':'profit'})
trades = trades.rename(columns={'max':'minutes_in_trade'})
trades

In [None]:
trades.minutes_in_trade.describe()


In [None]:
# plot_heatmaps(heatmap, agg='mean')zzz

In [None]:
class ConsecutiveCandles(Strategy):
    n = 7
    stop_loss_pips = 10 # pips
    take_profit_pips = 10 # pips
    pip_value = 0.0001
    position_size = 0.001

    def init(self):
        pass
    
    def next(self):
        close_prices = self.data.Close
        open_prices = self.data.Open

        actual_close_price = close_prices[-1]

        if barssince(close_prices < open_prices) == self.n:
            sl_price = round(actual_close_price - self.stop_loss_pips * self.pip_value, 6)
            tp_price = round(actual_close_price + self.take_profit_pips * self.pip_value, 6)
            
            self.buy(sl=sl_price, tp=tp_price, size=self.position_size)

        elif barssince(close_prices > open_prices) == self.n:
            sl_price = round(actual_close_price + self.stop_loss_pips * self.pip_value, 6)
            tp_price = round(actual_close_price - self.take_profit_pips * self.pip_value, 6)
            # print(f'short tp:{tp_price}, actual {actual_close_price}, sl {sl_price}')

            self.sell(sl=sl_price, tp=tp_price, size=self.position_size)
