# 0. Prerequisites

In [1]:
import numpy as np
import pandas as pd
import yfinance as yf
from datetime import datetime
from dateutil.relativedelta import relativedelta
import plotly.graph_objects as go
import math
from tqdm import tqdm
import random

import warnings
warnings.filterwarnings('ignore')

In [2]:
random.seed(42)

In [3]:
startdate = datetime(2022, 1, 1)
enddate = datetime(2024, 11, 9)
datelist = []
mondaylist = []
date = startdate

while date != enddate:
    datelist.append(date)
    date += relativedelta(days=1)

monday = datetime(2022, 1, 3)
while monday <= enddate:
    mondaylist.append(monday)
    monday += relativedelta(weeks=1)

In [4]:
longTickerList = [
    "CEG", "LH", "BA", "CARR", "DOW", "PH", "EMR", "JBL", "SWK", "URI", "BSX", "DLTR", "ORCL", "HUBB", "LYB", "XYL", "HON", "DD", "ROP", "UNH", "IBM", "GRMN", "CMI", "BKR", "GLW", "SYK", 
    "FTV", "ETN", "CHD", "OTIS", "PCAR", "DGX", "AME", "DRI", "APH", "AOS", "HUM", "CLX", "ORLY", "CTAS", "ECL", "TER", "TMUS", "MAS", "TDG", "JNPR", "NSC", "FAST", "PAYX", "ROK", "ITW",
    "CSCO", "CPRT", "TMO", "OKE", "EXC", "EMN", "PWR", "NEM", "DOV", "VTR", "TXT", "TXN", "PG", "AVY", "DTE", "MGM", "BR", "GD", "ADP", "PPL", "NI", "MLM", "IDXX", "HCA", "SHW", "HWM",
    "ZTS", "RCL", "GWW", "CDW", "CAH", "HPE", "HD", "HSY", "RTX", "UNP", "MCK", "AES", "FICO", "INTC", "JCI", "ATO", "HAS", "LOW", "ALLE", "WELL", "ISRG", "VRSN", "TRGP", "LMT"
]
shortTickerList = ["ETSY", "DXCM", "ILMN", "PAYC", "VFC", "ABNB", "APA", "UPS", "EPAM", "CHTR", "MOS", "EXPE", "MPC", "PANW", "VLO", "COR", "BXP", "MRO", "HAL", "MRNA"]

## 1. Acquire Daily Data from Yahoo Finance

In [115]:
for stock in longTickerList:
    ticker = yf.Ticker(stock)
    tickerDf = ticker.history(start=startdate-relativedelta(days=7), end=enddate).reset_index()
    tickerDf["Return"] = tickerDf["Close"] / tickerDf["Close"].shift(1) - 1
    tickerDf["Date"] = tickerDf["Date"].dt.tz_localize(None)
    tickerDf.to_csv(f"./History/Long/{stock}.csv", index=False)

for stock in shortTickerList:
    ticker = yf.Ticker(stock)
    tickerDf = ticker.history(start=startdate-relativedelta(days=7), end=enddate).reset_index()
    tickerDf["Return"] = tickerDf["Close"] / tickerDf["Close"].shift(1) - 1
    tickerDf["Date"] = tickerDf["Date"].dt.tz_localize(None)
    tickerDf.to_csv(f"./History/Short/{stock}.csv", index=False)

Here we compare `S&P500`, `Russell 2000`, `Nasdaq`, `Dow Jones` indexes, and used the index with the highest sharpe ratio as the market portfolio and benchmark.

In [4]:
baseIndexes = {
    "S&P 500": "ES=F",
    "Russell 2000": "RTY=F",
    "Nasdaq": "NQ=F",
    "Dow Jones": "YM=F"
}

In [116]:
for stock, code in baseIndexes.items():
    index = yf.Ticker(code)
    indexDf = index.history(start=startdate-relativedelta(days=7), end=enddate).reset_index()
    indexDf["Return"] = indexDf["Close"] / indexDf["Close"].shift(1) - 1
    indexDf["Date"] = indexDf["Date"].dt.tz_localize(None)
    indexDf.to_csv(f"./History/Indexes/{stock}.csv", index=False)

