# Capital Asset Pricing Model & Fama-French 3-Factor Regression in Python

The following code performs several financial analyses, mainly focusing on regression models and descriptive statistics for ETF and mutual fund data. 

First, it loads the relevant sheets from Excel files and prepares the data by calculating log returns and aligning with a risk-free rate. The code then conducts CAPM and Fama-French 3-Factor regressions, generating outputs such as coefficients, p-values, and R-squared values etc. 
Additionally, it performs diagnostic tests (e.g., Durbin-Watson, Breusch-Pagan, White test) and calculates descriptive statistics. Finally, visualizations of residuals and fitted values are created to assess model assumptions and results.

### Models used in this analysis:

#### • Capital Asset Pricing Model (CAPM)

#### $$ln⁡Δ R_p-ln⁡Δ R_f=α_i+β_1 ⋅ {ln⁡Δ R_m-ln⁡Δ R_f }+ϵ_i$$

| Expression | Description |
|------------|-------------|
| $$ \ln \Delta R_p - \ln \Delta R_f $$ | The natural logarithm of the excess return of the fund. |
| $$ \alpha_i $$ | The alpha value of the portfolio. |
| $$ \beta_1 $$ | The beta coefficient of the portfolio. |
| $$ \ln \Delta R_m - \ln \Delta R_f $$  | The natural logarithm of the change in return of the market portfolio. |
| $$ \epsilon_i $$ | The residual value for the portfolio, that cannot be explained by market movements. |

#### • Fama-French 3 Factor Model

#### $$ ln⁡Δ R_p-ln⁡Δ R_f=α_i+β_1 ⋅ {ln⁡Δ R_m-ln⁡Δ R_f }+β_2 ⋅ {SMB} + β_3 ⋅ {HML}+ϵ_i $$

| Expression | Description |
|------------|-------------|
| $$ \ln \Delta R_p - \ln \Delta R_f $$ | The natural logarithm of the excess return of the fund. |
| $$ \alpha_i $$ | The alpha value of the portfolio. |
| $$ \beta_1 $$ | The beta coefficient of the portfolio. |
| $$ \ln \Delta R_m - \ln \Delta R_f $$  | The natural logarithm of the change in return of the market portfolio. |
| $$ \beta_2 \, ⋅ \text{SMB} $$ | The Small Minus Big factor, which explains the return difference between small and large companies. |
| $$ \beta_3 \,⋅  \text{HML} $$ | The High Minus Low factor, which explains the return difference between portfolios with high and low book-to-market ratios. |
| $$ \epsilon_i $$ | The residual value for the portfolio that cannot be explained by market movements.|

### The first step is to load the excel-files containing the data

In [252]:
etf_file_path_excel = r'C:\Users\...\Data_ETF.xlsm'
mutual_file_path_excel = r'C:\Users\...\Data_Mutual.xlsm'

#### Importing all relevant packages

In [235]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import scipy.stats as stats
import seaborn as sns
import statsmodels.api as sm
import os
import glob
from openpyxl import Workbook
from openpyxl.drawing.image import Image
import warnings
from statsmodels.stats.stattools import durbin_watson
warnings.filterwarnings('ignore', category=FutureWarning)
from scipy.stats import skew, kurtosis, jarque_bera
from openpyxl import load_workbook
from statsmodels.stats.diagnostic import het_breuschpagan, het_white
import statsmodels.stats.api as sms
from statsmodels.stats.stattools import jarque_bera
from statsmodels.stats.api import het_breuschpagan
import statsmodels.stats.api as smd
from statsmodels.stats.outliers_influence import variance_inflation_factor
import matplotlib.dates as mdates

#### Loading the excel files and relevant sheets

In [None]:
df_closing_mutual = pd.read_excel(mutual_file_path_excel, sheet_name='Closing_Prices').iloc[:73]
df_closing_etf = pd.read_excel(etf_file_path_excel, sheet_name='Closing_Prices', skiprows=1).iloc[:73]
df_data_capm_fama = pd.read_excel(etf_file_path_excel, sheet_name='Data_CAPM_FAMA', skiprows=11).iloc[:72]
df_riskfree = df_data_capm_fama[['Dates', 'ln riskfree']].iloc[:72]
df_regressor_capm = df_data_capm_fama[['Dates', 'ln excess']].iloc[:72]
df_regressor_capm.set_index('Dates', inplace=True)
df_regressor_fama = df_data_capm_fama[['Dates', 'ln excess', 'SMB', 'HML']].iloc[:72]
df_regressor_fama.set_index('Dates', inplace=True)

