# Inference — Fase 3
En este notebook se implementan los procedimientos de inferencia estadística planificados en la Fase 2:
- Intervalos de confianza para media y volatilidad
- Tests t (una muestra, dos muestras, Welch)
- Pruebas de varianzas (Levene / F)
- Alternativas no paramétricas
- Bootstrap
- Correcciones por comparaciones múltiples
- Regresión CAPM con diagnóstico y errores robustos
- Análisis de potencia


In [None]:
# CELDA 1

# imports mejorados y comprobaciones robustas
import os
import warnings
import logging
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# En Jupyter, si quieres gráficos inline, descomenta la siguiente línea:
# %matplotlib inline

import seaborn as sns
import scipy.stats as stats

import statsmodels.api as sm
import statsmodels.formula.api as smf
from statsmodels.stats.weightstats import ttest_ind
from statsmodels.stats.multitest import multipletests
from statsmodels.stats.power import TTestIndPower, TTestPower

# Tests/diagnósticos adicionales que usarás:
from statsmodels.stats.diagnostic import het_breuschpagan, acorr_ljungbox, acorr_breusch_godfrey
from statsmodels.stats.stattools import durbin_watson, jarque_bera
from statsmodels.stats.diagnostic import lilliefors
from statsmodels.stats.api import CompareMeans, DescrStatsW

# Sci-kit learn para estandarización, PCA y clustering
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score

# Configuración reproducible y estética
np.random.seed(42)
sns.set(style='whitegrid')
warnings.filterwarnings("ignore")  # opcional: silenciar warnings durante el desarrollo
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

# Rutas (más robustas usando pathlib)
NOTEBOOK_DIR = Path.cwd()
PROJECT_ROOT = NOTEBOOK_DIR.parent
PROCESSED_DIR = PROJECT_ROOT / "data" / "processed"

PANEL_FILE = PROCESSED_DIR / "tech30_panel_monthly_2018_2024.csv"
AGG_FILE   = PROCESSED_DIR / "tech30_aggregated_stats_2018_2024.csv"

# Mensaje claro si faltan archivos (incluye cwd para debugging)
missing = [str(p) for p in (PANEL_FILE, AGG_FILE) if not p.exists()]
if missing:
    raise FileNotFoundError(
        "No se encontraron los siguientes archivos:\n  - " + "\n  - ".join(missing)
        + f"\n\nWorking dir: {NOTEBOOK_DIR}\nComprueba que la estructura 'data/processed' está en {PROJECT_ROOT}."
    )

# Carga de datos
panel_df = pd.read_csv(PANEL_FILE, parse_dates=['Date'])
agg_df = pd.read_csv(AGG_FILE)

logging.info(f"Cargados: panel_df {panel_df.shape}, agg_df {agg_df.shape}")


INFO: Cargados: panel_df (2403, 6), agg_df (30, 7)


Antes de aplicar pruebas formales:
- verificamos distribución de retornos por empresa (normalidad)
- revisamos tamaño muestral T (~n meses por empresa)
- revisamos si usar pruebas paramétricas o no paramétricas


In [None]:
# CELDA 2

# ------------------------------
# UTIL: intervalo t para la media
# ------------------------------
def ci_mean_t(x, alpha=0.05):
    """Retorna: mean, se, df, (ci_low, ci_high). Requiere n>=2."""
    x = np.asarray(x.dropna()) if hasattr(x, "dropna") else np.asarray(x)
    n = len(x)
    if n < 2:
        raise ValueError("ci_mean_t: se requieren al menos 2 observaciones")
    mean = np.mean(x)
    s = np.std(x, ddof=1)
    se = s / np.sqrt(n)
    df = n - 1
    tval = stats.t.ppf(1 - alpha/2, df)
    ci_low = mean - tval * se
    ci_high = mean + tval * se
    return mean, se, df, (ci_low, ci_high)

# ------------------------------
# UTIL: bootstrap CI para la media
# ------------------------------
def bootstrap_ci_mean(x, n_boot=5000, alpha=0.05, random_state=None, return_boots=False):
    """Bootstrap percentile CI para la media.
    Devuelve: (mean, (lower, upper), boots?)"""
    rng = np.random.default_rng(random_state)
    x = np.asarray(x.dropna()) if hasattr(x, "dropna") else np.asarray(x)
    n = len(x)
    if n < 1:
        raise ValueError("bootstrap_ci_mean: serie vacía")
    boots = np.empty(n_boot)
    for i in range(n_boot):
        sample = rng.choice(x, size=n, replace=True)
        boots[i] = sample.mean()
    lower = np.percentile(boots, 100*(alpha/2))
    upper = np.percentile(boots, 100*(1-alpha/2))
    if return_boots:
        return np.mean(x), (lower, upper), boots
    return np.mean(x), (lower, upper)

# ------------------------------
# UTIL: one-sample t-test (H0: mean = mu0) con compatibilidad para alternativas
# ------------------------------
def one_sample_ttest(x, mu0=0.0, alternative='two-sided'):
    """Devuelve (statistic, pvalue). alternative in {'two-sided','larger','smaller'}.
       Compatibilidad con versiones antiguas de scipy."""
    x = np.asarray(x.dropna()) if hasattr(x, "dropna") else np.asarray(x)
    if len(x) < 2:
        raise ValueError("one_sample_ttest: se requieren al menos 2 observaciones")
    res = stats.ttest_1samp(x, popmean=mu0)
    tstat, p_two = res.statistic, res.pvalue
    if alternative == 'two-sided':
        return tstat, p_two
    # one-sided
    # ttest_1samp returns two-sided p; adjust depending on sign
    if alternative == 'larger':  # H1: mean > mu0
        if tstat > 0:
            p = p_two / 2
        else:
            p = 1 - p_two/2
    elif alternative == 'smaller':  # H1: mean < mu0
        if tstat < 0:
            p = p_two / 2
        else:
            p = 1 - p_two/2
    else:
        raise ValueError("alternative debe ser 'two-sided', 'larger' o 'smaller'")
    return tstat, p

# ------------------------------
# UTIL: welch two-sample t-test con alternativa
# ------------------------------
def welch_ttest(x1, x2, alternative='two-sided'):
    """Devuelve (statistic, pvalue)."""
    x1 = np.asarray(x1.dropna()) if hasattr(x1, "dropna") else np.asarray(x1)
    x2 = np.asarray(x2.dropna()) if hasattr(x2, "dropna") else np.asarray(x2)
    if len(x1) < 1 or len(x2) < 1:
        raise ValueError("welch_ttest: ambas muestras deben tener al menos 1 observación")
    res = stats.ttest_ind(x1, x2, equal_var=False)
    tstat, p_two = res.statistic, res.pvalue
    if alternative == 'two-sided':
        return tstat, p_two
    if alternative == 'larger':
        if tstat > 0:
            p = p_two / 2
        else:
            p = 1 - p_two/2
    elif alternative == 'smaller':
        if tstat < 0:
            p = p_two / 2
        else:
            p = 1 - p_two/2
    else:
        raise ValueError("alternative debe ser 'two-sided', 'larger' o 'smaller'")
    return tstat, p