In [66]:
fig = go.Figure()
dailyReturnDf = pd.DataFrame()
for stock, code in baseIndexes.items():
    tickerDf = pd.read_csv(f"./History/Indexes/{stock}.csv", parse_dates=["Date"])
    dailyReturn = tickerDf["Return"]
    dailyReturnDf[stock] = dailyReturn
    fig.add_trace(go.Scatter(
        x=tickerDf["Date"],
        y=(dailyReturn),
        name=stock
    ))
    
fig.update_layout(
    title = "Daily Changes of Indexes",
    xaxis = dict(title="Date"),
    yaxis = dict(title="Daily Change Rate")
)
fig.show()

Here we use iShares Short Treasury Bond ETF (SHV), which tracks U.S. Treasury securities with maturities of one year or less, as risk-free asset to calculate those 4 indexes' sharpe ratio.

According to the Capital Asset Pricing Model, the sharpe ratio represents the efficiency of the portfolio. The one that has the highest sharpe ratio is always considered as the best investment.

In [64]:
SHV = yf.Ticker("SHV")
bondRateDf = SHV.history(start=startdate, end=enddate, interval="1d").reset_index()
bondRateDf.head()

Unnamed: 0,Date,Open,High,Low,Close,Volume,Dividends,Stock Splits,Capital Gains
0,2022-01-03 00:00:00-05:00,99.9485,99.957547,99.939445,99.939445,1072800,0.0,0.0,0.0
1,2022-01-04 00:00:00-05:00,99.939445,99.9485,99.939445,99.939445,1245800,0.0,0.0,0.0
2,2022-01-05 00:00:00-05:00,99.948504,99.948504,99.921341,99.921341,1332700,0.0,0.0,0.0
3,2022-01-06 00:00:00-05:00,99.930389,99.939444,99.921335,99.930389,1735300,0.0,0.0,0.0
4,2022-01-07 00:00:00-05:00,99.930391,99.939445,99.921337,99.939445,2089900,0.0,0.0,0.0


In [68]:
indexReturnsMean = dailyReturnDf.mean()
indexReturnsStd = dailyReturnDf.std()
mean = ((bondRateDf["Close"] / bondRateDf["Open"]) - 1).mean()

From the CAPM theory, we can calculate the Capital Market Line by formula below:

$$
E(R_p) = R_f + \frac{E(R_m)-R_f}{\sigma_m}\sigma_R
$$

Where **m** refers to the already known asset (here is the index data). And the slope of this line is the definition of sharpe ratio:

$$
\frac{R_m-R_f}{\sigma_m}
$$

Thus we can use the slope of the CML to show the sharpe ratio of different portfolio.

In [71]:
fig = go.Figure()

for i in range(len(indexReturnsMean)):
    fig.add_trace(go.Scatter(
        x=[indexReturnsStd[i], 0],
        y=[indexReturnsMean[i], mean],
        name=indexReturnsMean.index[i]
    ))
    
fig.update_layout(
    title = "Capital Market Lines",
    xaxis = dict(title="Standard Deviations"),
    yaxis = dict(title="Expected Returns")
)
fig.show()

Since S&P500 Index has the largest sharpe ratio in these 4 indexes, we will use it as the benchmark index.

## 2. Beta-constrained Long-short Strategy

To implement this beta-constrained long-short strategy, we can use weekly beta to maintain the portfolio's beta value into a certain level when comparing to the benchmark index (S&P500).

In [5]:
SP500 = pd.read_csv("./History/Indexes/S&P 500.csv", parse_dates=["Date"], infer_datetime_format=False)

