In [1]:
import pandas as pd
import pandas_datareader as pdr
import numpy as np
from datetime import datetime

## Step 1: Handle user input for dates

The two functions below take a date string and convert it to YYYY-MM format

We try multiple date formats (YYYY-MM, MM/YYYY, etc.) and returns a datetime object if successful, otherwise an error 

User inputs like "2000-01", "01/2000", "Jan-2000", "January-2000", "01-2000" will produce a datetime object for January 2000

In [2]:
def parse_date(date_string):
    
    date_string = str(date_string).strip()
    
    # List of all the date formats we want to accept
    possible_formats = [
        '%Y-%m',      # Example: 2000-01
        '%m/%Y',      # Example: 01/2000
        '%b-%Y',      # Example: Jan-2000
        '%B-%Y',      # Example: January-2000
        '%m-%Y',      # Example: 01-2000
    ]
    
    # Try each format until one works
    for format_string in possible_formats:
        try:
            # Try to convert the string to a datetime object
            date_object = datetime.strptime(date_string, format_string)
            return date_object
        except ValueError:
            # This format didn't work, try the next one
            continue
    
    # If we get here, none of the formats worked
    error_message = f"Could not understand date '{date_string}'. Please use: YYYY-MM, MM/YYYY, YYYYMM, or Mon-YYYY"
    raise ValueError(error_message)

In [3]:
def get_dates_from_user():

    print("\n" + "="*70)
    print("DATE RANGE SELECTION")
    print("="*70)
    print("You can enter dates in any of these formats:")
    print("  - YYYY-MM (like 2000-01)")
    print("  - MM/YYYY (like 01/2000)")
    print("  - MM-YYYY (like 01-2000)")
    print("  - Mon-YYYY (like Jan-2000)")
    print()
    
    # Keep looping until we get valid dates
    while True:
        try:
            # Ask for start date
            start_input = input("Enter START date: ").strip()
            start_datetime = parse_date(start_input)
            
            # Ask for end date
            end_input = input("Enter END date: ").strip()
            end_datetime = parse_date(end_input)
            
            # Check that start is before end
            if start_datetime >= end_datetime:
                print("\n ERROR: Start date must be before end date. Try again.\n")
                continue
            
            # Calculate how many months between the dates
            months_between = (end_datetime.year - start_datetime.year) * 12
            months_between += (end_datetime.month - start_datetime.month)
            
            
            # Convert to YYYY-MM format for output
            start_date = start_datetime.strftime('%Y-%m')
            end_date = end_datetime.strftime('%Y-%m')
            
            print(f"\n Dates accepted!")
            print(f"  Start: {start_date}")
            print(f"  End: {end_date}")
            print(f"  Total months: {months_between}")
            
            return start_date, end_date
            
        except ValueError as e:
            print(f"\n ERROR: {e}\n")

In [5]:
from datetime import datetime

start_date, end_date = get_dates_from_user()


DATE RANGE SELECTION
You can enter dates in any of these formats:
  - YYYY-MM (like 2000-01)
  - MM/YYYY (like 01/2000)
  - MM-YYYY (like 01-2000)
  - Mon-YYYY (like Jan-2000)


 Dates accepted!
  Start: 1963-01
  End: 1993-12
  Total months: 371


## Step 2: Get model choice from user

Here, we ask the user which model they want to test 

We show three options: CAPM only, FF3F only, or Both and return a model_choice: string ('CAPM', 'FF3F', or 'Both')

In [6]:
def get_model_choice_from_user():
    
    print("\n" + "="*70)
    print("MODEL SELECTION")
    print("="*70)
    print("Which model(s) do you want to test?")
    print()
    print("  1 = CAPM only (Capital Asset Pricing Model)")
    print("  2 = FF3F only (Fama-French Three-Factor Model)")
    print("  3 = Both CAPM and FF3F")
    print()
    
    # Keep looping until we get a valid choice
    while True:
        try:
            choice = input("Enter your choice (1, 2, or 3): ").strip()
            
            if choice == '1':
                print(" You selected: CAPM\n")
                return 'CAPM'
            elif choice == '2':
                print(" You selected: Fama-French Three-Factor (FF3F)\n")
                return 'FF3F'
            elif choice == '3':
                print(" You selected: Both models\n")
                return 'Both'
            else:
                print("Invalid. Please enter 1, 2, or 3.\n")
                
        except KeyboardInterrupt:
            print("\n\nYou cancelled. Exiting...")
            return None

In [7]:
model_choice = get_model_choice_from_user()


MODEL SELECTION
Which model(s) do you want to test?

  1 = CAPM only (Capital Asset Pricing Model)
  2 = FF3F only (Fama-French Three-Factor Model)
  3 = Both CAPM and FF3F

 You selected: Both models



## Step 3: Downloading data from Ken French library

- Connect to Ken French Data Library online
- Download the "25 Portfolios Formed on Size and Book-to-Market" for the mentioned dates 
- Download factors: 
    - MKT-RF (market excess return)
    - SMB (Small Minus Big)
    - HML (High Minus Low)
    - RF (Risk-free rate)

