# Cahart Factor Model #

### The Model Specification ###

In [3]:
# Import Libraries

# Data Management
import pandas as pd
import numpy as np

# Plots
import matplotlib.pyplot as plt

# Statistics
import statsmodels.api as sm
from statsmodels.stats.outliers_influence import variance_inflation_factor
from scipy.stats import norm

# Manipulate Files
import os

# Pretty Notation
from IPython.display import display, Math

In [4]:
# Create the Weights function
def wexp(N, half_life):
    c = np.log(0.5)/half_life
    n = np.array(range(N))
    w = np.exp(c*n)
    return np.flip(w/np.sum(w))

In [5]:
def import_financial_data(
    ticker: str
):

    # Check the ticker for Upper Cases
    ticker = ticker if ticker.isupper() else ticker.upper()

    # Import data
    df = pd.read_csv(rf"..\stocks\{ticker}.csv")

    # Set the Index
    df = df.set_index('Date')
    df.index = pd.to_datetime(df.index)

    df_useful_data = df[['Open Price', 'High Price', 'Low Price', 'Close Price', 'Adjusted_close']]

    df_useful_data = df_useful_data.rename(columns={
        "Open Price":"open",
        "High Price":"high",
        "Low Price":"low",
        "Close Price":"close",
        "Adjusted_close":"adjusted_close",
    })

    # Drop NaN's
    df_useful_data.dropna(inplace = True)

    return df_useful_data.loc["2015-01-01":]

In [6]:
# Get Data from Stock

ticker = 'META'

stock_price = import_financial_data(ticker)
stock_returns = stock_price['adjusted_close'].pct_change(1).dropna()

stock_returns

In [7]:
# Get the important data for the Risk Free Rate
rfr = pd.read_csv(r"..\additional_data\rfr.csv")
rfr = rfr.set_index('Date')
rfr.index = pd.to_datetime(rfr.index, dayfirst=True)

# Get the important data for the S&P500
sp500 = pd.read_csv(r"..\additional_data\sp500.csv")
sp500 = sp500.set_index('Date')
sp500.index = pd.to_datetime(sp500.index)

# Get the data for the SMB Premium
SMB = pd.read_csv(r"..\additional_data\SMB.csv")
SMB = SMB.set_index('Date')
SMB.index = pd.to_datetime(SMB.index)

# Get the data for the HML Premium
HML = pd.read_csv(r"..\additional_data\HML.csv")
HML = HML.set_index('Date')
HML.index = pd.to_datetime(HML.index)

# Get the data for the WML Premium
WML = pd.read_csv(r"..\additional_data\WML.csv")
WML = WML.rename(columns={'Unnamed: 0':'Date'})
WML = WML.set_index('Date')
WML.index = pd.to_datetime(WML.index)

# Get the data for the HML Premium
AMD = pd.read_csv(r"..\additional_data\AMD.csv")
AMD = AMD.rename(columns={'Unnamed: 0':'Date'})
AMD = AMD.set_index('Date')
AMD.index = pd.to_datetime(AMD.index)

In [8]:
# Create the data
daily_rfr = (((1 + (rfr['risk_free_rate'].div(100)))**(1/360)) - 1)
benchmark_returns = sp500['sp_500'].pct_change(1)

# Create the Excess Returns
market_excess_returns = benchmark_returns - daily_rfr
stock_excess_returns = stock_returns - daily_rfr

In [9]:
# Create the regression dataframe
regression_df = pd.DataFrame(index = stock_excess_returns.index)

regression_df['stock_excess_returns'] = stock_excess_returns
regression_df['MKT'] = market_excess_returns
regression_df['SMB'] = SMB
regression_df['HML'] = HML
regression_df['WML'] = WML
regression_df['AMD'] = AMD
regression_df.dropna(inplace = True)

regression_df

In [17]:
# Create the Y Vector
y = regression_df['stock_excess_returns']

# Create the X Matrix
x = regression_df[['MKT', 'SMB', 'HML', 'WML', 'AMD']]

In [19]:
# Correlations

x.corr()

In [21]:
# Calculate Weights
window = len(y)
weights = window * wexp(window, window/2)

#Model specification
model = sm.WLS(
    y, 
    sm.add_constant(x),
    missing='drop',
    weights=weights,
    )   
     
#the results of the model
results = model.fit() 
    
#here we check the summary
print(results.summary())

### Checking for Collinearity ###

In [23]:
vif_data = pd.DataFrame()
vif_data['vars'] = x.columns
vif_data['VIF'] = [variance_inflation_factor(x.values, i) for i in range(x.shape[1])]

