# Capital Asset Pricing Model (CAPM) in Factor Investing

In [1]:
# Import libraries
import pandas as pd
import numpy as np
import yfinance as yf
import statsmodels.api as sm
from statsmodels.regression.rolling import RollingOLS
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
from IPython.display import clear_output

In [2]:
# Plot stock vs market returns with regression values
tickers = ['AAPL', '^GSPC']

prices = yf.download(tickers, start='2015-01-01', end='2022-12-31')['Close']
returns = prices.pct_change().dropna()
returns.columns = ['AAPL', 'S&P500']
market_returns = returns['S&P500']
stock_returns = returns['AAPL']
model = sm.OLS(stock_returns, sm.add_constant(market_returns)).fit()
alpha = model.params[0]
beta = model.params[1]
returns['Regression'] = alpha + (beta*returns['S&P500'])

fig = px.scatter(returns,
           x='S&P500',
           y=['AAPL', 'Regression'],
                 width=900)

fig.update_layout(title="AAPL Returns vs. S&P500 Returns",
    xaxis_title="S&P500 Returns",
    yaxis_title="Returns",
    legend_title="Legend")

fig.show()

[*********************100%***********************]  2 of 2 completed


In [3]:
def select_tickers_insample(tickers, start_date_insample, end_date_insample):
    ''' Perform in-sample analysis to identify and select tickers with high betas
    '''
    # Fetch historical stock prices and market index using Yahoo Finance API
    prices = yf.download(tickers, start=start_date_insample, end=end_date_insample)['Close']
    market_ticker = '^GSPC'  # Ticker symbol for the market index
    market_prices = yf.download(market_ticker, start=start_date_insample, end=end_date_insample)['Close']

    # Combine prices and market_prices into a single DataFrame
    data = pd.concat([prices, market_prices], axis=1)
    data.columns = tickers + [market_ticker]

    # Calculate returns
    returns = data.pct_change().dropna()

    # Separate the market returns from the returns DataFrame
    market_returns = returns[market_ticker]
    stock_returns = returns[tickers]

    # Calculate CAPM beta for each stock in the in-sample period
    betas = pd.Series(index=stock_returns.columns, dtype=float)
    for ticker in stock_returns.columns:
        stock_returns_insample = stock_returns.loc[start_date_insample:end_date_insample, ticker]
        market_returns_insample = market_returns.loc[start_date_insample:end_date_insample]
        model = sm.OLS(stock_returns_insample, sm.add_constant(market_returns_insample)).fit()
        betas[ticker] = model.params[1]

    # print(betas)

    # Select tickers for the out-of-sample period based on the in-sample analysis
    selected_tickers = betas[betas > np.nanpercentile(betas, 75)].index.tolist()

    return selected_tickers