In [8]:
def download_portfolios(start_date, end_date):
    
    print("\n Downloading 25 portfolios (Size x Book-to-Market)...")
    
    try:
        # Using the pandas_datareader to get data by mentioning name of dataset, data source, start and end dates 
        portfolios_raw = pdr.DataReader(
            '25_Portfolios_5x5',      
            'famafrench',              
            start=start_date,          
            end=end_date               
        )
        
        # Extract the monthly returns (index 0 has monthly, index 1 has annual)
        portfolios_df = portfolios_raw[1]
        
        print(f" Downloaded successfully!")
        print(f"  - {portfolios_df.shape[0]} months of data")
        print(f"  - {portfolios_df.shape[1]} portfolios")
        
        return portfolios_df
        
    except Exception as e:
        print(f"ERROR downloading portfolios: {e}")
        return None

In [9]:
def download_factors(start_date, end_date):

    print("\n Downloading Fama-French factors (MKT-RF, SMB, HML, RF)...")
    
    try:
        # Download the factors
        factors_raw = pdr.DataReader(
            'F-F_Research_Data_Factors',  # Name of the dataset
            'famafrench',                  # Data source
            start=start_date,              # Start date
            end=end_date                   # End date
        )
        
        # Extract monthly data
        factors_df = factors_raw[0]
        
        print(f" Downloaded successfully!")
        print(f"  - {factors_df.shape[0]} months of data")
        print(f"  - {factors_df.shape[1]} factors")
        
        return factors_df
        
    except Exception as e:
        print(f" ERROR downloading factors: {e}")
        return None

In [10]:
import warnings
warnings.filterwarnings('ignore', category=FutureWarning)

In [11]:
portfolios = download_portfolios(start_date, end_date)
if portfolios is None:
    print("Failed to download portfolios. Exiting...")


 Downloading 25 portfolios (Size x Book-to-Market)...
 Downloaded successfully!
  - 372 months of data
  - 25 portfolios


In [12]:
portfolios

Unnamed: 0_level_0,SMALL LoBM,ME1 BM2,ME1 BM3,ME1 BM4,SMALL HiBM,ME2 BM1,ME2 BM2,ME2 BM3,ME2 BM4,ME2 BM5,...,ME4 BM1,ME4 BM2,ME4 BM3,ME4 BM4,ME4 BM5,BIG LoBM,ME5 BM2,ME5 BM3,ME5 BM4,BIG HiBM
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1963-01,14.7930,11.3544,11.2021,11.2592,11.8234,8.8123,6.2100,7.8364,8.5090,11.7846,...,5.5652,5.5438,6.0079,9.8474,7.2254,4.5417,4.9076,5.1422,6.8873,7.1022
1963-02,-3.5980,-4.2910,-1.8016,-0.7419,1.3812,-4.1521,-1.6406,-2.4189,-1.8105,-0.1308,...,-3.2457,-1.7448,-0.8770,-1.0166,-0.6858,-2.2546,-1.5875,-2.4510,-1.5110,-1.2464
1963-03,4.8514,-0.2757,-1.2932,1.8373,2.4085,-0.3340,0.9447,1.3256,2.9790,1.3464,...,1.4520,1.6870,2.4627,3.7530,2.9125,2.4889,3.0522,2.7492,4.7918,4.9746
1963-04,2.6006,1.7981,1.2740,2.6987,3.1669,2.5826,5.1031,1.1552,4.2695,5.9303,...,4.0643,3.1447,4.8493,6.0299,4.3777,3.6105,4.4520,5.1453,4.2166,7.9978
1963-05,3.2084,0.8693,3.1159,3.2140,7.0317,1.7595,2.6211,0.8164,5.0585,5.3368,...,1.6017,2.2572,1.5967,4.1879,3.7684,2.0212,0.7722,3.6433,2.2271,4.7486
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1993-08,4.7025,5.2851,3.6241,4.5009,2.1100,3.4439,5.7133,4.9447,5.0320,3.5118,...,5.4572,4.6039,4.1970,3.7722,5.0241,3.3241,4.1589,3.7655,4.1218,4.2549
1993-09,2.6499,2.5508,4.3563,3.6193,3.5219,3.4441,3.3885,2.5416,2.2751,2.7234,...,0.7684,1.1230,0.9021,-0.2932,-1.3077,-0.3504,-1.2390,-0.1650,1.0221,-0.1107
1993-10,4.8205,5.3575,5.9566,6.0946,5.5419,4.3329,3.3634,1.9944,0.9309,1.2764,...,3.2756,0.6670,0.0256,-0.6411,-1.7662,4.6190,1.4826,-1.1746,-1.0230,-1.7415
1993-11,-3.7828,-2.3662,-2.3178,-1.7305,-0.8060,-3.1312,-1.6982,-2.0816,-3.8551,-3.9306,...,-1.4979,-2.6367,-2.8001,-3.0401,-4.8789,0.2926,-0.8323,-1.8443,-3.4257,-1.8454


In [13]:
factors = download_factors(start_date, end_date)
if factors is None:
    print("Failed to download factors. Exiting...")


 Downloading Fama-French factors (MKT-RF, SMB, HML, RF)...
 Downloaded successfully!
  - 372 months of data
  - 4 factors


In [14]:
factors

