Please run those two cells before running the Notebook!

As those plotting settings are standard throughout the book, we do not show them in the book every time we plot something.

In [None]:
# FIX: Force a clean reinstall of numpy and then install TA-Lib using conda
!pip install --upgrade --force-reinstall numpy
!conda install -c conda-forge ta-lib --yes

In [None]:
# FIX: Install a version of numpy that is compatible with the numba library
!pip install numpy==1.26.4

In [None]:
# FIX: Install the numpy library, which is a requirement for matplotlib
!pip install numpy

# Now your original imports will work
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
%matplotlib inline
%config InlineBackend.figure_format = "retina"

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

import warnings
# FIX: Use the official public API path from pandas.errors
from pandas.errors import SettingWithCopyWarning

warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)

# feel free to modify, for example, change the context to "notebook"
sns.set_theme(context="talk", style="whitegrid",
              palette="colorblind", color_codes=True,
              rc={"figure.figsize": [12, 8]})

# Chapter 12 - Backtesting Trading Strategies

## 12.1 Vectorized backtesting with `pandas`

### How to do it...

1. Import the libraries:

In [None]:
# FIX: Use conda-forge to install a working version of TA-Lib
!conda install -c conda-forge ta-lib --yes

In [None]:
import pandas as pd 
import yfinance as yf
import numpy as np
import talib

2. Download Apple's stock prices from the years 2016-2021 and keep only the adjusted close price:

In [None]:
df = yf.download("AAPL", 
                 start="2016-01-01",
                 end="2021-12-31",
                 progress=False)
df = df[["Adj Close"]]


3. Calculate the log returns and the 20-day SMA of the close prices:

In [None]:
df["log_rtn"] = df["Adj Close"].apply(np.log).diff(1)
df["sma_20"] = df["Adj Close"].rolling(window=20).mean()
# df["sma_20"] = talib.SMA(df["Adj Close"], timeperiod=20)
df

4. Create a position indicator:

In [None]:
df["position"] = (df["Adj Close"] > df["sma_20"]).astype(int)

Using the following snippet, we count how many times we entered a long position:

In [None]:
sum((df["position"] == 1) & (df["position"].shift(1) == 0))

5. Visualize the strategy over 2021:

In [None]:
fig, ax = plt.subplots(2, sharex=True)
df.loc["2021", ["Adj Close", "sma_20"]].plot(ax=ax[0])
df.loc["2021", "position"].plot(ax=ax[1])
ax[0].set_title("Preview of our strategy in 2021")

sns.despine()
plt.tight_layout()
# plt.savefig("images/figure_12_1", dpi=200)

6. Calculate the strategy's daily and cumulative returns:

In [None]:
df["strategy_rtn"] = df["position"].shift(1) * df["log_rtn"]
df["strategy_rtn_cum"] = df["strategy_rtn"].cumsum().apply(np.exp)
df

7. Add the buy-and-hold strategy for comparison:

In [None]:
df["bh_rtn_cum"] = df["log_rtn"].cumsum().apply(np.exp)

8. Plot the strategies' cumulative returns:

In [None]:
df[["bh_rtn_cum", "strategy_rtn_cum"]].plot(title="Cumulative returns")

sns.despine()
plt.tight_layout()
# plt.savefig("images/figure_12_2", dpi=200)

### There's more

1. Calculate daily transaction costs:

In [None]:
TRANSACTION_COST = 0.01 
df["tc"] = df["position"].diff(1).abs() * TRANSACTION_COST

2. Calculate the strategy's performance accounting for transaction costs:

In [None]:
df["strategy_rtn_cum_tc"] = (
    (df["strategy_rtn"] - df["tc"]).cumsum().apply(np.exp)
)

3. Plot the cumulative returns of all the strategies:

In [None]:
(
    df
    .loc[:, ["bh_rtn_cum", "strategy_rtn_cum", "strategy_rtn_cum_tc"]]
    .plot(title="Cumulative returns")
)

sns.despine()
plt.tight_layout()
# plt.savefig("images/figure_12_3", dpi=200)

## 12.2 Event-driven backtesting with `backtrader`

### How to do it...

1. Import the libraries:

In [None]:
from datetime import datetime
import backtrader as bt
from backtrader_strategies.strategy_utils import *