vif_data

In [25]:
r_squared_df = pd.DataFrame()
r_squared_df['vars'] = x.columns

r_squared_df['r_squared'] = 1 - (1 / vif_data['VIF'])

r_squared_df

Econometricians can tolerate VIF stats lower than 5, so we are going to use these 5 factors.

### Orthogonalization ###

In [45]:
# Calculate Weights
window = len(y)
weights = window * wexp(window, window/2)

#Model specification
model_mktvsamd = sm.WLS(
    regression_df['MKT'], 
    regression_df['AMD'],
    missing='drop',
    weights=weights,
    )   
     
#the results of the model
results_mktvsamd = model_mktvsamd.fit() 
    
#here we check the summary
print(results_mktvsamd.summary())

In [61]:
# Obtaining the Real MKT Premium

OMKT = results_mktvsamd.resid
regression_df['OMKT'] = OMKT

# Create Plot
plt.figure(figsize=(10, 6))
plt.plot(regression_df['MKT'].cumsum(), label='Non-Orthogonalized MKT', alpha=1)
plt.plot(regression_df['OMKT'].cumsum(), label='Orthogonalized MKT', alpha=1)
plt.plot(regression_df['AMD'].cumsum(), label='AMD Premium', alpha=1)

# Config
plt.title('Premium Time Series')
plt.xlabel('Time')
plt.ylabel('Returns')
plt.legend()
plt.grid()

# Show
plt.show()

In [57]:
# Create the X Matrix
x_alt = regression_df[['OMKT', 'SMB', 'HML', 'WML', 'AMD']]

x_alt.corr()

In [59]:
# Calculate Weights
window = len(y)
weights = window * wexp(window, window/2)

#Model specification
model = sm.WLS(
    y, 
    sm.add_constant(x_alt),
    missing='drop',
    weights=weights,
    )   
     
#the results of the model
results = model.fit() 
    
#here we check the summary
print(results.summary())

In [69]:
vif_data_alt = pd.DataFrame()
vif_data_alt['vars'] = x_alt.columns
vif_data_alt['VIF'] = [variance_inflation_factor(x_alt.values, i) for i in range(x_alt.shape[1])]

vif_data_alt

### Obtaining the Coefficients for a single stock ###

In [83]:
# Define the Function
def CarhartFactors(
    stock_returns: pd.Series, 
    market_returns: pd.Series = OMKT, 
    small_minus_big_series: pd.Series = SMB,
    high_minus_low_series: pd.Series = HML,
    winners_minus_losers_series: pd.Series = WML,
    aggressive_minus_defensive_series: pd.Series = AMD,
    window: int = 252
) -> pd.DataFrame:
    
    # Asegurar el mismo índice
    common_index = stock_returns.index \
        .intersection(market_returns.index) \
        .intersection(small_minus_big_series.index) \
        .intersection(high_minus_low_series.index) \
        .intersection(winners_minus_losers_series.index) \
        .intersection(aggressive_minus_defensive_series.index)
    
    stock_returns = stock_returns.loc[common_index]
    market_returns = market_returns.loc[common_index]
    small_minus_big_series = small_minus_big_series.loc[common_index]
    high_minus_low_series = high_minus_low_series.loc[common_index]
    winners_minus_losers_series = winners_minus_losers_series.loc[common_index]
    aggressive_minus_defensive_series = aggressive_minus_defensive_series.loc[common_index]
    
    # Crear matriz X
    X_all = pd.concat([
        market_returns,
        small_minus_big_series,
        high_minus_low_series,
        winners_minus_losers_series,
        aggressive_minus_defensive_series
    ], axis=1)
    
    # Inicializar dataframe para betas
    betas = pd.DataFrame(index=stock_returns.index, columns=["mkt_beta", "smb_beta", "hml_beta", "wml_beta", "amd_beta"], dtype=float)
    
    for i in range(window, len(stock_returns)):
        y_window = stock_returns.iloc[i-window:i]
        X_window = X_all.iloc[i-window:i]
        
        if y_window.isna().any() or X_window.isna().any().any():
            continue

        weights = window * wexp(window, window/2)
        X_window = sm.add_constant(X_window)
        model = sm.WLS(y_window, X_window, weights=weights, missing='drop').fit()
        params = model.params

        # Store the betas
        mkt_beta = params.iloc[1]
        smb_beta = params.iloc[2]
        hml_beta = params.iloc[3]
        wml_beta = params.iloc[4]
        amd_beta = params.iloc[5]
        
        betas.iloc[i] = [
            mkt_beta,
            smb_beta,
            hml_beta,
            wml_beta,
            amd_beta
        ]
        
    return betas