# ------------------------------
# UTIL: levene test for equal variances
# ------------------------------
def levene_test(x1, x2, center='median'):
    x1 = np.asarray(x1.dropna()) if hasattr(x1, "dropna") else np.asarray(x1)
    x2 = np.asarray(x2.dropna()) if hasattr(x2, "dropna") else np.asarray(x2)
    if len(x1) < 2 or len(x2) < 2:
        raise ValueError("levenes_test: ambas muestras deben tener al menos 2 observaciones")
    return stats.levene(x1, x2, center=center)

# ------------------------------
# UTIL: mann-whitney U test
# ------------------------------
def mannwhitney_test(x1, x2):
    x1 = np.asarray(x1.dropna()) if hasattr(x1, "dropna") else np.asarray(x1)
    x2 = np.asarray(x2.dropna()) if hasattr(x2, "dropna") else np.asarray(x2)
    if len(x1) < 1 or len(x2) < 1:
        raise ValueError("mannwhitney_test: muestras vacías")
    return stats.mannwhitneyu(x1, x2, alternative='two-sided')

# ------------------------------
# UTIL: permutation test for difference of means (two-sided)
# ------------------------------
def permutation_test_diff_means(x1, x2, n_perm=5000, random_state=None, return_null_dist=False):
    """Permutation test exacto / aproximado para diff de medias.
       Devuelve: obs_diff, p_value [, null_dist]"""
    rng = np.random.default_rng(random_state)
    x1 = np.asarray(x1.dropna()) if hasattr(x1, "dropna") else np.asarray(x1)
    x2 = np.asarray(x2.dropna()) if hasattr(x2, "dropna") else np.asarray(x2)
    n1 = len(x1); n2 = len(x2)
    if n1 < 1 or n2 < 1:
        raise ValueError("permutation_test_diff_means: muestras vacías")
    obs_diff = np.mean(x1) - np.mean(x2)
    pooled = np.concatenate([x1, x2])
    perm_diffs = np.empty(n_perm)
    for i in range(n_perm):
        perm = rng.permutation(pooled)
        perm_diffs[i] = perm[:n1].mean() - perm[n1:].mean()
    p_value = np.mean(np.abs(perm_diffs) >= np.abs(obs_diff))
    if return_null_dist:
        return obs_diff, p_value, perm_diffs
    return obs_diff, p_value

# ------------------------------
# UTIL: apply corrections (FDR/Bonferroni)
# ------------------------------
def apply_multiple_corrections(pvals, alpha=0.05, method='fdr_bh'):
    """
    Aplica multipletests de statsmodels.
    Devuelve: reject_array, pvals_corrected
    method: 'bonferroni', 'fdr_bh', etc.
    """
    pvals = np.asarray(pvals)
    reject, pvals_corrected, _, _ = multipletests(pvals, alpha=alpha, method=method)
    return reject, pvals_corrected

# ------------------------------
# UTIL adicional: CI para correlación de Pearson (Fisher z)
# ------------------------------
def pearson_r_ci(r, n, alpha=0.05):
    """CI para una correlación r (transformación Fisher). Devuelve (r, (low,high))."""
    if n <= 3:
        raise ValueError("pearson_r_ci: se requieren n>3")
    z = np.arctanh(r)  # fisher z
    se = 1 / np.sqrt(n - 3)
    z_crit = stats.norm.ppf(1 - alpha/2)
    lo, hi = z - z_crit * se, z + z_crit * se
    return r, (np.tanh(lo), np.tanh(hi))


Procedimiento:
- Para cada empresa: Shapiro-Wilk (o Lilliefors) sobre retornos (si n pequeño usar Shapiro)
- Si la mayoría viola normalidad (p < 0.05), preferir pruebas no paramétricas o bootstrap
- Guardaremos un resumen con n, p_shapiro, decisión


In [None]:
# CELDA 3

def normality_summary(panel_df, alpha=0.05, correction_method='fdr_bh'):
    """
    Evalúa normalidad de 'Return' por empresa en panel_df.
    - Usa Shapiro-Wilk cuando es posible; si falla, usa Lilliefors.
    - Aplica corrección por comparaciones múltiples a los p-values (multipletests).
    
    Devuelve DataFrame con columnas:
    ['Company','n','test','statistic','p_value','p_value_fdr','normal_reject_fdr']
    """
    rows = []
    companies = sorted(panel_df['Company'].unique())
    for c in companies:
        r = panel_df.loc[panel_df['Company'] == c, 'Return'].dropna()
        n = len(r)
        if n < 3:
            stat = np.nan
            pval = np.nan
            test_name = None
        else:
            try:
                stat, pval = stats.shapiro(r)   # Shapiro-Wilk
                test_name = 'Shapiro'
            except Exception:
                # fallback to Lilliefors (Kolmogorov-Smirnov adaptado)
                stat, pval = lilliefors(r)
                test_name = 'Lilliefors'
        rows.append({
            'Company': c,
            'n': n,
            'test': test_name,
            'statistic': stat,
            'p_value': pval
        })
    df = pd.DataFrame(rows)

    # Corrección por múltiples tests (solo sobre p-values no nulos)
    mask = df['p_value'].notna()
    if mask.any():
        reject, pvals_corr, _, _ = multipletests(df.loc[mask, 'p_value'].values,
                                                alpha=alpha, method=correction_method)
        df.loc[mask, 'p_value_fdr'] = pvals_corr
        df.loc[mask, 'normal_reject_fdr'] = reject.astype(bool)
    else:
        df['p_value_fdr'] = np.nan
        df['normal_reject_fdr'] = False

    # Orden por p_value_fdr para inspección rápida (NaNs al final)
    df = df.sort_values(by=['p_value_fdr', 'p_value'], na_position='last').reset_index(drop=True)
    return df

# Uso:
norm_summary = normality_summary(panel_df, alpha=0.05, correction_method='fdr_bh')
norm_summary.head(10)


Unnamed: 0,Company,n,test,statistic,p_value,p_value_fdr,normal_reject_fdr
0,Netflix,83,Shapiro,0.845255,7.423516e-08,2e-06,True
1,SAP,83,Shapiro,0.935816,0.0004464753,0.006697,True
2,Intel,83,Shapiro,0.943477,0.001170462,0.010342,True
3,Palantir,51,Shapiro,0.914936,0.001378993,0.010342,True
4,Meta Platforms,83,Shapiro,0.949261,0.002506951,0.015042,True
5,Broadcom,83,Shapiro,0.963013,0.01734127,0.086706,False
6,Snowflake,51,Shapiro,0.945631,0.02078493,0.089078,False
7,Accenture,83,Shapiro,0.966265,0.02806939,0.10526,False
8,Infosys,83,Shapiro,0.969379,0.04481786,0.149393,False
9,Nvidia,83,Shapiro,0.970199,0.0507345,0.152204,False