2. Download data from Yahoo Finance:

In [None]:
data = bt.feeds.YahooFinanceData(dataname="AAPL", 
                                 fromdate=datetime(2021, 1, 1),
                                 todate=datetime(2021, 12, 31))

3. Define the strategy:

In [None]:
class SmaStrategy(bt.Strategy):
    params = (("ma_period", 20), )

    def __init__(self):
        # keep track of close price in the series
        self.data_close = self.datas[0].close

        # keep track of pending orders
        self.order = None

        # add a simple moving average indicator
        self.sma = bt.ind.SMA(self.datas[0],
                              period=self.params.ma_period)
        
    def log(self, txt):
        dt = self.datas[0].datetime.date(0).isoformat()
        print(f"{dt}: {txt}")

    def notify_order(self, order):
        if order.status in [order.Submitted, order.Accepted]:
            # order already submitted/accepted
            # no action required
            return

        # report executed order
        if order.status in [order.Completed]:

            direction = "b" if order.isbuy() else "s"
            log_str = get_action_log_string(
                dir=direction, 
                action="e", 
                price=order.executed.price,
                size=order.executed.size,
                cost=order.executed.value, 
                commission=order.executed.comm
            )
            self.log(log_str)

        # report failed order
        elif order.status in [order.Canceled, order.Margin, 
                              order.Rejected]:
            self.log("Order Failed")

        # reset order -> no pending order
        self.order = None

    def notify_trade(self, trade):
        if not trade.isclosed:
            return

        self.log(
            get_result_log_string(gross=trade.pnl, net=trade.pnlcomm)
        )

    def next(self):
        # do nothing if an order is pending
        if self.order:
            return

        # check if there is already a position
        if not self.position:
            # buy condition
            if self.data_close[0] > self.sma[0]:
                self.log(
                    get_action_log_string("b", "c", self.data_close[0], 1)
                )
                self.order = self.buy()
        else:
            # sell condition
            if self.data_close[0] < self.sma[0]:      
                self.log(
                    get_action_log_string("s", "c", self.data_close[0], 1)
                )      
                self.order = self.sell()

    def start(self):
        print(f"Initial Portfolio Value: {self.broker.get_value():.2f}")

    def stop(self):
        print(f"Final Portfolio Value: {self.broker.get_value():.2f}")

4. Set up the backtest:

In [None]:
cerebro = bt.Cerebro(stdstats = False)

cerebro.adddata(data)
cerebro.broker.setcash(1000.0)
cerebro.addstrategy(SmaStrategy)
cerebro.addobserver(MyBuySell)
cerebro.addobserver(bt.observers.Value)

5. Run the backtest:

In [None]:
cerebro.run()

6. Plot the results:

In [None]:
cerebro.plot(iplot=True, volume=False)

### There's more

Instead of running the cell below, we recommend running the script `sma_strategy_optimization.py`, as it contains the variant of the strategy without that much logging. The outputs are the same.

In [None]:
# create a Cerebro entity
cerebro = bt.Cerebro(stdstats = False)

# set up the backtest
cerebro.adddata(data)
cerebro.optstrategy(SmaStrategy, ma_period=range(10, 31))
cerebro.broker.setcash(1000.0)
cerebro.run(maxcpus=1)


## 12.3 Backtesting a long/short strategy based on the RSI

### How to do it...

1. Import the libraries:

In [None]:
from datetime import datetime
import backtrader as bt
from backtrader_strategies.strategy_utils import *

2. Define the signal strategy based on `bt.SignalStrategy`:

In [None]:
class RsiSignalStrategy(bt.SignalStrategy):
    params = dict(rsi_periods=14, rsi_upper=70, 
                  rsi_lower=30, rsi_mid=50)

    def __init__(self):
        
        # add RSI indicator
        rsi = bt.indicators.RSI(period=self.p.rsi_periods,
                                upperband=self.p.rsi_upper,
                                lowerband=self.p.rsi_lower)

        # add RSI from TA-lib just for reference 
        bt.talib.RSI(self.data, plotname="TA_RSI")
    
        # long condition (with exit)
        rsi_signal_long = bt.ind.CrossUp(rsi, self.p.rsi_lower, plot=False)
        self.signal_add(bt.SIGNAL_LONG, rsi_signal_long)
        self.signal_add(bt.SIGNAL_LONGEXIT, -(rsi > self.p.rsi_mid))

        # short condition (with exit)
        rsi_signal_short = -bt.ind.CrossDown(rsi, self.p.rsi_upper, plot=False)
        self.signal_add(bt.SIGNAL_SHORT, rsi_signal_short)
        self.signal_add(bt.SIGNAL_SHORTEXIT, rsi < self.p.rsi_mid)