In [85]:
# Check if the Fuction Works (Remember the Stock we are using)

parameters = CarhartFactors(
    regression_df['stock_excess_returns'],
).dropna()

parameters

In [86]:
# Create Plot

plt.figure(figsize=(10, 6))
plt.plot(parameters.ewm(span=21, adjust = False).mean(), label=parameters.columns, alpha=1)

# Config
plt.title(f'{ticker} Betas Time Series')
plt.xlabel('Time')
plt.ylabel('Betas')
plt.legend()
plt.grid()

# Show
plt.show()

### Obtaining the Coefficients for all the Stocks ###

In [73]:
# Dictionary to store the DataFrames
folder_path = r"..\stocks"

dataframes = {} 

# List all files in the folder
for file in os.listdir(folder_path):
    if file.endswith(".csv"):
        # Full path to the file
        file_path = os.path.join(folder_path, file)
        
        # Read the Excel file
        df = pd.read_csv(file_path)
        df = df.set_index("Date")
        df.index = pd.to_datetime(df.index)

        df = df['Adjusted_close']

        df = df.rename("adj_close")
        
        # Fill nans
        df = df.interpolate(method='time')

        df = df.loc['2015-01-01':]

        df.dropna(inplace=True)
        
        if len(df) >= 2000:
            # File name without extension
            file_name = os.path.splitext(file)[0]
            
            # Guardar en el diccionario
            dataframes[file_name] = df
            print(f"File loaded: {file_name} ({len(df)} rows)")
        else:
            print(f"File skipped (less than 2000 rows after cleaning): {file}")

print(f"\nTotal files loaded: {len(dataframes)}")
print("Files loaded:", list(dataframes.keys()))

In [75]:
# Create a whole new dataframe that contains all the stocks betas

rets_series = []

for stock, df in dataframes.items():
    series = df.pct_change(1).rename(stock)  
    series = series.iloc[1:]
    rets_series.append(series)

# Concat
returns_df = pd.concat(rets_series, axis=1)
returns_df = returns_df.apply(lambda x: x.fillna(x.mean()), axis=0)
returns_df = returns_df.sub(daily_rfr, axis=0)

returns_df.dropna(inplace = True)

returns_df

In [77]:
# Create Dicts
betas_mkt_dict = {}
betas_smb_dict = {}
betas_hml_dict = {}
betas_wml_dict = {}
betas_amd_dict = {}

# Loop
for ticker, df in dataframes.items():
    stock_returns = returns_df[ticker]

    # Calculate betas
    parameters = CarhartFactors(stock_returns)

    # Store
    betas_mkt_dict[ticker] = parameters['mkt_beta']
    betas_smb_dict[ticker] = parameters['smb_beta']
    betas_hml_dict[ticker] = parameters['hml_beta']
    betas_wml_dict[ticker] = parameters['wml_beta']
    betas_amd_dict[ticker] = parameters['amd_beta']

    print(f'{ticker} is ready.')

# Create the DataFrames
betas_mkt = pd.concat(betas_mkt_dict, axis=1)
betas_smb = pd.concat(betas_smb_dict, axis=1)
betas_hml = pd.concat(betas_hml_dict, axis=1)
betas_wml = pd.concat(betas_wml_dict, axis=1)
betas_amd = pd.concat(betas_amd_dict, axis=1)

In [89]:
# See Market Betas
betas_mkt.dropna(inplace = True)

betas_mkt

In [91]:
# See SMB Betas
betas_smb.dropna(inplace = True)

betas_smb

In [93]:
# See HML Betas
betas_hml.dropna(inplace = True)

betas_hml

In [95]:
# See WML Betas

betas_wml.dropna(inplace = True)

betas_wml

In [97]:
# See AMD Betas
betas_amd.dropna(inplace = True)

betas_amd

In [99]:
# Save

betas_mkt.to_csv(r'..\additional_data\carhart_mkt_betas.csv')
betas_smb.to_csv(r'..\additional_data\carhart_smb_betas.csv')
betas_hml.to_csv(r'..\additional_data\carhart_hml_betas.csv')
betas_wml.to_csv(r'..\additional_data\carhart_wml_betas.csv')
betas_amd.to_csv(r'..\additional_data\carhart_amd_betas.csv')