In [1]:
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import numpy as np
from statsmodels.graphics.regressionplots import plot_partregress_grid
from matplotlib.colors import ListedColormap, BoundaryNorm
import tempfile, os

In [2]:
df = pd.read_csv( "data_W.csv" )

In [3]:
out_path = "./img2/"

In [4]:
factor_cols = ['rmrf', 'smb', 'hml', 'umd']
xs_cols = [f'XS{i}' for i in range(1, 11)]
xs_w_cols = [f'XS{i}_W' for i in range(1, 11)]

In [5]:
# tieni solo le colonne necessarie e rimuovi righe con NaN
use = df[xs_cols + factor_cols].dropna().copy()

X = sm.add_constant(use[factor_cols])  # aggiunge l'intercetta

# --- run OLS for each XS ---
results = {}  # per riusare nei punti successivi
for xs in xs_cols:
    y = use[xs]
    model = sm.OLS(y, X)
    results[xs] = model.fit()

In [6]:
#Nuova regressione ma per dati winsorizzati
# tieni solo le colonne necessarie e rimuovi righe con NaN
use_w = df[xs_w_cols + factor_cols].dropna().copy()

X = sm.add_constant(use_w[factor_cols])  # aggiunge l'intercetta

# --- run OLS for each XS ---
results_w = {}  # per riusare nei punti successivi
for xs_w in xs_w_cols:
    y = use_w[xs_w]
    model = sm.OLS(y, X)
    results_w[xs_w] = model.fit()

results_w = {k.replace('_W', ''): v for k, v in results_w.items()}

In [None]:
def plot_betas_grouped(results, group_gap=1.5, palette='tab10'):
    """
    Grouped bar chart of regression betas:
    - each group = explanatory variable
    - each bar = portfolio
    """
    # Estrai coefficienti
    any_res = next(iter(results.values()))
    vars_all = [v for v in any_res.params.index if v.lower() != 'const']
    pf_names = list(results.keys())
    betas = pd.DataFrame({pf: res.params.reindex(vars_all) for pf, res in results.items()})
    
    n_vars = len(vars_all)
    n_pf = len(pf_names)
    
    # posizioni sull’asse x
    group_width = n_pf
    positions = np.arange(n_vars * (group_width + group_gap))  # base
    tick_pos = []  # dove mettere etichette variabili
    bar_positions = []  # coordinate reali delle barre
    beta_values = []
    colors = plt.get_cmap(palette)(np.linspace(0, 1, n_pf))
    
    for i, var in enumerate(vars_all):
        start = i * (group_width + group_gap)
        xs = np.arange(start, start + n_pf)
        bar_positions.extend(xs)
        beta_values.extend(betas.loc[var].values)
        tick_pos.append(start + group_width/2)
    
    fig, ax = plt.subplots(figsize=(1.3*n_vars+3, 6))
    
    # Disegna per ogni pf
    for j, pf in enumerate(pf_names):
        xs = [i*(group_width+group_gap)+j for i in range(n_vars)]
        ax.bar(xs, betas.loc[:, pf].values, color=colors[j], width=0.8, label=pf)
    
    # ticks e labels
    ax.set_xticks(tick_pos)
    ax.set_xticklabels(vars_all, rotation=45, ha='right')
    ax.axhline(0, color='black', linewidth=1)
    
    ax.set_ylabel('Coefficient (β)')
    ax.set_title('Estimated Betas Across Portfolios')
    ax.legend(title='Portfolio', bbox_to_anchor=(1.02, 1), loc='upper left')
    
    # spaziatura e limiti
    ax.set_xlim(-0.5, n_vars*(group_width+group_gap) - group_gap)
    fig.tight_layout()
    plt.savefig(f"{out_path}estimated_betas.png", dpi=300, bbox_inches="tight")
    return fig

In [None]:
fig = plot_betas_grouped(results, group_gap=2, palette="coolwarm")