def getWeeklyBeta(stock: str, monday: datetime, friday: datetime, long: bool = True) -> float:
    if long:
        stockDf = pd.read_csv(f"./History/Long/{stock}.csv", parse_dates=["Date"])
    else:
        stockDf = pd.read_csv(f"./History/Short/{stock}.csv", parse_dates=["Date"])
    curDf = stockDf[(stockDf["Date"] >= monday) & (stockDf["Date"] <= friday)]
    curSP500 = SP500[(SP500["Date"] >= monday) & (SP500["Date"] <= friday)]
    var_market = np.var(curSP500["Return"])
    merged = pd.merge(curDf, curSP500, on="Date")
    covariance = np.cov(merged["Return_x"][1:], merged["Return_y"][1:])[0, 1]
    beta = covariance / var_market
    if np.isnan(beta):
        beta = 0
    return float(beta)

In [8]:
class Portfolio():
    def __init__(
            self, 
            price: float = 5e6, 
            portfolio: dict = None, 
            capital: float = 5e6
        ) -> None:
        """
        Build a class that represents the portfolio and the capital we possess at a certain moment.

        Args:
            price (float): Total price of the portfolio.
            portfolio (dict): The shares of stocks and indexes which are contained in the portfolio, {Stock/Index name: Shares}
            capital (float): The capital that left in the portfolio.
        """
        self.Price = price
        if portfolio is None:
            portfolio = dict(zip(longTickerList+shortTickerList, [0]*len(longTickerList+shortTickerList)))
        self.Portfolio = portfolio
        self.Capital = capital
        self.LongPrice = 0
        self.ShortPrice = 0
        
    def purchase(self, stockname: str, tradedate: datetime, shares: int, tradetime: str, price: float = np.nan) -> None:
        long = stock in longTickerList
        if long:
            stockDf = pd.read_csv(f"./History/Long/{stock}.csv", parse_dates=["Date"])
        else:
            stockDf = pd.read_csv(f"./History/Short/{stock}.csv", parse_dates=["Date"])
        if len(stockDf[stockDf["Date"]==tradedate]) == 0:
            return
        if np.isnan(price):
            price = stockDf[stockDf["Date"]==tradedate][tradetime].iloc[0]
        self.Capital -= (price * shares)
        if stockname not in self.Portfolio.keys():
            self.Portfolio[stockname] = 0
        self.Portfolio[stockname] += shares

    def sell(self, stockname: str, tradedate: datetime, shares: int, tradetime: str, price: float = np.nan) -> None:
        long = stock in longTickerList
        if long:
            stockDf = pd.read_csv(f"./History/Long/{stock}.csv", parse_dates=["Date"])
        else:
            stockDf = pd.read_csv(f"./History/Short/{stock}.csv", parse_dates=["Date"])
        if len(stockDf[stockDf["Date"]==tradedate]) == 0:
            return
        if np.isnan(price):
            price = stockDf[stockDf["Date"]==tradedate][tradetime].iloc[0]
        self.Capital += (price * shares)
        if stockname not in self.Portfolio.keys():
            self.Portfolio[stockname] = 0
        self.Portfolio[stockname] -= shares
        
    def shortsell(self, stockname: str, tradedate: datetime, shares: int, tradetime: str, price: float = np.nan) -> None:
        if stockname not in self.Portfolio.keys():
            self.Portfolio[stockname] = 0
        self.Portfolio[stockname] += shares
        long = stock in longTickerList
        if long:
            stockDf = pd.read_csv(f"./History/Long/{stock}.csv", parse_dates=["Date"])
        else:
            stockDf = pd.read_csv(f"./History/Short/{stock}.csv", parse_dates=["Date"])
        if len(stockDf[stockDf["Date"]==tradedate]) == 0:
            return
        if np.isnan(price):
            price = stockDf[stockDf["Date"]==tradedate][tradetime].iloc[0]
        self.Capital += (price * shares)
        if stockname not in self.Portfolio.keys():
            self.Portfolio[stockname] = 0
        
    def shortpurchase(self, stockname: str, tradedate: datetime, shares: int, tradetime: str, price: float = np.nan) -> None:
        if stockname not in self.Portfolio.keys():
            self.Portfolio[stockname] = 0
        long = stock in longTickerList
        if long:
            stockDf = pd.read_csv(f"./History/Long/{stock}.csv", parse_dates=["Date"])
        else:
            stockDf = pd.read_csv(f"./History/Short/{stock}.csv", parse_dates=["Date"])
        if len(stockDf[stockDf["Date"]==tradedate]) == 0:
            return
        if np.isnan(price):
            price = stockDf[stockDf["Date"]==tradedate][tradetime].iloc[0]
        self.Capital -= (price * shares)
        if stockname not in self.Portfolio.keys():
            self.Portfolio[stockname] = 0
        self.Portfolio[stockname] -= shares

    def currentPrice(self, curdate: datetime) -> float:
        price = self.Capital
        self.LongPrice = 0
        self.ShortPrice = 0
        for stock, share in self.Portfolio.items():
            if share == 0:
                continue
            long = stock in longTickerList
            if long:
                stockDf = pd.read_csv(f"./History/Long/{stock}.csv", parse_dates=["Date"])
                if len(stockDf[stockDf["Date"]==curdate]) == 0:
                    continue
                index = int(np.where(stockDf["Date"]==curdate)[0])
                curprice = stockDf["Close"].iloc[index]
                self.LongPrice += (share * curprice)
            else:
                stockDf = pd.read_csv(f"./History/Short/{stock}.csv", parse_dates=["Date"])
                if len(stockDf[stockDf["Date"]==curdate]) == 0:
                    continue
                index = int(np.where(stockDf["Date"]==curdate)[0])
                curprice = stockDf["Close"].iloc[index]
                self.ShortPrice += (share * curprice)
        price += self.LongPrice
        price -= self.ShortPrice
        self.Price = price
        return float(price)

