In [None]:
import numpy as np
import pandas as pd
import statsmodels.api as sm
from scipy.stats import chi2

In [None]:
EXPORT_DIR = "data/facteurs_systemiq"
res_cons = pd.read_csv(f"{EXPORT_DIR}/resid_construction.csv")
res_ind = pd.read_csv(f"{EXPORT_DIR}/industrie.csv")
res_ext = pd.read_csv(f"{EXPORT_DIR}/resid_extraction_primaire.csv")
res_fin = pd.read_csv(f"{EXPORT_DIR}/resid_finance.csv")
res_info = pd.read_csv(f"{EXPORT_DIR}/resid_info_com.csv")
res_public = pd.read_csv(f"{EXPORT_DIR}/resid_public.csv")
res_serv_pro = pd.read_csv(f"{EXPORT_DIR}/resid_services_pro.csv")
res_serv = pd.read_csv(f"{EXPORT_DIR}/resid_services.csv")

In [None]:
def breusch_pagan_contemp(resid_df):
    """
    resid_df : DataFrame (T x m) de résidus par secteur (alignés sur un index temps).
    Le test est fait sur l'intersection des dates (lignes complètes).
    """
    E = resid_df.dropna(axis=0, how="any")  # intersection
    T, m = E.shape
    if m < 2:
        raise ValueError("Il faut au moins 2 secteurs (m>=2).")
    if T < 3:
        raise ValueError("Pas assez d'observations communes pour calculer les corrélations.")

    R = E.corr()

    # Somme des r_ij^2 pour i<j
    r2_sum = 0.0
    cols = list(R.columns)
    for i in range(m):
        for j in range(i + 1, m):
            rij = R.loc[cols[i], cols[j]]
            r2_sum += float(rij**2)

    BP = T * r2_sum
    df_chi = m * (m - 1) / 2
    pval = 1 - chi2.cdf(BP, df_chi)

    # Top corrélations (utile pour diagnostiquer)
    R_off = R.copy()
    np.fill_diagonal(R_off.values, np.nan)
    top_pairs = (
        R_off.abs()
             .stack()
             .sort_values(ascending=False)
             .head(15)
    )

    return {
        "T_common": T,
        "m": m,
        "BP": BP,
        "df": df_chi,
        "pvalue": pval,
        "R": R,
        "top_abs_corr_pairs": top_pairs,
        "E_common": E
    }