In [None]:
def plot_betas_grouped_delta(results_before, results_after,
                             group_gap=1.8, palette='coolwarm',
                             abs_tol=1e-3, rel_tol=1e-2):
    # normalizza eventuale suffisso _W
    results_after = {k.replace('_W', ''): v for k, v in results_after.items()}

    # ordine portafogli = come in results_before, filtrando quelli presenti in after
    pfs = [pf for pf in results_before.keys() if pf in results_after]
    any_res = next(iter(results_after.values()))
    vars_all = [v for v in any_res.params.index if v.lower() != 'const']

    betas_b = pd.DataFrame({pf: results_before[pf].params.reindex(vars_all) for pf in pfs})
    betas_a = pd.DataFrame({pf: results_after[pf].params.reindex(vars_all)  for pf in pfs})

    diff = (betas_a - betas_b).astype(float)
    tol  = np.maximum(abs_tol, (rel_tol * betas_b.abs()).astype(float)).reindex_like(diff)
    changed = (diff.abs() > tol)

    n_vars, n_pf = len(vars_all), len(pfs)
    colors = plt.get_cmap(palette)(np.linspace(0, 1, n_pf))
    group_width = n_pf

    fig, ax = plt.subplots(figsize=(1.3*n_vars+3, 6))

    # base grigia = before
    for j, pf in enumerate(pfs):
        xs = [i*(group_width+group_gap)+j for i in range(n_vars)]
        ax.bar(xs, betas_b.loc[:, pf].values, color="#d9d9d9", width=0.8, zorder=1)

    # overlay colorato = sola differenza, e teniamo traccia di chi va in legenda
    changed_pf_for_legend = []
    for j, pf in enumerate(pfs):
        xs = np.array([i*(group_width+group_gap)+j for i in range(n_vars)])
        b_before = betas_b.loc[:, pf].values
        b_after  = betas_a.loc[:, pf].values
        delta    = b_after - b_before
        ch_mask  = changed.loc[:, pf].values

        if not ch_mask.any():
            continue  # niente overlay, niente legenda

        # aumenti
        inc = ch_mask & (delta > 0)
        if inc.any():
            ax.bar(xs[inc], delta[inc], bottom=b_before[inc],
                   color=colors[j], width=0.8, alpha=0.95, zorder=2,
                   label=pf if pf not in changed_pf_for_legend else None)

        # diminuzioni
        dec = ch_mask & (delta < 0)
        if dec.any():
            ax.bar(xs[dec], np.abs(delta[dec]), bottom=b_after[dec],
                   color=colors[j], width=0.8, alpha=0.95, zorder=2,
                   label=pf if (pf not in changed_pf_for_legend and not inc.any()) else None)

        changed_pf_for_legend.append(pf)

    # ticks di gruppo (variabili)
    tick_pos = [i*(group_width+group_gap) + group_width/2 for i in range(n_vars)]
    ax.set_xticks(tick_pos)
    ax.set_xticklabels(vars_all, rotation=45, ha='right')

    ax.axhline(0, color='black', linewidth=1)
    ax.set_ylabel('Coefficient (β)')
    ax.set_title('Betas After Winsorization (colored = changed portion)')
    ax.yaxis.grid(True, linestyle=':', linewidth=0.7, alpha=0.6)
    ax.set_axisbelow(True)

    # legenda solo per i pf che hanno cambiato
    if changed_pf_for_legend:
        ax.legend(title='Changed portfolios', bbox_to_anchor=(1.02, 1), loc='upper left')

    ax.set_xlim(-0.5, n_vars*(group_width+group_gap) - group_gap)
    fig.tight_layout()
    plt.savefig(f"{out_path}estimated_betas_after_winsorization.png", dpi=300, bbox_inches="tight")
    return fig

In [None]:
fig = plot_betas_grouped_delta(results, results_w, group_gap=2.0, abs_tol=1e-3, rel_tol=0.02)

In [None]:
def _collect_tables(results):
    any_res = next(iter(results.values()))
    vars_all = [v for v in any_res.params.index if v.lower() != 'const']
    pf_names = list(results.keys())
    pvals = pd.DataFrame(index=pf_names, columns=vars_all, dtype=float)
    for pf, res in results.items():
        pvals.loc[pf] = res.pvalues.reindex(vars_all).values
    return pvals, vars_all, pf_names

def _format_p(p):
    return f"{p:.3f}" if p >= 1e-3 else f"{p:.1e}"