### Overview of Function-based code

#### Calculating Log returns and replacing of missing values with the mean

In [None]:
def calculate_log_returns(df):
    '''
    Calculates the ln return 
    '''
    date_column = df.iloc[:, 0]
    log_returns = np.log(df.iloc[:, 1:].shift(1) / df.iloc[:, 1:])
    log_returns.insert(0, 'Date', date_column)
    return log_returns

def replace_nans_and_zeros_with_mean(series):
    '''
    Replaces NaN and 0 values between the first and last valid value with the mean value of the entire time series.
    '''
    # Calculate the mean value of the valid values (without NaN and 0 values)
    valid_values = series[(series != 0) & series.notna()]
    overall_mean = valid_values.mean()
    if valid_values.empty:
        # If there are no valid values, the unchanged series is returned
        return series
    # Indices of valid values
    valid_indices = valid_values.index
    # Determining the first and last valid index
    first_valid_index = valid_indices.min()  # newest values 
    last_valid_index = valid_indices.max()   # oldest values
    # Mask for values between the first and last valid value that are NaN or 0
    mask = ((series == 0) | series.isna()) & (series.index >= first_valid_index) & (series.index <= last_valid_index)
    # Replacing these values with the mean value
    series.loc[mask] = overall_mean
    return series

#### CAPM & Fama-French Regression

In [None]:
def regression_capm(log_excess_return, df_regressor_capm, writer):
    '''
    Performs the CAPM regression and stores the resulting data, residuals, and fitted values in data frames.
    '''
    results_list = []
    residuals_dict = {}
    fitted_values_dict = {}
    
    # Only numeric regressor column
    X = df_regressor_capm['ln excess'].astype(float)  # Convert to float if necessary
    
    # Loop through the columns of log_excess_return
    for column in log_excess_return.select_dtypes(include=['float64', 'int64']).columns:
        y = log_excess_return[column].astype(float)  # Convert to float if necessary
        # Combine dependent variable and regressor, removing NaN rows and performing regression
        regression_data = pd.concat([y, X], axis=1).dropna()
        y_clean = regression_data.iloc[:, 0]
        X_clean = sm.add_constant(regression_data.iloc[:, 1])
        model = sm.OLS(y_clean, X_clean)
        results = model.fit()     
        # Temporarily saving regression results
        result_dict = {
            'Fund': column,
            'Alpha': results.params[0],
            'Beta': results.params[1],
            'P-Values Alpha': results.pvalues[0],
            'P-Values Beta': results.pvalues[1],
            'R-Squared': results.rsquared,
            'Confidence Interval Alpha': results.conf_int().iloc[0].tolist(),
            'Confidence Interval Beta': results.conf_int().iloc[1].tolist()
        }
        results_list.append(result_dict)
        # Temporarily saving residuals and fitted values
        residuals_dict[column] = pd.Series(results.resid).reindex(log_excess_return.index).tolist()
        fitted_values_dict[column] = pd.Series(results.fittedvalues).reindex(log_excess_return.index).tolist()

    # Converting results, residuals, and fitted values to dataframes
    results_df = pd.DataFrame(results_list)
    residuals_df = pd.DataFrame(residuals_dict, index=log_excess_return.index)
    fitted_values_df = pd.DataFrame(fitted_values_dict, index=log_excess_return.index)
    # Saving to Excel
    results_df.to_excel(writer, sheet_name='CAPM Results', index=False)
    residuals_df.to_excel(writer, sheet_name='Residuals', index=True)
    fitted_values_df.to_excel(writer, sheet_name='Fitted Values', index=True)
    return results_df, residuals_df, fitted_values_df