To be specific, here we use weekly beta to adjust the weights of the constituent stocks in the portfolio, which ensures that the portfolio's beta value is equal to the benchmark index's beta value.

Here we rebalance the weights of the constituents in the portfolio every week using `minimize` function in scipy.optimize with constraints:

+ $\sum{weight_i} = 1$
+ $\sum{weight_i*\beta_i} = 0$

In [9]:
from scipy.optimize import minimize

port = Portfolio(price=5e6, capital=5e6)
pricelist = []
weeklybetalist = []
capitallist = []
portlist = []
shortlist = []
longlist = []

stocklist = longTickerList + shortTickerList
capitalAlloDict = dict(zip(stocklist, np.zeros(len(stocklist))))
purchasepricedict = dict(zip(stocklist, np.zeros(len(stocklist))))

for i in tqdm(range(len(datelist))):
    date = datelist[i]
    ### Rebalance the weights on every mondays
    if date in mondaylist:
        # Assume we sell/short-purchase all the stocks so that we can rebalance the weights easily
        for stock, share in port.Portfolio.items():
            if share == 0:
                continue
            long = stock in longTickerList
            if long:
                port.sell(stock, date, share, "Open")
            else:
                port.shortpurchase(stock, date, share, "Open")
        longbetaDict = {}
        shortbetaDict = {}
        lastmonday = date-relativedelta(weeks=1)
        lastfriday = lastmonday+relativedelta(days=4)
        for stock in longTickerList:
            beta = getWeeklyBeta(stock, monday=lastmonday, friday=lastfriday)
            longbetaDict[stock] = beta
        for stock in shortTickerList:
            beta = getWeeklyBeta(stock, monday=lastmonday, friday=lastfriday, long=False)
            shortbetaDict[stock] = beta
        longbetalist = list(longbetaDict.values())
        shortbetalist = list(shortbetaDict.values())
        targetbeta = 0.0
        betas = np.concatenate([longbetalist, shortbetalist])
        def objective(weights):
            return np.sum(weights * betas) - targetbeta
        constraints = ({'type': 'eq', 'fun': lambda weights: np.sum(weights) - 1})
        bounds = [(0, 1)] * len(longbetalist) + [(-1, 0)] * len(shortbetalist)
        initial_weights = np.ones(len(betas)) / len(betas)
        result = minimize(
            lambda weights: abs(objective(weights)), 
            initial_weights, 
            bounds=bounds, 
            constraints=constraints
        )
        optimal_weights = result.x
        capitals = port.Capital * optimal_weights
        capitalAlloDict = dict(zip(stocklist, capitals))
        weeklybetalist.append(sum(betas * result.x))
        for i in range(len(stocklist)):
            capital = capitals[i]
            if capital == 0:
                continue
            stock = stocklist[i]
            if stock in longTickerList:
                stockDf = pd.read_csv(f"./History/Long/{stock}.csv", parse_dates=["Date"])
                if len(stockDf[stockDf["Date"]==date]) == 0:
                    continue
                price = stockDf[stockDf["Date"]==date]["Open"].iloc[0]
                share = math.floor(capital / price)
                port.purchase(stock, date, share, "Open")
                purchasepricedict[stock] = price
            else:
                stockDf = pd.read_csv(f"./History/Short/{stock}.csv", parse_dates=["Date"])
                if len(stockDf[stockDf["Date"]==date]) == 0:
                    continue
                price = stockDf[stockDf["Date"]==date]["Open"].iloc[0]
                share = math.floor(-capital / price)
                port.purchase(stock, date, share, "Open")
                purchasepricedict[stock] = price
    
    ### Check if current date is trade day
    if date in SP500["Date"].unique():
        # print(date, port.Portfolio)
        for stock, share in port.Portfolio.items():
            long = stock in longTickerList
            if long:
                stockDf = pd.read_csv(f"./History/Long/{stock}.csv", parse_dates=["Date"])
            else:
                stockDf = pd.read_csv(f"./History/Short/{stock}.csv", parse_dates=["Date"])
            if share == 0 and capitalAlloDict[stock] != 0: # For those constituent stocks which suffer loss on last day
                capital = capitalAlloDict[stock]
                if long:
                    if len(stockDf[stockDf["Date"]==date]) == 0:
                        continue
                    share = math.floor(capital / stockDf[stockDf["Date"]==date]["Open"].iloc[0])
                    port.purchase(stock, date, share, "Open")
                    purchasepricedict[stock] = price
                else:
                    if len(stockDf[stockDf["Date"]==date]) == 0:
                        continue
                    share = math.floor((-1 * capital) / stockDf[stockDf["Date"]==date]["Open"].iloc[0])
                    port.shortsell(stock, date, share, "Open")
                    purchasepricedict[stock] = price
                    
            index = np.where(stockDf["Date"]==date)[0]
            if index.size == 0:
                continue
            index = int(index)
            if long:
                if (stockDf["Low"].iloc[index] / purchasepricedict[stock]) <= 0.97:
                    port.sell(stock, date, share, "Low", purchasepricedict[stock]*0.97)
            else:
                if (stockDf["High"].iloc[index] / purchasepricedict[stock]) >= 1.03:
                    port.shortpurchase(stock, date, share, "High", purchasepricedict[stock]*1.03)
        
        pricelist.append(port.currentPrice(curdate=date))
        capitallist.append(port.Capital)
        portlist.append(port.Portfolio)
        shortlist.append(port.ShortPrice)
        longlist.append(port.LongPrice)
    else:
        pricelist.append(float(port.Price))
        capitallist.append(port.Capital)
        portlist.append(port.Portfolio)
        shortlist.append(port.ShortPrice)
        longlist.append(port.LongPrice)

100%|██████████| 1043/1043 [17:17<00:00,  1.01it/s]


In [10]:
fig = go.Figure()

fig.add_trace(go.Scatter(
    x=datelist,
    y=[pricelist[i] / 5e6 for i in range(len(pricelist))],
    name="Current Portfolio Value"
))

fig.add_trace(go.Scatter(
    x=datelist,
    y=SP500["Return"],
    name="SP500 Return"
))
    
fig.update_layout(
    title = "Portfolio Return",
    xaxis = dict(title="Dates"),
    yaxis = dict(title="Return")
)
fig.show()

In [11]:
fig = go.Figure()

fig.add_trace(go.Scatter(
    x=mondaylist,
    y=weeklybetalist,
    name="Beta Value"
))
    
fig.update_layout(
    title = "Portfolio Beta Value",
    xaxis = dict(title="Dates"),
    yaxis = dict(title="Beta Value")
)
fig.show()