def plot_pvalue_heatmap(results, thresholds=(0.10, 0.05, 0.03, 0.01)):
    """
    P-value heatmap (blue shades, categorical legend).
    Crisp cells (no bleed) via pcolormesh + no interpolation.
    """
    # --- palette blu (chiaro -> scuro)
    colors = ["#f2f5fa", "#c9d8f2", "#9cbce8", "#669dd6", "#2c7bb6"]

    pvals, vars_all, pf_names = _collect_tables(results)
    P = pvals[vars_all].astype(float).values

    # bins in ordine crescente
    t1, t2, t3, t4 = thresholds
    bins = [0.0, t4, t3, t2, t1, 1.0]
    cmap = ListedColormap(colors)
    norm = BoundaryNorm(bins, cmap.N, clip=True)

    # figure
    fig, ax = plt.subplots(figsize=(1.2*len(vars_all)+3, 0.7*len(pf_names)+2))

    # coordinate deI bordi cella (righe x colonne)
    ny, nx = P.shape
    X, Y = np.meshgrid(np.arange(nx+1), np.arange(ny+1))

    # celle nette senza bordi/interpolazione
    qm = ax.pcolormesh(X, Y, P, cmap=cmap, norm=norm, shading='flat',
                       edgecolors='face', linewidth=0, antialiased=False)

    # ticks centrati sulle celle
    ax.set_xticks(np.arange(nx) + 0.5)
    ax.set_xticklabels(vars_all, rotation=45, ha='right')
    ax.set_yticks(np.arange(ny) + 0.5)
    ax.set_yticklabels(pf_names)

    # annotazioni al centro cella
    for i in range(ny):
        for j in range(nx):
            ax.text(j+0.5, i+0.5, _format_p(P[i, j]),
                    ha='center', va='center', fontsize=9, color='black')

    # griglia sottile (linee bianche tra celle)
    ax.set_xticks(np.arange(nx+1), minor=True)
    ax.set_yticks(np.arange(ny+1), minor=True)
    ax.grid(which='minor', color='white', linewidth=0.8)
    ax.tick_params(which='minor', bottom=False, left=False)

    # colorbar categoriale come prima
    cbar = fig.colorbar(qm, ax=ax,
                        ticks=[(bins[k]+bins[k+1])/2 for k in range(len(bins)-1)])
    cbar.ax.set_yticklabels([
        "< 0.01 (highly significant)",
        "0.01–0.03 (very significant)",
        "0.03–0.05 (significant)",
        "0.05–0.10 (marginally significant)",
        "> 0.10 (not significant)"
    ])
    cbar.set_label("p-value", rotation=90)

    ax.set_xlim(0, nx)
    ax.set_ylim(ny, 0)  # origine in alto
    ax.set_title("P-Values Across Portfolios", fontsize=13)
    fig.tight_layout()
    plt.savefig(f"{out_path}p-values_heatmap.png", dpi=300, bbox_inches="tight")
    return fig

In [None]:
fig1 = plot_pvalue_heatmap(results)

In [None]:
def plot_all_actual_vs_fitted(results, y_df, save_path="actual_vs_fitted_all.png"):
    n_rows, n_cols = 5, 2  # 10 pf → 5x2
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(8*n_cols, 5*n_rows))
    axes = axes.flatten()

    i = 0
    for pf, res in results.items():
        if i >= len(axes): break
        ax = axes[i]

        # trova la colonna corretta in y_df (pf o pf_W)
        col = pf if pf in y_df.columns else (pf + "_W" if (pf + "_W") in y_df.columns else None)
        if col is None:
            ax.set_visible(False)
            i += 1
            continue

        # riallinea per indice
        fitted = res.fittedvalues
        y = y_df[col].reindex(fitted.index)

        # filtra NaN
        mask = (~np.asarray(fitted.isna())) & (~np.asarray(y.isna()))
        x = np.asarray(fitted[mask])
        yy = np.asarray(y[mask])

        if x.size == 0:
            ax.set_visible(False)
            i += 1
            continue

        ax.scatter(x, yy, s=12)
        m = np.nanmin([x.min(), yy.min()])
        M = np.nanmax([x.max(), yy.max()])
        ax.plot([m, M], [m, M], color='red', lw=1)

        ax.set_xlabel('Fitted')
        ax.set_ylabel('Actual')
        ax.set_title(f"{pf} — Actual vs Fitted")
        i += 1

    # eventuali assi in più → invisibili
    for j in range(i, len(axes)):
        axes[j].set_visible(False)

    fig.tight_layout()
    plt.savefig(f"{out_path}{save_path}", dpi=300, bbox_inches='tight')
    plt.close(fig)
    print(f"✅ Saved all plots to {save_path}")