def regression_fama(log_excess_return, df_regressor_fama, writer):
    '''
    Perform the Fama-French three-factor regression and store the resulting data, residuals, and adjusted values in data frames
    '''
    results_list = []
    residuals_dict = {}
    fitted_values_dict = {}
    # only numeric Regressor columns
    X = df_regressor_fama.select_dtypes(include=['float64', 'int64'])
    # Loop through the columns of log_excess_return
    for column in log_excess_return.select_dtypes(include=['float64', 'int64']).columns:
        y = log_excess_return[column].astype(float)  # Convert to float if necessary
        # Combine dependent variable and regressors, removing NaN rows
        regression_data = pd.concat([y, X], axis=1).dropna()
        y_clean = regression_data.iloc[:, 0]
        X_clean = sm.add_constant(regression_data.iloc[:, 1:])
        model = sm.OLS(y_clean, X_clean)
        results = model.fit()
        # Temporarily saving regression results 
        result_dict = {
            'Fund': column, 
            'Alpha': results.params[0], 
            'P-Values Alpha': results.pvalues[0], 
            'R-Squared': results.rsquared, 
            'Model P-Value': results.f_pvalue, 
            'Confidence Interval Alpha': results.conf_int().iloc[0].tolist()
        }
        # Adding Betas, p-values and confidence intervals
        for i, regressor in enumerate(X.columns):
            result_dict[f'Beta {regressor}'] = results.params[i + 1]
            result_dict[f'P-Value Beta {regressor}'] = results.pvalues[i + 1]
            result_dict[f'Confidence Interval Beta {regressor}'] = results.conf_int().iloc[i + 1].tolist()
        results_list.append(result_dict)
        # Temporarily saving residuals and fitted values
        residuals_dict[column] = pd.Series(results.resid).reindex(log_excess_return.index).tolist()
        fitted_values_dict[column] = pd.Series(results.fittedvalues).reindex(log_excess_return.index).tolist()

    # Converting results, residuals and fitted values to dataframes
    results_df = pd.DataFrame(results_list)
    residuals_df = pd.DataFrame(residuals_dict, index=log_excess_return.index)
    fitted_values_df = pd.DataFrame(fitted_values_dict, index=log_excess_return.index)
    # Saving to Excel
    results_df.to_excel(writer, sheet_name='Fama Results', index=False)
    residuals_df.to_excel(writer, sheet_name='Residuals', index=True)
    fitted_values_df.to_excel(writer, sheet_name='Fitted Values', index=True)
    return results_df, residuals_df, fitted_values_df

#### Descriptive Analysis of the funds

In [240]:
def descriptive_statistics(log_return, writer):
    '''
    Saves the descriptive statistics of the logarithmized excess returns in an Excel spreadsheet
    '''
    # Filter only numeric columns
    numeric_df = log_return.select_dtypes(include=['float64', 'int64'])
    # Calculate the descriptive statistics
    descriptive_stats = numeric_df.describe().transpose()
    # Calculate the kurtosis and skewness
    descriptive_stats['Skewness'] = numeric_df.skew()
    descriptive_stats['Excess-Kurtosis'] = numeric_df.kurt()
    # Saving descriptive statistics in Excel
    descriptive_stats.to_excel(writer, sheet_name='Descriptive Statistics', index=True)

### Statistical Analysis

#### Check for Normal Distribution & Homoskedasticity

