# Finance Project III — CAPM & FF3F on Ken French 5×5 (ME × BE/ME) Portfolios (Monthly)

This notebook:
- Downloads **Ken French** 5×5 portfolios (ME × BE/ME, monthly, value-weighted) and **FF3 factors**.
- Lets you set a **date range** and choose whether descriptives use **raw or excess** returns.
- Runs **Time-Series (TS)** tests for **CAPM** and **FF3F**:
  - Per portfolio: α, s.e.(α), t(α), β’s, R²; report **Average R²**; run **GRS** (joint α = 0).
- Runs **Fama–MacBeth (FMB)** cross-sectional pricing for **CAPM** and **FF3F** with rolling betas.
- Shows **clean tables** and **simple Matplotlib visuals** (no seaborn, one chart per figure, no custom colors).

In [1]:
import re
import numpy as np
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
from numpy.linalg import inv
from scipy.stats import f as f_dist
from io import StringIO

In [2]:
# Clean tables
pd.set_option("display.max_rows", 120)
pd.set_option("display.max_columns", 120)
pd.set_option("display.width", 160)
pd.set_option("display.float_format", lambda x: f"{x:,.6f}")

# ---------- small statistical helpers ----------
def monthly_sharpe(excess_series: pd.Series) -> float:
    s = excess_series.dropna()
    if s.shape[0] < 2:
        return np.nan
    mu = s.mean()
    sd = s.std(ddof=1)
    return np.nan if sd == 0 else mu / sd

def tstat_of_mean(series: pd.Series) -> float:
    s = series.dropna()
    T = s.shape[0]
    if T < 2:
        return np.nan
    mu = s.mean()
    sd = s.std(ddof=1)
    if sd == 0:
        return np.nan
    se = sd / np.sqrt(T)
    if se == 0:
        return np.nan
    return mu / se

def ols_with_const(y: pd.Series, X: pd.DataFrame):
    Xc = sm.add_constant(X)
    return sm.OLS(y, Xc, missing="drop").fit()

def grs_test_with_alphas(alphas, factor_means, factor_cov, residual_cov, T, N, L):
    """
    Gibbons–Ross–Shanken (1989) F-stat for joint α = 0 across N assets, L factors.
    Returns (F, pval).
    """
    a = np.asarray(alphas, dtype=float).reshape(-1, 1)
    m = np.asarray(factor_means, dtype=float).reshape(-1, 1)
    Sigma_f_inv = inv(np.asarray(factor_cov, dtype=float))
    Sigma_e_inv = inv(np.asarray(residual_cov, dtype=float))

    term = float(m.T @ Sigma_f_inv @ m)
    numer = (T - N - L) / N
    denom = 1.0 + term

    F = numer * float(a.T @ Sigma_e_inv @ a) / denom
    df1, df2 = N, T - N - L
    if df2 <= 0:
        return np.nan, np.nan
    pval = 1.0 - f_dist.cdf(F, df1, df2)
    return F, pval

In [3]:
URL_FF3_MONTHLY = 'https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/ftp/F-F_Research_Data_Factors_CSV.zip'
URL_KF_25_MONTHLY = 'https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/ftp/25_Portfolios_5x5_CSV.zip'

import requests, zipfile, os
from io import BytesIO

def download_and_save_kf_data(url: str, save_dir: str = "."):
    """
    Download Ken French dataset ZIP file, extract all CSV files, and save to the specified directory.

    Args:
        url (str): URL to the Ken French ZIP file.
        save_dir (str): Directory to save extracted CSVs (default: current folder).
    Returns:
        List of saved file paths.
    """
    os.makedirs(save_dir, exist_ok=True)
    
    resp = requests.get(url)
    resp.raise_for_status()

    with zipfile.ZipFile(BytesIO(resp.content)) as z:
        saved_files = []
        for fname in z.namelist():
            if fname.endswith('.csv'):
                out_path = os.path.join(save_dir, os.path.basename(fname))
                with z.open(fname) as f_in, open(out_path, 'wb') as f_out:
                    f_out.write(f_in.read())
                saved_files.append(out_path)
    
    return saved_files

download_and_save_kf_data(URL_FF3_MONTHLY)
download_and_save_kf_data(URL_KF_25_MONTHLY)

['.\\25_Portfolios_5x5.csv']

