In [1]:
!pip install backtrader



DEPRECATION: pytorch-lightning 1.7.7 has a non-standard dependency specifier torch>=1.9.*. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pytorch-lightning or contact the author to suggest that they release a version with a conforming dependency specifiers. Discussion can be found at https://github.com/pypa/pip/issues/12063

[notice] A new release of pip is available: 23.2.1 -> 23.3.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import backtrader as bt
import pmdarima
from pmdarima import auto_arima
import datetime
import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error,r2_score

In [6]:

class AutoARIMAMeanReversionStrategy(bt.Strategy):
    params = (
        ("lookback_period", 20),
        ("buy_threshold", 0.01),
        ("sell_threshold", 0.01),
        ("print_metrics", True),
        ("risk_free_rate", 0.02)
    )

    def __init__(self):
        self.data_close = self.data.close
        self.auto_arima_model = None
        self.orders = []
        self.gross_profit = 0
        self.gross_loss = 0
        self.net_profit = 0
        self.total_closed_trades = 0
        self.winning_trades = 0
        self.max_drawdown = 0
        self.average_winning_trade = 0
        self.average_losing_trade = 0
        self.buy_and_hold_return = 0
        self.largest_losing_trade = 0
        self.largest_winning_trade = 0
        self.sharpe_ratio = 0
        self.sortino_ratio = 0
        self.total_holding_duration = 0
        self.max_dip_in_running_trade = 0
        self.average_dip_in_running_trade = 0
        self.trade_history = []
        self.current_trade_start = None
        self.forecast_values = []
        self.actual_values = []

    def notify_trade(self, trade):
        if trade.isclosed:
            pnl = trade.pnl
            if pnl > 0:
                self.gross_profit += pnl
                self.winning_trades += 1
                self.largest_winning_trade = max(self.largest_winning_trade, pnl)
            else:
                self.gross_loss += abs(pnl)
                self.largest_losing_trade = max(self.largest_losing_trade, abs(pnl))

            self.total_closed_trades += 1

            # Iterate through individual orders and check their status
            for order in self.orders:
                if order.status in [order.Completed, order.Canceled, order.Margin, order.Rejected]:
                    if order.isbuy():
                        self.current_trade_start = self.data.datetime.datetime()

        self.trade_history.append(trade)  # Store trade information for drawdown calculation

    def next(self):
        if len(self) > self.params.lookback_period:
            # Get historical prices for AutoARIMA model
            historical_prices = np.array(self.data_close.get(size=self.params.lookback_period))

            # Fit AutoARIMA model
            self.auto_arima_model = auto_arima(historical_prices, suppress_warnings=True)

            # Forecast next price
            forecast = self.auto_arima_model.predict(n_periods=1)[0]
            print(f"Forecast: {forecast}, Close Price: {self.data_close[0]}")
            
    

            self.forecast_values.append(forecast)
            self.actual_values.append(self.data_close[0])
            
            # Generate signals
            if self.data_close[0] < (1 + self.params.buy_threshold) * forecast:
                order = self.buy()
                self.orders.append(order)  # Append the order object
                self.log_trade("Buy")
            elif self.data_close[0] > (1 + self.params.sell_threshold) * forecast:
                order = self.sell()
                self.orders.append(order)  # Append the order object
                self.log_trade("Sell")

        # Calculate dip metrics during running trades
        current_trade_dip = self.calculate_running_trade_dip()
        self.max_dip_in_running_trade = max(self.max_dip_in_running_trade, current_trade_dip)
        self.average_dip_in_running_trade += current_trade_dip
        if self.current_trade_start is not None:
            self.total_holding_duration += 1

        # Update metrics at the end of each trade
        if self.current_trade_start is not None and not self.position:
            trade_duration = self.data.datetime.datetime() - self.current_trade_start
            self.total_holding_duration += trade_duration.days
            self.current_trade_start = None
        


    def log_trade(self, action):
        dt_value = self.data.datetime[0]

        if isinstance(dt_value, float):
            # Convert float to datetime object
            dt_value = bt.num2date(dt_value)

        date = dt_value.strftime('%Y-%m-%d %H:%M:%S')
        price = self.data_close[0]
        print(f"{date} - {action} at {price:.2f} USDT")

    def calculate_running_trade_dip(self):
        if len(self.trade_history) > 0:
            equity_curve = np.cumsum([trade.pnl for trade in self.trade_history])
            return np.max(np.maximum.accumulate(equity_curve) - equity_curve)
        return 0

    def stop(self):
        if self.params.print_metrics:
            self.print_performance_metrics()
            
    

    def print_performance_metrics(self):
        self.net_profit = self.gross_profit - self.gross_loss

        if self.total_closed_trades > 0:
            self.win_rate = self.winning_trades / self.total_closed_trades
            self.average_winning_trade = self.gross_profit / self.winning_trades if self.winning_trades > 0 else 0
            self.average_losing_trade = self.gross_loss / (self.total_closed_trades - self.winning_trades) if (self.total_closed_trades - self.winning_trades) > 0 else 0
            self.average_holding_duration = self.total_holding_duration / self.total_closed_trades if self.total_closed_trades > 0 else 0
            self.buy_and_hold_return = (self.data_close[-1] - self.data_close[0]) / self.data_close[0] * 100  # Assumes daily data
            self.max_drawdown = self.calculate_max_drawdown()
            self.sharpe_ratio = self.calculate_sharpe_ratio()
            self.sortino_ratio = self.calculate_sortino_ratio()
            self.average_dip_in_running_trade /= self.total_closed_trades  # Calculate average dip
        else:
            self.win_rate = 0
            self.average_winning_trade = 0
            self.average_losing_trade = 0
            self.average_holding_duration = 0
            self.buy_and_hold_return = 0

        # Print or store the calculated metrics as needed
        print(f"Gross Profit: {self.gross_profit}")
        print(f"Net Profit: {self.net_profit}")
        print(f"Total Closed Trades: {self.total_closed_trades}")
        print(f"Win Rate: {self.win_rate * 100:.2f}%")
        print(f"Gross Loss: {self.gross_loss}")
        print(f"Average Winning Trade: {self.average_winning_trade}")
        print(f"Average Losing Trade: {self.average_losing_trade}")
        print(f"Buy and Hold Return: {self.buy_and_hold_return * 100:.2f}%")
        print(f"Average Holding Duration per Trade: {self.average_holding_duration}")
        print(f"Max Drawdown: {self.max_drawdown * 100:.2f}%")
        print(f"Sharpe Ratio: {self.sharpe_ratio}")
        print(f"Sortino Ratio: {self.sortino_ratio}")
        print(f"Largest Losing Trade: {self.largest_losing_trade}")
        print(f"Largest Winning Trade: {self.largest_winning_trade}")
        print(f"Max Dip in Running Trade: {self.max_dip_in_running_trade}")
        print(f"Average Dip in Running Trade: {self.average_dip_in_running_trade}")
        r2 = r2_score(self.actual_values, self.forecast_values)
        print(f'R2 Score: {r2}')

        df_plot = pd.DataFrame({'Actual': self.actual_values, 'Forecast': self.forecast_values})
        df_plot.plot(title='Actual vs Forecast', ylabel='Price', xlabel='Time')


    def calculate_max_drawdown(self):
        equity_curve = np.cumsum([trade.pnl for trade in self.trade_history])
        peaks = np.maximum.accumulate(equity_curve)

        # Replace zero values in peaks with 1 to avoid division by zero
        peaks_nonzero = np.where(peaks == 0, 1, peaks)

        drawdowns = (peaks_nonzero - equity_curve) / peaks_nonzero
        return np.max(drawdowns)
    
    def calculate_sharpe_ratio(self):
        if self.net_profit == 0 or self.max_drawdown == 0:
            return 0

        excess_return = self.net_profit - (self.params.risk_free_rate / 252)  # Assuming daily data
        volatility = np.std([trade.pnl for trade in self.trade_history])

        sharpe_ratio = excess_return / volatility
        return sharpe_ratio

    def calculate_sortino_ratio(self):
        if self.net_profit == 0 or self.average_losing_trade == 0:
            return 0

        downside_risk = self.average_losing_trade
        sortino_ratio = (self.net_profit - self.params.risk_free_rate / 252) / downside_risk  # Assuming daily data
        return sortino_ratio