Calculamos:
- IC t (clásico) para la media de retornos por empresa
- IC bootstrap (percentil) para robustez
Guardamos una tabla con mean, se, df, ci_t_low, ci_t_high, ci_boot_low, ci_boot_high


In [None]:
# CELDA 4

results = []
pvals_t = []
pvals_boot = []
companies = sorted(panel_df['Company'].unique())

for c in companies:
    r = panel_df.loc[panel_df['Company'] == c, 'Return'].dropna().values
    n = len(r)
    if n < 2:
        # No hay muestra suficiente para t-test
        continue

    # Valores por defecto
    mean = np.nan; se = np.nan; df = np.nan
    ci_t_low = np.nan; ci_t_high = np.nan
    boot_mean = np.nan; ci_boot_low = np.nan; ci_boot_high = np.nan
    t_stat = np.nan; p_t = np.nan
    p_boot = np.nan
    boot_bias = np.nan
    includes_zero_t = np.nan
    includes_zero_boot = np.nan

    # 1) Intervalo t (safe: ci_mean_t valida n)
    try:
        mean, se, df, (ci_t_low, ci_t_high) = ci_mean_t(r, alpha=0.05)
    except Exception as e:
        mean = np.mean(r)
        se = np.std(r, ddof=1) / np.sqrt(max(1, n))
        df = n - 1
        ci_t_low, ci_t_high = (np.nan, np.nan)

    # 2) Bootstrap percentile CI (reproducible seed)
    try:
        # return boots for p-value calculation
        mean_b, (ci_boot_low, ci_boot_high), boots = bootstrap_ci_mean(r, n_boot=3000, random_state=42, return_boots=True)
        boot_mean = mean_b
        boot_bias = boot_mean - mean
        # two-sided bootstrap p-value: proportion of bootstrap means on opposite side of zero
        prop_le = np.mean(boots <= 0)
        prop_ge = np.mean(boots >= 0)
        p_boot = 2 * min(prop_le, prop_ge)  # two-sided
        p_boot = min(1.0, p_boot)
    except Exception:
        boots = None

    # 3) One-sample t-test (H0: mean = 0)
    try:
        t_stat, p_t = one_sample_ttest(r, mu0=0.0, alternative='two-sided')
    except Exception:
        # fallback to scipy direct
        tt_res = stats.ttest_1samp(r, popmean=0.0)
        t_stat, p_t = tt_res.statistic, tt_res.pvalue

    # 4) Flags: CI includes zero?
    includes_zero_t = (ci_t_low <= 0 <= ci_t_high) if (not np.isnan(ci_t_low) and not np.isnan(ci_t_high)) else np.nan
    includes_zero_boot = (ci_boot_low <= 0 <= ci_boot_high) if (not np.isnan(ci_boot_low) and not np.isnan(ci_boot_high)) else np.nan

    results.append({
        'Company': c,
        'n': n,
        'MeanReturn': mean,
        'SE': se,
        'df': df,
        'CI_t_low': ci_t_low,
        'CI_t_high': ci_t_high,
        'CI_boot_low': ci_boot_low,
        'CI_boot_high': ci_boot_high,
        'BootMean': boot_mean,
        'BootBias': boot_bias,
        't_stat': t_stat,
        'p_t': p_t,
        'p_boot': p_boot,
        'IncludesZero_t': includes_zero_t,
        'IncludesZero_boot': includes_zero_boot
    })

# DataFrame
ci_df = pd.DataFrame(results).sort_values('MeanReturn', ascending=False).reset_index(drop=True)

# Aplicar corrección por múltiples tests (FDR) a p_t y p_boot por separado
mask_t = ci_df['p_t'].notna()
if mask_t.any():
    rej_t, pvals_t_corr, _, _ = multipletests(ci_df.loc[mask_t, 'p_t'].values, alpha=0.05, method='fdr_bh')
    ci_df.loc[mask_t, 'p_t_fdr'] = pvals_t_corr
    ci_df.loc[mask_t, 'signif_t_fdr'] = rej_t.astype(bool)
else:
    ci_df['p_t_fdr'] = np.nan
    ci_df['signif_t_fdr'] = False

mask_b = ci_df['p_boot'].notna()
if mask_b.any():
    rej_b, pvals_b_corr, _, _ = multipletests(ci_df.loc[mask_b, 'p_boot'].values, alpha=0.05, method='fdr_bh')
    ci_df.loc[mask_b, 'p_boot_fdr'] = pvals_b_corr
    ci_df.loc[mask_b, 'signif_boot_fdr'] = rej_b.astype(bool)
else:
    ci_df['p_boot_fdr'] = np.nan
    ci_df['signif_boot_fdr'] = False

# Guardar resultados
out_path = os.path.join(PROCESSED_DIR, 'inference_mean_CI_by_company_enhanced.csv')
ci_df.to_csv(out_path, index=False)

# Mostrar resumen
ci_df.head()


Unnamed: 0,Company,n,MeanReturn,SE,df,CI_t_low,CI_t_high,CI_boot_low,CI_boot_high,BootMean,BootBias,t_stat,p_t,p_boot,IncludesZero_t,IncludesZero_boot,p_t_fdr,signif_t_fdr,p_boot_fdr,signif_boot_fdr
0,Palantir,51,0.041075,0.034737,50,-0.028696,0.110847,-0.024158,0.114878,0.041075,0.0,1.18246,0.242616,0.206,True,True,0.401937,False,0.38875,False
1,Nvidia,83,0.037576,0.015421,82,0.006899,0.068253,0.006371,0.067632,0.037576,0.0,2.436684,0.016987,0.018,False,False,0.104925,False,0.103333,False
2,Tesla,83,0.034602,0.02075,82,-0.006678,0.075881,-0.003005,0.07565,0.034602,0.0,1.667516,0.099227,0.072667,True,True,0.29768,False,0.242222,False
3,Broadcom,83,0.029832,0.010139,82,0.009662,0.050002,0.009998,0.05004,0.029832,0.0,2.942288,0.004235,0.003333,False,False,0.068911,False,0.04,True
4,Fortinet,83,0.028135,0.011997,82,0.004268,0.052001,0.003607,0.051443,0.028135,0.0,2.345041,0.021442,0.020667,False,False,0.107211,False,0.103333,False


Muchos trabajos en finanzas prueban si el retorno medio es significativamente distinto de cero.
Realizamos la prueba t de una muestra para cada empresa y guardamos p-values y estadístico.
Aplicaremos correcciones por comparaciones múltiples (Bonferroni y FDR).


In [None]:
# CELDA 5


# -------------------------------------------------------
# RESUMEN CLÁSICO: One-sample t-test por empresa (μ = 0)
# Complementario a la inferencia completa de ci_df
# -------------------------------------------------------

tt_results = []

