# MACD + RSI + BBW Strategy for BTC/USDT (1hr)

In [2]:
!pip3 install backtesting

Collecting backtesting
  Downloading Backtesting-0.3.3.tar.gz (175 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/175.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.4/175.5 kB[0m [31m1.7 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m175.5/175.5 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: backtesting
  Building wheel for backtesting (setup.py) ... [?25l[?25hdone
  Created wheel for backtesting: filename=Backtesting-0.3.3-py3-none-any.whl size=173916 sha256=fd3750f504e1dbbfb208b9bdc8e5f284179ddb61f82992a727ad50cf48077efa
  Stored in directory: /root/.cache/pip/wheels/e2/30/7f/19cbe31987c6ebdb47f1f510343249066711609e3da2d57176
Successfully built backtesting
Installing collected packages: backtesting
Successfully installed backtesting-

In [3]:
!pip3 install pandas_ta

Collecting pandas_ta
  Downloading pandas_ta-0.3.14b.tar.gz (115 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/115.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━[0m [32m71.7/115.1 kB[0m [31m2.0 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m115.1/115.1 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pandas_ta
  Building wheel for pandas_ta (setup.py) ... [?25l[?25hdone
  Created wheel for pandas_ta: filename=pandas_ta-0.3.14b0-py3-none-any.whl size=218907 sha256=08da52569ea59fd45015104260964e2bf6f27be01303835f8150eeb6aa01ad24
  Stored in directory: /root/.cache/pip/wheels/69/00/ac/f7fa862c34b0e2ef320175100c233377b4c558944f12474cf0
Successfully built pandas_ta
Installing collected packages: pandas_ta
Successfully installed pandas_ta-0.3.14b0


## Import Modules and Data

In [4]:
# To run this code, you will have to install pandas, pandas_ta, backtesting, plotly, scipy
# if you are using pip or pip3, you can run the following commands in your terminal:
# !pip3 install backtesting
# !pip3 install pandas_ta
# !pip3 install plotly
# !pip3 install scipy
# !pip3 install matplotlib

import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
import pandas_ta as ta
from scipy import stats
import numpy as np
from datetime import datetime
from backtesting import Strategy
from backtesting import Backtest
from scipy.stats import linregress



In [34]:
#Preprocessing data
def preprocess_data(file_path:str):
    df = pd.read_csv( file_path, index_col='Unnamed: 0', parse_dates=True)
    df=df[['high','low', 'close', 'open', 'volume']]
    # df.dropna(inplace=True)
    df.rename(columns={'open' : "Open" , 'close' : 'Close' , 'high' : 'High' ,'low' : 'Low' , 'volume' : 'Volume' } , inplace=True)
    return df

# The current strategy uses 1hr time frame data
file_path = '/content/BTCUSDT-hourly-historical-price.csv' #Replace this with path of file containing btcusdt_1h data
btc_1h_data = preprocess_data(file_path)

## Functions to generate Indicators

In [35]:
#Function to calculate exponential moving average
def calculate_exponential_moving_average(df:pd.DataFrame, window:int, column_name:str='Close'):
    df_copy = df.copy()
    ema = df_copy[column_name].ewm(span=window, adjust=False).mean()
    df_copy[f'ema_{window}'] = ema
    return df_copy[f'ema_{window}']

#Function to calculate Moving Average Convergence Divergence Indicator
def calculate_moving_average_convergence_divergence(df: pd.DataFrame, slow_window: int = 16, fast_window: int = 8, signal_window: int = 4):
    df_copy = df.copy()
    exponential_moving_average_16 = calculate_exponential_moving_average(df_copy, slow_window)
    exponential_moving_average_8 = calculate_exponential_moving_average(df_copy, fast_window)
    macd = exponential_moving_average_8 - exponential_moving_average_16
    df_copy['macd'] = macd
    signal = calculate_exponential_moving_average(df_copy, signal_window, column_name='macd')
    df_copy['signal_line'] = signal
    return df_copy['macd'], df_copy['signal_line']

#Function to calculate Volume Condtion
def calculate_volume_condition(df: pd.DataFrame,slow_window: int = 50, fast_window: int = 10  ):
    """
    Calculates the volume condition for a given DataFrame.
    Volume condition - Tells us whether the 10 period SMA is above the 50 period SMA or not
    Used for confirmation of the upward or the downward trend of the price action

    """
    df_copy = df.copy()
    df_copy[f'sma_{slow_window}'] = df_copy['Close'].rolling(slow_window).mean()
    df_copy[f'sma_{fast_window}'] = df_copy['Close'].rolling(fast_window).mean()
    df_copy['vol_condition'] =[df_copy[f'sma_{fast_window}'][i] > df_copy[f'sma_{slow_window}'][i] for i in range(len(df_copy[f'sma_{slow_window}']))]
    return df_copy['vol_condition']

#Function to calculate RSI
def calculate_relative_strength_index(df:pd.DataFrame, window:int=14):
    df_copy = df.copy()
    rsi = ta.rsi(df_copy['Close'], length=window)
    return rsi

#Function to calculate Bollinger Bands
def calculate_bollinger_bands_width_condition(df: pd.DataFrame, window: int = 20):
    df_copy = df.copy()
    df_copy[f'sma_{window}'] = df['Close'].rolling(window).mean()
    df_copy[f'std_{window}'] = df['Close'].rolling(window).std()
    df_copy['upper_band'] = df_copy[f'sma_{window}'] + 2 * df_copy[f'std_{window}']
    df_copy['lower_band'] = df_copy[f'sma_{window}'] - 2 * df_copy[f'std_{window}']
    df_copy['bb_width'] = (df_copy['upper_band'] - df_copy['lower_band']) / df_copy[f'sma_{window}']
    df_copy['bbw_condition'] =df_copy['bb_width'].rolling(10).mean() > df_copy['bb_width'].rolling(65).mean()
    return df_copy['bbw_condition']

#Function to calculate weighted moving average
def calculate_weighted_moving_average(df: pd.DataFrame, window: int = 14):
    df_copy = df.copy()
    df_copy['wma'] = df_copy.ewm(alpha=1/window, adjust=False).mean()
    return df_copy['wma']

#Function to calculate Average True Range
def calculate_average_true_range(df: pd.DataFrame, window: int = 14):
    """
    Calculates the Average True Range (ATR) for a given DataFrame.

    Parameters:
    - df (pd.DataFrame): The DataFrame containing the high, low, and close prices.
    - window (int): The window size for calculating the ATR. Default is 14.

    Returns:
    - pd.Series: The calculated Average True Range values.
    """
    df_copy = df.copy()
    high = df_copy['High']
    low = df_copy['Low']
    close = df_copy['Close']
    df_copy['arg_0'] = abs(high - low)
    df_copy['arg_1'] = abs(high - close.shift())
    df_copy['arg_2'] = abs(low - close.shift())
    args = df_copy[['arg_0', 'arg_1', 'arg_2']].max(axis=1)
    df_copy['average_true_range'] = calculate_weighted_moving_average(args, window)
    return df_copy['average_true_range']

def calculate_average_directional_index(df:pd.DataFrame , window:int = 14):
    df_copy = df.copy()
    df_copy['up'] = df_copy['High'] - df_copy['High'].shift()
    df_copy['down'] = df_copy['Low'].shift() - df_copy['Low']
    df_copy['plus_dm'] = np.where((df_copy['up']>df_copy['down']) & (df_copy['up']>0), df_copy['up'], 0)
    df_copy['minus_dm'] = np.where((df_copy['down']>df_copy['up']) & (df_copy['down']>0), df_copy['down'], 0)
    df_copy['plus_dm_avg'] = calculate_weighted_moving_average(df_copy['plus_dm'], window)
    df_copy['minus_dm_avg'] = calculate_weighted_moving_average(df_copy['minus_dm'], window)
    df_copy['atr'] = calculate_average_true_range(df_copy)
    df_copy['plus_di'] = 100 * (df_copy['plus_dm_avg']/df_copy['atr'])
    df_copy['minus_di'] = 100 * (df_copy['minus_dm_avg']/df_copy['atr'])
    df_copy['adx'] = 100 * (abs(df_copy['plus_di'] - df_copy['minus_di'])/(df_copy['plus_di'] + df_copy['minus_di']))
    return df_copy['adx']



## Initialize data frame with indicators


In [36]:
btc_1h_data['MACD'] , btc_1h_data['Signal_line'] = calculate_moving_average_convergence_divergence(btc_1h_data)
btc_1h_data['EMA_200'] = calculate_exponential_moving_average(btc_1h_data ,window = 200 )
btc_1h_data['Volume_Condition'] = calculate_volume_condition(btc_1h_data)
btc_1h_data['RSI'] = calculate_relative_strength_index(btc_1h_data)
btc_1h_data['BBW_Condition'] = calculate_bollinger_bands_width_condition(btc_1h_data)
btc_1h_data['Average_True_Range'] = calculate_average_true_range(btc_1h_data)
btc_1h_data['Average_Directional_Index'] = calculate_average_directional_index(btc_1h_data)

In [38]:
btc_1h_data.isna().sum()

High                          0
Low                           0
Close                         0
Open                          0
Volume                        0
MACD                          0
Signal_line                   0
EMA_200                       0
Volume_Condition              0
RSI                          14
BBW_Condition                 0
Average_True_Range            0
Average_Directional_Index     1
dtype: int64

## Creating preliminary signals(without stop loss and take profit)

In [39]:
#Checking the crossover of MACD line and Signal Line from below Signal line
def check_macd_crossover(df:pd.DataFrame):
    df_copy = df.copy()
    df_copy['cross_over'] = [False] + [df_copy['MACD'][i] < 0 and df_copy['MACD'][i] > df_copy['Signal_line'][i] and df_copy['MACD'][i-1] < df_copy['Signal_line'][i-1] for i in range(1,len(df_copy))]
    return df_copy['cross_over']

#Checking the crossover of MACD line and Signal Line from above the Signal line
def check_macd_crossdown(df:pd.DataFrame):
    df_copy = df.copy()
    df_copy['cross_down'] = [False]+[df['MACD'][i] > 0 and df['MACD'][i] < df['Signal_line'][i] and df['MACD'][i-1] > df['Signal_line'][i-1] for i in range(1,len(df_copy))]
    return df_copy['cross_down']

#Checking the range of RSI indicator
def check_RSI_condition(df:pd.DataFrame):
    df_copy = df.copy()
    df_copy['RSI_Condtion'] = [df_copy['RSI'][i] > 35 and df['RSI'][i] < 70 for i in range(len(df_copy))]
    return df_copy['RSI_Condtion']

#Initialize data frame with above conditions
btc_1h_data['macd_crossover'] = check_macd_crossover(btc_1h_data)
btc_1h_data['macd_crossdown'] = check_macd_crossdown(btc_1h_data)
btc_1h_data['check_RSI'] = check_RSI_condition(btc_1h_data)

In [44]:
btc_1h_data['macd_crossover'].describe()

count     41854
unique        2
top       False
freq      40124
Name: macd_crossover, dtype: object

In [40]:
#Generate Signals
signal = [0]*len(btc_1h_data)

for i in range(200,len(btc_1h_data)) :
    if  btc_1h_data['macd_crossover'][i] and btc_1h_data['EMA_200'][i] < btc_1h_data['Close'][i] and btc_1h_data['check_RSI'][i] and not btc_1h_data['BBW_Condition'][i]:
        signal[i] = 1
    elif btc_1h_data['macd_crossdown'][i] and btc_1h_data['EMA_200'][i] > btc_1h_data['Close'][i] and btc_1h_data['check_RSI'][i] and not btc_1h_data['BBW_Condition'][i]:
        signal[i] = -1

btc_1h_data['signal'] = signal

In [48]:
btc_1h_data['signal'].value_counts()

 0    40963
 1      465
-1      426
Name: signal, dtype: int64

## Initial Backtest using stop loss and take profit

In [41]:
#Function to calculate the stoploss and takeprofit
#fs implies future signal
def get_stoploss_takeprofit(fs:int, vol_cond:bool):
    if fs == 1:
        if vol_cond:
            return 2, 1.4
        else:
            return 3, 1.4
    elif fs == 2:
        if vol_cond:
            return 1, 1.75
        else:
            return 1, 1.5
    elif fs == 3:
        return 2, 1.8
    elif fs == 4:
        return 2, 1.3
    elif fs == -1:
        if vol_cond:
            return 3, 1.3
        else:
            return 2, 1.4
    elif fs == -2:
        if vol_cond:
            return 1, 0.6
        else:
            return 1, 0.7
    elif fs == -3:
        if vol_cond:
            return 3 , 0.7
        else:
            return 2, 0.8


In [19]:
btc_1h_data

Unnamed: 0,Open,High,Low,Close,Volume,close_time,qav,num_trades,taker_base_vol,taker_quote_vol,...,RSI,BBW_Condition,Average_True_Range,Average_Directional_Index,macd_crossover,macd_crossdown,check_RSI,signal,trade,signals
0,,,,,,,,,,,...,,,,,False,False,False,0,0,0


In [42]:
# Generating the finals signals based on the initial signals , involving stoploss and take profit
#fs implies future signal
data = btc_1h_data
class MyStrat(Strategy):
    mysize = 1 - 1e-010
    data['trade'] = 0
    data['signals'] = 0

    def init(self):
        self.fs = 0
        super().init()

    def next(self):
        if self.fs == 0 :
            for trade in self.trades:
                if trade.is_long and self.data.RSI[-1] < 20:
                    self.position.close()
                    if not self.data.Volume_Condition[-1]:
                        self.fs = -3
                elif trade.is_short and self.data.RSI[-1] > 70:
                    self.position.close()
                elif trade.is_long and self.sl > self.data.Low[-1] + 3*self.data.Average_True_Range[-1]:
                    self.position.close()
                    if not self.data.Volume_Condition[-1] and self.data.BBW_Condition[-1] and self.data.Average_Directional_Index[-1] < 35:
                        self.fs = -2
                elif trade.is_long and self.tp < self.data.High[-1] + 3*self.data.Average_True_Range[-1] :
                    self.position.close()
                    if not self.data.Volume_Condition[-1] and self.data.BBW_Condition[-1] and self.data.Average_Directional_Index[-1] < 35:
                        self.fs = -3
                elif trade.is_short and self.sl < self.data.High[-1] - 3*self.data.Average_True_Range[-1] :
                    self.position.close()
                    if self.data.Volume_Condition[-1] and self.data.BBW_Condition[-1] and self.data.Average_Directional_Index[-1] < 35:
                        self.fs = 2
                elif trade.is_short and self.tp > self.data.Low[-1] - 3*self.data.Average_True_Range[-1] :
                    self.position.close()
                    if self.data.Volume_Condition[-1] and self.data.BBW_Condition[-1] and self.data.Average_Directional_Index[-1] < 35:
                        self.fs = 4

    # here we use the strong signals generated by the initial signals to generate the final signals
            if self.data.signal == 1 :
                if not self.position.is_long and not self.position.is_short :
                    if not self.data.Volume_Condition[-1]:
                        sl = self.data.Close[-1] - 2.5*self.data.Average_True_Range[-1]
                        tp = self.data.Close[-1] + 6*self.data.Average_True_Range[-1]
                    else :
                        sl = self.data.Close[-1] - 2*self.data.Average_True_Range[-1]
                        tp = self.data.Close[-1]*1.5
                    self.sl = sl
                    self.tp = tp
                    self.buy(size = self.mysize)
                if self.position.is_short :
                    self.position.close()
                    self.fs = 3

            elif self.data.signal == -1 :
                if not self.position.is_long and not self.position.is_short:
                    if not self.data.Volume_Condition[-1]:
                        sl = self.data.Close[-1] + 2.5*self.data.Average_True_Range[-1]
                        tp = self.data.Close[-1] - 4.5*self.data.Average_True_Range[-1]
                    else :
                        sl = self.data.Close[-1] + 1.5*self.data.Average_True_Range[-1]
                        tp = self.data.Close[-1] - 4.5*self.data.Average_True_Range[-1]
                    self.sl = sl
                    self.tp = tp
                    self.sell(size = self.mysize)
                if self.position.is_long :
                    self.position.close()
                    self.fs = -3

        elif self.fs > 0:
            if self.data.signal == -1 :
                if not self.data.Volume_Condition[-1]:
                    sl = self.data.Close[-1] + 2*self.data.Average_True_Range[-1]
                    tp = self.data.Close[-1]*0.5
                else :
                    sl = self.data.Close[-1] + 1*self.data.Average_True_Range[-1]
                    tp = self.data.Close[-1]*0.6
                    self.sl = sl
                    self.tp = tp
                    self.sell(size = self.mysize)
            else:
                if self.data.Average_Directional_Index[-1] > 40:
                    sl_multiplier, tp_multiplier = get_stoploss_takeprofit(self.fs, self.data.Volume_Condition[-1])
                    self.sl = self.data.Close[-1] - sl_multiplier*self.data.Average_True_Range[-1]
                    self.tp = self.data.Close[-1] * tp_multiplier
                    self.buy(size = self.mysize)
            self.fs = 0
        elif self.fs < 0:
            if self.data.signal == 1 :
                if not self.position.is_long and not self.position.is_short :
                    if not self.data.Volume_Condition[-1]:
                        sl = self.data.Close[-1] - 2*self.data.Average_True_Range[-1]
                        tp = self.data.Close[-1]*1.4
                    else :
                        sl = self.data.Close[-1] - 1*self.data.Average_True_Range[-1]
                        tp = self.data.Close[-1]*1.5
                    self.sl = sl
                    self.tp = tp
                    self.buy(size = self.mysize)
            else:
                if self.data.Average_Directional_Index[-1] > 40:
                    sl_multiplier, tp_multiplier = get_stoploss_takeprofit(self.fs, self.data.Volume_Condition[-1])
                    self.sl = self.data.Close[-1] + sl_multiplier*self.data.Average_True_Range[-1]
                    self.tp = self.data.Close[-1] * tp_multiplier
                    self.sell(size = self.mysize)
            self.fs = 0
bt = Backtest(data, MyStrat, cash=100000, commission= 0.001)
stat = bt.run()
stat

Start                     2017-08-17 04:00:00
End                       2022-05-31 23:00:00
Duration                   1748 days 19:00:00
Exposure Time [%]                   55.296985
Equity Final [$]               13648374.49715
Equity Peak [$]                14632959.55056
Return [%]                       13548.374497
Buy & Hold Return [%]                7.208453
Return (Ann.) [%]                  178.978986
Volatility (Ann.) [%]               163.46254
Sharpe Ratio                         1.094924
Sortino Ratio                        5.523588
Calmar Ratio                         5.776717
Max. Drawdown [%]                  -30.982819
Avg. Drawdown [%]                   -2.656928
Max. Drawdown Duration      221 days 00:00:00
Avg. Drawdown Duration        4 days 08:00:00
# Trades                                  342
Win Rate [%]                        62.865497
Best Trade [%]                      48.480858
Worst Trade [%]                    -15.132735
Avg. Trade [%]                    

## Create final signals and store in csv

In [12]:
#Creating the final signals using the trade logs from above
#Create a deepcopy of the dataframe 'data' to avoid any changes in the original dataframe
final = data.copy(deep=True)
trades=stat._trades

for i in range(len(trades)):
    if trades['Size'][i] > 0:
        final['signals'][trades['EntryBar'][i]-1] = 1
        final['signals'][trades['ExitBar'][i]-1] = -1
    elif trades['Size'][i] < 0:
        final['signals'][trades['EntryBar'][i]-1] = -1
        final['signals'][trades['ExitBar'][i]-1] = 1

print(len(final[final['signals'] == 1]), len(final[final['signals'] == -1]))

NameError: ignored

In [None]:
# Making a final data frame
final = final.reset_index()
df_final = final[['Open' , 'Close' , 'High' , 'Low' , 'Volume' , 'signals','datetime' ]]
df_final.rename(columns={'Open' : "open" , 'Close' : 'close' , 'High' : 'high' ,'Low' : 'low' , 'Volume' : 'volume' } , inplace=True)
df_final

In [None]:
# Storing the dataframe in a csv file
df_final.to_csv('test.csv')

## Checking the finals results

In [None]:
#Backtesting on final csv file
file_path = '/home/madhu/Zelta/test.csv' #Replace this with path of file containing test data
test_data = preprocess_data(file_path)

In [None]:
# Backtesting on the final csv file created
class check_signals(Strategy):
    mysize = 0.99999999
    def init(self):
        super().init()

    def next(self):

        if self.data.signals == 1:
            if not self.position.is_long and not self.position.is_short  :
                self.buy( size = self.mysize)

            elif self.position.is_short:
                self.position.close()

        elif self.data.signals == -1:
            if not self.position.is_long and not self.position.is_short:
                self.sell(size = self.mysize)

            elif self.position.is_long:
                self.position.close()

bt_check = Backtest(test_data, check_signals, cash=100000, commission= 0.001)
stats = bt_check.run()
stats

## Visualisation using graphs

In [None]:
# Visualisation of log values of close and portfolio to compare them
fig1 = go.Figure()

# Add trace for log(close)
fig1.add_trace(go.Scatter(x=test_data.index, y=np.log(test_data.Close), name='close_log', mode='lines'))

# Add trace for log(portfolio) with actual values on the y-axis
equity_log = np.log(stats._equity_curve.Equity)
fig1.add_trace(go.Scatter(x=stats._equity_curve.Equity.index, y=equity_log, name='portfolio_log', mode='lines'))

# Set custom tick format for the y-axis
fig1.update_layout(
    yaxis=dict(
        tickvals=[np.log(value) for value in [10000,100000,1e6,1e7,1e8]],
        ticktext=[str(value) for value in [10000,100000,1e6,1e7,1e8]],  )
)

fig1.show()


In [None]:
# The portfolio graph
fig2 =go.Figure()
fig2.add_trace(go.Scatter(x = stats._equity_curve.Equity.index , y =stats._equity_curve.Equity , name='portfolio', mode ='lines'))
fig2.show()

In [None]:
# detailed visualisation of all the trades
# Profit -- Green
# Loss -- Red

log_c = stats._trades
fig3 =go.Figure()
fig3.add_trace(go.Scatter(x = test_data.index , y =test_data.Open , name='Open Values', mode ='lines',showlegend=False))

for i in range(0 , len(log_c)) :

    if log_c.PnL[i] > 0 :
        fig3.add_scatter(x=[log_c.EntryTime[i]  ,log_c.ExitTime[i] ], y=[log_c.EntryPrice[i], log_c.ExitPrice[i]], mode="lines",
                marker=dict(size=3, color="green"),showlegend=False)
    else :
        fig3.add_scatter(x=[log_c.EntryTime[i]  ,log_c.ExitTime[i] ], y=[log_c.EntryPrice[i], log_c.ExitPrice[i]], mode="lines",
                marker=dict(size=3, color="red"), showlegend=False)

fig3.show()