# Empirical Homogeneity Test

## Notebook Setup

In [1]:
import pandas as pd 
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
import torch
import sys 
sys.path.append('../')
from utils import utils
sys.executable

'/Users/fanghema/Desktop/aaSTAT_5200/STAT_5200_final_project/env/bin/python'

In [2]:
data = pd.read_csv(
    '../data/processed/data_extended.csv',
    index_col=0,
    parse_dates=True
)

factors = ['Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA']
assets = [col for col in data.columns if col != 'RF' and col not in factors]
data['Quarter'] = data.index.to_period("Q")

## Set up empirical testing parameters

In [3]:
factor_options = [
    ['Mkt-RF'],
    ['Mkt-RF', 'SMB', 'HML'],
    ['Mkt-RF', 'SMB', 'HML', 'RMW', 'CMA'],
]
R_options = [1, 2, 5]
sample_period_options = [
    ('1963-01-01', '2025-12-31'),
    ('1963-01-01', '1983-01-01'),
    ('1973-01-01', '1993-01-01'),
    ('1983-01-01', '2003-01-01'),
    ('1993-01-01', '2013-01-01'),
    ('2003-01-01', '2023-01-01'),
]

results = pd.DataFrame(
    index=pd.MultiIndex.from_product([
        list(map(tuple, factor_options)),   # convert lists → tuples
        R_options,
        sample_period_options
    ]),
    columns=['gamma_a_lam', 'gamma_a', 'gamma_lam']
)


print(f"Total combinations: {results.shape[0]}")
counter = 0

for factors in factor_options:
    K = len(factors)
    for R in R_options:
        for sample_period in sample_period_options:
            print(f"Processing {counter}/{results.shape[0]}: {factors} - {R} - {sample_period}")
            data_slice = data.loc[
                (data.index > sample_period[0]) &
                (data.index < sample_period[1])
            ]
            beta_loading, returns_df, realized_covariance, residuals = utils.calculate_factor_loading(
                data_slice, 
                factors=factors, 
                assets=assets
            )

            excess_returns = returns_df.groupby("Quarter").sum()[assets].T.values
            industries = beta_loading.index.get_level_values(0).unique().tolist()
            factors_names = beta_loading.index.get_level_values(1).unique().tolist()

            N = len(industries)
            K = len(factors)
            T = beta_loading.shape[1]

            beta_hat_np = np.zeros((N, K, T))

            for i, asset in enumerate(industries):
                for j, factor in enumerate(factors):
                    beta_hat_np[i, j, :] = beta_loading.loc[(asset, factor)].values

            
            eta, G, beta_star, objective = utils.iterative_convergence(
                beta_hat_np, 
                excess_returns,
                N = N,
                K = K, 
                R = R,
                T = T,
                n_iter=2000
            )

            avar = utils.estimate_avar(
                beta_hat=beta_hat_np,
                excess_returns=excess_returns,
                eta=eta,
                G=G,
                beta_star=beta_star,
                realized_covariance=realized_covariance,
                residuals=residuals,
                N = N,
                K = K, 
                R = R,
                T = T,
            )

                
            gamma_a_lambda = utils.full_homogeneity_test(
                eta = eta, 
                avar = avar,
                N = N,
                K = K, 
                T = T
            )

            gamma_a = utils.intercept_homogeneity_test(
                eta = eta, 
                avar = avar,
                N = N,
                K = K, 
                T = T
            )

            gamma_lambda = utils.slope_homogeneity_test(
                eta = eta, 
                avar = avar,
                N = N,
                K = K, 
                T = T
            )
            print(f"Test statistics")
            print(f"gamma_a_lam: {gamma_a_lambda}")
            print(f"gamma_a: {gamma_a}")
            print(f"gamma_lam: {gamma_lambda}")

            results.loc[(
                tuple(factors), R, sample_period
            )] = np.asarray([
                gamma_a_lambda,
                gamma_a,
                gamma_lambda
            ])
            counter += 1
            print(f"===========================")




Total combinations: 54
Processing 0/54: ['Mkt-RF'] - 1 - ('1963-01-01', '2025-12-31')
Test statistics
gamma_a_lam: 44.021375176126504
gamma_a: -4.700491211791159
gamma_lam: -4.793114003302496
Processing 1/54: ['Mkt-RF'] - 1 - ('1963-01-01', '1983-01-01')
Test statistics
gamma_a_lam: -19.45507836193608
gamma_a: -5.290005147715778
gamma_lam: -4.82127703591286
Processing 2/54: ['Mkt-RF'] - 1 - ('1973-01-01', '1993-01-01')
Test statistics
gamma_a_lam: 38.51023758818772
gamma_a: -4.780716850584511
gamma_lam: -4.7891599721430245
Processing 3/54: ['Mkt-RF'] - 1 - ('1983-01-01', '2003-01-01')
Test statistics
gamma_a_lam: -29.992553862296607
gamma_a: -4.8616347938768305
gamma_lam: -4.796865405543951
Processing 4/54: ['Mkt-RF'] - 1 - ('1993-01-01', '2013-01-01')
Test statistics
gamma_a_lam: -11.007995129966412
gamma_a: -5.291507502318423
gamma_lam: -4.738953249229063
Processing 5/54: ['Mkt-RF'] - 1 - ('2003-01-01', '2023-01-01')
Test statistics
gamma_a_lam: 24.688723708717475
gamma_a: -3.7499377

In [4]:
results