def calculate_portfolio_returns_outsample(selected_tickers, start_date_outsample, end_date_outsample, start_value, verbose=False):
    ''' Perform vectorized out of sample test with the tickers selected in the in-sample analysis 
    '''
    # Fetch historical stock prices and market index using Yahoo Finance API
    prices = yf.download(selected_tickers, start=start_date_outsample, end=end_date_outsample)['Adj Close']
    market_ticker = '^GSPC'  # Ticker symbol for the market index
    market_prices = yf.download(market_ticker, start=start_date_outsample, end=end_date_outsample)['Adj Close']

    # Combine prices and market_prices into a single DataFrame
    data = pd.concat([prices, market_prices], axis=1)
    data.columns = selected_tickers + [market_ticker]

    # display(data)

    # Calculate returns
    returns = data.pct_change().dropna()

    # Separate the market returns from the returns DataFrame
    market_returns = returns[market_ticker]
    stock_returns = returns[selected_tickers]

    # Calculate factor signal (high beta) for the out-of-sample period
    factor_signal = pd.DataFrame(index=returns.index, columns=selected_tickers)
    factor_signal.loc[:, selected_tickers] = 1

    # display(factor_signal)

    # Calculate daily factor returns for the out-of-sample period (overall asset returns)
    factor_returns = factor_signal.shift(1) * returns

    # Define the portfolio weights for the out-of-sample period
    portfolio_weights = factor_signal.div(factor_signal.sum(axis=1, skipna=True), axis=0)

    # display(portfolio_weights)

    # Calculate daily portfolio returns for the out-of-sample period
    portfolio_returns = (portfolio_weights * factor_returns).sum(axis=1)

    # Calculate cumulative returns for the out-of-sample period
    cumulative_returns = start_value*((1 + portfolio_returns).cumprod())

    if verbose==True:
      # Plot cumulative returns for the out-of-sample period
      plt.figure(figsize=(10, 6))
      cumulative_returns.plot(label='Factor Model Strategy')
      plt.title('Factor Model Strategy - Cumulative Returns (Out-of-Sample)')
      plt.xlabel('Date')
      plt.ylabel('Cumulative Returns')
      plt.legend()
      plt.show()

      # Calculate performance metrics for the out-of-sample period (e.g., annualized return, volatility, Sharpe ratio, etc.)
      compounded_total_return = cumulative_returns[-1] - 1
      annualized_return = (1 + compounded_total_return) ** (252 / len(portfolio_returns)) - 1
      annualized_volatility = portfolio_returns.std() * np.sqrt(252)
      sharpe_ratio = annualized_return / annualized_volatility

      print("Compounded Total Return (Out-of-Sample): {:.2%}".format(compounded_total_return))
      print("Annualized Return (Out-of-Sample): {:.2%}".format(annualized_return))
      print("Annualized Volatility (Out-of-Sample): {:.2%}".format(annualized_volatility))
      print("Sharpe Ratio (Out-of-Sample): {:.2f}".format(sharpe_ratio))

    return cumulative_returns

In [4]:
# Define input variables
tickers = ["AAPL", "MSFT", "AMZN", "GOOGL", "GOOG", "TSLA", "BRK-B", 
           "JNJ", "NVDA","JPM", "V", "HD", "PG", "MA","DIS", "BAC", "UNH", 
           "PYPL", "CMCSA","VZ", "INTC", "NFLX", "T", "ADBE", "PEP", 
           "CSCO", "ABT", "KO", "CRM",    "ABBV", "XOM", "MRK", "TMO", "AVGO",
           "WMT", "ACN", "CVX", "MCD", "BMY", "LLY", "COST", "NKE", "MDT", 
           "DHR","LIN", "PM", "UNP", "NEE", "AMGN"]

market_ticker = '^GSPC'  # Ticker symbol for the market index
start_value = 1
years_list = [2016, 2017, 2018, 2019, 2020, 2021, 2022]

total_cumulative_returns = pd.DataFrame()

# Loop through years
for year in years_list:
  start_date_insample = str(year-1) + '-01-01'
  end_date_insample = str(year-1) + '-12-31'
  start_date_outsample = str(year) + '-01-01'
  end_date_outsample = str(year) + '-12-31'

  # Run walk-forward test functions
  selected_tickers = select_tickers_insample(tickers, start_date_insample, end_date_insample)
  cumulative_returns = calculate_portfolio_returns_outsample(selected_tickers, start_date_outsample, end_date_outsample, start_value)
  total_cumulative_returns = pd.concat([total_cumulative_returns, cumulative_returns])
  start_value = cumulative_returns[-1]
  clear_output()

# Calculate benchmark performance
benchmark = pd.DataFrame(yf.download(market_ticker, start=str(years_list[0]) +'-01-01', end=str(years_list[-1]) +'-12-31')['Adj Close'])
benchmark.columns = ['Benchmark Returns']
benchmark['Benchmark Returns'] = benchmark['Benchmark Returns'] / benchmark['Benchmark Returns'].iloc[0]

total_cumulative_returns.columns=['Returns']

[*********************100%***********************]  1 of 1 completed


In [13]:
# Plot figure
fig = go.Figure()
fig.add_trace(go.Scatter(x=total_cumulative_returns.index, 
                         y=total_cumulative_returns['Returns'],
                    mode='lines',
                    name='CAPM (Out of Sample)'))

fig.add_trace(go.Scatter(x=benchmark.index, 
                         y=benchmark['Benchmark Returns'],
                    mode='lines',
                    name='S&P500'))