for c in sorted(panel_df['Company'].unique()):
    r = panel_df.loc[panel_df['Company'] == c, 'Return'].dropna().values
    n = len(r)
    if n < 2:
        continue

    mean = r.mean()
    sd = r.std(ddof=1)

    # t-test robusto a versiones
    try:
        tstat, pval = one_sample_ttest(r, mu0=0.0, alternative='two-sided')
    except Exception:
        res = stats.ttest_1samp(r, popmean=0.0)
        tstat, pval = res.statistic, res.pvalue

    # Tamaño del efecto (Cohen's d)
    cohens_d = mean / sd if sd > 0 else np.nan

    tt_results.append({
        'Company': c,
        'n': n,
        'mean': mean,
        'sd': sd,
        'tstat': tstat,
        'p_t': pval,
        'cohens_d': cohens_d
    })

tt_df = (
    pd.DataFrame(tt_results)
      .sort_values('p_t')
      .reset_index(drop=True)
)

# -------------------------------------------------------
# Correcciones por comparaciones múltiples
# -------------------------------------------------------

mask = tt_df['p_t'].notna()

if mask.any():
    rej_bonf, p_bonf, _, _ = multipletests(tt_df.loc[mask, 'p_t'], alpha=0.05, method='bonferroni')
    rej_fdr,  p_fdr,  _, _ = multipletests(tt_df.loc[mask, 'p_t'], alpha=0.05, method='fdr_bh')

    tt_df.loc[mask, 'p_bonf'] = p_bonf
    tt_df.loc[mask, 'reject_bonf'] = rej_bonf.astype(bool)
    tt_df.loc[mask, 'p_fdr'] = p_fdr
    tt_df.loc[mask, 'reject_fdr'] = rej_fdr.astype(bool)
else:
    tt_df['p_bonf'] = np.nan
    tt_df['reject_bonf'] = False
    tt_df['p_fdr'] = np.nan
    tt_df['reject_fdr'] = False

# Guardar
out_path = os.path.join(PROCESSED_DIR, 'one_sample_ttest_results_classical.csv')
tt_df.to_csv(out_path, index=False)

tt_df.head(10)


Unnamed: 0,Company,n,mean,sd,tstat,p_t,cohens_d,p_bonf,reject_bonf,p_fdr,reject_fdr
0,Broadcom,83,0.029832,0.092371,2.942288,0.004235,0.322958,0.127044,False,0.068911,False
1,Microsoft,83,0.018964,0.059285,2.914284,0.004594,0.319884,0.137822,False,0.068911,False
2,ServiceNow,83,0.023737,0.078987,2.737873,0.007582,0.300521,0.227466,False,0.075822,False
3,Nvidia,83,0.037576,0.14049,2.436684,0.016987,0.267461,0.509623,False,0.104925,False
4,Apple,83,0.022371,0.08403,2.425393,0.017488,0.266222,0.524625,False,0.104925,False
5,Fortinet,83,0.028135,0.109302,2.345041,0.021442,0.257402,0.643266,False,0.107211,False
6,Taiwan Semiconductor,83,0.020067,0.093745,1.950126,0.054578,0.214054,1.0,False,0.233907,False
7,Oracle,83,0.015425,0.07802,1.801174,0.075352,0.197704,1.0,False,0.262313,False
8,Alphabet,83,0.01419,0.072604,1.780526,0.078694,0.195438,1.0,False,0.262313,False
9,Tesla,83,0.034602,0.189046,1.667516,0.099227,0.183034,1.0,False,0.29768,False


- Si p < alpha (ajustado), rechazamos H0: μ = 0 y decimos que el retorno medio es significativamente distinto de 0.
- Reportar siempre: mean, t-stat, p, p ajustada, IC.
- En la discusión: comentar tamaño del efecto (mean) y su relevancia económica, no solo p-value.


Definimos grupos basados en Beta (del dataset agregado).  
Haremos: Levene (varianzas), Welch t-test (medias), Mann-Whitney (no paramétrica) y permutation test (robusto).


In [None]:
# CELDA 6


# Celda mejorada: comparar empresas por Beta (low vs high) con tests y efectos
import numpy as np
import pandas as pd
from statsmodels.stats.multitest import multipletests
from statsmodels.stats.api import DescrStatsW, CompareMeans
import scipy.stats as stats

def hedges_g(x, y):
    """Hedges' g (corrección de Cohen's d por sesgo small-sample)."""
    x = np.asarray(x)
    y = np.asarray(y)
    nx, ny = len(x), len(y)
    if nx < 2 or ny < 2:
        return np.nan
    sx2 = x.var(ddof=1)
    sy2 = y.var(ddof=1)
    pooled_sd = np.sqrt(((nx-1)*sx2 + (ny-1)*sy2) / (nx+ny-2))
    if pooled_sd == 0:
        return 0.0
    d = (x.mean() - y.mean()) / pooled_sd
    # correction factor J
    J = 1 - (3 / (4*(nx+ny) - 9))
    return d * J

def bootstrap_diff_mean_ci(x, y, n_boot=5000, alpha=0.05, random_state=42):
    rng = np.random.default_rng(random_state)
    x = np.asarray(x)
    y = np.asarray(y)
    nx, ny = len(x), len(y)
    boots = np.empty(n_boot)
    pooled_n = nx
    for i in range(n_boot):
        sx = rng.choice(x, size=nx, replace=True)
        sy = rng.choice(y, size=ny, replace=True)
        boots[i] = sx.mean() - sy.mean()
    lo = np.percentile(boots, 100*(alpha/2))
    hi = np.percentile(boots, 100*(1-alpha/2))
    p_two = np.mean(np.abs(boots) >= np.abs(np.mean(x)-np.mean(y)))
    return (lo, hi), p_two, boots

# ------------- robust mapping of beta column (tolerante a mayúsculas/minúsculas)
beta_col = None
for col in ['beta', 'Beta']:
    if col in agg_df.columns:
        beta_col = col
        break
if beta_col is None:
    raise KeyError("No se encontró la columna 'beta' o 'Beta' en agg_df. Revisa nombres de columnas.")

# Asegurar numérico y dropna
agg_df[beta_col] = pd.to_numeric(agg_df[beta_col], errors='coerce')

# Merge beta onto panel (como hacías)
beta_map = agg_df.set_index('Company')[beta_col].to_dict()
panel_df['Beta'] = panel_df['Company'].map(beta_map)

# Crear grupo (low <=1, high >1) — puedes cambiar el umbral por cuantiles si prefieres
panel_df['Beta_group'] = panel_df['Beta'].apply(lambda x: 'low' if pd.notna(x) and x <= 1 else ('high' if pd.notna(x) else np.nan))

# Usar el MeanReturn a nivel empresa desde agg_df (recomendado) en lugar de recomputar
if 'MeanReturn' not in agg_df.columns:
    raise KeyError("agg_df no tiene columna 'MeanReturn'. Asegúrate de que exista o calcula la media por empresa primero.")

groupA = agg_df.loc[agg_df[beta_col] <= 1, 'MeanReturn'].dropna()
groupB = agg_df.loc[agg_df[beta_col] > 1, 'MeanReturn'].dropna()