Unnamed: 0_level_0,Mkt-RF,SMB,HML,RF
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1963-01,4.94,3.01,2.24,0.25
1963-02,-2.40,0.46,2.15,0.23
1963-03,3.08,-2.58,2.10,0.23
1963-04,4.51,-1.26,1.00,0.25
1963-05,1.76,1.08,2.55,0.24
...,...,...,...,...
1993-08,3.72,0.11,0.18,0.25
1993-09,-0.12,3.09,-0.24,0.26
1993-10,1.42,1.99,-2.78,0.22
1993-11,-1.87,-1.27,-0.78,0.25


## Step 4: Data Cleaning

Validating that both the portfolio returns data and the factor data cover the same time period (i.e., have the same number of months) and checking for any missing or incomplete observations in either dataset.


In [15]:
def check_data_quality(portfolios_df, factors_df):
    
    # Verify same number of observations
    if len(portfolios_df) != len(factors_df):
        print(f"WARNING: Different number of months!")
        print(f"Portfolios: {len(portfolios_df)} months")
        print(f"actors: {len(factors_df)} months")
    else:
        print(f"Both datasets have {len(portfolios_df)} months")

    # Check for any missing values (NaN)
    portfolio_missing = portfolios_df.isnull().sum().sum()
    factor_missing = factors_df.isnull().sum().sum()
    
    if portfolio_missing > 0:
        print(f"Found {portfolio_missing} missing values in portfolios")
    if factor_missing > 0:
        print(f"Found {factor_missing} missing values in factors")
    
    if portfolio_missing == 0 and factor_missing == 0:
        print("No missing values detected")
    
    return portfolios_df, factors_df

In [16]:
portfolios_validated, factors_validated = check_data_quality(portfolios, factors)

Both datasets have 372 months
No missing values detected


## Step 5: Excess Returns Calculation

- Takes raw portfolio returns (R_p)
- Subtracts the risk-free rate (R_f)
- Returns excess returns (R_p - R_f)

In [17]:
def calculate_excess_returns(portfolios_df, factors_df):
    
    print("\nCalculating excess returns (R_p - R_f)...")
    
    # Get the risk-free rate column
    risk_free_rate = factors_df['RF']
    
    # Subtract RF from each portfolio's return
    # The .sub() function subtracts the risk-free rate from each portfolio
    # axis=0 means subtract row-by-row (each month)
    excess_returns_df = portfolios_df.sub(risk_free_rate, axis=0)
    
    print(f"Excess returns calculated for {excess_returns_df.shape[1]} portfolios")
    print(f"Period: {excess_returns_df.index[0]} to {excess_returns_df.index[-1]}")
    print(f"Total months: {excess_returns_df.shape[0]}")
    
    return excess_returns_df

In [18]:
excess_returns = calculate_excess_returns(portfolios_validated, factors_validated)


Calculating excess returns (R_p - R_f)...
Excess returns calculated for 25 portfolios
Period: 1963-01 to 1993-12
Total months: 372


In [19]:
excess_returns

Unnamed: 0_level_0,SMALL LoBM,ME1 BM2,ME1 BM3,ME1 BM4,SMALL HiBM,ME2 BM1,ME2 BM2,ME2 BM3,ME2 BM4,ME2 BM5,...,ME4 BM1,ME4 BM2,ME4 BM3,ME4 BM4,ME4 BM5,BIG LoBM,ME5 BM2,ME5 BM3,ME5 BM4,BIG HiBM
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1963-01,14.5430,11.1044,10.9521,11.0092,11.5734,8.5623,5.9600,7.5864,8.2590,11.5346,...,5.3152,5.2938,5.7579,9.5974,6.9754,4.2917,4.6576,4.8922,6.6373,6.8522
1963-02,-3.8280,-4.5210,-2.0316,-0.9719,1.1512,-4.3821,-1.8706,-2.6489,-2.0405,-0.3608,...,-3.4757,-1.9748,-1.1070,-1.2466,-0.9158,-2.4846,-1.8175,-2.6810,-1.7410,-1.4764
1963-03,4.6214,-0.5057,-1.5232,1.6073,2.1785,-0.5640,0.7147,1.0956,2.7490,1.1164,...,1.2220,1.4570,2.2327,3.5230,2.6825,2.2589,2.8222,2.5192,4.5618,4.7446
1963-04,2.3506,1.5481,1.0240,2.4487,2.9169,2.3326,4.8531,0.9052,4.0195,5.6803,...,3.8143,2.8947,4.5993,5.7799,4.1277,3.3605,4.2020,4.8953,3.9666,7.7478
1963-05,2.9684,0.6293,2.8759,2.9740,6.7917,1.5195,2.3811,0.5764,4.8185,5.0968,...,1.3617,2.0172,1.3567,3.9479,3.5284,1.7812,0.5322,3.4033,1.9871,4.5086
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1993-08,4.4525,5.0351,3.3741,4.2509,1.8600,3.1939,5.4633,4.6947,4.7820,3.2618,...,5.2072,4.3539,3.9470,3.5222,4.7741,3.0741,3.9089,3.5155,3.8718,4.0049
1993-09,2.3899,2.2908,4.0963,3.3593,3.2619,3.1841,3.1285,2.2816,2.0151,2.4634,...,0.5084,0.8630,0.6421,-0.5532,-1.5677,-0.6104,-1.4990,-0.4250,0.7621,-0.3707
1993-10,4.6005,5.1375,5.7366,5.8746,5.3219,4.1129,3.1434,1.7744,0.7109,1.0564,...,3.0556,0.4470,-0.1944,-0.8611,-1.9862,4.3990,1.2626,-1.3946,-1.2430,-1.9615
1993-11,-4.0328,-2.6162,-2.5678,-1.9805,-1.0560,-3.3812,-1.9482,-2.3316,-4.1051,-4.1806,...,-1.7479,-2.8867,-3.0501,-3.2901,-5.1289,0.0426,-1.0823,-2.0943,-3.6757,-2.0954