In [None]:
plot_all_actual_vs_fitted(results_w, use_w, save_path="actual_vs_fitted_all.png")

In [None]:
def save_partial_plots_grid(
    results,
    save_path="partials_all.png",
    single_dpi=220,
    final_dpi=300,
    title_y=0.97,
    subplots_top=0.95,
    hspace=0.03,
    wspace=0.02
):
    """
    Save 2x5 grid of partial regression plots using temporary PNGs.

    Parameters
    ----------
    results : dict
        Dizionario {portfolio: results_OLS}
    save_path : str
        Percorso file finale PNG.
    single_dpi : int
        DPI per i singoli plot salvati.
    final_dpi : int
        DPI per la griglia finale.
    title_y : float
        Altezza verticale del titolo per i singoli plot (0..1, più basso = più vicino ai pannelli).
    subplots_top : float
        Limite superiore dei singoli plot (ridurlo se il titolo si sovrappone).
    hspace : float
        Spazio verticale tra le due righe nella griglia finale.
    wspace : float
        Spazio orizzontale tra le colonne nella griglia finale.
    """

    tmpdir = tempfile.gettempdir()
    image_paths = []
    x_labels = ["const", "rmrf", "smb", "hml", "umd"]

    # 1️ genera e salva ogni plot singolarmente
    for pf, res in results.items():
        fig = plot_partregress_grid(res)

        # etichette assi
        for ax, lab in zip(fig.axes, x_labels):
            ax.set_xlabel(f"{lab} (residuals)")
            ax.set_ylabel("XS (residuals)")

        # regola spazio interno e posizione titolo
        fig.subplots_adjust(
            left=0.06, right=0.995, bottom=0.07, top=subplots_top,
            wspace=0.35, hspace=0.60
        )
        fig.suptitle(f"{pf} — Partial Regression Plots", fontsize=12, y=title_y)

        # salva immagine singola
        path = os.path.join(tmpdir, f"partial_{pf}.png")
        fig.savefig(path, dpi=single_dpi, bbox_inches="tight", pad_inches=0.02)
        plt.close(fig)
        image_paths.append(path)

    # 2️ crea la griglia finale 2×5
    n_rows, n_cols = 2, 5
    fig, axes = plt.subplots(
        n_rows, n_cols, figsize=(20, 8),
        constrained_layout=False,
        gridspec_kw={'hspace': hspace, 'wspace': wspace}
    )
    axes = axes.flatten()

    for i, ax in enumerate(axes):
        if i < len(image_paths):
            img = mpimg.imread(image_paths[i])
            ax.imshow(img, aspect='auto')
            ax.axis("off")
        else:
            ax.set_visible(False)

    # applichiamo le regolazioni globali
    fig.subplots_adjust(hspace=hspace, wspace=wspace)
    fig.savefig(save_path, dpi=final_dpi, bbox_inches="tight")
    plt.close(fig)

    # 3️ pulizia file temporanei
    for p in image_paths:
        try:
            os.remove(p)
        except OSError:
            pass

    print(f"Saved 2x5 grid of partial regression plots to {save_path}")

In [None]:
save_partial_plots_grid(results_w, save_path=f"{out_path}partials_regression_plots.png", title_y=0.96, subplots_top=0.91, hspace=0.06, wspace=0.07)

In [None]:
r2 = pd.Series({k: v.rsquared_adj for k, v in results.items()})
r2 = r2.reindex(sorted(r2.index, key=lambda x: int(x[2:])))

fig, ax = plt.subplots(figsize=(8, 5))
bars = r2.plot(kind='bar', ax=ax, color='skyblue')

ax.set_ylabel('Adjusted R²')
ax.set_title('Adjusted R² by portfolio (XS1–XS10)')

# aggiungi i valori sopra ogni barra
for i, (idx, val) in enumerate(r2.items()):
    ax.text(i, val + 0.002, f"{val:.3f}", ha='center', va='bottom', fontsize=9)

plt.tight_layout()
plt.savefig(f"{out_path}r-squared_by_pf.png", dpi=300, bbox_inches="tight")
plt.show()