In [11]:
import yfinance as yf
import pandas as pd
import numpy as np
import backtest as bt

import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
from itertools import combinations

In [12]:
tickers = ['ABBV', 'CB']
timeframe = '5y'
start_cash = 10_000
window = 200

In [13]:
df=yf.download(tickers, period=timeframe)


YF.download() has changed argument auto_adjust default to True

[*********************100%***********************]  2 of 2 completed


In [14]:
def log_return(df):
    log_df = np.log(df['Close']/df['Close'].shift(1))
    log_df.columns = pd.MultiIndex.from_product([['Log Return'], log_df.columns])
    return log_df
log_return(df)

Unnamed: 0_level_0,Log Return,Log Return
Ticker,ABBV,CB
Date,Unnamed: 1_level_2,Unnamed: 2_level_2
2020-08-17,,
2020-08-18,-0.007291,-0.006057
2020-08-19,0.004694,-0.009237
2020-08-20,-0.009410,-0.003962
2020-08-21,-0.003578,-0.012390
...,...,...
2025-08-11,0.002975,-0.000885
2025-08-12,0.000000,-0.004549
2025-08-13,0.014146,0.019347
2025-08-14,0.015807,0.002542


In [15]:
def lin_reg(df,tickers,window):
    slope_col = [np.nan] * window
    intercept_col = [np.nan] * window
    for i in range(window, len(df)):
        data = df.iloc[i-window:i]
        stock1 = data[tickers[0]].values
        stock2 = data[tickers[1]].values
        stock1 = sm.add_constant(stock1)
        model = sm.OLS(stock2,stock1).fit()
        slope_col.append(model.params[1])
        intercept_col.append(model.params[0])
    return pd.DataFrame({'Slope': slope_col, 'Intercept': intercept_col}, index=df.index)

In [16]:
def calc_spread(data,tickers,window):
    df=data['Close']
    df[['Slope','Intercept']] = lin_reg(df,tickers,window)[['Slope','Intercept']]
    spread = df[tickers[1]] - (df['Intercept'] + df['Slope']*df[tickers[0]])
    spread_mean = spread.rolling(window).mean()
    spread_std = spread.rolling(window).std()
    z_score = (spread - spread_mean)/spread_std
    return pd.DataFrame({'Slope': df['Slope'], 'Spread': spread, 'Spread Mean': spread_mean, 'Spread Std': spread_std, 'Z-Score': z_score})

In [17]:
def close(df):
    return df[['Close']]

In [31]:
df.iloc[395:405]

class Pairs(bt.Strategy):
    def __init__(self, data, cash):
        super().__init__(data, cash)
        self.indicator(close, data)
        self.indicator(calc_spread, data, tickers, window)
    
    def run(self, date_index, row):
        super().run(date_index,row)
        if abs(row['Z-Score']) >= 1:
            self.buy(tickers[1], buy_price=self.cash*(-0.05)*row['Z-Score'])
            self.buy(tickers[0], buy_price=self.cash*0.05*row['Z-Score'])
        else:
            self.sell(tickers[1])
            self.sell(tickers[0])

# Multiplier for divergence from Z-score


In [29]:
test = Pairs(data=df, cash=10000)
testrun = bt.Backtest(df, test)
testrun.run_backtest(monte_carlo_iterations=100, mc_replacement=False,duration=5)
testrun.visualize()

Return:1.11844640673641
Standard Dev:0.01275572437635136
Sharpe Ratio:5.891190850417614


In [30]:
prices = [trade.price for trade in testrun.strategy.all_trades]
buy_prices = [trade.buy_price for trade in testrun.strategy.all_trades]
np.sum(prices)-np.sum(buy_prices)

np.float64(7501.525008787517)