## Step 6: Calculate Summary Statistics 

Here, we calculate the follwowing for each portfolio:
- Mean Return: Average monthly excess return
- Standard Deviation: Volatility (risk) of returns
- Sharpe Ratio: Return per unit of risk (Mean / Std Dev)
- T: Number of observations (months)

In [20]:
def calculate_summary_statistics(excess_returns_df):

    print("\nCalculating summary statistics...")
    
    # Create empty lists to store results
    mean_returns = []
    std_devs = []
    sharpe_ratios = []
    sample_sizes = []
    
    # Loop through each portfolio (each column)
    for portfolio_name in excess_returns_df.columns:
        # Get the returns for this portfolio
        returns = excess_returns_df[portfolio_name]
        
        # Remove any missing values
        returns_clean = returns.dropna()
        
        # Calculate statistics
        mean_ret = returns_clean.mean()           # Average return
        std_dev = returns_clean.std()             # Standard deviation
        
        # Sharpe ratio = mean / std dev (if std dev > 0)
        if std_dev > 0:
            sharpe = mean_ret / std_dev
        else:
            sharpe = np.nan  # Can't calculate if std dev is zero
        
        sample_size = len(returns_clean)          # Number of months
        
        # Add to our lists
        mean_returns.append(mean_ret)
        std_devs.append(std_dev)
        sharpe_ratios.append(sharpe)
        sample_sizes.append(sample_size)
    
    # Create a DataFrame with all the statistics
    summary_stats_df = pd.DataFrame({
        'Mean_Return': mean_returns,
        'Std_Dev': std_devs,
        'Sharpe_Ratio': sharpe_ratios,
        'T': sample_sizes
    }, index=excess_returns_df.columns)
    
    print(f"Statistics calculated for {len(summary_stats_df)} portfolios")
    print(f"\nAverages across all portfolios:")
    print(f"Mean Return: {summary_stats_df['Mean_Return'].mean():.4f}%")
    print(f"Std Dev: {summary_stats_df['Std_Dev'].mean():.4f}%")
    print(f"Sharpe Ratio: {summary_stats_df['Sharpe_Ratio'].mean():.4f}")
    
    return summary_stats_df

In [21]:
summary_stats = calculate_summary_statistics(excess_returns)


Calculating summary statistics...
Statistics calculated for 25 portfolios

Averages across all portfolios:
Mean Return: 0.7591%
Std Dev: 5.7077%
Sharpe Ratio: 0.1348


In [22]:
summary_stats

Unnamed: 0,Mean_Return,Std_Dev,Sharpe_Ratio,T
SMALL LoBM,0.570428,7.822235,0.072924,372
ME1 BM2,0.890798,6.994026,0.127366,372
ME1 BM3,0.994382,6.520668,0.152497,372
ME1 BM4,1.15906,6.241323,0.185707,372
SMALL HiBM,1.404726,6.478163,0.21684,372
ME2 BM1,0.344651,7.258296,0.047484,372
ME2 BM2,0.692816,6.23414,0.111133,372
ME2 BM3,0.907551,5.689072,0.159525,372
ME2 BM4,0.966682,5.283879,0.182949,372
ME2 BM5,1.085945,6.000923,0.180963,372


## Step 7: Time Series Regression on CAPM & FF3F

Time-series regressions is done per portfolio

CAPM: Rp_t - Rf_t = αp + βM,p * (MKT-RF)_t + εp,t

FF3F: Rp_t - Rf_t = αp + βM,p*(MKT-RF)_t + βSMB,p*SMB_t + βHML,p*HML_t + εp,t

In [23]:
import statsmodels.api as sm

def _design_matrix(factors: pd.DataFrame, model: str) -> pd.DataFrame:
    F = factors.rename(columns={'Mkt-RF': 'MKT_RF'}).copy()
    if model == 'CAPM':
        X = F[['MKT_RF']]
    elif model == 'FF3F':
        X = F[['MKT_RF', 'SMB', 'HML']]
    else:
        raise ValueError("model must be 'CAPM' or 'FF3F'")
    X = sm.add_constant(X)
    return X