In [4]:
import pandas as pd
import numpy as np
import re
from io import StringIO

def _first_monthly_index_csv(lines):
    """
    Return the line index where monthly data start (first token YYYYMM), 
    assuming comma-separated rows.
    """
    for i, line in enumerate(lines):
        toks = [t.strip() for t in line.strip().split(",")]
        if len(toks) > 0 and toks[0].isdigit() and len(toks[0]) == 6:
            return i
    return None

def _dedupe(names):
    """
    Make column names unique by appending _1, _2, ... to duplicates.
    """
    out, seen = [], {}
    for n in names:
        n = (n or "").strip()
        if n == "":
            n = "COL"
        if n in seen:
            seen[n] += 1
            out.append(f"{n}_{seen[n]}")
        else:
            seen[n] = 0
            out.append(n)
    return out

def _align_header_to_ncols(header, ncols):
    """
    Ensure header length == ncols; pad or truncate as needed.
    """
    header = list(header)
    if len(header) < ncols:
        pad = [f"COL{j}" for j in range(len(header)+1, ncols+1)]
        header = header + pad
    elif len(header) > ncols:
        header = header[:ncols]
    return header

# ---------- read Ken French 25-portfolios (monthly) from CSV (no Path) ----------
def read_kf_25_csv(filename: str) -> pd.DataFrame:
    with open(filename, "r", encoding="utf-8", errors="ignore") as f:
        lines = f.readlines()

    start_idx = _first_monthly_index_csv(lines)
    if start_idx is None:
        raise ValueError("Could not locate YYYYMM data start in 25_Portfolios_5x5.csv")

    # Header is previous line; parse by commas (NOT whitespace)
    header_line = lines[start_idx - 1]
    header_raw = [t.strip() for t in header_line.strip().split(",")]

    # Detect ncols from the FIRST data row (comma-split)
    first_data_tokens = [t.strip() for t in lines[start_idx].strip().split(",")]
    ncols = len(first_data_tokens)

    # Force first column to 'YYYYMM'
    if not header_raw:
        header_raw = ["YYYYMM"]
    else:
        header_raw[0] = "YYYYMM"

    # align and dedupe
    header = _align_header_to_ncols(header_raw, ncols)
    header = _dedupe(header)

    # read the data block using comma separator
    text = "".join(lines[start_idx:])
    df = pd.read_csv(StringIO(text), sep=",", engine="python", header=None, names=header)

    # keep only proper YYYYMM rows
    df = df[df["YYYYMM"].astype(str).str.isdigit()].copy()
    df["YYYYMM"] = df["YYYYMM"].astype(int)
    df["date"] = pd.to_datetime(df["YYYYMM"].astype(str) + "01", format="%Y%m%d") + pd.offsets.MonthEnd(0)

    # set index and drop YYYYMM
    df = df.set_index("date").drop(columns=["YYYYMM"])

    # keep first 25 portfolio columns (leftmost 25 after YYYYMM)
    keep_cols = list(df.columns)[:25]
    df = df[keep_cols].apply(pd.to_numeric, errors="coerce") / 100.0  # % → decimal
    return df