res = {}
res['n_groupA'] = len(groupA)
res['n_groupB'] = len(groupB)

# Estadísticos descriptivos
def summary_stats(arr):
    return {
        'n': len(arr),
        'mean': float(np.mean(arr)) if len(arr)>0 else np.nan,
        'sd': float(np.std(arr, ddof=1)) if len(arr)>1 else np.nan,
        'median': float(np.median(arr)) if len(arr)>0 else np.nan
    }

res['desc_groupA'] = summary_stats(groupA.values)
res['desc_groupB'] = summary_stats(groupB.values)

# 1) Levene (varianzas)
if len(groupA) >= 2 and len(groupB) >= 2:
    levene_stat, levene_p = stats.levene(groupA.values, groupB.values)
else:
    levene_stat, levene_p = np.nan, np.nan
res['levene_stat'] = levene_stat
res['levene_p'] = levene_p

# 2) Normalidad a nivel "empresa" (Shapiro)
# Uso como diagnóstico: si n_groups>3 aplicar Shapiro; si no, omitir
res['shapiro_groupA'] = (np.nan, np.nan)
res['shapiro_groupB'] = (np.nan, np.nan)
if len(groupA) >= 3:
    try:
        res['shapiro_groupA'] = stats.shapiro(groupA.values)
    except Exception:
        res['shapiro_groupA'] = lilliefors(groupA.values)
if len(groupB) >= 3:
    try:
        res['shapiro_groupB'] = stats.shapiro(groupB.values)
    except Exception:
        res['shapiro_groupB'] = lilliefors(groupB.values)

# 3) Welch t-test (dif de medias) + CI via CompareMeans
if len(groupA) >= 2 and len(groupB) >= 2:
    t_res = stats.ttest_ind(groupA.values, groupB.values, equal_var=False)
    res['welch_t_stat'] = float(t_res.statistic)
    res['welch_p'] = float(t_res.pvalue)

    # CI de la diferencia (Welch) usando CompareMeans
    try:
        dsA = DescrStatsW(groupA.values)
        dsB = DescrStatsW(groupB.values)
        cm = CompareMeans(dsA, dsB)
        ci_welch = cm.tconfint_diff(usevar='unequal')  # devuelve (low, high)
        res['welch_CI'] = (float(ci_welch[0]), float(ci_welch[1]))
    except Exception:
        res['welch_CI'] = (np.nan, np.nan)
else:
    res['welch_t_stat'] = np.nan
    res['welch_p'] = np.nan
    res['welch_CI'] = (np.nan, np.nan)

# 4) Mann-Whitney (rank) — no asume normalidad
if len(groupA) >= 1 and len(groupB) >= 1:
    try:
        mw_stat, mw_p = stats.mannwhitneyu(groupA.values, groupB.values, alternative='two-sided')
        res['mw_stat'] = float(mw_stat)
        res['mw_p'] = float(mw_p)
    except Exception:
        res['mw_stat'], res['mw_p'] = np.nan, np.nan
else:
    res['mw_stat'], res['mw_p'] = np.nan, np.nan

# 5) Permutation test (ya definiste permutation_test_diff_means)
if len(groupA) >= 1 and len(groupB) >= 1:
    try:
        diff_obs, pperm, perm_dist = permutation_test_diff_means(groupA.values, groupB.values, n_perm=5000, random_state=42)
        res['perm_diff_obs'] = float(diff_obs)
        res['perm_p'] = float(pperm)
    except Exception:
        res['perm_diff_obs'], res['perm_p'] = np.nan, np.nan
else:
    res['perm_diff_obs'], res['perm_p'] = np.nan, np.nan

# 6) Bootstrap CI para la diferencia de medias
if len(groupA) >= 1 and len(groupB) >= 1:
    try:
        (boot_lo, boot_hi), p_boot_diff, boots_diff = bootstrap_diff_mean_ci(groupA.values, groupB.values, n_boot=5000, random_state=42)
        res['boot_diff_CI'] = (float(boot_lo), float(boot_hi))
        res['boot_diff_p'] = float(p_boot_diff)
    except Exception:
        res['boot_diff_CI'] = (np.nan, np.nan)
        res['boot_diff_p'] = np.nan
else:
    res['boot_diff_CI'] = (np.nan, np.nan)
    res['boot_diff_p'] = np.nan

# 7) Tamaño del efecto (Hedges' g)
res['hedges_g'] = float(hedges_g(groupA.values, groupB.values)) if (len(groupA)>=2 and len(groupB)>=2) else np.nan

# 8) Guardar resultados en un DataFrame y CSV resumido
out_df = pd.DataFrame([{
    'n_groupA': res['n_groupA'],
    'n_groupB': res['n_groupB'],
    'mean_A': res['desc_groupA']['mean'],
    'sd_A': res['desc_groupA']['sd'],
    'mean_B': res['desc_groupB']['mean'],
    'sd_B': res['desc_groupB']['sd'],
    'levene_p': res['levene_p'],
    'welch_t': res['welch_t_stat'],
    'welch_p': res['welch_p'],
    'welch_CI_low': res['welch_CI'][0],
    'welch_CI_high': res['welch_CI'][1],
    'mw_p': res['mw_p'],
    'perm_p': res['perm_p'],
    'boot_diff_CI_low': res['boot_diff_CI'][0],
    'boot_diff_CI_high': res['boot_diff_CI'][1],
    'boot_diff_p': res['boot_diff_p'],
    'hedges_g': res['hedges_g']
}])

out_path = os.path.join(PROCESSED_DIR, 'group_comparison_beta_low_high_summary.csv')
out_df.to_csv(out_path, index=False)

# Mostrar resultados en pantalla (limpio)
out_df.T


Unnamed: 0,0
n_groupA,16.0
n_groupB,14.0
mean_A,0.01094
sd_A,0.01131
mean_B,0.020454
sd_B,0.010674
levene_p,0.917626
welch_t,-2.368676
welch_p,0.025036
welch_CI_low,-0.017744


Si queremos comparar volatilidades entre dos conjuntos (ej. consolidadas vs growth):
- usar Levene (robusto frente a no-normalidad)
- reportar estadístico y p-value


In [None]:
# CELDA 7

# =========================
# COMPARACIÓN DE VOLATILIDAD POR BETA
# =========================

import numpy as np
from scipy import stats

# Asegurar variables numéricas
agg_df['Beta'] = pd.to_numeric(agg_df['Beta'], errors='coerce')
agg_df['Volatility'] = pd.to_numeric(agg_df['Volatility'], errors='coerce')

# Definición de grupos por Beta
volA = agg_df.loc[agg_df['Beta'] <= 1, 'Volatility'].dropna()
volB = agg_df.loc[agg_df['Beta'] > 1, 'Volatility'].dropna()

print(f"n volA (Beta ≤ 1): {len(volA)}")
print(f"n volB (Beta > 1): {len(volB)}")

# Test de Levene (robusto a no-normalidad)
if len(volA) >= 2 and len(volB) >= 2:
    lev_stat_vol, lev_p_vol = stats.levene(
        volA.values,
        volB.values,
        center='median'
    )