def run_time_series_regressions(excess_returns: pd.DataFrame,
                                factors: pd.DataFrame,
                                model_choice: str):

    models = ['CAPM', 'FF3F'] if model_choice == 'Both' else [model_choice]
    outputs = {}

    for model in models:
        X = _design_matrix(factors, model)
        # align with returns:
        common_idx = excess_returns.index.intersection(X.index)
        X = X.loc[common_idx]
        ER = excess_returns.loc[common_idx]

        rows = []
        resid_cols = {}
        beta_rows = []

        for p in ER.columns:
            y = ER[p].dropna()
            Xi = X.loc[y.index]

            res = sm.OLS(y, Xi).fit()

            # Collect core stats
            row = {
                'Portfolio': p,
                'alpha': res.params.get('const', np.nan),
                'alpha_se': res.bse.get('const', np.nan),
                'alpha_t': res.tvalues.get('const', np.nan),
                'R2': res.rsquared
            }

            # Betas and t-stats (exclude constant)
            for k in Xi.columns:
                if k == 'const':
                    continue
                row[k] = res.params.get(k, np.nan)
                row[f'{k}_se'] = res.bse.get(k, np.nan)
                row[f'{k}_t'] = res.tvalues.get(k, np.nan)

            rows.append(row)
            resid_cols[p] = res.resid.reindex(common_idx)  # keep T index

            # Save betas (exclude const) for FM step
            beta_rows.append(pd.Series(
                {k: res.params[k] for k in Xi.columns if k != 'const'},
                name=p
            ))

        results_table = pd.DataFrame(rows).set_index('Portfolio')
        residuals = pd.DataFrame(resid_cols).loc[common_idx]  # T×N
        betas = pd.DataFrame(beta_rows)                       # N×K

        outputs[model] = {
            'results_table': results_table,  
            'residuals': residuals,
            'betas': betas  
        }

    return outputs

In [24]:
ts_outputs = run_time_series_regressions(
    excess_returns=excess_returns,
    factors=factors,
    model_choice=model_choice
)

In [25]:
ts_outputs['FF3F']['results_table']

Unnamed: 0_level_0,alpha,alpha_se,alpha_t,R2,MKT_RF,MKT_RF_se,MKT_RF_t,SMB,SMB_se,SMB_t,HML,HML_se,HML_t
Portfolio,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
SMALL LoBM,-0.282211,0.142279,-1.983509,0.885733,0.981689,0.035245,27.853639,1.598866,0.052068,30.707106,-0.038221,0.057512,-0.664577
ME1 BM2,-0.025111,0.107015,-0.234647,0.919139,0.907305,0.026509,34.225912,1.500304,0.039163,38.308908,0.22138,0.043258,5.117678
ME1 BM3,0.045191,0.092679,0.487613,0.930228,0.886816,0.022958,38.62785,1.382003,0.033917,40.746886,0.378834,0.037463,10.112245
ME1 BM4,0.195346,0.084018,2.325053,0.937412,0.843302,0.020812,40.51908,1.352124,0.030747,43.975594,0.467263,0.033962,13.758483
SMALL HiBM,0.325272,0.10542,3.085493,0.908537,0.853207,0.026114,32.672298,1.401615,0.038579,36.330594,0.673633,0.042613,15.808132
ME2 BM1,-0.25179,0.082654,-3.046315,0.955212,1.11027,0.020475,54.226596,1.095276,0.030248,36.209788,-0.408415,0.033411,-12.224118
ME2 BM2,-0.059553,0.069156,-0.861146,0.957498,1.036998,0.017131,60.533689,0.964492,0.025308,38.109767,0.064144,0.027954,2.294617
ME2 BM3,0.113192,0.065118,1.73827,0.95475,0.974573,0.016131,60.417617,0.880217,0.02383,36.93665,0.259477,0.026322,9.857791
ME2 BM4,0.097506,0.05962,1.635471,0.956028,0.968055,0.014769,65.547956,0.739239,0.021818,33.881512,0.504572,0.0241,20.937022
ME2 BM5,0.012777,0.064624,0.197717,0.959945,1.07992,0.016008,67.460008,0.893708,0.02365,37.789323,0.741518,0.026122,28.386297


In [26]:
ts_outputs['CAPM']['results_table']

Unnamed: 0_level_0,alpha,alpha_se,alpha_t,R2,MKT_RF,MKT_RF_se,MKT_RF_t
Portfolio,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
SMALL LoBM,-0.028127,0.260397,-0.108016,0.592899,1.353653,0.058313,23.213497
ME1 BM2,0.358032,0.234347,1.527781,0.587563,1.204871,0.05248,22.958806
ME1 BM3,0.496524,0.217765,2.280093,0.590285,1.125924,0.048766,23.08824
ME1 BM4,0.691277,0.213827,3.232875,0.568814,1.057908,0.047884,22.092966
SMALL HiBM,0.945833,0.237052,3.98999,0.508104,1.037801,0.053085,19.549714
ME2 BM1,-0.292771,0.177242,-1.651814,0.780944,1.441553,0.039692,36.318898
ME2 BM2,0.142777,0.14967,0.953946,0.788259,1.243933,0.033517,37.113574
ME2 BM3,0.410887,0.141807,2.897499,0.771753,1.123225,0.031756,35.370169
ME2 BM4,0.508787,0.134934,3.770629,0.760431,1.035545,0.030217,34.27012
ME2 BM5,0.583985,0.169042,3.454665,0.708495,1.1352,0.037855,29.987899


## Step 8: GRS joint-alpha test

In [27]:
import numpy.linalg as la
from scipy import stats

def _factor_block_for_model(factors: pd.DataFrame, model: str) -> pd.DataFrame:
    F = factors.rename(columns={'Mkt-RF':'MKT_RF'})
    if model == 'CAPM':
        return F[['MKT_RF']].dropna()
    elif model == 'FF3F':
        return F[['MKT_RF','SMB','HML']].dropna()
    else:
        raise ValueError("model must be 'CAPM' or 'FF3F'")

