In [2]:
import pandas as pd
import vectorbtpro as vbt
from datetime import datetime, timedelta
import pytz
import numpy as np
import pandas_ta as ta
import logging
from backtesting import Strategy, Backtest
from backtesting.lib import resample_apply
logging.basicConfig(level=logging.WARNING, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
import multiprocessing as mp
mp.set_start_method('fork')




In [3]:

start = "2018-01-01"
# Enter your parameters here
metric = 'total_return'

start_date = datetime(2020, 1, 1, tzinfo=pytz.utc)  # time period for analysis, must be timezone-aware
end_date = datetime.now(pytz.utc)
# end_date = datetime(2020, 1, 1, tzinfo=pytz.utc)

# The following is the number of days to look back for the analysis
time_buffer = timedelta(days=100)  # buffer before to pre-calculate SMA/EMA, best to set to max window
freq = '1h'

vbt.settings.portfolio['init_cash'] = 100_000.  # 100,000$
# vbt.settings.portfolio['fees'] = 0.0025  # 0.25%
# vbt.settings.portfolio['slippage'] = 0.0025  # 0.25%

# get binance data doing it this way allows for you to update your data rather than re-downloading it
# binance_data = vbt.BinanceData.fetch(symbols,timeframe=freq, start=start_date,end="now UTC")
# binance_data.save("binance_data.pkl")

# If you already have the data downloaded, you can load it
# binance_data = vbt.BinanceData.load("binance_data.pkl")
# binance_data = binance_data.update() if you want to update it.
futures_path = '/Users/ericervin/Documents/Coding/data-repository/data/BTCUSDT_1m_futures.pkl'
# You can grab the necessary file from this google drive link. https://drive.google.com/drive/folders/1jKy2DMbBow-J5jTvPutw-j17m7Ss8R3i?usp=drive_link

If you don't have the datafile yet you can grab it from this [link](https://drive.google.com/drive/folders/1jKy2DMbBow-J5jTvPutw-j17m7Ss8R3i?usp=drive_link)

In [4]:
btc = vbt.BinanceData.load(futures_path)
df = btc.get()

In [5]:
# create a signal array the same length as the dataframe
signal = np.zeros(len(df))
signal = pd.Series(signal, index=df.index)
# set the signal to 1 at midnight UTC
signal[df.at_time('00:00').index] = 1
# Create a -1 right before the signal
signal[df.at_time('12:00').index] = -1 # random short signals at noon
df['signal'] = signal
# Convert to int
df['signal'] = df['signal'].astype(int)
df['signal'].sum()

0

In [26]:
class LONG_SHORT_Underwater_w_decay_and_deleverage(Strategy):
    # All of the following variables can be used during optimization
    initial_position_size = 0.3
    percent_invested_threshold = 0.3
    atr_length = 14  # 14 days
    atr_multiplier = 0.5
    add_size = 0.1
    delay_period = 1000
    delta_time = 1000
    upper_bound_profit_target = 0.05
    lower_bound_loss_threshold = 0.00
    take_profit_loss_reduction = -0.1 # amount take profit is reduced by if the position is highly leveraged and we wish to trim
    deleverage_pct = 0.30  # This is the amount that the position is reduced by if the position is highly leveraged and we wish to trim
    bounce_multiplier = 1.5  # multiply the ATR Treshold by this amount to get the bounce threshold we are looking to bounce off a low "max pain" point before we reduce our position
    max_loss_threshold = -0.05 # if the position is down this much, and we are fully invested we will reduce our positio
    def SIGNAL(self):
        return self.data.signal

    def ATR(self, df, length):
        return df.ta.atr(length=length)

    def bars_since_first_trade(self, use_for_indexing=False):
        """
        Calculate the number of bars since the first trade was entered.
        If use for indexing is true then it will return 1 if there are no trades.
        This way you can retrieve the last element of a list by applying a - to the return value.
        eg. self.equity[-self.bars_since_first_trade(use_for_indexing=True):] gives you the array of your account's equity since you entered the first trade that is currently open
        """
        if len(self.trades) > 0:
            self.first_trade_entry_bar = self.trades[0].entry_bar
            bars_since_first_trade = len(self.data.Close) - self.first_trade_entry_bar
            return bars_since_first_trade
        elif use_for_indexing:
            return 1
        else:
            return 0

    def custom_decay_func(
        self,
        x,
        delay_period,
        upper_bound_profit_target,
        lower_bound_loss_threshold,
        delta_time,
    ):
        """
        This function is used to calculate the decayed take profit x represents the number of bars since the first trade was entered
        x is the number of bars since the first trade was entered
        delay_period is the number of bars to wait before starting to decay
        upper_bound_profit_target is the upper bound of the take profit
        lower_bound_loss_threshold is the lower bound of the take profit
        delta_time is the number of bars over which to transition from the upper bound to the lower bound
        """
        if x <= delay_period:
            return upper_bound_profit_target
        elif delay_period < x < delay_period + delta_time:
            # Calculate the x value for the cos function
            transition_x = (x - delay_period) / delta_time * np.pi
            # Calculate the decayed take profit
            return (-np.cos(transition_x) + 1) / 2 * (
                lower_bound_loss_threshold - upper_bound_profit_target
            ) + upper_bound_profit_target
        else:
            return lower_bound_loss_threshold

    def init(self):
        super().init()
        self.signal = self.I(self.SIGNAL)
        self.atr = self.I(self.ATR, self.data.df, self.atr_length)
        self.daily_atr = resample_apply("1D", self.ATR, self.data.df, length=14)
        count_NaN = len(self.atr) - len(
            pd.Series(self.atr).dropna()
        )  # This is used for the progress bar
        self.length_of_data = (
            len(self.data.Close) - count_NaN
        )  # This is used for the progress bar
        self.equity_during_trade = []  # Keeps a list for the equity during the trade
        self.long_short_flag = (
            None  # This is used to keep track of whether we are long or short
        )
        self.price_at_last_trim = (
            0  # This is used to keep track of the price at the last trim
        )

    def next(self):
        super().next()

        price = self.data.Close[-1]
        position_value = self.position.size * price
        percent_invested = (
            position_value / self.equity
        )  # this will come in handy if we decide to change behavior once XX% is invested
        atr_threshold_pct = (
            self.atr_multiplier * self.daily_atr[-1] / price
        )  # This is the ATR threshold times a multiplier calculated as a percentage of price
        bars_since_trade_open = self.bars_since_first_trade(use_for_indexing=True)
        highly_leveraged = abs(percent_invested) > self.percent_invested_threshold
        # Calculate the decayed take profit
        decayed_take_profit = self.custom_decay_func(
            bars_since_trade_open,
            self.delay_period,
            self.upper_bound_profit_target,
            self.lower_bound_loss_threshold,
            self.delta_time,
        )

        
        # Keep a running list of the self.equity values while we have a trade open
        if self.position:  # TODO work on this for long versus short
            self.equity_during_trade.append(self.equity)
        else:
            self.equity_during_trade = []

        low_point_in_trade = min(self.equity_during_trade, default=0)
        # Calculate the percentage bounce from the low point in the trade this is based on equity so works for long or short
        bounce_from_low_pct = (lambda x: (self.equity - x) / x if x != 0 else 0)(
            low_point_in_trade
        )  # The lambda function is used to avoid a divide by zero error

        # Opening a Trade on a signal from the LSTM
        if not self.position:
            if self.signal == 1:
                self.buy(size=self.initial_position_size)
                self.long_short_flag = 1
            elif self.signal == -1:
                self.sell(size=self.initial_position_size)
                self.long_short_flag = -1
        
        # If we are in a short trade and the account is fully invested and the loss is greater than the max loss threshold then close the trade
        elif self.long_short_flag == -1 and (abs(percent_invested) > 1 and self.position.pl_pct < self.max_loss_threshold):
            self.position.close()
        
        # If we are in a trade and it meets our profit criteria then close the trade
        elif self.position.pl_pct > decayed_take_profit:
            self.position.close()
            self.price_at_last_trim = 0
            # print(f'Closing at {price} at {self.data.index[-1]} Position PNL is {self.position.pl}')

        # If we are in a trade and it meets our loss criteria then close a portion of the trade on a bounce from the low point
        elif self.position and self.position.pl_pct < -atr_threshold_pct:
            # Check to see if we are also down on our last trade
            if self.trades[-1].pl_pct < -atr_threshold_pct:
                if self.long_short_flag == 1:
                    self.buy(size=self.add_size)
                elif self.long_short_flag == -1:
                    self.sell(size=self.add_size)
        
        # If we are totally upside down on a short then reduce the position size
        elif self.position and self.position.pl_pct < decayed_take_profit + self.take_profit_loss_reduction:
            if self.trades[-1].pl_pct < -atr_threshold_pct: # if we are down more than 1 ATR threshold then reduce the position size and take a tiny loss
                self.position.close(portion=self.deleverage_pct)
                print(f'Closing at {price} at {self.data.index[-1]} Position PNL is {self.position.pl}')
        
        # Deleverage if we are over a certain percent invested and the market is recovering from the low point
        elif (
            highly_leveraged
            and bounce_from_low_pct > self.bounce_multiplier * atr_threshold_pct
        ):

            if (self.position.pl_pct > decayed_take_profit + self.take_profit_loss_reduction):
                if (self.long_short_flag == 1) and (self.price_at_last_trim == 0 or price > self.price_at_last_trim *(1 + 2 * atr_threshold_pct)):
                    self.position.close(portion=self.deleverage_pct)
                    # Keep track of the price to avoid trimming too often
                    self.price_at_last_trim = price 
                    # print(f'Deleveraging at {price} at {self.data.index[-1]} Position PNL is {self.position.pl}')
                elif (self.long_short_flag) == -1 and (self.price_at_last_trim == 0 or price < self.price_at_last_trim *(1 - 0.5 * atr_threshold_pct)):
                    self.position.close(portion=self.deleverage_pct)
                    # Keep track of the price to avoid trimming too often
                    self.price_at_last_trim = price 
                    # print(f'Deleveraging at {price} at {self.data.index[-1]} Position PNL is {self.position.pl}')



start = "2022-12-01"  # Note the strategy requires a warmup period for the ATR to calculate first trades begin after 14 days
end = "2023-06-03"  # It will always close any open trades at the end of the backtest
bt = Backtest(
    df.loc[start:end],
    LONG_SHORT_Underwater_w_decay_and_deleverage,
    cash=100_000_000,
    exclusive_orders=False,
    trade_on_close=True,
    margin=1,
)
stats = bt.run()
print(stats)
bt.plot(resample=False)

Start                     2022-12-01 00:00...
End                       2023-06-03 23:12...
Duration                    184 days 23:12:00
Exposure Time [%]                   84.503272
Equity Final [$]                   82914418.1
Equity Peak [$]                   102082152.2
Return [%]                         -17.085582
Buy & Hold Return [%]               57.637052
Return (Ann.) [%]                  -30.902982
Volatility (Ann.) [%]                18.21221
Sharpe Ratio                        -1.696828
Sortino Ratio                       -1.472269
Calmar Ratio                        -1.181394
Max. Drawdown [%]                  -26.158069
Avg. Drawdown [%]                    -0.41092
Max. Drawdown Duration      150 days 04:12:00
Avg. Drawdown Duration        2 days 00:20:00
# Trades                                  168
Win Rate [%]                             62.5
Best Trade [%]                       13.17805
Worst Trade [%]                    -35.557495
Avg. Trade [%]                    

In [7]:
bt.plot(resample=False)

## Now let's optimize the long short strategy
Note, this will take a while to run depending on how many parameters you want to optimize on.

I'm copying the class down here to remove the progress bars they conflict with the optimizer progress bars

In [None]:
stats, heatmap = bt.optimize(
    # initial_position_size = np.arange(0.3, 0.6, 0.1).tolist(),
    # percent_invested_threshold = 0.7 
    # atr_length = np.arange(7,37,10).to_list(), # 14 days
    # atr_multiplier = np.arange(0.6, 0.8, 0.1).tolist(),
    add_size = np.arange(0.10,0.30, 0.10).tolist(),
    # delay_period = np.arange(250,1500,500).tolist(),
    delta_time = np.arange(1000,5000,1000).tolist(),
    upper_bound_profit_target = np.arange(0.05, 0.10, 0.05).tolist(),
    lower_bound_loss_threshold = np.arange(0.05, -0.10, -0.05).tolist(),
    take_profit_loss_reduction = np.arange(-0.15, -0.05, 0.05) # This is the amount that the take profit is reduced by if the position is highly leveraged and we wish to trim
    # deleverage_pct = 0.5 # This is the amount that the position is reduced by if the position is highly leveraged and we wish to trim
    
    maximize='Equity Final [$]', # this can be any of the column names from the stats table the output of the backtest
    max_tries=200,
    random_state=0,
    return_heatmap=True)
best_params = stats._strategy.__dict__["_params"] # This will print out all the parameters used for the best backtest
print(f'Best Parameters: {best_params}')
heatmap.sort_values(ascending=False).iloc[:-5] # print the top 5 parameter sets

In [None]:
# Plot the heatmap
from backtesting.lib import plot_heatmaps
plot_heatmaps(heatmap, agg='mean')


### Rerun the stats for the best strategy


In [None]:
bt.run(**best_params)
bt.plot(resample=False)