# ---------- read FF factors (monthly only) from CSV (no Path) ----------
def read_ff3_monthly(filename: str) -> pd.DataFrame:
    with open(filename, "r", encoding="utf-8", errors="ignore") as f:
        lines = f.readlines()

    # find where Annual section starts (truncate monthly section there)
    stop_idx = None
    for i, line in enumerate(lines):
        if "Annual" in line or "ANNUAL" in line:
            stop_idx = i
            break
    monthly_lines = lines if stop_idx is None else lines[:stop_idx]

    start_idx = _first_monthly_index_csv(monthly_lines)
    if start_idx is None:
        raise ValueError("Could not locate YYYYMM monthly start in F-F_Research_Data_Factors.csv")

    header_line = monthly_lines[start_idx - 1]
    header_raw = [t.strip() for t in header_line.strip().split(",")]
    first_data_tokens = [t.strip() for t in monthly_lines[start_idx].strip().split(",")]
    ncols = len(first_data_tokens)

    # first column
    if not header_raw:
        header_raw = ["YYYYMM"]
    else:
        header_raw[0] = "YYYYMM"

    header = _align_header_to_ncols(header_raw, ncols)
    header = _dedupe(header)

    text = "".join(monthly_lines[start_idx:])
    df = pd.read_csv(StringIO(text), sep=",", engine="python", header=None, names=header)

    # keep only monthly numeric rows
    df = df[df["YYYYMM"].astype(str).str.isdigit()].copy()
    df["YYYYMM"] = df["YYYYMM"].astype(int)
    df["date"] = pd.to_datetime(df["YYYYMM"].astype(str) + "01", format="%Y%m%d") + pd.offsets.MonthEnd(0)

    # rename factors to RMRF/SMB/HML/RF
    rename_map = {}
    for c in df.columns:
        cu = c.strip().upper()
        if "MKT" in cu: rename_map[c] = "RMRF"
        elif cu == "SMB": rename_map[c] = "SMB"
        elif cu == "HML": rename_map[c] = "HML"
        elif cu == "RF" : rename_map[c] = "RF"
    df = df.rename(columns=rename_map)

    keep = [c for c in ["date","RMRF","SMB","HML","RF"] if c in (["date"] + list(df.columns))]
    df = df[keep].set_index("date").sort_index()

    # numeric & % → decimal
    for c in [col for col in ["RMRF","SMB","HML","RF"] if col in df.columns]:
        df[c] = pd.to_numeric(df[c], errors="coerce") / 100.0
    return df

# ---------- read both files (filenames only) ----------
ret_5x5 = read_kf_25_csv("25_Portfolios_5x5.csv")
ff3     = read_ff3_monthly("F-F_Research_Data_Factors.csv")

# align on common months
data = ret_5x5.join(ff3, how="inner")

print("Portfolios:", ret_5x5.shape, "| Factors:", ff3.shape)
print("Merged data range:", data.index.min().date(), "→", data.index.max().date())
data.head()

Portfolios: (8632, 25) | Factors: (1190, 4)
Merged data range: 1926-07-31 → 2025-08-31


Unnamed: 0_level_0,SMALL LoBM,ME1 BM2,ME1 BM3,ME1 BM4,SMALL HiBM,ME2 BM1,ME2 BM2,ME2 BM3,ME2 BM4,ME2 BM5,ME3 BM1,ME3 BM2,ME3 BM3,ME3 BM4,ME3 BM5,ME4 BM1,ME4 BM2,ME4 BM3,ME4 BM4,ME4 BM5,BIG LoBM,ME5 BM2,ME5 BM3,ME5 BM4,BIG HiBM,RMRF,SMB,HML,RF
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1
1926-07-31,0.058276,-0.017006,0.005118,-0.021477,0.019583,0.012118,0.024107,0.006056,-0.026082,-0.004527,0.019071,0.024491,-0.004399,0.037289,-0.012629,0.015376,0.01546,0.013389,0.002765,0.024678,0.033248,0.060909,0.020285,0.031263,0.005623,0.0289,-0.0255,-0.0239,0.0022
1926-08-31,-0.020206,-0.080282,0.013968,0.021483,0.085104,0.02362,-0.007525,0.038984,0.002299,0.062937,-0.009386,0.023875,0.019916,0.043179,0.081142,0.013858,0.038587,0.019738,0.021336,0.053422,0.010169,0.041975,0.019769,0.054924,0.077576,0.0264,-0.0114,0.0381,0.0025
1926-09-30,-0.048291,-0.026806,-0.043417,-0.032683,0.008586,-0.026849,-0.005252,0.010789,-0.032877,-0.009419,-0.003406,0.001689,-0.019453,0.026961,-0.036215,0.016897,-0.005246,-0.017724,0.014806,0.00873,-0.012951,0.03661,0.001384,-0.007497,-0.024284,0.0038,-0.0136,0.0005,0.0023
1926-10-31,-0.093633,-0.035519,-0.035024,0.034413,-0.025452,-0.028014,-0.044191,-0.050767,-0.080271,-0.013213,-0.051925,-0.025577,0.004354,-0.021992,-0.031373,-0.039136,-0.026528,-0.021058,-0.032532,-0.053525,-0.027382,-0.030061,-0.022467,-0.046725,-0.058129,-0.0327,-0.0014,0.0082,0.0032
1926-11-30,0.055888,0.041877,0.024384,-0.044495,0.00511,0.031023,-0.017317,0.030425,0.049538,0.027292,0.01911,0.045488,0.00479,0.049524,0.037374,0.034492,0.023823,0.037315,0.051102,0.018213,0.044331,0.025355,0.01528,0.036596,0.025636,0.0254,-0.0011,-0.0061,0.0031