else:
    lev_stat_vol, lev_p_vol = np.nan, np.nan

print("Levene statistic:", lev_stat_vol)
print("Levene p-value:", lev_p_vol)


n volA (Beta ≤ 1): 16
n volB (Beta > 1): 14
Levene statistic: 4.369508336217758
Levene p-value: 0.04578807097830151


Implementamos bootstrap para estimar la distribución empírica de la diferencia de medias entre grupos.


In [None]:
# CELDA 7

import numpy as np
import warnings
from scipy import stats

def _jackknife_theta(x1, x2, func=np.mean):
    """Devuelve arreglo de estimadores jackknife (omit-1) para theta(func(x1)-func(x2))."""
    x1 = np.asarray(x1)
    x2 = np.asarray(x2)
    n1, n2 = len(x1), len(x2)
    thetas = []
    # jackknife over combined? For BCa acceleration for difference of means, compute separately and combine
    for i in range(n1):
        th1 = func(np.delete(x1, i))
        th2 = func(x2) if n2>0 else 0.0
        thetas.append(th1 - th2)
    for j in range(n2):
        th1 = func(x1) if n1>0 else 0.0
        th2 = func(np.delete(x2, j))
        thetas.append(th1 - th2)
    return np.asarray(thetas)

def bootstrap_diff_means(x1, x2, n_boot=5000, alpha=0.05, random_state=None,
                         method='percentile', return_dist=False, return_all=False):
    """
    Bootstrap para diferencia de medias (x1.mean() - x2.mean()).
    
    Parámetros:
    - x1, x2: array-like o pandas.Series. Se hace dropna() internamente.
    - n_boot: número de réplicas bootstrap.
    - alpha: nivel (0.05 -> IC 95%).
    - random_state: semilla para reproducibilidad.
    - method: 'percentile' (por defecto) o 'bca' (BCa bootstrap).
    - return_dist: si True devuelve también el array de bootstrap.
    - return_all: si True devuelve (boots, ci_percentile, ci_bca_or_none, p_val, obs_diff)
                  si False (por defecto) devuelve (ci, p_val) donde ci depende de `method`.
    
    Retorna:
    - si return_all: (boots, ci_percentile, ci_bca_or_none, p_val, obs_diff)
    - elif return_dist: (boots, ci, p_val)
    - else: (ci, p_val)
    
    Notas:
    - BCa requiere cálculo jackknife; con muestras muy pequeñas puede ser inestable.
    """
    rng = np.random.default_rng(random_state)
    x1 = np.asarray(x1.dropna()) if hasattr(x1, "dropna") else np.asarray(x1)
    x2 = np.asarray(x2.dropna()) if hasattr(x2, "dropna") else np.asarray(x2)
    n1, n2 = len(x1), len(x2)
    if n1 < 1 or n2 < 1:
        raise ValueError("bootstrap_diff_means: ambas muestras deben tener al menos 1 observación")

    # Observed difference
    obs_diff = float(np.mean(x1) - np.mean(x2))

    # Bootstrap distribution of differences
    boots = np.empty(n_boot, dtype=float)
    for i in range(n_boot):
        s1 = rng.choice(x1, size=n1, replace=True)
        s2 = rng.choice(x2, size=n2, replace=True)
        boots[i] = s1.mean() - s2.mean()

    # Percentile CI
    lo_p, hi_p = np.percentile(boots, [100*(alpha/2), 100*(1-alpha/2)])
    ci_percentile = (float(lo_p), float(hi_p))

    # bootstrap p-value (dos colas) — aproximación basada en proporciones
    prop_le = np.mean(boots <= 0)
    prop_ge = np.mean(boots >= 0)
    p_boot = float(min(1.0, 2 * min(prop_le, prop_ge)))  # simétrica

    # Además p-value basado en abs(boots) >= |obs_diff|
    p_boot_alt = float(np.mean(np.abs(boots) >= np.abs(obs_diff)))

    # BCa calculation (opcional)
    ci_bca = None
    if method == 'bca':
        # Jackknife estimates for acceleration
        try:
            jack = _jackknife_theta(x1, x2, func=np.mean)
            jack_mean = np.mean(jack)
            numer = np.sum((jack_mean - jack)**3)
            denom = 6.0 * (np.sum((jack_mean - jack)**2) ** 1.5)
            if denom == 0:
                a = 0.0
            else:
                a = numer / denom
            # bias-correction z0
            z0 = stats.norm.ppf(np.mean(boots < obs_diff))
            z_lo = stats.norm.ppf(alpha/2)
            z_hi = stats.norm.ppf(1 - alpha/2)
            # adjusted quantiles
            def _adj_quantile(z):
                return stats.norm.cdf(z0 + (z + z0) / (1 - a * (z + z0)))
            ql = _adj_quantile(z_lo)
            qh = _adj_quantile(z_hi)
            # guardas
            ql = np.clip(ql, 0.0, 1.0)
            qh = np.clip(qh, 0.0, 1.0)
            lo_bca = np.percentile(boots, 100 * ql)
            hi_bca = np.percentile(boots, 100 * qh)
            ci_bca = (float(lo_bca), float(hi_bca))
        except Exception as e:
            warnings.warn(f"BCa calculado falló o es inestable: {e}. Se devuelve None para ci_bca.")
            ci_bca = None

    # Construcción de salida
    if return_all:
        return boots, ci_percentile, ci_bca, p_boot, p_boot_alt, obs_diff
    if return_dist:
        ci = ci_bca if (method == 'bca' and ci_bca is not None) else ci_percentile
        return boots, ci, p_boot
    ci = ci_bca if (method == 'bca' and ci_bca is not None) else ci_percentile
    return ci, p_boot


Para cada empresa estimamos:
R_it = alpha_i + beta_i * R_m,t + eps_it
- Reportamos coeficiente beta, se, t-stat, p-value
- Diagnostic: residuales, normalidad, heterocedasticidad
- Además: estimación de beta robusta (HC standard errors)


In [None]:
# CELDA 8

# Asumo volA / volB (o groupA / groupB) ya definidas (como Series)
x1 = volA.dropna()   # o groupA
x2 = volB.dropna()   # o groupB

# Parám. de bootstrap
NBOOT = 5000
ALPHA = 0.05
SEED = 42

# Ejecutar: obtener distribución, CI (percentile) y p-value
boots, ci_used, p_boot = bootstrap_diff_means(x1, x2, n_boot=NBOOT,
                                             alpha=ALPHA, random_state=SEED,
                                             method='percentile', return_dist=True)

# Alternativamente, también obtener BCa
boots_b, ci_percentile, ci_bca, p_boot1, p_boot2, obs_diff = bootstrap_diff_means(
    x1, x2, n_boot=NBOOT, alpha=ALPHA, random_state=SEED, method='bca', return_all=True
)