3. Download data:

In [None]:
data = bt.feeds.YahooFinanceData(dataname="META", 
                                 fromdate=datetime(2021, 1, 1),
                                 todate=datetime(2021, 12, 31))

4. Set up and run the backtest:

In [None]:
cerebro = bt.Cerebro(stdstats = False)

cerebro.addstrategy(RsiSignalStrategy)
cerebro.adddata(data)
cerebro.addsizer(bt.sizers.SizerFix, stake=1)
cerebro.broker.setcash(1000.0)
cerebro.broker.setcommission(commission=0.001)
cerebro.addobserver(MyBuySell)
cerebro.addobserver(bt.observers.Value)

print(f"Starting Portfolio Value: {cerebro.broker.getvalue():.2f}")
cerebro.run()
print(f"Final Portfolio Value: {cerebro.broker.getvalue():.2f}")


5. Plot the results:

In [None]:
cerebro.plot(iplot=True, volume=False)

### There's more

1. Go "all-in" with the RSI strategy:

In [None]:
cerebro = bt.Cerebro(stdstats = False)

cerebro.addstrategy(RsiSignalStrategy)
cerebro.adddata(data)
cerebro.addsizer(bt.sizers.AllInSizer)
cerebro.broker.setcash(1000.0)
cerebro.broker.setcommission(commission=0.001)
cerebro.addobserver(bt.observers.Value)

print(f"Starting Portfolio Value: {cerebro.broker.getvalue():.2f}")
cerebro.run()
print(f"Final Portfolio Value: {cerebro.broker.getvalue():.2f}")

2. Definte a fixed commission scheme per share and run the backtest:

In [None]:
class FixedCommisionShare(bt.CommInfoBase):
    """
    Scheme with fixed commission per share
    """
    params = (
        ("commission", 0.03),
        ("stocklike", True),
        ("commtype", bt.CommInfoBase.COMM_FIXED),
    )

    def _getcommission(self, size, price, pseudoexec):
        return abs(size) * self.p.commission

In [None]:
cerebro = bt.Cerebro(stdstats = False)

cerebro.addstrategy(RsiSignalStrategy)
cerebro.adddata(data)
cerebro.addsizer(bt.sizers.AllInSizer)
cerebro.broker.setcash(1000.0)
cerebro.broker.addcommissioninfo(FixedCommisionShare())
cerebro.addobserver(bt.observers.Value)

print(f"Starting Portfolio Value: {cerebro.broker.getvalue():.2f}")
cerebro.run()
print(f"Final Portfolio Value: {cerebro.broker.getvalue():.2f}")


3. Definte a fixed commission scheme per order and run the backtest:

In [None]:
class FixedCommisionOrder(bt.CommInfoBase):
    """
    Scheme with fixed commission per order
    """
    params = (
        ("commission", 2.5),
        ("stocklike", True),
        ("commtype", bt.CommInfoBase.COMM_FIXED),
    )

    def _getcommission(self, size, price, pseudoexec):
        return self.p.commission

In [None]:
cerebro = bt.Cerebro(stdstats = False)

cerebro.addstrategy(RsiSignalStrategy)
cerebro.adddata(data)
cerebro.addsizer(bt.sizers.AllInSizer)
cerebro.broker.setcash(1000.0)
cerebro.broker.addcommissioninfo(FixedCommisionOrder())
cerebro.addobserver(bt.observers.Value)

print(f"Starting Portfolio Value: {cerebro.broker.getvalue():.2f}")
cerebro.run()
print(f"Final Portfolio Value: {cerebro.broker.getvalue():.2f}")


## 12.4 Backtesting a buy/sell strategy based on Bollinger bands

### How to do it...

1. Import the libraries:

In [None]:
import backtrader as bt
import datetime
import pandas as pd
from backtrader_strategies.strategy_utils import *

2. Define the strategy based on the Bollinger Bands:

In [None]:
class BollingerBandStrategy(bt.Strategy):
    params = (("period", 20),
              ("devfactor", 2.0),)

    def __init__(self):
        # keep track of prices
        self.data_close = self.datas[0].close
        self.data_open = self.datas[0].open

        # keep track of pending orders
        self.order = None

        # add Bollinger Bands indicator and track the buy/sell signals
        self.b_band = bt.ind.BollingerBands(self.datas[0], 
                                            period=self.p.period, 
                                            devfactor=self.p.devfactor)
        self.buy_signal = bt.ind.CrossOver(self.datas[0], 
                                           self.b_band.lines.bot,
                                           plotname="buy_signal")
        self.sell_signal = bt.ind.CrossOver(self.datas[0], 
                                            self.b_band.lines.top,
                                            plotname="sell_signal")

    def log(self, txt):
        dt = self.datas[0].datetime.date(0).isoformat()
        print(f"{dt}: {txt}")

    def notify_order(self, order):
        if order.status in [order.Submitted, order.Accepted]:
            # order already submitted/accepted - no action required
            return

        # report executed order
        if order.status in [order.Completed]:

            direction = "b" if order.isbuy() else "s"
            log_str = get_action_log_string(
                    dir=direction, 
                    action="e", 
                    price=order.executed.price,
                    size=order.executed.size,
                    cost=order.executed.value, 
                    commission=order.executed.comm
                )
            self.log(log_str)

        # report failed order
        elif order.status in [order.Canceled, order.Margin, 
                              order.Rejected]:
            self.log("Order Failed")

        # reset order -> no pending order
        self.order = None

    def notify_trade(self, trade):
        if not trade.isclosed:
            return

        self.log(get_result_log_string(gross=trade.pnl, net=trade.pnlcomm))

    def next_open(self):
        if not self.position:
            if self.buy_signal > 0:
                # calculate the max number of shares ("all-in")
                size = int(self.broker.getcash() / self.datas[0].open)
                # buy order
                log_str = get_action_log_string("b", "c", 
                                                price=self.data_close[0], 
                                                size=size,
                                                cash=self.broker.getcash(),
                                                open=self.data_open[0],
                                                close=self.data_close[0])
                self.log(log_str)
                self.order = self.buy(size=size)
        else:
            if self.sell_signal < 0:
                # sell order
                log_str = get_action_log_string("s", "c", self.data_close[0], 
                                                self.position.size)
                self.log(log_str)
                self.order = self.sell(size=self.position.size)

    def start(self):
        print(f"Initial Portfolio Value: {self.broker.get_value():.2f}")

    def stop(self):
        print(f"Final Portfolio Value: {self.broker.get_value():.2f}")

3. Download data:

In [None]:
data = bt.feeds.YahooFinanceData(
    dataname="MSFT",
    fromdate=datetime.datetime(2021, 1, 1),
    todate=datetime.datetime(2021, 12, 31)
)

4. Set up the backtest:

In [None]:
cerebro = bt.Cerebro(stdstats = False, cheat_on_open=True)

cerebro.addstrategy(BollingerBandStrategy)
cerebro.adddata(data)
cerebro.broker.setcash(10000.0)
cerebro.broker.setcommission(commission=0.001)
cerebro.addobserver(MyBuySell)
cerebro.addobserver(bt.observers.Value)
cerebro.addanalyzer(bt.analyzers.Returns, _name="returns")
cerebro.addanalyzer(bt.analyzers.TimeReturn, _name="time_return")

5. Run the backtest:

In [None]:
backtest_result = cerebro.run()

6. Plot the results:

In [None]:
cerebro.plot(iplot=True, volume=False)

7. Investigate different returns metrics:

In [None]:
backtest_result[0].analyzers.returns.get_analysis()

8. Extract daily portfolio returns and plot them:

In [None]:
returns_dict = backtest_result[0].analyzers.time_return.get_analysis()
returns_df = (
    pd.DataFrame(list(returns_dict.items()), 
                 columns = ["date", "return"])
    .set_index("date")
)
returns_df.plot(title="Strategy's daily returns")