In [5]:
def normalize_port_names(cols):
    out = []
    for c in cols:
        s = str(c).strip()
        s = s.replace("SMALL", "ME1").replace("BIG", "ME5")
        s = s.replace("LoBM", "BM1").replace("HiBM", "BM5")
        s = re.sub(r"\s+", " ", s).upper()
        s = s.replace(" ", "_")
        out.append(s)
    return out

factor_cols = [c for c in ["RMRF","SMB","HML","RF"] if c in data.columns]
port_cols   = [c for c in data.columns if c not in factor_cols]

new_port_cols = normalize_port_names(port_cols)
data = data.rename(columns={old:new for old,new in zip(port_cols, new_port_cols)})

factor_cols = [c for c in ["RMRF","SMB","HML","RF"] if c in data.columns]
port_cols   = [c for c in data.columns if c not in factor_cols]
port_cols[:10]

['ME1_BM1',
 'ME1_BM2',
 'ME1_BM3',
 'ME1_BM4',
 'ME1_BM5',
 'ME2_BM1',
 'ME2_BM2',
 'ME2_BM3',
 'ME2_BM4',
 'ME2_BM5']

## User Inputs

- `start`, `end` — inclusive sample window (YYYY-MM).
- `use_raw_for_descriptives` — True ⇒ descriptives on raw returns (TS/FMB always use **excess**).
- `fmb_beta_window` — rolling months for FMB betas.

In [None]:
import re
from datetime import datetime

def prompt_date_range(dates):
    """
    Prompt user for a start and end date, normalize to YYYY-MM endpoints, and validate against the given date index.
    Keeps looping until valid input is given for both.
    """
    min_date = dates.min()
    max_date = dates.max()

    def normalize_date(s):
        """Try to convert user input to 'YYYY-MM' or Timestamp."""
        s = s.strip()
        patterns = [
            ("%Y-%m", r"^\d{4}-\d{2}$"),
            ("%m/%Y", r"^\d{2}/\d{4}$"),
            ("%Y%m",  r"^\d{6}$"),
            ("%b-%Y", r"^[A-Za-z]{3}-\d{4}$"),
            ("%B-%Y", r"^[A-Za-z]+-\d{4}$"),
        ]
        for fmt, pat in patterns:
            if re.match(pat, s):
                try:
                    return pd.to_datetime(datetime.strptime(s, fmt)), None
                except Exception:
                    return None, f"Could not parse '{s}'"
        try:
            return pd.to_datetime(s), None
        except Exception:
            return None, f"Could not parse '{s}'"

    while True:
        start_input = input(f"Enter start date on or after {min_date.strftime('%Y-%m')} (e.g., 2010-01): ").strip()
        start, err_start = normalize_date(start_input)
        if err_start:
            print(err_start)
            continue  # Invalid start date, retry

        end_input = input(f"Enter end date on or before {max_date.strftime('%Y-%m')} (e.g., 2023-12): ").strip()
        end, err_end = normalize_date(end_input)
        if err_end:
            print(err_end)
            continue  # Invalid end date, retry

        if end < start:
            print("End date must be after or equal to start date.")
            continue  # Retry on inverted range

        if start < min_date or end > max_date:
            print(f"Date range must be within {min_date.strftime('%Y-%m')} and {max_date.strftime('%Y-%m')}.")
            continue  # Retry on out of range

        # Normalize to month end
        start = pd.to_datetime(start) + pd.offsets.MonthEnd(0)
        end = pd.to_datetime(end) + pd.offsets.MonthEnd(0)

        print(f"Validated date range: {start.date()} to {end.date()}")
        return start, end
        

def prompt_model_choice():
    """
    Prompt user for model choice. Options: CAPM, FF3F, Both (case-insensitive).
    Returns:
        One of: 'CAPM', 'FF3F', 'Both'
    """
    valid = {"capm": "CAPM", "ff3f": "FF3F", "both": "Both"}
    while True:
        inp = input("Choose model (CAPM, FF3F, Both): ").strip().lower()
        if inp in valid:
            print(f"Selected model: {valid[inp]}")
            return valid[inp]
        print("Invalid option. Please type CAPM, FF3F, or Both.")