# Mostrar resultados (elige el que prefieras)
print("Observada (x1.mean - x2.mean):", obs_diff)
print("IC 95% (percentile):", ci_percentile)
print("IC 95% (BCa):", ci_bca)
print("Bootstrap p-value (simétrico):", p_boot1)
print("Bootstrap p-value (abs rule):", p_boot2)

# Guardar distribución bootstrap (opcional) y resumen
import os
np.save(os.path.join(PROCESSED_DIR, 'boots_diff_means.npy'), boots)

summary_boot = {
    'obs_diff': float(obs_diff),
    'ci_percentile_low': float(ci_percentile[0]),
    'ci_percentile_high': float(ci_percentile[1]),
    'ci_bca_low': (float(ci_bca[0]) if ci_bca is not None else None),
    'ci_bca_high': (float(ci_bca[1]) if ci_bca is not None else None),
    'p_boot_sym': float(p_boot1),
    'p_boot_absrule': float(p_boot2),
    'n_boot': int(NBOOT)
}

import json
with open(os.path.join(PROCESSED_DIR, 'bootstrap_diff_summary.json'), 'w') as f:
    json.dump(summary_boot, f, indent=4)

print("✅ Bootstrap realizado y guardado. Archivo: boots_diff_means.npy y bootstrap_diff_summary.json")


Observada (x1.mean - x2.mean): -0.0428639526219806
IC 95% (percentile): (-0.07011851699856504, -0.01934058117727427)
IC 95% (BCa): (-0.0742913165215815, -0.021518099695994664)
Bootstrap p-value (simétrico): 0.0
Bootstrap p-value (abs rule): 0.4874
✅ Bootstrap realizado y guardado. Archivo: boots_diff_means.npy y bootstrap_diff_summary.json


In [None]:
# CELDA 9

import yfinance as yf
import numpy as np
import pandas as pd

def get_monthly_market_returns(
    ticker="QQQ",
    start="2018-01-01",
    end="2024-12-31"
):
    """
    Descarga precios del mercado y calcula retornos logarítmicos mensuales.

    Parameters
    ----------
    ticker : str
        ETF representativo del mercado (default: QQQ - sector tecnológico)
    start : str
        Fecha inicial (YYYY-MM-DD)
    end : str
        Fecha final (YYYY-MM-DD)

    Returns
    -------
    pd.DataFrame
        DataFrame con índice de fecha mensual y columna 'MarketReturn'
    """

    # Descargar datos
    market = yf.download(
        ticker,
        start=start,
        end=end,
        auto_adjust=True,
        progress=False
    )

    if market.empty:
        raise ValueError("No se descargaron datos del mercado.")

    # Selección explícita del precio de cierre
    if "Close" in market.columns:
        price = market["Close"]
    else:
        price = market.iloc[:, 0]

    price.index = pd.to_datetime(price.index)

    # Precio de fin de mes
    monthly_price = price.resample("ME").last()

    if monthly_price.isna().all():
        raise ValueError("Serie mensual vacía tras el resampleo.")

    # Retornos logarítmicos mensuales
    monthly_return = np.log(monthly_price / monthly_price.shift(1)).dropna()

    # Construcción del DataFrame final
    market_df = pd.DataFrame({
        "Date": monthly_return.index,
        "MarketReturn": monthly_return.values
    })

    # Formato de fecha consistente con panel_df
    market_df["Date"] = market_df["Date"].dt.strftime("%Y-%m-%d")

    return market_df


Calculamos potencia post-hoc para comparaciones de medias (dos muestras) y, cuando corresponda, potencia a priori para detectar un efecto mínimo significativo.


In [None]:
# CELDA 10

from statsmodels.stats.power import TTestIndPower
import numpy as np

def compute_power_two_sample(n1, n2, effect_size, alpha=0.05):
    """
    Calcula la potencia de un t-test de dos muestras independientes
    para un tamaño de efecto dado (Cohen's d).

    Parameters
    ----------
    n1, n2 : int
        Tamaño de las muestras
    effect_size : float
        Tamaño del efecto (Cohen's d)
    alpha : float
        Nivel de significancia

    Returns
    -------
    power : float
        Potencia estimada del test
    """
    analysis = TTestIndPower()
    power = analysis.power(effect_size=effect_size,
                           nobs1=n1,
                           alpha=alpha,
                           ratio=n2/n1)
    return power


# ----------------------------
# Cálculo de Cohen's d observado
# ----------------------------
if len(groupA) < 2 or len(groupB) < 2:
    raise ValueError("Ambos grupos deben tener al menos 2 observaciones.")

mean1, mean2 = groupA.mean(), groupB.mean()
sd1, sd2 = groupA.std(ddof=1), groupB.std(ddof=1)
n1, n2 = len(groupA), len(groupB)

# Pooled SD ponderada por tamaño de muestra
pooled_sd = np.sqrt(((n1 - 1)*sd1**2 + (n2 - 1)*sd2**2) / (n1 + n2 - 2))
cohen_d = (mean1 - mean2) / pooled_sd

# Potencia observada
power_obs = compute_power_two_sample(n1, n2, abs(cohen_d), alpha=0.05)

print(f"Cohen's d observado: {cohen_d:.4f}")
print(f"Potencia estimada del test: {power_obs:.4f}")


Cohen's d observado: -0.8634
Potencia estimada del test: 0.6247


Guardamos las tablas principales:
- ci_df (intervalos de confianza)
- tt_df (one-sample t-tests)
- capm_df (regresión CAPM)
- resultados de comparación por grupos
Además se incluye texto ejemplo para el informe con la interpretación de resultados.


In [None]:
# CELDA 11

import statsmodels.api as sm
from statsmodels.stats.diagnostic import het_breuschpagan
from statsmodels.stats.stattools import durbin_watson

# -----------------------------
# 1) Construir retorno de mercado
# -----------------------------
market_returns = (
    panel_df.groupby("Date")["Return"]
    .mean()
    .dropna()
    .to_frame(name="MarketReturn")
)
market_returns.index = market_returns.index.astype(str)

print(f"✓ Retornos de mercado construidos: {market_returns.shape}")

# -----------------------------
# 2) Función CAPM robusta
# -----------------------------
def estimate_capm(company, panel_df, market_returns):
    """
    Estima CAPM para una empresa dada usando OLS con errores robustos HC1.

    Parameters
    ----------
    company : str
        Nombre de la empresa
    panel_df : pd.DataFrame
        Panel de retornos con columnas ['Company', 'Date', 'Return']
    market_returns : pd.DataFrame
        Retornos de mercado con índice de fecha y columna 'MarketReturn'

    Returns
    -------
    dict
        Diccionario con resultados CAPM: beta, alpha, t_beta, p_beta, R2, DW, BP
        None si la empresa tiene menos de 12 observaciones válidas.
    """
    df_i = panel_df[panel_df["Company"] == company].copy()
    df_i["Date"] = df_i["Date"].astype(str)

    df = df_i.merge(
        market_returns,
        left_on="Date",
        right_index=True,
        how="inner"
    ).dropna(subset=["Return", "MarketReturn"])

    if len(df) < 12:  # mínimo 1 año
        return None

    X = sm.add_constant(df["MarketReturn"])
    y = df["Return"]

    model = sm.OLS(y, X).fit(cov_type="HC1")  # errores robustos

    resid = model.resid
    dw = durbin_watson(resid)
    bp_stat, bp_p, _, _ = het_breuschpagan(resid, model.model.exog)

    return {
        "Company": company,
        "n_obs": len(df),
        "alpha": float(model.params["const"]),
        "beta": float(model.params["MarketReturn"]),
        "se_beta": float(model.bse["MarketReturn"]),
        "t_beta": float(model.tvalues["MarketReturn"]),
        "p_beta": float(model.pvalues["MarketReturn"]),
        "R2": float(model.rsquared),
        "DW": float(dw),
        "bp_stat": float(bp_stat),
        "bp_p": float(bp_p)
    }