sns.despine()
plt.tight_layout()
# plt.savefig("images/figure_12_7", dpi=200)

### There's more

Compare the performance of our strategy to a "buy-and-hold" strategy:

In [None]:
import quantstats as qs
qs.reports.metrics(returns_df, 
                   benchmark="MSFT", 
                   mode="basic")

## 12.5 Backtesting a moving average crossover strategy using crypto data

### How to do it...

1. Import the libraries:

In [None]:
import backtrader as bt
import datetime
import pandas as pd
from backtrader_strategies.strategy_utils import *

2. Define the commission scheme allowing for fractional trades:

In [None]:
class FractionalTradesCommission(bt.CommissionInfo):
    def getsize(self, price, cash):
        """Returns the fractional size"""
        return self.p.leverage * (cash / price)

3. Define the SMA crossover strategy:

In [None]:
class SMACrossoverStrategy(bt.Strategy):
    params = (
        ("ma_fast", 20),
        ("ma_slow", 50),
        ("target_perc", 0.7)
    )

    def __init__(self):
        # keep track of close price in the series
        self.data_close = self.datas[0].close
        
        # keep track of pending orders
        self.order = None

        # calculate the SMAs and get the crossover signal        
        self.fast_ma = bt.indicators.MovingAverageSimple(
            self.datas[0], 
            period=self.params.ma_fast
        )
        self.slow_ma = bt.indicators.MovingAverageSimple(
            self.datas[0], 
            period=self.params.ma_slow
        )
        self.ma_crossover = bt.indicators.CrossOver(self.fast_ma, 
                                                    self.slow_ma)
        
    def log(self, txt):
        dt = self.datas[0].datetime.date(0).isoformat()
        print(f"{dt}: {txt}")

    def notify_order(self, order):
        if order.status in [order.Submitted, order.Accepted]:
            # order already submitted/accepted - no action required
            return

        # report executed order
        if order.status in [order.Completed]:

            direction = "b" if order.isbuy() else "s"
            log_str = get_action_log_string(
                    dir=direction, 
                    action="e", 
                    price=order.executed.price,
                    size=order.executed.size,
                    cost=order.executed.value, 
                    commission=order.executed.comm
                )
            self.log(log_str)

        # report failed order
        elif order.status in [order.Canceled, order.Margin, 
                              order.Rejected]:
            self.log("Order Failed")

        # reset order -> no pending order
        self.order = None

    def notify_trade(self, trade):
        if not trade.isclosed:
            return

        self.log(get_result_log_string(gross=trade.pnl, net=trade.pnlcomm))

    def next(self):

        if self.order:
            return  # pending order execution. Waiting in orderbook

        if not self.position:
            if self.ma_crossover > 0:
                self.order = self.order_target_percent(
                    target=self.params.target_perc
                )
                log_str = get_action_log_string("b", "c", 
                                                price=self.data_close[0], 
                                                size=self.order.size,
                                                cash=self.broker.getcash(),
                                                open=self.data_open[0],
                                                close=self.data_close[0])
                self.log(log_str)

        else:
            if self.ma_crossover < 0:
                # sell order
                log_str = get_action_log_string("s", "c", self.data_close[0], 
                                                self.position.size)
                self.log(log_str)
                self.order = (
                    self.order_target_percent(target=0)
                )

    def start(self):
        print(f"Initial Portfolio Value: {self.broker.get_value():.2f}")

    def stop(self):
        print(f"Final Portfolio Value: {self.broker.get_value():.2f}")


4. Download the `BTC-USD` data:

In [None]:
data = bt.feeds.YahooFinanceData(
    dataname="BTC-USD",
    fromdate=datetime.datetime(2020, 1, 1),
    todate=datetime.datetime(2021, 12, 31)
)

5. Set up the backtest:

In [None]:
cerebro = bt.Cerebro(stdstats = False, cheat_on_open=False)

cerebro.addstrategy(SMACrossoverStrategy)
cerebro.adddata(data)
cerebro.broker.setcash(10000.0)
cerebro.broker.addcommissioninfo(
    FractionalTradesCommission(commission=0.001)
)
cerebro.addobserver(MyBuySell)
cerebro.addobserver(bt.observers.Value)
cerebro.addanalyzer(
    bt.analyzers.TimeReturn, _name="time_return"
)