# Usage example:
# start, end = prompt_date_range(data.index)
# model = prompt_model_choice()


Selected model: CAPM


In [20]:
# sample window
start = "1963-07"
end   = "1993-12"

# descriptive stats mode
use_raw_for_descriptives = False   # True to show Mean/StdDev on raw returns

# FMB rolling window
fmb_beta_window = 60

# subset and rebuild lists
sample = data.loc[
    (data.index >= pd.to_datetime(start) + pd.offsets.MonthEnd(0)) &
    (data.index <= pd.to_datetime(end)   + pd.offsets.MonthEnd(0))
].copy()

factor_cols = [c for c in ["RMRF","SMB","HML","RF"] if c in sample.columns]
port_cols   = [c for c in sample.columns if c not in factor_cols]

# EXCESS returns for 25 portfolios
excess = sample[port_cols].sub(sample["RF"], axis=0)

print("Sample window:", sample.index.min().date(), "→", sample.index.max().date())
print("# portfolios:", len(port_cols), "| factors:", factor_cols)

Sample window: 1963-07-31 → 1993-12-31
# portfolios: 25 | factors: ['RMRF', 'SMB', 'HML', 'RF']


## Descriptive Statistics (per portfolio)

For each portfolio *i* we report:

- Mean return $E[r_i]$  
- Standard deviation $\sigma_i$  
- Sharpe ratio $SR_i = \frac{E[r_i - r_f]}{\sigma_i}$  
- *t-statistic of mean excess return* $t(E[r_i - r_f]) = \frac{E[r_i - r_f]}{s(r_i - r_f)/\sqrt{T}}$

In [21]:
base_for_stats = sample[port_cols] if use_raw_for_descriptives else excess

rows = []
for p in port_cols:
    rows.append({
        "Portfolio": p,
        "Mean": base_for_stats[p].mean(),
        "StdDev": base_for_stats[p].std(ddof=1),
        "Sharpe (monthly)": monthly_sharpe(excess[p]),
        "t(mean excess)": tstat_of_mean(excess[p]),
    })
desc_table = pd.DataFrame(rows).set_index("Portfolio")
desc_table

Unnamed: 0_level_0,Mean,StdDev,Sharpe (monthly),t(mean excess)
Portfolio,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
ME1_BM1,0.63044,1.95828,0.321936,17.420251
ME1_BM2,0.396942,1.078638,0.368003,19.913029
ME1_BM3,0.371372,0.991197,0.37467,20.27379
ME1_BM4,0.411751,1.086151,0.379092,20.513047
ME1_BM5,0.695491,1.981758,0.350947,18.990069
ME2_BM1,0.301626,0.6161,0.489574,26.491318
ME2_BM2,0.2573,0.511573,0.502958,27.215549
ME2_BM3,0.256355,0.510105,0.502553,27.193627
ME2_BM4,0.246951,0.488262,0.505776,27.368017
ME2_BM5,0.229995,0.453861,0.506752,27.420877


## Time-Series CAPM

We estimate for each portfolio *i*:

$$
r_{i,t} - r_{f,t} = \alpha_i + \beta_i (R_{M,t} - r_{f,t}) + \varepsilon_{i,t}
$$

Reported statistics:

- $\alpha_i$, $SE(\alpha_i)$, $t(\alpha_i)$  
- $\beta_i$, $SE(\beta_i)$, $t(\beta_i)$  
- $R_i^2$, Average $R^2$ across portfolios  
- GRS test for joint $\alpha_i = 0$ across all assets

In [22]:
capm_rows, alphas_capm, resids_capm = [], [], []
X_capm = sample[["RMRF"]]

for p in port_cols:
    y = excess[p]
    m = ols_with_const(y, X_capm)

    a    = m.params.get("const", np.nan)
    b    = m.params.get("RMRF", np.nan)
    se_a = m.bse.get("const", np.nan)
    se_b = m.bse.get("RMRF", np.nan)
    t_a  = m.tvalues.get("const", np.nan)
    t_b  = m.tvalues.get("RMRF", np.nan)
    r2   = m.rsquared

    capm_rows.append({
        "Portfolio": p,
        "Alpha": a, "SE(Alpha)": se_a, "t(Alpha)": t_a,
        "Beta_MKT": b, "SE(Beta_MKT)": se_b, "t(Beta_MKT)": t_b,
        "R2": r2
    })
    alphas_capm.append(a)
    resids_capm.append(m.resid)