fig.update_layout(title="Walk Forward Test - Compounded Normalized Returns",
    xaxis_title="Date",
    yaxis_title="Returns",
    legend_title="Legend",
    legend=dict(orientation='h',
                yanchor='bottom',
                y=1.02,
                xanchor='right',
                x=1),
                  width=900,
                  height=550)

fig.show()

In [15]:
# Calculate daily returns
total_cumulative_returns['Daily Returns'] = total_cumulative_returns['Returns'].pct_change()
benchmark['Daily Returns - Benchmark'] = benchmark['Benchmark Returns'].pct_change()

# Combine dataframes
data = pd.concat([total_cumulative_returns, benchmark], axis=1)

In [16]:
# Calculate running beta
mod_short = RollingOLS(data['Daily Returns'], sm.add_constant(data['Daily Returns - Benchmark']), window=41)
mod_long = RollingOLS(data['Daily Returns'], sm.add_constant(data['Daily Returns - Benchmark']), window=252, min_nobs=41, expanding=True)
rolling_params_short = mod_short.fit()
rolling_params_long = mod_long.fit()

# Plot figure
fig = go.Figure()
fig.add_trace(go.Scatter(x=rolling_params_short.params.index, 
                         y=rolling_params_short.params['Daily Returns - Benchmark'],
                    mode='lines',
                    name='60D Rolling Beta (Out of Sample)'))

fig.add_trace(go.Scatter(x=rolling_params_long.params.index, 
                         y=rolling_params_long.params['Daily Returns - Benchmark'],
                    mode='lines',
                    name='360D Rolling Beta (Out of Sample)'))

fig.update_layout(title="Walk Forward Test - Rolling Beta",
    xaxis_title="Date",
    yaxis_title="Beta",
    legend_title="Legend",
     legend=dict(orientation='h',
                yanchor='bottom',
                y=1.02,
                xanchor='right',
                x=1),
                  width=900,
                  height=550)

fig.show()

In [17]:
# Calculate rolling volatility
data['60D Rolling Volatility'] = data['Daily Returns'].rolling('60D').std()*100
data['60D Rolling Volatility - Benchmark'] = data['Daily Returns - Benchmark'].rolling('60D').std()*100

# Plot figure
fig = go.Figure()
fig.add_trace(go.Scatter(x=data.index, 
                         y=data['60D Rolling Volatility'],
                    mode='lines',
                    name='CAPM (Out of Sample)'))

fig.add_trace(go.Scatter(x=data.index, 
                         y=data['60D Rolling Volatility - Benchmark'],
                    mode='lines',
                    name='S&P500'))

fig.update_layout(title="Walk Forward Test - 60-Day Rolling Volatility",
    xaxis_title="Date",
    yaxis_title="Volatility (%)",
    legend_title="Legend",
     legend=dict(orientation='h',
                yanchor='bottom',
                y=1.02,
                xanchor='right',
                x=1),
                  width=900,
                  height=550)

fig.show()

In [18]:
# Calculate drawdowns
data['Global Max'] = data['Returns'].cummax()
data['Drawdown'] = (data['Returns'] - data['Global Max'])*100/data['Global Max']
data['Global Max - Benchmark'] = data['Benchmark Returns'].cummax()
data['Drawdown - Benchmark'] = (data['Benchmark Returns'] - data['Global Max - Benchmark'])*100/data['Global Max - Benchmark']

# Plot figure
fig = go.Figure()
fig.add_trace(go.Scatter(x=data.index, 
                         y=data['Drawdown'],
                    mode='lines',
                    name='CAPM (Out of Sample)'))

fig.add_trace(go.Scatter(x=data.index, 
                         y=data['Drawdown - Benchmark'],
                    mode='lines',
                    name='S&P500'))

fig.update_layout(title="Walk Forward Test - Drawdown",
    xaxis_title="Date",
    yaxis_title="Drawdown (%)",
    legend_title="Legend",
     legend=dict(orientation='h',
                yanchor='bottom',
                y=1.02,
                xanchor='right',
                x=1),
                  width=900,
                  height=550)

fig.show()