6. Run the backtest:

In [None]:
backtest_result = cerebro.run()

7. Plot the results:

In [None]:
cerebro.plot(iplot=True, volume=False)

Get the strategy's performance as compared to a buy-and-hold strategy:

In [None]:
import quantstats as qs

# get the returns of the strategy as a DataFrame
returns_dict = backtest_result[0].analyzers.time_return.get_analysis()
returns_df = pd.DataFrame(list(returns_dict.items()), 
                          columns = ["date", "return"]) \
               .set_index("date")

# calculate the perf as compared to buy-and-hold
qs.reports.metrics(returns_df, 
                   benchmark="BTC-USD", 
                   mode="basic")

### There's more

1. Define the same strategy, this time, manually calculating the amount of BTC to buy:

In [None]:
class SMACrossoverStrategyAlt(bt.Strategy):
    params = (
        ("ma_fast", 20),
        ("ma_slow", 50),
        ("target_perc", 0.7)
    )

    def __init__(self):
        # keep track of close price in the series
        self.data_close = self.datas[0].close
        
        # keep track of pending orders
        self.order = None

        # calculate the SMAs and get the crossover signal        
        self.fast_ma = bt.indicators.MovingAverageSimple(self.datas[0], 
                       period=self.params.ma_fast)
        self.slow_ma = bt.indicators.MovingAverageSimple(self.datas[0], 
                       period=self.params.ma_slow)
        self.ma_crossover = bt.indicators.CrossOver(self.fast_ma, 
                                                    self.slow_ma)
        
    def log(self, txt):
        dt = self.datas[0].datetime.date(0).isoformat()
        print(f"{dt}: {txt}")

    def notify_order(self, order):
        if order.status in [order.Submitted, order.Accepted]:
            # order already submitted/accepted - no action required
            return

        # report executed order
        if order.status in [order.Completed]:

            direction = "b" if order.isbuy() else "s"
            log_str = get_action_log_string(
                    dir=direction, 
                    action="e", 
                    price=order.executed.price,
                    size=order.executed.size,
                    cost=order.executed.value, 
                    commission=order.executed.comm
                )
            self.log(log_str)

        # report failed order
        elif order.status in [order.Canceled, order.Margin, 
                              order.Rejected]:
            self.log("Order Failed")

        # reset order -> no pending order
        self.order = None

    def notify_trade(self, trade):
        if not trade.isclosed:
            return

        self.log(get_result_log_string(gross=trade.pnl, net=trade.pnlcomm))

    def next(self):

        if self.order:
            return  # pending order execution. Waiting in orderbook

        if not self.position:
            if self.ma_crossover > 0:
                size = self.broker.getcash() / self.datas[0].close * self.params.target_perc
                log_str = get_action_log_string("b", "c", 
                                                price=self.data_close[0], 
                                                size=size,
                                                cash=self.broker.getcash(),
                                                open=self.data_open[0],
                                                close=self.data_close[0])
                self.log(log_str)
                self.order = self.buy(size=size)
        else:
            if self.ma_crossover < 0:
                # sell order
                log_str = get_action_log_string("s", "c", self.data_close[0], 
                                                self.position.size)
                self.log(log_str)
                # self.order = self.sell(size=self.position.size)
                self.order = self.order_target_percent(target=0)

    def start(self):
        print(f"Initial Portfolio Value: {self.broker.get_value():.2f}")

    def stop(self):
        print(f"Final Portfolio Value: {self.broker.get_value():.2f}")


2. Set up the backtest, this time without the fractional commission scheme:

In [None]:
cerebro = bt.Cerebro(stdstats = False, cheat_on_open=True)

cerebro.addstrategy(SMACrossoverStrategyAlt)
cerebro.adddata(data)
cerebro.broker.setcash(10000.0)
cerebro.broker.setcommission(commission=0.001)
cerebro.addobserver(MyBuySell)
cerebro.addobserver(bt.observers.Value)

3. Run the backtest:

In [None]:
backtest_result = cerebro.run()

## 12.6 Backtesting a mean-variance portfolio optimization

### Getting ready

In [None]:
import backtrader as bt