Unnamed: 0,Unnamed: 1,Unnamed: 2,gamma_a_lam,gamma_a,gamma_lam
"(Mkt-RF,)",1,"(1963-01-01, 2025-12-31)",44.021375,-4.700491,-4.793114
"(Mkt-RF,)",1,"(1963-01-01, 1983-01-01)",-19.455078,-5.290005,-4.821277
"(Mkt-RF,)",1,"(1973-01-01, 1993-01-01)",38.510238,-4.780717,-4.78916
"(Mkt-RF,)",1,"(1983-01-01, 2003-01-01)",-29.992554,-4.861635,-4.796865
"(Mkt-RF,)",1,"(1993-01-01, 2013-01-01)",-11.007995,-5.291508,-4.738953
"(Mkt-RF,)",1,"(2003-01-01, 2023-01-01)",24.688724,-3.749938,-4.302826
"(Mkt-RF,)",2,"(1963-01-01, 2025-12-31)",-2.673267,-5.339213,-4.798138
"(Mkt-RF,)",2,"(1963-01-01, 1983-01-01)",88.124487,0.485877,-2.312996
"(Mkt-RF,)",2,"(1973-01-01, 1993-01-01)",-42.373443,-4.861106,-4.798697
"(Mkt-RF,)",2,"(1983-01-01, 2003-01-01)",-66.679785,-4.304673,-5.388708


In [5]:
def clean_results_index(results):

    fac_idx = results.index.get_level_values(0)
    R_idx   = results.index.get_level_values(1)
    t_idx   = results.index.get_level_values(2)

    fac_new = [len(x) for x in fac_idx]

    t_new = [
        f"{str(start)[:4]}–{str(end)[:4]}"
        for start, end in t_idx
    ]

    new_index = pd.MultiIndex.from_arrays(
        [fac_new, R_idx, t_new],
        names=["K", "R", "Period"]
    )

    cleaned = results.copy()
    cleaned.index = new_index
    return cleaned

cleaned_results = clean_results_index(results)
cleaned_results

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,gamma_a_lam,gamma_a,gamma_lam
K,R,Period,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
1,1,1963–2025,44.021375,-4.700491,-4.793114
1,1,1963–1983,-19.455078,-5.290005,-4.821277
1,1,1973–1993,38.510238,-4.780717,-4.78916
1,1,1983–2003,-29.992554,-4.861635,-4.796865
1,1,1993–2013,-11.007995,-5.291508,-4.738953
1,1,2003–2023,24.688724,-3.749938,-4.302826
1,2,1963–2025,-2.673267,-5.339213,-4.798138
1,2,1963–1983,88.124487,0.485877,-2.312996
1,2,1973–1993,-42.373443,-4.861106,-4.798697
1,2,1983–2003,-66.679785,-4.304673,-5.388708


In [6]:
table_gamma_a_lam = cleaned_results["gamma_a_lam"].unstack(level=[0,1])
table_gamma_a     = cleaned_results["gamma_a"].unstack(level=[0,1])
table_gamma_lam   = cleaned_results["gamma_lam"].unstack(level=[0,1])

In [7]:
def add_p_values(table):
    table_numeric = table.apply(pd.to_numeric, errors="coerce")

    periods = table_numeric.index
    columns = table_numeric.columns

    new_rows = []
    new_index = []

    for period in periods:
        stats = table_numeric.loc[period].values.astype(float)

        new_rows.append(stats)
        new_index.append((period, "$\gamma$"))

        pvals = 2 * (1 - norm.cdf(np.abs(stats)))
        new_rows.append(pvals)
        new_index.append((period, "$p$"))

    multi_index = pd.MultiIndex.from_tuples(new_index, names=["Period", "Type"])
    new_table = pd.DataFrame(new_rows, index=multi_index, columns=columns)

    return new_table

In [8]:
table_gamma_a_lam

K,1,1,1,3,3,3,5,5,5
R,1,2,5,1,2,5,1,2,5
Period,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2
1963–1983,-19.455078,88.124487,-54.073769,-5.314649,-5.4852,80.827827,-318.541687,-7.20782,1.33291
1963–2025,44.021375,-2.673267,2438.235755,-8.923047,-6.179767,25.315847,-6.989102,-8.436649,-9.916105
1973–1993,38.510238,-42.373443,54.923418,-10.300183,-8.93029,-8.791554,-10.77598,-11.08305,-7.920071
1983–2003,-29.992554,-66.679785,-110.746221,-2.406316,28.525922,160.972484,-11.416337,-9.906017,3.275619
1993–2013,-11.007995,-52.799651,23007.892324,-0.419906,-5.057214,20.582393,-5.13662,-1.924427,39.095191
2003–2023,24.688724,-4.784644,2327.368624,-3.493712,6.245208,190.731657,8.601293,0.336696,1.240625


In [9]:
table_gamma_a_lam = add_p_values(table_gamma_a_lam)
table_gamma_a = add_p_values(table_gamma_a)
table_gamma_lam = add_p_values(table_gamma_lam)

In [None]:
latex_a_lam = table_gamma_a_lam.round(3).to_latex(
    multirow=True,
    multicolumn=True,
    index=True,
    escape=False,
    caption="Joint Homogeneity Test ($\\Gamma_{\\alpha,\\lambda}$) with p-values",
    label="tab:gamma_a_lam_with_p",
    float_format="%.2f",
)

latex_a = table_gamma_a.round(3).to_latex(
    multirow=True,
    multicolumn=True,
    index=True,
    escape=False,
    caption="Intercept Homogeneity Test ($\\Gamma_{\\alpha}$) with p-values",
    label="tab:gamma_a",
    float_format="%.2f",
)

latex_lam = table_gamma_lam.round(3).to_latex(
    multirow=True,
    multicolumn=True,
    index=True,
    escape=False,
    caption="Slope Homogeneity Test ($\\Gamma_{\\lambda}$) with p-values",
    label="tab:gamma_lam",
    float_format="%.2f",
)

print(latex_a_lam)
print(latex_a)
print(latex_lam)