def grs_test(ts_outputs: dict, factors: pd.DataFrame, model_choice: str):
    
    # Handle both models or single model
    models = ['CAPM', 'FF3F'] if model_choice == 'Both' else [model_choice]
    
    results = {}
    
    for model in models:
        assert model in ts_outputs, f"{model} results not found in ts_outputs"

        # Residuals and alpha vector aligned by portfolio order
        resids = ts_outputs[model]['residuals'].dropna()
        N = resids.shape[1]  # number of portfolios

        # Ensure alphas are ordered like residual columns
        alpha_series = ts_outputs[model]['results_table'].loc[resids.columns, 'alpha']
        alpha = alpha_series.values.reshape(-1, 1)  # N×1

        # T periods after alignment
        T = resids.shape[0]

        # Factor block for this model aligned to residuals index
        F = _factor_block_for_model(factors, model).loc[resids.index]
        K = F.shape[1]

        # Safety checks for degrees of freedom (required for F)
        if not (T - N - K) > 0 or not (T - K - 1) > 0:
            raise ValueError(f"Insufficient T relative to N and K for GRS: T={T}, N={N}, K={K}. "
                             f"Need T > N + K and T > K + 1.")

        # Residual covariance Σ_e (compute if not present)
        Sigma_e = ts_outputs[model].get('Sigma_e', None)
        if Sigma_e is None:
            Sigma_e = resids.cov()
            Sigma_e =Sigma_e*(T-1)/(T-K)

        # Factor mean vector and covariance
        f_bar = F.mean().values.reshape(-1, 1)     # K×1
        Sigma_f = F.cov().values                   # K×K
        Sigma_f = Sigma_f*(T-1)/(T-K)

        # Convert Σ_e to numpy array (N×N)
        Sig_e = Sigma_e.values

        # Inversions
        Sig_e_inv = la.inv(Sig_e)
        Sig_f_inv = la.inv(Sigma_f)

        # Numerator and denominator of GRS
        num = float((alpha.T @ Sig_e_inv @ alpha).item())
        den = float(1.0 + (f_bar.T @ Sig_f_inv @ f_bar).item())

        F_grs = (T / N) * ((T - N - K) / (T - K - 1)) * (num / den)
        df1 = N
        df2 = T - N - K
        pval = 1.0 - stats.f.cdf(F_grs, dfn=df1, dfd=df2)

        # Persist useful components for later use
        ts_outputs[model]['Sigma_e'] = Sigma_e
        ts_outputs[model]['Sigma_f'] = pd.DataFrame(Sigma_f, index=F.columns, columns=F.columns)
        ts_outputs[model]['f_bar']   = f_bar

        results[model] = {
            'model': model,
            'F': F_grs,
            'df1': df1,
            'df2': df2,
            'pval': pval,
            'T': T, 'N': N, 'K': K,
            'den_component': den,    # 1 + f̄' Σ_f^{-1} f̄
            'num_component': num     # α' Σ_e^{-1} α
        }
    
    # Return single result if only one model, otherwise return dict
    if len(results) == 1:
        return results[models[0]]
    else:
        return results

In [28]:
grs_test_res = grs_test(ts_outputs, factors, model_choice)

In [29]:
grs_test_res

{'CAPM': {'model': 'CAPM',
  'F': 2.8398268567121607,
  'df1': 25,
  'df2': 346,
  'pval': np.float64(1.1847485830163151e-05),
  'T': 372,
  'N': 25,
  'K': 1,
  'den_component': 1.0098756417033512,
  'num_component': 0.2061021172505251},
 'FF3F': {'model': 'FF3F',
  'F': 1.990093324130466,
  'df1': 25,
  'df2': 344,
  'pval': np.float64(0.003736014601148252),
  'T': 372,
  'N': 25,
  'K': 3,
  'den_component': 1.0677987602468255,
  'num_component': 0.15277394735710767}}

## Step 9: Fama–MacBeth (with Shanken correction)

- Use the β̂’s from the time-series regressions (for the chosen model).

- For each month (t): Regress cross-section of R_{p,t} on those β̂’s → get λ_t (factor premia time series).

Report mean λ, standard error (time-series SE of λ_t), t-stat.

Shanken correction to adjust SEs for using estimated betas:

In [30]:
import numpy.linalg as la

def _fm_factor_block(factors: pd.DataFrame, model: str) -> pd.DataFrame:
    F = factors.rename(columns={'Mkt-RF':'MKT_RF'}).copy()
    if model == 'CAPM':
        return F[['MKT_RF']]
    elif model == 'FF3F':
        return F[['MKT_RF','SMB','HML']]
    else:
        raise ValueError("model must be 'CAPM' or 'FF3F'")