class FractionalTradesCommission(bt.CommissionInfo):
    def getsize(self, price, cash):
        """Returns the fractional size"""
        return self.p.leverage * (cash / price)

### How to do it...

1. Import the libraries:

In [None]:
from datetime import datetime
import backtrader as bt
import pandas as pd
from pypfopt.expected_returns import mean_historical_return
from pypfopt.risk_models import CovarianceShrinkage
from pypfopt.efficient_frontier import EfficientFrontier
from backtrader_strategies.strategy_utils import *

2. Define the strategy:

In [None]:
class MeanVariancePortfStrategy(bt.Strategy):
    params = (("n_periods", 252), )

    def __init__(self):  
        # track number of days
        self.day_counter = 0
               
    def log(self, txt):
        dt = self.datas[0].datetime.date(0).isoformat()
        print(f"{dt}: {txt}")

    def notify_order(self, order):
        if order.status in [order.Submitted, order.Accepted]:
            # order already submitted/accepted - no action required
            return

        # report executed order
        if order.status in [order.Completed]:

            direction = "b" if order.isbuy() else "s"
            log_str = get_action_log_string(
                    asset=order.data._name,
                    dir=direction, 
                    action="e", 
                    price=order.executed.price,
                    size=order.executed.size,
                    cost=order.executed.value, 
                    commission=order.executed.comm
                )
            self.log(log_str)

        # report failed order
        elif order.status in [order.Canceled, order.Margin, 
                              order.Rejected]:
            self.log(f"Order Failed: {order.data._name}")

        # reset order -> no pending order
        self.order = None

    def notify_trade(self, trade):
        if not trade.isclosed:
            return

        self.log(get_result_log_string(gross=trade.pnl, net=trade.pnlcomm))

    def next(self):

        # check if we have enough data points
        self.day_counter += 1
        if self.day_counter < self.p.n_periods:
            return

        # check if the date is a Friday
        today = self.datas[0].datetime.date()
        if today.weekday() != 4: 
            return

        # find and print the current allocation
        current_portf = {}
        for data in self.datas:
            current_portf[data._name] = (
                self.positions[data].size * data.close[0]
            )
        portf_df = pd.DataFrame(current_portf, index=[0])
        print(f"Current allocation as of {today}")
        print(portf_df / portf_df.sum(axis=1).squeeze())

        # extract the past price data for each asset
        price_dict = {}
        for data in self.datas:
            price_dict[data._name] = data.close.get(0, self.p.n_periods+1)
        prices_df = pd.DataFrame(price_dict)

        # find the optimal portfolio weights
        mu = mean_historical_return(prices_df)
        S = CovarianceShrinkage(prices_df).ledoit_wolf()
        ef = EfficientFrontier(mu, S)
        weights = ef.max_sharpe(risk_free_rate=0)
        print(f"Optimal allocation identified on {today}")
        print(pd.DataFrame(ef.clean_weights(), index=[0]))

        # create orders
        for allocation in list(ef.clean_weights().items()):
            self.order_target_percent(data=allocation[0],
                                      target=allocation[1])

    def start(self):
        print(f"Initial Portfolio Value: {self.broker.get_value():.2f}")

    def stop(self):
        print(f"Final Portfolio Value: {self.broker.get_value():.2f}")

3. Download the prices of the FAANG stocks and store the data feeds in a list:

In [None]:
TICKERS = ["META", "AMZN", "AAPL", "NFLX", "GOOG"]
data_list = []

for ticker in TICKERS:
    data = bt.feeds.YahooFinanceData(
        dataname=ticker,
        fromdate=datetime(2020, 1, 1),
        todate=datetime(2021, 12, 31)
    )
    data_list.append(data)

4. Set up the backtest:

In [None]:
cerebro = bt.Cerebro(stdstats = False)

cerebro.addstrategy(MeanVariancePortfStrategy)

for ind, ticker in enumerate(TICKERS):
    cerebro.adddata(data_list[ind], name=ticker)

cerebro.broker.setcash(1000.0)
cerebro.broker.addcommissioninfo(
    FractionalTradesCommission(commission=0)
)
cerebro.addobserver(MyBuySell)
cerebro.addobserver(bt.observers.Value)

5. Run the backtest:

In [None]:
backtest_result = cerebro.run()