In [244]:
def combined_tests(residuals_df, fitted_values_df, regressor, excess_returns_df, output_dir, writer):
    '''
    Performs Breusch-Pagan, White, Shapiro-Wilk, and Jarque-Bera tests, and creates combined plots for each column in residuals_df.
    '''
    # Create empty DataFrames and lists for the results
    results_bp_df = pd.DataFrame(columns=['Fund', 'LM stat', 'p-value', 'f-value', 'f p-value'])
    results_white_df = pd.DataFrame(columns=['Fund', 'LM stat', 'p-value', 'f-stat', 'f p-value'])
    shapiro_results = []
    jarque_bera_results = []
    # Loop through each column in residuals_df
    for column in residuals_df.columns:
        residuals = residuals_df[column].dropna()
        matching_indices = residuals.index
        # Ensure indices are sorted and are datetime objects
        if not pd.api.types.is_datetime64_any_dtype(matching_indices):
            try:
                matching_indices = pd.to_datetime(matching_indices)
            except Exception as e:
                print(f'Error converting indices to datetime for column "{column}": {e}')
                continue
        residuals = residuals.sort_index()
        matching_indices = residuals.index
        # Ensure fitted_values_df, regressor, and excess_returns_df have matching indices
        fitted_values = fitted_values_df[column].loc[matching_indices].sort_index()
        excess_returns = excess_returns_df[column].loc[matching_indices].sort_index()
        regressor_filtered = regressor.loc[matching_indices].sort_index()
        # Add constant to regressors
        exog_with_const = sm.add_constant(regressor_filtered)

        # Perform Breusch-Pagan test
        bp_test_result = het_breuschpagan(residuals, exog_with_const)
        bp_new_row = pd.DataFrame({
            'Fund': [column],
            'LM stat': [bp_test_result[0]],
            'p-value': [bp_test_result[1]],
            'f-value': [bp_test_result[2]],
            'f p-value': [bp_test_result[3]]
        })
        results_bp_df = pd.concat([results_bp_df, bp_new_row], ignore_index=True)

        # Perform White test
        white_test_result = het_white(residuals, exog_with_const)
        white_new_row = pd.DataFrame({
            'Fund': [column],
            'LM stat': [white_test_result[0]],
            'p-value': [white_test_result[1]],
            'f-stat': [white_test_result[2]],
            'f p-value': [white_test_result[3]]
        })
        results_white_df = pd.concat([results_white_df, white_new_row], ignore_index=True)

        # Perform Shapiro-Wilk test
        shapiro_test = stats.shapiro(residuals)
        shapiro_results.append({
            'Fund': column,
            'Shapiro-Wilk Statistic': shapiro_test.statistic,
            'p-value': shapiro_test.pvalue
        })

        # Perform Jarque-Bera test
        jb_test = stats.jarque_bera(residuals)
        skewness = stats.skew(residuals)
        kurtosis = stats.kurtosis(residuals, fisher=False)  # Pearson's definition
        jarque_bera_results.append({
            'Fund': column,
            'Jarque-Bera Statistic': jb_test.statistic,
            'p-value': jb_test.pvalue,
            'Skewness': skewness,
            'Kurtosis': kurtosis
        })

        # Calculation of standardized residuals and fitted values
        std_residuals = np.std(residuals, ddof=2)
        std_fitted_values = np.std(fitted_values, ddof=1)
        standardized_fitted_values = (fitted_values - np.mean(fitted_values)) / std_fitted_values
        standardized_residuals = (residuals - np.mean(residuals)) / std_residuals

        # Plot 1: Residuals vs. Fitted Values
        fig, axs1 = plt.subplots(2, 2, figsize=(14, 10))
        axs1[0, 0].scatter(fitted_values, residuals, edgecolor='k', alpha=0.7)
        axs1[0, 0].set_title(f'Residuals vs. Fitted Values for {column}')
        axs1[0, 0].set_xlabel('Fitted Values')
        axs1[0, 0].set_ylabel('Residuals')
        axs1[0, 0].axhline(0, color='red', linestyle='-', linewidth=1)
        
        # Plot 2: Residuals vs. Fitted Values
        axs1[0, 1].scatter(standardized_fitted_values, standardized_residuals, edgecolor='k', alpha=0.7)
        axs1[0, 1].set_title(f'Stand. Residuals vs. Stand. Fitted Values für {column}')
        axs1[0, 1].set_xlabel('Standardized Fitted Values')
        axs1[0, 1].set_ylabel('Standardized Residuals')
        axs1[0, 1].axhline(0, color='red', linestyle='-', linewidth=1)

        # Plot 3: QQ Plot
        sm.qqplot(residuals, line='s', ax=axs1[1, 0])
        axs1[1, 0].set_title(f'QQ-Plot of Residuals for {column}')

        # Plot 4: Histogram
        axs1[1, 1].hist(residuals, bins=30, edgecolor='k', alpha=0.7)
        axs1[1, 1].set_title(f'Histogram of Residuals for {column}')
        axs1[1, 1].set_xlabel('Residuals')
        axs1[1, 1].set_ylabel('Frequency')

        # Saving Plots
        plt.tight_layout()
        plt.savefig(f"{output_dir}/residual_tests_{column}.png")
        plt.close(fig)

    # After the loop, save results to Excel
    shapiro_df = pd.DataFrame(shapiro_results)
    shapiro_df.to_excel(writer, sheet_name='Shapiro-Wilk Test', index=False)
    jb_df = pd.DataFrame(jarque_bera_results)
    jb_df.to_excel(writer, sheet_name='Jarque-Bera Test', index=False)
    results_bp_df.to_excel(writer, sheet_name='Breusch-Pagan Test', index=False)
    results_white_df.to_excel(writer, sheet_name='White Test', index=False)
    return results_bp_df, results_white_df, shapiro_df, jb_df

