A trading strategy brought to you by trueToastedCode

https://www.youtube.com/trueToastedCode

The "other backtesting library" can be downloaded from here:

https://github.com/trueToastedCode/Backtest.git

---

In [1]:
import pandas as pd
import numpy as np
import pandas_ta as ta
from tqdm.notebook import tqdm
from backtesting import Backtest, Strategy
import warnings
from datetime import datetime
import plotly.offline as pyo
from Backtest.backtest import Backtest as Backtest2
from Backtest.broker import Broker as Broker2



In [2]:
warnings.filterwarnings("ignore")
pyo.init_notebook_mode(connected=True)

In [3]:
df = pd.read_csv('../data/BTCUSD-5m-2019-2023.csv')
df.Datetime = pd.to_datetime(df.Datetime, format='%Y-%m-%d %H:%M:%S')
# df = df.dropna() (for my data dropna was already applied to csv) 

# Technical Indicators

In [4]:
def calc_rsi(df, length):
    result = [np.nan] * length
    
    _df = df.Close[:length+1]
    c_up, c_down = 0, 0
    for a, b in zip(_df[::1], _df[1::1]):
        if b > a:
            c_up += b - a
        elif b < a:
            c_down += a - b
    prev_smma_up, prev_smma_down = c_up / length, c_down / length
    
    rs = prev_smma_up / prev_smma_down
    rsi = 100 - (100 / (1 + rs))
    
    result.append(rsi)
    
    for i in range(length+1, len(df)):
        a, b = df.Close[i-1], df.Close[i]
        if b > a:
            c_up = b - a
            c_down = 0
        elif b < a:
            c_up = 0
            c_down = a - b
        else:
            c_up = 0
            c_down = 0
        
        curr_smma_up = (c_up + (prev_smma_up * (length-1))) / length
        curr_smma_down = (c_down + (prev_smma_down * (length-1))) / length
        
        rs = curr_smma_up / curr_smma_down
        rsi = 100 - (100 / (1 + rs))
        
        result.append(rsi)
        
        prev_smma_up, prev_smma_down = curr_smma_up, curr_smma_down
    
    return result

In [5]:
# using pandas_ta to calculate the rsi let to inconsistent pivots when also updating
# the rsi with the library, in this case pandas_ta would be ok but i want to use
# the same code between notebooks and live implementations
# df['RSI'] = ta.rsi(df.Close, length=14)
df['RSI'] = calc_rsi(df, 14)

In [6]:
def calc_tr(df, i):
    assert i > 0
    return max(
        df.High[i] - df.Low[i],
        abs(df.High[i] - df.Close[i-1]),
        abs(df.Low[i] - df.Close[i-1])
    )

def calc_atr(df, length):
    result = [np.nan] * length
    atr = sum([calc_tr(df, i) for i in range(1, length+1)]) / length
    result.append(atr)
    for i in range(length+1, len(df)):
        atr = (atr * (length-1) + calc_tr(df, i)) / length
        result.append(atr)
    return result

In [7]:
# same reason why i don't use the rsi from pandas_ta, please read above
# df['ATR'] = ta.atr(df.High, df.Low, df.Close)
df['ATR'] = calc_atr(df, 14)

# Custom Indicators

### Edge Pivots

In [8]:
def get_edge_pivots(df, src, n, mode):
    """
    Find Edge Pivots
    Edge Pivots is what i call local pivots,
    that had been determined only by comparing a potential pivot value only to the left side
    @param src: Column that should serve as source
    @param n: Amount of values before a pivot
    """
    if mode == 'min':
        return df.iloc[n:].apply(
            lambda row: row[src]
                if df[src].loc[row.name-n:row.name].idxmin() == row.name
                else np.nan,
            axis=1)
    if mode == 'max':
        return df.iloc[n:].apply(
            lambda row: row[src]
                if df[src].loc[row.name-n:row.name].idxmax() == row.name
                else np.nan,
            axis=1)
    raise ValueError(f'Unknown mode: {mode}, should either be min or max')

In [9]:
rsi_edge_pivot_n = 5
df['RSIEdgePivotLow'] = get_edge_pivots(df, 'RSI', rsi_edge_pivot_n, 'min')

In [10]:
dt = df[pd.notnull(df.RSIEdgePivotLow)].iloc[10].name
df.RSI.loc[df.index[df.index.get_loc(dt)-5]:df.index[df.index.get_loc(dt)]]

44    49.242747
45    48.150566
46    47.368346
47    52.881124
48    48.951763
49    46.627635
Name: RSI, dtype: float64

In [11]:
len(df[pd.notnull(df.RSIEdgePivotLow)])

118439

### Smoothed Moving Average