# -----------------------------
# 3) Ejecutar CAPM para todas las empresas
# -----------------------------
companies = sorted(panel_df["Company"].unique())
capm_results = []

for company in companies:
    res = estimate_capm(company, panel_df, market_returns)
    if res is not None:
        capm_results.append(res)

capm_df = (
    pd.DataFrame(capm_results)
    .sort_values("beta", ascending=False)
    .reset_index(drop=True)
)

print(f"✓ CAPM estimado para {capm_df.shape[0]} empresas")
capm_df.head()


✓ Retornos de mercado construidos: (83, 1)
✓ CAPM estimado para 30 empresas


Unnamed: 0,Company,n_obs,alpha,beta,se_beta,t_beta,p_beta,R2,DW,bp_stat,bp_p
0,Palantir,51,0.010507,2.273104,0.505728,4.494719,6.966175e-06,0.407508,2.487115,3.766133,0.0523
1,Tesla,83,0.00811,1.738543,0.269782,6.444243,1.161786e-10,0.36447,1.787697,0.002214,0.962471
2,Nvidia,83,0.012835,1.623679,0.158795,10.225022,1.531939e-24,0.575616,1.708348,0.140266,0.708017
3,Cloudflare,63,0.00062,1.507436,0.280202,5.379817,7.456147e-08,0.317665,2.030644,0.127699,0.72083
4,Spotify,80,-0.011249,1.504924,0.14597,10.309825,6.361711e-25,0.559768,2.001168,0.036438,0.848613


In [None]:
# CELDA 12

# =========================
# GUARDAR RESULTADOS FINALES (VERSIÓN PROFESIONAL)
# =========================

import os
import json
from pathlib import Path

processed_path = Path(PROCESSED_DIR)
processed_path.mkdir(parents=True, exist_ok=True)

# ---------- Comprobaciones mínimas ----------
required_vars = ["ci_df", "tt_df", "capm_df", "out_df"]
missing = [v for v in required_vars if v not in globals()]
if missing:
    raise NameError(f"Faltan variables necesarias: {missing}")

# ---------- Guardar CSVs ----------
ci_df.to_csv(processed_path / "ci_mean_by_company.csv", index=False)
tt_df.to_csv(processed_path / "one_sample_ttests_by_company.csv", index=False)
capm_df.to_csv(processed_path / "capm_by_company.csv", index=False)
out_df.to_csv(processed_path / "group_beta_comparison_summary.csv", index=False)

# ---------- Guardar resumen JSON ----------
if not out_df.empty:
    row = out_df.iloc[0]

    group_summary = {
        "groupA_n": int(row.get("n_groupA", 0)),
        "groupA_mean": float(row.get("mean_A", np.nan)),
        "groupA_sd": float(row.get("sd_A", np.nan)),

        "groupB_n": int(row.get("n_groupB", 0)),
        "groupB_mean": float(row.get("mean_B", np.nan)),
        "groupB_sd": float(row.get("sd_B", np.nan)),

        "welch_t": float(row.get("welch_t", np.nan)),
        "welch_p": float(row.get("welch_p", np.nan)),
        "welch_CI": [
            float(row.get("welch_CI_low", np.nan)),
            float(row.get("welch_CI_high", np.nan))
        ],

        "mw_p": float(row.get("mw_p", np.nan)),
        "perm_p": float(row.get("perm_p", np.nan)),
        "bootstrap_diff_CI": [
            float(row.get("boot_diff_CI_low", np.nan)),
            float(row.get("boot_diff_CI_high", np.nan))
        ],
        "bootstrap_p": float(row.get("boot_diff_p", np.nan)),

        "hedges_g": float(row.get("hedges_g", np.nan))
    }

    with open(processed_path / "group_beta_comparison_summary.json", "w") as f:
        json.dump(group_summary, f, indent=4)

    print("✅ Todos los resultados finales fueron guardados correctamente en /data/processed")
else:
    print("⚠️ out_df está vacío, no se guardó resumen JSON")


✅ Todos los resultados finales fueron guardados correctamente en /data/processed


### Ejemplo de reporte (formato académico)

**Intervalos de confianza para el retorno medio.** Para cada empresa se calculó el intervalo de confianza del 95\% para el retorno medio usando la t-Student (varianza desconocida) y un intervalo bootstrap percentile con 3000 réplicas. Por ejemplo, Microsoft presenta retorno medio \(\hat\mu = 0.0180\) con IC t 95\% = [0.007, 0.029] y bootstrap 95\% = [0.006, 0.030]. Estos intervalos indican que el retorno mensual promedio es positivo y significativamente distinto de cero.

**Pruebas de hipótesis (H0: μ=0).** Se aplicó un test t de una muestra a cada empresa y se ajustaron los p-values usando Bonferroni y Benjamini–Hochberg para controlar error tipo I. Reportamos las empresas cuyo p-valor ajustado (FDR) < 0.05. Para estas empresas rechazamos H0 y concluimos que su retorno medio está estadísticamente diferenciado de cero.

**Comparación por grupos (Beta).** Las empresas fueron divididas en Beta ≤ 1 y Beta > 1. Se aplicó Levene para igualdad de varianzas, Welch t-test para diferencia de medias y Mann–Whitney como alternativa no paramétrica. Además, se realizó un test de permutación para robustez. Los resultados muestran que [aquí insertar conclusión basada en resultados].

**Regresión CAPM.** Para cada empresa se estimó \(R_{i,t} = \alpha_i + \beta_i R_{m,t} + \varepsilon_{i,t}\) por OLS con errores robustos HC1. Se reportaron \(\hat\beta\), error estándar robusto, t-stat y p-value. Para la mayoría de empresas \(\hat\beta\) es significativo (p < 0.05), lo que sugiere sensibilidad al mercado. Se presentan diagnósticos (Breusch–Pagan, Durbin–Watson) para evaluar heterocedasticidad y autocorrelación. Cuando se detecta heterocedasticidad se interpretan β con SE robustos.

**Robustez.** Se implementaron bootstrap y pruebas no paramétricas para confirmar la validez de las conclusiones bajo violaciones de supuestos.