def fama_macbeth(excess_returns: pd.DataFrame,
                          factors: pd.DataFrame,
                          ts_outputs: dict,
                          model: str,
                          include_intercept: bool = False,
                          shanken: bool = True):

    # 1) Collect first-pass betas (N×K)
    assert model in ts_outputs, f"{model} not in ts_outputs"
    B = ts_outputs[model]['betas'].copy()     # index=portfolios, columns=K factor names (MKT_RF, SMB, HML)
    if not include_intercept and 'const' in B.columns:
        B = B.drop(columns=['const'], errors='ignore')

    
    
    # 2) Build the factor matrix for the chosen model and align with data
    F = _fm_factor_block(factors, model)
    T_all = excess_returns.index.intersection(F.index)
    ER = excess_returns.loc[T_all, B.index].dropna(how='any', axis=1)  # T×N (excess returns panel)
    B  = B.loc[ER.columns]                                            # N×K, aligned to portfolios in ER
    F  = F.loc[ER.index]     
    
    # print('\n\n\n\n B value\n\n')                                         # T×K
    # print(B)
    T, N, K = ER.shape[0], ER.shape[1], B.shape[1]
    if N < K:
        raise ValueError(f"Not enough portfolios for cross-section: N={N} < K={K}")

    # 3) Second-pass: for each month t, regress R_{:,t} on betas (with or without intercept)
    #    y_t (N×1) = [intercept 1] + B (N×K) @ lambda_t  (+ residual)
    if include_intercept:
        X = np.column_stack([np.ones(N), B.values])   # N×(1+K)
        xcols = ['const'] + list(B.columns)
    else:
        X = B.values                                  # N×K
        xcols = list(B.columns)

    # print('\n\n\n\n X value\n\n')                                         # T×K
    # print(X)


    lambdas_t = []
    for t in ER.index:
        y = ER.loc[t].values  # N portfolio returns at time t
        
        # Regress returns on betas (X already has intercept if include_intercept=True)
        reg = sm.OLS(y, X).fit()
        lambdas_t.append(reg.params)  # Extract coefficients (K or K+1)

    L = np.vstack(lambdas_t)         # T×(K or K+1)
    L_df = pd.DataFrame(L, index=ER.index, columns=xcols)


    # print('\n\n\n\n lambas \n\n')  
    # print(L_df)

    # 4) Time-series mean and standard errors of lambda
    lam_mean = L_df.mean(axis=0)                     # (K or K+1)
    lam_cov  = L_df.cov()                            # (K or K+1) × (K or K+1)
    lam_se   = np.sqrt(np.diag(lam_cov) / T)         # SE of time-series mean = sqrt(Var/T)
    lam_t    = lam_mean.values / lam_se

    # print(lam_mean)
    # print(lam_se)

    # 5) Shanken (1992) correction
    shanken_out = None
    if shanken:
        F_block = _fm_factor_block(factors, model).loc[ER.index]
        Sigma_f = F_block.cov().values                    # K×K
        
        # Get lambda means (exclude intercept if present)
        if include_intercept:
            lam_mean_factors = lam_mean.values[1:].reshape(-1, 1)  # K×1
        else:
            lam_mean_factors = lam_mean.values.reshape(-1, 1)      # K×1

        # print(lam_mean_factors)

        Sigma_f = Sigma_f*(T-1)/(T-K)

        # print(Sigma_f)
        
        # Shanken adjustment: c = 1 + λ̄' Σ_f^{-1} λ̄
        c = float(1.0 + (lam_mean_factors.T @ la.inv(Sigma_f) @ lam_mean_factors).item())

        # print(c)
        
        # Adjusted SE: sqrt(c) * original SE
        lam_se_sh = np.sqrt(c) * lam_se
        lam_t_sh = lam_mean.values / lam_se_sh

        shanken_out = {
            'c_scalar': c,
            'se_shanken': lam_se_sh,
            't_shanken': lam_t_sh
        }
    
    # print(c)

    # Package results
    out = {
        'L_t': L_df,                                # time series of monthly λ_t
        'lambda_mean': lam_mean,                    # mean λ
        'lambda_se': pd.Series(lam_se, index=L_df.columns),
        'lambda_t': pd.Series(lam_t, index=L_df.columns),
        'T': T, 'N': N, 'K': K,
        'include_intercept': include_intercept,
        'shanken': shanken,
        'shanken_details': shanken_out
    }
    return out

In [31]:
models = ['CAPM','FF3F'] if model_choice == 'Both' else [model_choice]

In [32]:
fm_results = {}
for m in models:
    fm_out = fama_macbeth(
        excess_returns=excess_returns,
        factors=factors,
        ts_outputs=ts_outputs,
        model=m,
        include_intercept=True,  # set True if you want a zero-beta/intercept estimate
        shanken=True
    )
    fm_results[m] = fm_out

In [33]:
fm_results