In [12]:
def smma(df, src, length):
    # Calculate first index, where before there is an amount
    # of values matching length, that are not nan
    idx = df[pd.notnull(df[src])].index.min()
    idx_length = idx + length
    # Calculate the fist smma value
    result = ta.sma(df[src][idx:idx_length], length=length).to_list()
    # Calculate all smma values
    previous_smma = result[-1]
    for i in range(idx_length, len(df)):
        current_smma = (previous_smma * (length - 1) + df[src][i]) / length
        result.append(current_smma)
        previous_smma = current_smma
    return result

In [13]:
df['SMMA1'] = smma(df, 'Close', 21)
df['SMMA2'] = smma(df, 'Close', 50)
df['SMMA3'] = smma(df, 'Close', 200)

### (Start using timeframe)

In [14]:
df.set_index('Datetime', inplace=True)

In [15]:
timeframe = df.index[1] - df.index[0]

def candles_since(a, b):
    count = abs((b - a) / timeframe)
    assert count % 1 == 0
    return int(count)

# Divergences

In [16]:
class Div:
    # Interface for divergences
    SRC_PIVOT = None
    SRC_OSC = 'RSI'
    SRC_PRICE = None

    @staticmethod
    def is_div(osc_before, osc_now, price_before, price_now):
        raise NotImplemented


class BullDiv(Div):
    # Bullish divergence
    SRC_PIVOT = 'RSIEdgePivotLow'
    SRC_PRICE = 'Low'

    @staticmethod
    def is_div(osc_before, osc_now, price_before, price_now):
        # Osc: Higher Low
        # Price: Lower Low
        return osc_now > osc_before and price_now < price_before


class AnotherBearDiv(Div):
    # This isn't a divergence as per typical definition... but it can be used to find some
    # bad long signals...
    SRC_PIVOT = 'RSIEdgePivotLow'
    SRC_PRICE = 'High'

    @staticmethod
    def is_div(osc_before, osc_now, price_before, price_now):
        # Osc: Lower Low
        # Price: Higher High
        return osc_now < osc_before and price_now > price_before


def get_div_to_latest_pivot(pivot_rows, div, max_backpivots, backcandles_min, backcandles_max):
    """
    Find a divergence at last row in pivot_rows
    @param pivot_rows: Rows containing the pivots
    @param div: Implementation of Div (Divergence)
    @param max_backpivots: Allowed distance of pivots starting after min candles
    @param backcandles_min: Minimum Candles between past and current
    @param backcandles_max: Maximum Candles between past and current
    """
    if len(pivot_rows) < 2:
        return
    row_now = pivot_rows.iloc[-1]
    
    # find first index that matches backcandles min
    k = pivot_rows[
        (row_now.name - pivot_rows.index) / timeframe >= backcandles_min
    ].index.max()
    if pd.isnull(k):
        return
    k = pivot_rows.index.get_loc(k)
    
    # find first divergence starting from k
    for j in range(k,
                   -1 if max_backpivots is None else max(k - max_backpivots - 1, -1),
                   -1):
        row_before = pivot_rows.iloc[j]
        candle_count = candles_since(row_before.name, row_now.name)
        if candle_count > backcandles_max:
            return
        if div.is_div(
                row_before[div.SRC_OSC], row_now[div.SRC_OSC],
                row_before[div.SRC_PRICE], row_now[div.SRC_PRICE]):
            return row_before.name, row_now.name

In [17]:
rows = df[pd.notnull(df.RSIEdgePivotLow)]

df['BullDiv'] = np.nan
df['AnotherBearDiv'] = np.nan

for i in tqdm(range(len(rows))):
    row = rows.iloc[i]
    # Gather all pivot before and at current index
    current_rows = rows.iloc[max(i-3000+1, 0):i+1]
    
    # Check for Bullish Divergence
    div = get_div_to_latest_pivot(current_rows, BullDiv, 4, 5, 55)
    if div:
        df.loc[row.name, 'BullDiv'] = div[0]
        
    # Checl for Another Bearish Divergence
    div = get_div_to_latest_pivot(current_rows, AnotherBearDiv, 4, 5, 55)
    if div:
        df.loc[row.name, 'AnotherBearDiv'] = div[0]

  0%|          | 0/118439 [00:00<?, ?it/s]

In [39]:
print(len(df[pd.notnull(df.BullDiv)]))
print(len(df[pd.notnull(df.AnotherBearDiv)]))

26504
33810


# Total Signal

In [93]:
LONG_SIGNAL = 1

### Long Signals

In [94]:
rows = df[pd.notnull(df.BullDiv)]