#### Check for Autocorrelation

In [59]:
def durbin_watson_capm(residuals_df, writer):
    '''
    Performs the Durbin-Watson test and saves the results in an Excel file.
    '''
    # Calculate Durbin-Watson statistics for each fund's residuals
    dw_results = {column: durbin_watson(residuals_df[column].dropna()) for column in residuals_df.columns}
    # Convert the results into a DataFrame
    dw_df = pd.DataFrame(list(dw_results.items()), columns=['Fund', 'Durbin-Watson Statistic'])
    # Save the Durbin-Watson test results to an Excel file
    dw_df.to_excel(writer, sheet_name='Durbin-Watson Test', index=False)
    return dw_df

#### Check for Multicollinearity

In [68]:
def regressor_variation(df_regressor_capm, writer):
    '''
    Checks the variation of the independent variables (regressors) and saves the results in an Excel file
    '''
    std_regressors = df_regressor_capm.std()
    if isinstance(std_regressors, pd.Series):
        std_regressors_df = std_regressors.to_frame(name='Standard Deviation')
    else:
        std_regressors_df = pd.DataFrame({'Standard Deviation': [std_regressors]})
    std_regressors_df.to_excel(writer, sheet_name='Regressor Variation', index=True)

def regressor_variation_fama(df_regressor_fama, writer):
    '''
    Checks the variation of the independent variables (regressors) and saves the results in an Excel file
    '''
    # Calculate the standard deviation of the regressors
    std_regressors = df_regressor_fama.std()
    # Convert the Series to a DataFrame
    std_regressors_df = std_regressors.to_frame(name='Standard Deviation')
    # Save the standard deviation data to an Excel file
    std_regressors_df.to_excel(writer, sheet_name='Fama Regressor Variation', index=True)
    return std_regressors_df

def vif_fama(df_regressor_fama):
    '''
    Calculates the Variance Inflation Factor (VIF) for each variable in the Fama-French model's regressor data.
    '''
    # Add a constant to the DataFrame for the intercept
    df_with_constant = sm.add_constant(df_regressor_fama)
    # Create a DataFrame to hold VIF results
    vif_data = pd.DataFrame()
    vif_data['Variable'] = df_with_constant.columns
    vif_data['VIF'] = [variance_inflation_factor(df_with_constant.values, i) for i in range(df_with_constant.shape[1])]
    return vif_data

### Beginning the analysis

In [None]:
# Performing the calculate_log_returns function on the closing prices
df_raw_log_etf_returns = calculate_log_returns(df_closing_etf)
df_raw_log_mutual_returns = calculate_log_returns(df_closing_mutual)
df1 = pd.DataFrame(df_raw_log_etf_returns)
df2 = pd.DataFrame(df_raw_log_mutual_returns)
# Removing the first line (01.01.2024)
df1 = df1[1:]
df2 = df2[1:]

# Applying the replace_nans_and_zeros_with_mean function
for col in df1.columns:
    if col != 'Date':  # Ignoring the date column
        df1[col] = replace_nans_and_zeros_with_mean(df1[col].copy())
for col in df2.columns:
    if col != 'Date':  # Ignoring the date column
        df2[col] = replace_nans_and_zeros_with_mean(df2[col].copy())

# Ensuring df1 and df_riskfree have the same index
df1.set_index('Date', inplace=True)
df_riskfree.set_index('Dates', inplace=True)

# Align df_riskfree with df1
df_riskfree_aligned = df_riskfree.reindex(df1.index)

# Subtract risk-free rate only where df is non-zero
df_log_excess_return_etf = df1.where(df1 == 0, df1.subtract(df_riskfree_aligned['ln riskfree'], axis=0))