capm_table = pd.DataFrame(capm_rows).set_index("Portfolio")
avg_r2_capm = capm_table["R2"].mean()

# residual covariance (N×N)
resid_mat_capm = pd.DataFrame({p: resids_capm[i] for i, p in enumerate(port_cols)}).dropna()
Sigma_e_capm   = resid_mat_capm.cov()

# factor means/cov
f_means_capm = [sample["RMRF"].mean()]
f_cov_capm   = sample[["RMRF"]].cov()

T_capm = resid_mat_capm.shape[0]
N_capm = len(port_cols)
L_capm = 1

GRS_capm, p_capm = grs_test_with_alphas(alphas_capm, f_means_capm, f_cov_capm, Sigma_e_capm, T_capm, N_capm, L_capm)

capm_summary = pd.DataFrame({
    "Average R2": [avg_r2_capm],
    "GRS (CAPM)": [GRS_capm],
    "p-value": [p_capm],
    "T": [T_capm],
    "N": [N_capm]
})
capm_table, capm_summary

  term = float(m.T @ Sigma_f_inv @ m)
  F = numer * float(a.T @ Sigma_e_inv @ a) / denom


(             Alpha  SE(Alpha)  t(Alpha)  Beta_MKT  SE(Beta_MKT)  t(Beta_MKT)       R2
 Portfolio                                                                            
 ME1_BM1   0.627071   0.036352 17.249919  0.797622      0.811152     0.983320 0.000330
 ME1_BM2   0.395125   0.020023 19.733363  0.430276      0.446793     0.963033 0.000317
 ME1_BM3   0.369724   0.018400 20.093608  0.390274      0.410575     0.950554 0.000309
 ME1_BM4   0.410126   0.020163 20.340185  0.384758      0.449920     0.855170 0.000250
 ME1_BM5   0.694022   0.036793 18.862956  0.347731      0.820988     0.423552 0.000061
 ME2_BM1   0.299822   0.011433 26.223684  0.427183      0.255119     1.674448 0.000957
 ME2_BM2   0.255622   0.009492 26.929288  0.397188      0.211810     1.875211 0.001200
 ME2_BM3   0.254923   0.009467 26.928611  0.338938      0.211236     1.604544 0.000879
 ME2_BM4   0.245734   0.009062 27.116663  0.288147      0.202210     1.424992 0.000694
 ME2_BM5   0.228759   0.008423 27.158630  0

## Time-Series FF3F

We estimate:

$$
r_{i,t} - r_{f,t} = \alpha_i + b_{MKT} RMRF_t + b_{SMB} SMB_t + b_{HML} HML_t + \varepsilon_{i,t}
$$

Reported:

- $\alpha_i$, $t(\alpha_i)$, and $R_i^2$  
- Three factor loadings ($b_{MKT}$, $b_{SMB}$, $b_{HML}$) with standard errors and *t*-values  
- Average $R^2$ and GRS for joint $\alpha=0$

In [23]:
ff3_rows, alphas_ff3, resids_ff3 = [], [], []
X_ff3 = sample[["RMRF","SMB","HML"]]

for p in port_cols:
    y = excess[p]
    m = ols_with_const(y, X_ff3)

    a     = m.params.get("const", np.nan)
    b_mkt = m.params.get("RMRF", np.nan)
    b_smb = m.params.get("SMB",  np.nan)
    b_hml = m.params.get("HML",  np.nan)

    se_a  = m.bse.get("const", np.nan)
    se_m  = m.bse.get("RMRF",  np.nan)
    se_s  = m.bse.get("SMB",   np.nan)
    se_h  = m.bse.get("HML",   np.nan)

    t_a   = m.tvalues.get("const", np.nan)
    t_m   = m.tvalues.get("RMRF",  np.nan)
    t_s   = m.tvalues.get("SMB",   np.nan)
    t_h   = m.tvalues.get("HML",   np.nan)

    r2    = m.rsquared

    ff3_rows.append({
        "Portfolio": p,
        "Alpha": a, "SE(Alpha)": se_a, "t(Alpha)": t_a,
        "Beta_MKT": b_mkt, "SE(Beta_MKT)": se_m, "t(Beta_MKT)": t_m,
        "Beta_SMB": b_smb, "SE(Beta_SMB)": se_s, "t(Beta_SMB)": t_s,
        "Beta_HML": b_hml, "SE(Beta_HML)": se_h, "t(Beta_HML)": t_h,
        "R2": r2
    })
    alphas_ff3.append(a)
    resids_ff3.append(m.resid)

ff3_table = pd.DataFrame(ff3_rows).set_index("Portfolio")
avg_r2_ff3 = ff3_table["R2"].mean()

# residual covariance
resid_mat_ff3 = pd.DataFrame({p: resids_ff3[i] for i, p in enumerate(port_cols)}).dropna()
Sigma_e_ff3   = resid_mat_ff3.cov()

# factor moments
f_means_ff3 = sample[["RMRF","SMB","HML"]].mean().values
f_cov_ff3   = sample[["RMRF","SMB","HML"]].cov()

T_ff3 = resid_mat_ff3.shape[0]
N_ff3 = len(port_cols)
L_ff3 = 3

GRS_ff3, p_ff3 = grs_test_with_alphas(alphas_ff3, f_means_ff3, f_cov_ff3, Sigma_e_ff3, T_ff3, N_ff3, L_ff3)

ff3_summary = pd.DataFrame({
    "Average R2": [avg_r2_ff3],
    "GRS (FF3F)": [GRS_ff3],
    "p-value": [p_ff3],
    "T": [T_ff3],
    "N": [N_ff3]
})
ff3_table, ff3_summary

  term = float(m.T @ Sigma_f_inv @ m)
  F = numer * float(a.T @ Sigma_e_inv @ a) / denom


(             Alpha  SE(Alpha)  t(Alpha)  Beta_MKT  SE(Beta_MKT)  t(Beta_MKT)   Beta_SMB  SE(Beta_SMB)  t(Beta_SMB)  Beta_HML  SE(Beta_HML)  t(Beta_HML)  \
 Portfolio                                                                                                                                                 
 ME1_BM1   0.628078   0.037318 16.830530  1.054412      0.925978     1.138701  -0.991180      1.364043    -0.726649  0.143376      1.507616     0.095101   
 ME1_BM2   0.394367   0.020556 19.184711  0.537044      0.510070     1.052883  -0.262690      0.751376    -0.349611  0.228348      0.830463     0.274965   
 ME1_BM3   0.368358   0.018890 19.500091  0.470233      0.468726     1.003216  -0.095467      0.690472    -0.138264  0.285365      0.763148     0.373931   
 ME1_BM4   0.407723   0.020700 19.697145  0.383384      0.513626     0.746425   0.309631      0.756615     0.409231  0.342892      0.836253     0.410034   
 ME1_BM5   0.690784   0.037771 18.288599  0.240577      0.937231

## Cross-Sectional Fama–MacBeth (FMB)

1. Estimate rolling time-series betas for each portfolio *i* using a window of *fmb_beta_window* months.

2. At each month *t* after the initial window, run a cross-sectional regression:

**CAPM pricing regression**
$$
r_{i,t}^{excess} = \lambda_{0,t} + \lambda_{MKT,t}\,\beta_{i,MKT} + \epsilon_{i,t}
$$

**FF3F pricing regression**
$$
r_{i,t}^{excess} = \lambda_{0,t} + \lambda_{MKT,t}\,\beta_{i,MKT} + \lambda_{SMB,t}\,\beta_{i,SMB} + \lambda_{HML,t}\,\beta_{i,HML} + \epsilon_{i,t}
$$

3. Average each $\lambda$ over time and report its mean, standard error, and *t-statistic*:

$$
t(\bar{\lambda}_k) = \frac{\bar{\lambda}_k}{s(\lambda_{k,t}) / \sqrt{T}}
$$

In [26]:
def rolling_betas(excess_df: pd.DataFrame, X_df: pd.DataFrame, window: int):
    betas = {}
    T = excess_df.shape[0]
    for p in excess_df.columns:
        rows = []
        for t in range(window, T + 1):
            y = excess_df[p].iloc[t-window:t]
            X = X_df.iloc[t-window:t]
            m = ols_with_const(y, X)
            params = m.params.drop("const", errors="ignore")
            rows.append(params)
        idx = excess_df.index[window-1:]
        betas[p] = pd.DataFrame(rows, index=idx)
    return betas

def summarize_lambdas(lambda_df: pd.DataFrame) -> pd.DataFrame:
    out = []
    for c in lambda_df.columns:
        s = lambda_df[c].dropna()
        n = s.shape[0]
        if n <= 1:
            mean = se = tval = np.nan
        else:
            mean = s.mean()
            sd   = s.std(ddof=1)
            se   = sd / np.sqrt(n) if sd > 0 else np.nan
            tval = mean / se if (se is not np.nan and se != 0) else np.nan
        out.append({"Price": c, "Mean": mean, "StdErr": se, "t(Mean)": tval})
    return pd.DataFrame(out).set_index("Price")

# ---- CAPM FMB ----
X_capm = sample[["RMRF"]]
betas_capm = rolling_betas(excess, X_capm, fmb_beta_window)

dates_fmb = excess.index[fmb_beta_window-1:]
lambda_rows_capm = []

for dt in dates_fmb:
    Yi = excess.loc[dt, port_cols].dropna()
    bmat, names = [], []
    for p in port_cols:
        if dt in betas_capm[p].index:
            bmat.append(betas_capm[p].loc[dt].values)  # one: B_MKT
            names.append(p)
    if len(names) == 0:
        continue
    Xcs = pd.DataFrame(bmat, index=names, columns=["B_MKT"])
    Ycs = Yi.reindex(names)
    mod = ols_with_const(Ycs, Xcs)
    lambda_rows_capm.append({
        "date": dt,
        "lambda_0": mod.params.get("const", np.nan),
        "lambda_MKT": mod.params.get("B_MKT", np.nan)
    })

lambda_capm = pd.DataFrame(lambda_rows_capm).set_index("date")
lambda_capm_summary = summarize_lambdas(lambda_capm[["lambda_0","lambda_MKT"]])

lambda_capm.head(5), lambda_capm_summary

ValueError: Shape of passed values is (25, 8), indices imply (25, 1)

## Visuals (MPL defaults)

- **Alpha bars:** display $\alpha_i$ for CAPM and FF3F.  
- **R² scatter:** compare $R_i^2$(CAPM) vs $R_i^2$(FF3F).  
- **FMB λ plots:** plot $\bar{\lambda}_k$ with ±1 standard error bars.

In [None]:
# 1) CAPM alpha bars
plt.figure()
capm_table["Alpha"].sort_values().plot(kind="barh")
plt.title("CAPM Alphas by Portfolio (monthly)")
plt.xlabel("Alpha")
plt.tight_layout()
plt.show()

# 2) FF3F alpha bars
plt.figure()
ff3_table["Alpha"].sort_values().plot(kind="barh")
plt.title("FF3F Alphas by Portfolio (monthly)")
plt.xlabel("Alpha")
plt.tight_layout()
plt.show()

# 3) R² scatter: CAPM vs FF3F
plt.figure()
x = capm_table["R2"]; y = ff3_table["R2"]
plt.scatter(x, y)
minv = float(min(x.min(), y.min()))
maxv = float(max(x.max(), y.max()))
plt.plot([minv, maxv], [minv, maxv])  # 45-degree line
plt.title("R²: CAPM vs FF3F (per portfolio)")
plt.xlabel("R² CAPM")
plt.ylabel("R² FF3F")
plt.tight_layout()
plt.show()

# 4) FMB CAPM lambdas (±1 s.e.)
plt.figure()
s = lambda_capm_summary.loc[["lambda_0","lambda_MKT"]]
plt.errorbar(range(s.shape[0]), s["Mean"], yerr=s["StdErr"])
plt.xticks(range(s.shape[0]), s.index, rotation=0)
plt.title("FMB CAPM — Lambda Means with StdErr")
plt.tight_layout()
plt.show()

# 5) FMB FF3F lambdas (±1 s.e.)
plt.figure()
s = lambda_ff3_summary.loc[["lambda_0","lambda_MKT","lambda_SMB","lambda_HML"]]
plt.errorbar(range(s.shape[0]), s["Mean"], yerr=s["StdErr"])
plt.xticks(range(s.shape[0]), s.index, rotation=0)
plt.title("FMB FF3F — Lambda Means with StdErr")
plt.tight_layout()
plt.show()