for i in tqdm(range(len(rows))):
    row = rows.iloc[i]
    # if rsi above x, 2 more confirmations:
    # - price above third smma
    # - smma's have to line up above each other
    if row.RSI >= 70 \
            and (row.Low <= row.SMMA3
                 or row.SMMA2 <= row.SMMA3
                 or row.SMMA1 <= row.SMMA2):
        continue
    if row.RSI >= 90:
        # rsi to high
        continue
    df.loc[row.name, 'TotalSignal'] = LONG_SIGNAL

  0%|          | 0/26504 [00:00<?, ?it/s]

In [95]:
len(df[df.TotalSignal == 1])

26504

# Conflicting Long Signals

In [96]:
rows = df[pd.notnull(df.AnotherBearDiv)]

for i in tqdm(range(len(rows))):
    row = rows.iloc[i]
    # if rsi below x, 2 more confirmations:
    # - price below third smma
    # - smma's have to line up under each other
    if row.RSI <= 30 \
            and (row.High >= row.SMMA3
                 or row.SMMA2 >= row.SMMA3
                 or row.SMMA1 >= row.SMMA2):
        continue
    if row.RSI <= 10:
        # rsi to low
        continue
    # if there was long signal, it gets deleted
    # actually this could also be used short signal
    # (but my focus is on long - due to real market fees)
    df.loc[row.name, 'TotalSignal'] = np.nan

  0%|          | 0/33810 [00:00<?, ?it/s]

In [97]:
len(df[df.TotalSignal == 1])

25200

# Backtest

In [98]:
SL_FACTOR = 1.2
TP_FACTOR = 2.71

In [99]:
def CLOSE():
    return df.Close

def ATR():
    return df.ATR

def SIGNAL():
    return df.TotalSignal

class MyStrategy(Strategy):
    def init(self):
        super().init()
        self.close = self.I(CLOSE)
        self.atr = self.I(ATR)
        self.signal = self.I(SIGNAL)
        
    def next(self):
        global pbar
        super().next()
        pbar.update(1)
        
        if self.trades:
            return
        
        if self.signal == LONG_SIGNAL:
            sl = self.close - self.atr * SL_FACTOR
            tp = self.close + self.atr * SL_FACTOR * TP_FACTOR
        
            self.buy(sl=sl, tp=tp, size=0.99)

In [100]:
pbar = tqdm(total=len(df))
bt = Backtest(df, MyStrategy, cash=100_000, margin=1/1, commission=0.0)
stats = bt.run()
print(stats) 

  0%|          | 0/499197 [00:00<?, ?it/s]

Start                     2017-12-28 17:05:00
End                       2023-03-01 09:25:00
Duration                   1888 days 16:20:00
Exposure Time [%]                   26.606129
Equity Final [$]               4580343.574032
Equity Peak [$]                4918297.657404
Return [%]                        4480.343574
Buy & Hold Return [%]               68.686991
Return (Ann.) [%]                  114.244838
Volatility (Ann.) [%]               71.102095
Sharpe Ratio                         1.606772
Sortino Ratio                        5.176356
Calmar Ratio                         3.172122
Max. Drawdown [%]                  -36.015267
Avg. Drawdown [%]                   -0.729576
Max. Drawdown Duration      284 days 14:15:00
Avg. Drawdown Duration        1 days 03:13:00
# Trades                                12513
Win Rate [%]                        30.072724
Best Trade [%]                      12.229156
Worst Trade [%]                     -6.028036
Avg. Trade [%]                    

In [101]:
bt.plot()

# Another Backtest

In [102]:
class MyStrategy2(Backtest2):
    def next(self):
        global pbar
        pbar.update(1)
        
        if self.broker.positions:
            return
        
        if self.row.TotalSignal == LONG_SIGNAL:
            sl = self.row.Close - self.row.ATR * SL_FACTOR
            tp = self.row.Close + self.row.ATR * SL_FACTOR * TP_FACTOR
            
            self.broker.open_long(
                price=self.row.Close,
                total=self.broker.equity * 0.99,
                # total=100,
                open_dt=self.row.name,
                fee=self.broker.taker_fee,
                leverage=1,
                stop_loss=sl,
                take_profit=tp
            )

In [103]:
pbar = tqdm(total=len(df))
broker = Broker2(maker_fee=0, taker_fee=0)
my_strategy2 = MyStrategy2(df, broker)
my_strategy2.run()
print(my_strategy2.stats)

  0%|          | 0/499197 [00:00<?, ?it/s]

Period                       1888 days 16:20:00
Final Equity                           56504.08
Min Equity                               953.89
Max Equity                             60522.03
Return [%]                              5550.41
Win rate [%]                              30.07
Avg. Trade Duration             0 days 00:57:09
Min Trade Duration              0 days 00:05:00
Max Trade Duration             14 days 02:25:00
Trades                                    12513
Avg. Profit Per Trade                      4.44


In [104]:
my_strategy2.stats.equity_trades_profit_fig.show()

In [105]:
df.to_csv('dump.csv')