# For df2
df2.set_index('Date', inplace=True)
df_log_excess_return_mutual = df2.where(df2 == 0, df2.subtract(df_riskfree_aligned['ln riskfree'], axis=0))
df1.replace(0.00, np.nan, inplace=True)
df_log_excess_return_etf.replace(0.00, np.nan, inplace=True)
df2.replace(0.00, np.nan, inplace=True)
df_log_excess_return_mutual.replace(0.00, np.nan, inplace=True)

#### CAPM Regression for ETFs and Mutual Funds

In [None]:
# CAPM Regression for ETFs
output_file_path = r'C:\Users\...\CAPM_Results_ETF.xlsx'
output_file_path_pictures = r'C:\Users\...\Pictures\CAPM_ETF'

with pd.ExcelWriter(output_file_path, engine='openpyxl') as writer:
    results_df, residuals_df, fitted_values_df = regression_capm(df_log_excess_return_etf, df_regressor_capm ,writer)
    descriptive_statistics(df1, writer)
    combined_tests(residuals_df, fitted_values_df, df_regressor_capm, df_log_excess_return_etf, output_file_path_pictures, writer)
    dw_df = durbin_watson_capm(residuals_df, writer)
    df_regressor_capm.to_excel(writer, sheet_name='Regressor')
    regressor_variation(df_regressor_capm['ln excess'], writer)
    df1.to_excel(writer, sheet_name='Log Returns ETF')
    df_log_excess_return_etf.to_excel(writer, sheet_name='Log Excess Returns ETF')

# CAPM Regression for Mutual Funds
output_file_path = r'C:\Users\...\CAPM_Results_Mutual.xlsx'
output_file_path_pictures = r'C:\Users\...\Pictures\CAPM_Mutual'

with pd.ExcelWriter(output_file_path, engine='openpyxl') as writer:
    results_df, residuals_df, fitted_values_df = regression_capm(df_log_excess_return_mutual, df_regressor_capm ,writer)
    descriptive_statistics(df2, writer)
    combined_tests(residuals_df, fitted_values_df, df_regressor_capm, df_log_excess_return_mutual, output_file_path_pictures, writer)
    dw_df = durbin_watson_capm(residuals_df, writer)
    df_regressor_capm.to_excel(writer, sheet_name='Regressor')
    regressor_variation(df_regressor_capm['ln excess'], writer)
    df2.to_excel(writer, sheet_name='Log Returns ETF')
    df_log_excess_return_mutual.to_excel(writer, sheet_name='Log Excess Returns ETF')

#### Fama Regression for ETFs and Mutual Funds

In [None]:
# FAMA Regression for ETFs
output_file_path = r'C:\Users\...\FAMA_Results_ETF.xlsx'
output_file_path_pictures = r'C:\Users\...\Pictures\FAMA_ETF'

with pd.ExcelWriter(output_file_path, engine='openpyxl') as writer:
    results_df, residuals_df, fitted_values_df = regression_fama(df_log_excess_return_etf, df_regressor_fama, writer=writer)
    combined_tests(residuals_df, fitted_values_df, df_regressor_fama, df_log_excess_return_etf, output_file_path_pictures, writer)
    dw_df = durbin_watson_capm(residuals_df, writer)
    vif_df = vif_fama(df_regressor_fama)
    vif_df.to_excel(writer, sheet_name='VIF Results', index=False)
    df_regressor_fama.to_excel(writer, sheet_name='Regressor') 
    regressor_variation_fama(df_regressor_fama, writer)

# FAMA Regression for Mutual Funds
output_file_path = r'C:\Users\...\FAMA_Results_Mutual.xlsx'
output_file_path_pictures = r'C:\Users\...\Pictures\FAMA_Mutual'

with pd.ExcelWriter(output_file_path, engine='openpyxl') as writer:
    results_df, residuals_df, fitted_values_df = regression_fama(df_log_excess_return_mutual, df_regressor_fama, writer=writer)
    descriptive_statistics(df2, writer)
    combined_tests(residuals_df, fitted_values_df, df_regressor_fama, df_log_excess_return_mutual, output_file_path_pictures, writer)
    dw_df = durbin_watson_capm(residuals_df, writer)
    vif_df = vif_fama(df_regressor_fama)
    vif_df.to_excel(writer, sheet_name='VIF Results', index=False)
    df_regressor_fama.to_excel(writer, sheet_name='Regressor') 
    regressor_variation_fama(df_regressor_fama, writer)