In [1]:
import pandas as pd
import numpy as np
import statsmodels.api as sm

from sklearn.decomposition import FactorAnalysis
from sklearn.preprocessing import StandardScaler

In [2]:
income_after_outlier_drop_standardized = pd.read_pickle("income_after_outlier_drop_standardized.pkl")
balance_after_outlier_drop_standardized = pd.read_pickle("balance_after_outlier_drop_standardized.pkl")
cashflow_after_outlier_drop_standardized = pd.read_pickle("cashflow_after_outlier_drop_standardized.pkl")
pe_aligned = pd.read_pickle("pe_aligned.pkl")

print(income_after_outlier_drop_standardized.shape)

(410, 13)


In [3]:
def compute_latent_factors_with_components_no_standardize(
    df: pd.DataFrame,
    n_factors: int = 3,
    dropna: bool = True,
    check_standardized: bool = True,
    mean_tol: float = 1e-2,
    std_tol: float = 1e-2,
):
    """
    Parameters
    ----------
    df : pd.DataFrame
        Input dataframe containing numeric features (and possibly id cols).
    n_factors : int
        Number of latent factors.
    dropna : bool
        If True, drop rows with any NaNs in numeric columns.
    check_standardized : bool
        If True, prints warnings if data doesn't look standardized (mean~0, std~1).
    mean_tol, std_tol : float
        Tolerances for the standardized check.

    Returns
    -------
    F_df : pd.DataFrame
        Latent factor scores (n x k).
    Lambda : np.ndarray
        Loadings matrix (p x k).
    macro_total_df : pd.DataFrame
        Total macro component in original feature space (n x p).
    macro_by_factor_df : dict[str, pd.DataFrame]
        Each factor's macro component in original feature space.
    idio_df : pd.DataFrame
        Idiosyncratic component (X - macro_total).
    """
    numeric_df = df.select_dtypes(include=["number"])
    if dropna:
        numeric_df = numeric_df.dropna()

    # Convert to numpy array without scaling
    X = numeric_df.to_numpy(dtype=float)

    if check_standardized:
        col_means = np.nanmean(X, axis=0)
        col_stds = np.nanstd(X, axis=0, ddof=0)

        max_abs_mean = float(np.max(np.abs(col_means))) if col_means.size else 0.0
        max_abs_std1 = float(np.max(np.abs(col_stds - 1.0))) if col_stds.size else 0.0

        if max_abs_mean > mean_tol or max_abs_std1 > std_tol:
            print(
                f"[warn] Data may NOT be standardized: "
                f"max|mean|={max_abs_mean:.4g}, max|std-1|={max_abs_std1:.4g}. "
                f"(If you intended correlation-based FA, standardize upstream once.)"
            )

    # Factor Analysis on X as-is
    fa = FactorAnalysis(n_components=n_factors, random_state=0)
    F = fa.fit_transform(X)          # (n x k)
    Lambda = fa.components_.T        # (p x k)

    # Macro components
    X_macro_total = F @ Lambda.T     # (n x p)

    macro_by_factor_df = {}
    for j in range(n_factors):
        Fj = F[:, [j]]               # (n x 1)
        Lj = Lambda[:, [j]]          # (p x 1)
        X_macro_j = Fj @ Lj.T        # (n x p)
        macro_by_factor_df[f"LF{j+1}_macro"] = pd.DataFrame(
            X_macro_j, index=numeric_df.index, columns=numeric_df.columns
        )

    # Idiosyncratic component in same space as X
    X_idio = X - X_macro_total

    # Outputs as DataFrames
    F_df = pd.DataFrame(
        F, index=numeric_df.index, columns=[f"LF{i+1}" for i in range(n_factors)]
    )
    macro_total_df = pd.DataFrame(
        X_macro_total, index=numeric_df.index, columns=numeric_df.columns
    )
    idio_df = pd.DataFrame(
        X_idio, index=numeric_df.index, columns=numeric_df.columns
    )

    return F_df, Lambda, macro_total_df, macro_by_factor_df, idio_df


In [4]:
# Compute latent factors for each aligned dataset
income_latent = compute_latent_factors_with_components_no_standardize(income_after_outlier_drop_standardized, n_factors=2)
balance_latent = compute_latent_factors_with_components_no_standardize(balance_after_outlier_drop_standardized, n_factors=2)
cashflow_latent = compute_latent_factors_with_components_no_standardize(cashflow_after_outlier_drop_standardized, n_factors=2)