{'CAPM': {'L_t':             const     MKT_RF
  Date                        
  1963-01  3.649141   3.810183
  1963-02  4.549010  -5.846981
  1963-03  9.799032  -7.174874
  1963-04  8.999539  -4.845151
  1963-05  8.038301  -4.668355
  ...           ...        ...
  1993-08  1.935540   1.778380
  1993-09 -5.435268   6.078708
  1993-10 -9.609625  10.257703
  1993-11 -1.500873  -0.970812
  1993-12  3.243501  -0.646190
  
  [372 rows x 2 columns],
  'lambda_mean': const     1.560956
  MKT_RF   -0.720654
  dtype: float64,
  'lambda_se': const     0.401448
  MKT_RF    0.457670
  dtype: float64,
  'lambda_t': const     3.888315
  MKT_RF   -1.574615
  dtype: float64,
  'T': 372,
  'N': 25,
  'K': 1,
  'include_intercept': True,
  'shanken': True,
  'shanken_details': {'c_scalar': 1.0262316305626236,
   'se_shanken': array([0.40667922, 0.46363358]),
   't_shanken': array([ 3.83829879, -1.55436015])}},
 'FF3F': {'L_t':              const    MKT_RF       SMB       HML
  Date                       

## Step 10: Excel Report

In [34]:
import pandas as pd

def export_asset_pricing_results(file_path: str,
                                 ts_outputs: dict,
                                 fm_results: dict,
                                 model_choice: str,
                                 summary_stats: pd.DataFrame = None):

    models = ['CAPM', 'FF3F'] if model_choice == 'Both' else [model_choice]

    with pd.ExcelWriter(file_path, engine='xlsxwriter') as writer:

        # 1️⃣ Portfolio summary
        if summary_stats is not None:
            summary_stats.to_excel(writer, sheet_name='Portfolio Summary')

        for m in models:
            print(f"\nExporting {m} results ...")

            # --- Time-series regression outputs
            reg = ts_outputs[m]['results_table'].copy()
            reg.to_excel(writer, sheet_name=f'{m}_Regressions')

            # --- Residual covariance Σₑ
            Sigma_e = ts_outputs[m].get('Sigma_e')
            if Sigma_e is not None:
                Sigma_e.to_excel(writer, sheet_name=f'{m}_Sigma_e')

            # --- Factor covariance Σ_f
            Sigma_f = ts_outputs[m].get('Sigma_f')
            if Sigma_f is not None:
                Sigma_f.to_excel(writer, sheet_name=f'{m}_Sigma_f')

            # --- GRS test summary (if present)
            grs = ts_outputs[m].get('GRS_Result')
            if grs is not None:
                pd.DataFrame([grs]).to_excel(writer, sheet_name=f'{m}_GRS')

            # --- Fama–MacBeth results
            if m in fm_results:
                fm_out = fm_results[m]
                # λₜ time series
                fm_out['L_t'].to_excel(writer, sheet_name=f'{m}_FM_Lambdas')

                # Summary table (with/without Shanken)
                lam = fm_out['lambda_mean']
                se  = fm_out['lambda_se']
                t   = fm_out['lambda_t']
                fm_summary = pd.DataFrame({'lambda': lam, 'SE': se, 't': t})

                if fm_out['shanken'] and fm_out['shanken_details']:
                    fm_summary['SE_Shanken'] = fm_out['shanken_details']['se_shanken']
                    fm_summary['t_Shanken']  = fm_out['shanken_details']['t_shanken']

                fm_summary.to_excel(writer, sheet_name=f'{m}_FM_Summary')

        # Optional formatting
        workbook = writer.book
        for sheet in writer.sheets.values():
            worksheet = sheet
            worksheet.set_column('A:Z', 18)
        print(f"\n✅ Excel export completed: {file_path}")

In [35]:
# Example path (you can change the filename)
output_path = "Asset_Pricing_Results.xlsx"

# Store the GRS results in ts_outputs first (optional, if not already stored)
for m in ts_outputs.keys():
    try:
        ts_outputs[m]['GRS_Result'] = grs_test(ts_outputs, factors, m)
    except Exception as e:
        print(f"Skipping GRS for {m}: {e}")

# Export everything
export_asset_pricing_results(
    file_path=output_path,
    ts_outputs=ts_outputs,
    fm_results=fm_results,
    model_choice=model_choice,
    summary_stats=summary_stats
)


Exporting CAPM results ...

Exporting FF3F results ...

✅ Excel export completed: Asset_Pricing_Results.xlsx


In [38]:
def debug_shanken(fm_out):
    print("\n--- Shanken Diagnostics ---")
    if not (fm_out['shanken'] and fm_out['shanken_details']):
        print("Shanken correction not present.")
        return

    c = fm_out['shanken_details']['c_scalar']
    print(f"c = 1 + lambda' Σ_f^(-1) lambda = {c:.6f}")
    print("Columns:", list(fm_out['L_t'].columns))

    # Uncorrected
    se = fm_out['lambda_se'].values
    print("Uncorrected SE:", np.round(se, 6))

    # Corrected
    se_sh = fm_out['shanken_details']['se_shanken']
    print("Shanken SE    :", np.round(se_sh, 6))

    # Compare ratios (should be ≈ sqrt(c) for factor rows)
    ratio = se_sh / se
    print("SE ratio (Shanken/Uncorr):", np.round(ratio, 6))

    # If intercept is present, first element is intercept; others are factors
    if fm_out['include_intercept']:
        print("Note: Intercept SE is not Shanken-scaled; factors are.")

In [39]:
for m in fm_results:
    debug_shanken(fm_results[m])
    


--- Shanken Diagnostics ---
c = 1 + lambda' Σ_f^(-1) lambda = 1.026232
Columns: ['const', 'MKT_RF']
Uncorrected SE: [0.401448 0.45767 ]
Shanken SE    : [0.406679 0.463634]
SE ratio (Shanken/Uncorr): [1.013031 1.013031]
Note: Intercept SE is not Shanken-scaled; factors are.

--- Shanken Diagnostics ---
c = 1 + lambda' Σ_f^(-1) lambda = 1.064822
Columns: ['const', 'MKT_RF', 'SMB', 'HML']
Uncorrected SE: [0.339078 0.406942 0.156764 0.138527]
Shanken SE    : [0.349895 0.419925 0.161765 0.142946]
SE ratio (Shanken/Uncorr): [1.031902 1.031902 1.031902 1.031902]
Note: Intercept SE is not Shanken-scaled; factors are.