In [5]:
# Pull symbol_date in the exact order used to compute the latent factors
ids_from_statement = (
    income_after_outlier_drop_standardized
    .loc[income_latent[0].index, "symbol_date"]
    .reset_index(drop=True)
)

# symbol_date from the dependent dataframe
ids_from_pe = pe_aligned["symbol_date"].reset_index(drop=True)

print("Same length:", len(ids_from_statement) == len(ids_from_pe))
print("Row-for-row identical:", (ids_from_statement == ids_from_pe).all())


Same length: True
Row-for-row identical: True


In [6]:
def regress_each_latent_factor(F_df, y):
    """
    Runs separate univariate regressions:
        log_PE ~ LF_i
    """
    results = []

    for lf in F_df.columns:
        model, summary = run_ols(y, F_df[[lf]])

        results.append({
            "factor": lf,
            "coef": summary.loc[lf, "coef"],
            "t_value": summary.loc[lf, "t_value"],
            "p_value": summary.loc[lf, "p_value"],
            "r2": model.rsquared,
            "adj_r2": model.rsquared_adj
        })

    return pd.DataFrame(results).sort_values("p_value")


In [7]:
def regress_each_latent_factor_univariate(
    latent_F_df,
    pe_aligned,
    statement_label=""
):
    """
    Runs univariate regressions:
        log_PE ~ LF_i
    for a given latent factor dataframe.

    Assumes latent_F_df rows are aligned with pe_aligned rows.
    """

    y = pe_aligned["log_PE"].astype(float)
    F_df = latent_F_df.astype(float)

    rows = []
    for lf in F_df.columns:
        X = sm.add_constant(F_df[[lf]])
        m = sm.OLS(y, X, missing="drop").fit()

        rows.append({
            "statement": statement_label,
            "factor": lf,
            "coef": m.params[lf],
            "t_value": m.tvalues[lf],
            "p_value": m.pvalues[lf],
            "r2": m.rsquared,
            "adj_r2": m.rsquared_adj,
            "n_obs": int(m.nobs)
        })

    return pd.DataFrame(rows).sort_values("p_value")


In [8]:
# Extract factor score dataframes
income_F  = income_latent[0]
balance_F = balance_latent[0]
cashflow_F = cashflow_latent[0]

# (You already ran assert_aligned(...) beforehand)

uni_income_lf = regress_each_latent_factor_univariate(
    income_F,
    pe_aligned,
    statement_label="Income"
)

uni_balance_lf = regress_each_latent_factor_univariate(
    balance_F,
    pe_aligned,
    statement_label="Balance"
)

uni_cashflow_lf = regress_each_latent_factor_univariate(
    cashflow_F,
    pe_aligned,
    statement_label="Cashflow"
)

# Combine if you want one table
uni_all_lf = pd.concat(
    [uni_income_lf, uni_balance_lf, uni_cashflow_lf],
    ignore_index=True
)

print(uni_all_lf)


  statement factor      coef   t_value   p_value        r2    adj_r2  n_obs
0    Income    LF2  0.110243  2.995077  0.002911  0.021513  0.019115    410
1    Income    LF1  0.089037  2.409785  0.016404  0.014033  0.011617    410
2   Balance    LF1 -0.061351 -1.654298  0.098836  0.006663  0.004228    410
3   Balance    LF2 -0.029386 -0.783663  0.433693  0.001503 -0.000944    410
4  Cashflow    LF1 -0.015522 -0.344216  0.730861  0.000290 -0.002160    410
5  Cashflow    LF2  0.013710  0.284423  0.776231  0.000198 -0.002252    410


In [9]:
def regress_all_latent_factors(F_df, y):
    """
    Runs multivariate regression:
        log_PE ~ LF1 + LF2 + ... + LFk
    """
    model, summary = run_ols(y, F_df)

    model_stats = {
        "r2": model.rsquared,
        "adj_r2": model.rsquared_adj,
        "f_stat": model.fvalue,
        "f_pvalue": model.f_pvalue,
        "n_obs": int(model.nobs)
    }

    return model, summary, model_stats