# Risco Político em Anos Eleitorais no Brasil (2002–2022)
## Metodologia: Fama-French 3 Fatores + BHAR

Restruturação completa do estudo com:
- Janelas dinâmicas baseadas no HGPE
- Modelo FF3 (NEFIN-USP)
- BHAR (Buy-and-Hold Abnormal Return)
- Ponderação Value-Weighted
- Testes: t-Student, Wilcoxon, Placebo, DiD

In [None]:
import os, warnings, logging, time
import numpy as np
import pandas as pd
import statsmodels.api as sm
from scipy import stats
warnings.filterwarnings("ignore")
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger(__name__)
print("✓ Bibliotecas carregadas")

## 1. Configuração e Constantes

In [None]:
# =============================================================================
# CONSTANTES
# =============================================================================

OUTPUT_DIR = "./output_ff3_bhar"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Datas do HGPE (início do Horário Gratuito de Propaganda Eleitoral)
DATAS_HGPE = {
    2002: pd.Timestamp("2002-08-20"), 2006: pd.Timestamp("2006-08-15"),
    2010: pd.Timestamp("2010-08-17"), 2014: pd.Timestamp("2014-08-19"),
    2018: pd.Timestamp("2018-08-31"), 2022: pd.Timestamp("2022-08-26"),
}

# Datas do 1º turno
DATAS_PRIMEIRO_TURNO = {
    2002: pd.Timestamp("2002-10-06"), 2006: pd.Timestamp("2006-10-01"),
    2010: pd.Timestamp("2010-10-03"), 2014: pd.Timestamp("2014-10-05"),
    2018: pd.Timestamp("2018-10-07"), 2022: pd.Timestamp("2022-10-02"),
}

# Anos e datas de Placebo
ANOS_PLACEBO = [2003, 2007, 2011, 2013, 2017, 2019]
DATAS_HGPE_PLACEBO = {
    2003: pd.Timestamp("2003-08-20"), 2007: pd.Timestamp("2007-08-15"),
    2011: pd.Timestamp("2011-08-17"), 2013: pd.Timestamp("2013-08-19"),
    2017: pd.Timestamp("2017-08-31"), 2019: pd.Timestamp("2019-08-26"),
}
DATAS_PRIMEIRO_TURNO_PLACEBO = {
    2003: pd.Timestamp("2003-10-05"), 2007: pd.Timestamp("2007-10-07"),
    2011: pd.Timestamp("2011-10-02"), 2013: pd.Timestamp("2013-10-06"),
    2017: pd.Timestamp("2017-10-01"), 2019: pd.Timestamp("2019-10-06"),
}

# Classificação para DiD
SETORES_REGULADOS = ["Petróleo, Gás e Biocombustíveis", "Utilidade Pública", "Financeiro"]

# Parâmetros
JANELA_ESTIMACAO_DU = 252
GAP_SEGURANCA_DU = 30
MIN_OBS_REGRESSAO = 60
MIN_EMPRESAS_SETOR = 1
MIN_PREGOES_PCT = 0.40

print("✓ Constantes definidas")

## 2. Carregamento de Dados

**IMPORTANTE:** Você precisa ter os arquivos de preços e volumes. Se ainda não os tem, rode a célula de download via yfinance abaixo.

In [None]:
# =============================================================================
# CARREGAMENTO DOS FATORES NEFIN
# =============================================================================

df_nefin_raw = pd.read_csv("nefin_factors.csv", index_col=0)
df_nefin_raw["Date"] = pd.to_datetime(df_nefin_raw["Date"])
df_nefin = df_nefin_raw.set_index("Date").sort_index()
df_nefin = df_nefin.rename(columns={"Risk_Free": "Rf"})
df_nefin = df_nefin[["Rm_minus_Rf", "SMB", "HML", "Rf"]]

print(f"✓ NEFIN: {len(df_nefin)} obs, {df_nefin.index.min().date()} a {df_nefin.index.max().date()}")
df_nefin.head()

In [None]:
# =============================================================================
# CARREGAMENTO DE EMPRESAS
# =============================================================================

df_empresas = pd.read_excel("resultados_analise_b3_com_tickers.xlsx",
                             sheet_name="LISTA FINAL (Cont+IPOs-Canc)")
df_empresas = df_empresas.dropna(subset=["TICKER", "SETOR_B3"])
df_empresas["DT_REG"] = pd.to_datetime(df_empresas["DT_REG"], errors="coerce")

print(f"✓ {len(df_empresas)} empresas, {df_empresas['SETOR_B3'].nunique()} setores")
print(df_empresas["SETOR_B3"].value_counts())

In [None]:
# =============================================================================
# DOWNLOAD DE PREÇOS E VOLUMES (yfinance)
# Rode esta célula apenas uma vez; depois use os CSVs salvos.
# =============================================================================

import yfinance as yf

tickers = df_empresas["TICKER"].unique().tolist()
tickers_yf = [t + ".SA" for t in tickers]

print(f"Baixando {len(tickers_yf)} tickers...")
data = yf.download(tickers_yf, start="2000-01-01", end="2023-12-31",
                    group_by="ticker", auto_adjust=True, threads=True)

# Extrair preços e volumes
df_precos = pd.DataFrame()
df_volumes = pd.DataFrame()

for t_yf, t_b3 in zip(tickers_yf, tickers):
    try:
        df_precos[t_b3] = data[t_yf]["Close"]
        df_volumes[t_b3] = data[t_yf]["Volume"] * data[t_yf]["Close"]  # Volume financeiro
    except:
        pass

df_precos.to_csv("precos.csv")
df_volumes.to_csv("volumes.csv")
print(f"✓ Preços: {df_precos.shape}, Volumes: {df_volumes.shape}")
print("  Salvos em precos.csv e volumes.csv")

In [None]:
# =============================================================================
# OU: CARREGUE DE CSVs JÁ SALVOS
# =============================================================================

# df_precos = pd.read_csv("precos.csv", index_col=0, parse_dates=True)
# df_volumes = pd.read_csv("volumes.csv", index_col=0, parse_dates=True)
# print(f"✓ Preços: {df_precos.shape}, Volumes: {df_volumes.shape}")

In [None]:
# =============================================================================
# RETORNOS LOGARÍTMICOS
# =============================================================================

df_retornos = np.log(df_precos / df_precos.shift(1)).dropna(how="all")
print(f"✓ Retornos: {df_retornos.shape}")

## 3. Funções Núcleo: Janelas, FF3, BHAR

In [None]:
# =============================================================================
# DEFINIÇÃO DINÂMICA DE JANELAS
# =============================================================================

def definir_janelas(ano, bdates, datas_hgpe=None, datas_1turno=None):
    """
    Define janelas de estimação e evento para um dado ano.
    Janela de Evento: do HGPE até a véspera do 1º turno.
    Janela de Estimação: 252 DU, encerrando 30 DU antes do HGPE.
    """
    if datas_hgpe is None: datas_hgpe = DATAS_HGPE
    if datas_1turno is None: datas_1turno = DATAS_PRIMEIRO_TURNO

    if ano not in datas_hgpe or ano not in datas_1turno:
        return None

    hgpe = datas_hgpe[ano]
    turno1 = datas_1turno[ano]
    bdays = bdates.sort_values()

    evt_inicio_idx = bdays.searchsorted(hgpe, side="left")
    if evt_inicio_idx >= len(bdays): return None
    evt_inicio = bdays[evt_inicio_idx]

    evt_fim_idx = bdays.searchsorted(turno1, side="left") - 1
    if evt_fim_idx < 0: return None
    evt_fim = bdays[evt_fim_idx]

    est_fim_idx = evt_inicio_idx - GAP_SEGURANCA_DU
    if est_fim_idx < 0: return None
    est_fim = bdays[max(0, est_fim_idx)]

    est_inicio_idx = est_fim_idx - JANELA_ESTIMACAO_DU
    if est_inicio_idx < 0: return None
    est_inicio = bdays[max(0, est_inicio_idx)]

    return {
        "est_inicio": est_inicio, "est_fim": est_fim,
        "evt_inicio": evt_inicio, "evt_fim": evt_fim, "ano": ano,
    }

# Teste rápido
bdates_test = df_retornos.index.sort_values()
for ano in sorted(DATAS_HGPE.keys()):
    j = definir_janelas(ano, bdates_test)
    if j:
        print(f"{ano}: Estimação [{j['est_inicio'].date()} → {j['est_fim'].date()}]  "
              f"Evento [{j['evt_inicio'].date()} → {j['evt_fim'].date()}]")

In [None]:
# =============================================================================
# CALCULAR BHAR POR ATIVO (FF3)
# =============================================================================

def calcular_bhar_ativo(ticker, ano, df_retornos, df_nefin, bdates,
                        datas_hgpe=None, datas_1turno=None):
    """
    Calcula BHAR de um ativo:
      1. Define janelas dinâmicas (HGPE)
      2. OLS(FF3) na janela de estimação
      3. Prediz retorno esperado na janela de evento
      4. BHAR = prod(1+Ri) - prod(1+E[Ri])
    """
    if datas_hgpe is None: datas_hgpe = DATAS_HGPE
    if datas_1turno is None: datas_1turno = DATAS_PRIMEIRO_TURNO

    janelas = definir_janelas(ano, bdates, datas_hgpe, datas_1turno)
    if janelas is None: return None
    if ticker not in df_retornos.columns: return None

    ret_ativo = df_retornos[ticker].dropna()

    # --- Janela de Estimação ---
    mask_est = (ret_ativo.index >= janelas["est_inicio"]) & (ret_ativo.index <= janelas["est_fim"])
    ret_est = ret_ativo.loc[mask_est]
    n_esperado = len(bdates[(bdates >= janelas["est_inicio"]) & (bdates <= janelas["est_fim"])])
    if len(ret_est) < max(MIN_OBS_REGRESSAO, int(n_esperado * MIN_PREGOES_PCT)):
        return None

    fac_est = df_nefin.loc[df_nefin.index.isin(ret_est.index)]
    common_est = ret_est.index.intersection(fac_est.index)
    if len(common_est) < MIN_OBS_REGRESSAO: return None

    ret_est = ret_est.loc[common_est]
    fac_est = fac_est.loc[common_est]

    # Regressão: Ri - Rf = α + β1(Rm-Rf) + β2·SMB + β3·HML + ε
    y = ret_est - fac_est["Rf"]
    X = sm.add_constant(fac_est[["Rm_minus_Rf", "SMB", "HML"]])
    try:
        modelo = sm.OLS(y, X, missing="drop").fit(cov_type="HAC", cov_kwds={"maxlags": 5})
    except:
        return None
    betas = modelo.params

    # --- Janela de Evento ---
    mask_evt = (ret_ativo.index >= janelas["evt_inicio"]) & (ret_ativo.index <= janelas["evt_fim"])
    ret_evt = ret_ativo.loc[mask_evt]
    fac_evt = df_nefin.loc[df_nefin.index.isin(ret_evt.index)]
    common_evt = ret_evt.index.intersection(fac_evt.index)
    if len(common_evt) < 5: return None

    ret_evt = ret_evt.loc[common_evt]
    fac_evt = fac_evt.loc[common_evt]

    # E[Ri,t] = α̂ + β̂·X_t + Rf_t
    X_evt = sm.add_constant(fac_evt[["Rm_minus_Rf", "SMB", "HML"]])
    ret_esperado = X_evt.dot(betas) + fac_evt["Rf"]

    # BHAR
    bhar_real = (1 + ret_evt).prod()
    bhar_esp = (1 + ret_esperado).prod()
    if np.isnan(bhar_real) or np.isnan(bhar_esp) or bhar_esp == 0: return None

    return {
        "ticker": ticker, "ano": ano,
        "bhar": bhar_real - bhar_esp,
        "n_obs_est": len(common_est), "n_obs_evt": len(common_evt),
        "r2": modelo.rsquared,
        "alpha": betas.get("const", np.nan),
        "beta_mkt": betas.get("Rm_minus_Rf", np.nan),
        "beta_smb": betas.get("SMB", np.nan),
        "beta_hml": betas.get("HML", np.nan),
    }

print("✓ calcular_bhar_ativo definida")

In [None]:
# =============================================================================
# PROCESSAR SETOR (AGREGAÇÃO VALUE-WEIGHTED)
# =============================================================================

def processar_setor(setor, ano, tickers_setor, df_retornos, df_volumes,
                    df_nefin, bdates, datas_hgpe=None, datas_1turno=None):
    """
    Agrega BHARs de um setor com pesos VW (volume na janela de estimação).
    Retorna ABHAR_VW, ABHAR_EW, testes t e Wilcoxon.
    """
    if datas_hgpe is None: datas_hgpe = DATAS_HGPE
    if datas_1turno is None: datas_1turno = DATAS_PRIMEIRO_TURNO

    resultados_ativos = []
    for ticker in tickers_setor:
        res = calcular_bhar_ativo(ticker, ano, df_retornos, df_nefin, bdates,
                                  datas_hgpe, datas_1turno)
        if res is not None:
            resultados_ativos.append(res)

    if len(resultados_ativos) < MIN_EMPRESAS_SETOR:
        return None

    df_res = pd.DataFrame(resultados_ativos)
    bhars = df_res["bhar"].values

    # Pesos VW
    janelas = definir_janelas(ano, bdates, datas_hgpe, datas_1turno)
    if janelas is None: return None

    pesos = []
    for _, row in df_res.iterrows():
        tk = row["ticker"]
        if tk in df_volumes.columns:
            vol_est = df_volumes.loc[
                (df_volumes.index >= janelas["est_inicio"]) &
                (df_volumes.index <= janelas["est_fim"]), tk
            ]
            pesos.append(max(vol_est.mean() if len(vol_est) > 0 else 0, 0))
        else:
            pesos.append(0)

    pesos = np.array(pesos, dtype=float)
    if pesos.sum() == 0: pesos = np.ones(len(pesos))
    pesos_norm = pesos / pesos.sum()

    abhar_vw = np.dot(pesos_norm, bhars)
    abhar_ew = np.mean(bhars)

    # Testes estatísticos
    p_ttest = p_wilcoxon = np.nan
    if len(bhars) >= 2:
        try: _, p_ttest = stats.ttest_1samp(bhars, 0)
        except: pass
    if len(bhars) >= 5:
        try: _, p_wilcoxon = stats.wilcoxon(bhars, alternative="two-sided")
        except: pass

    return {
        "setor": setor, "ano": ano,
        "abhar_vw": abhar_vw, "abhar_ew": abhar_ew,
        "n_ativos": len(bhars),
        "bhar_medio": np.mean(bhars), "bhar_mediana": np.median(bhars),
        "bhar_std": np.std(bhars, ddof=1) if len(bhars) > 1 else np.nan,
        "p_ttest": p_ttest, "p_wilcoxon": p_wilcoxon,
        "tickers_validos": df_res["ticker"].tolist(),
        "r2_medio": df_res["r2"].mean(),
    }

print("✓ processar_setor definida")

## 4. Execução: Mapa de Risco Eleitoral

In [None]:
# =============================================================================
# GERAR MAPA DE RISCO
# =============================================================================

def gerar_mapa_risco(df_retornos, df_volumes, df_nefin, df_empresas,
                     anos=None, datas_hgpe=None, datas_1turno=None, label="Eleitoral"):
    if anos is None: anos = sorted(DATAS_HGPE.keys())
    if datas_hgpe is None: datas_hgpe = DATAS_HGPE
    if datas_1turno is None: datas_1turno = DATAS_PRIMEIRO_TURNO

    bdates = df_retornos.index.sort_values()
    setores = df_empresas.groupby("SETOR_B3")["TICKER"].apply(list).to_dict()

    resultados = []
    for ano in anos:
        if ano not in datas_hgpe:
            print(f"  ⚠ Ano {ano} sem HGPE, pulando.")
            continue
        print(f"  Processando {ano}...")
        for setor, tickers in setores.items():
            res = processar_setor(setor, ano, tickers, df_retornos, df_volumes,
                                  df_nefin, bdates, datas_hgpe, datas_1turno)
            if res is not None:
                res["tipo"] = label
                resultados.append(res)
                sig = "***" if res["p_ttest"]<0.01 else ("**" if res["p_ttest"]<0.05 else ("*" if res["p_ttest"]<0.10 else ""))
                print(f"    {setor}: ABHAR_VW={res['abhar_vw']:+.4f}, N={res['n_ativos']}, p={res['p_ttest']:.4f} {sig}")

    return pd.DataFrame(resultados)

# ---- EXECUTAR ----
print("=" * 70)
print("MAPA DE RISCO — ANOS ELEITORAIS (2002–2022)")
print("=" * 70)
df_eleitoral = gerar_mapa_risco(df_retornos, df_volumes, df_nefin, df_empresas)
print(f"\n✓ {len(df_eleitoral)} observações setor×ano")

## 5. Heatmap Eleitoral

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style("whitegrid")

hm_data = df_eleitoral.pivot_table(index="setor", columns="ano", values="abhar_vw", aggfunc="first")

fig, ax = plt.subplots(figsize=(14, 8))
sns.heatmap(hm_data * 100, annot=True, fmt=".2f", cmap="RdYlGn_r",
            center=0, linewidths=0.5, ax=ax, cbar_kws={"label": "ABHAR (%)"})
ax.set_title("Mapa de Risco Político Setorial — ABHAR (FF3, VW)\nAnos Eleitorais 2002–2022", fontsize=14)
ax.set_ylabel("Setor B3")
ax.set_xlabel("Ano Eleitoral")
plt.tight_layout()
plt.savefig(os.path.join(OUTPUT_DIR, "heatmap_eleitoral.png"), dpi=300)
plt.show()

## 6. Teste Placebo (Anos Não-Eleitorais)

In [None]:
print("=" * 70)
print("TESTE PLACEBO — ANOS NÃO-ELEITORAIS")
print("=" * 70)
df_placebo = gerar_mapa_risco(
    df_retornos, df_volumes, df_nefin, df_empresas,
    anos=ANOS_PLACEBO,
    datas_hgpe=DATAS_HGPE_PLACEBO,
    datas_1turno=DATAS_PRIMEIRO_TURNO_PLACEBO,
    label="Placebo",
)
print(f"\n✓ {len(df_placebo)} observações placebo")

# Heatmap Placebo
hm_placebo = df_placebo.pivot_table(index="setor", columns="ano", values="abhar_vw", aggfunc="first")
if not hm_placebo.empty:
    fig, ax = plt.subplots(figsize=(14, 8))
    sns.heatmap(hm_placebo * 100, annot=True, fmt=".2f", cmap="RdYlGn_r",
                center=0, linewidths=0.5, ax=ax, cbar_kws={"label": "ABHAR (%)"})
    ax.set_title("Teste Placebo — ABHAR (FF3, VW)\nAnos Não-Eleitorais", fontsize=14)
    plt.tight_layout()
    plt.savefig(os.path.join(OUTPUT_DIR, "heatmap_placebo.png"), dpi=300)
    plt.show()

## 7. Difference-in-Differences (Regulados vs Não Regulados)

In [None]:
def gerar_diff_in_diff(df_resultados):
    df = df_resultados.copy()
    df["grupo"] = df["setor"].apply(lambda s: "Regulado" if s in SETORES_REGULADOS else "Não Regulado")

    resultados_did = []
    for ano in sorted(df["ano"].unique()):
        df_ano = df[df["ano"] == ano]
        reg = df_ano[df_ano["grupo"] == "Regulado"]["abhar_vw"]
        nreg = df_ano[df_ano["grupo"] == "Não Regulado"]["abhar_vw"]
        if len(reg) == 0 or len(nreg) == 0: continue

        diff = reg.mean() - nreg.mean()
        p_val = np.nan
        if len(reg) >= 2 and len(nreg) >= 2:
            try: _, p_val = stats.ttest_ind(reg, nreg, equal_var=False)
            except: pass

        resultados_did.append({
            "ano": ano, "abhar_regulados": reg.mean(),
            "abhar_nao_regulados": nreg.mean(), "diff": diff,
            "n_reg": len(reg), "n_nreg": len(nreg), "p_value": p_val,
        })

    df_did = pd.DataFrame(resultados_did)
    if not df_did.empty:
        media = {"ano": "Média", "abhar_regulados": df_did["abhar_regulados"].mean(),
                 "abhar_nao_regulados": df_did["abhar_nao_regulados"].mean(),
                 "diff": df_did["diff"].mean(), "n_reg": "", "n_nreg": "", "p_value": np.nan}
        df_did = pd.concat([df_did, pd.DataFrame([media])], ignore_index=True)
    return df_did

df_did = gerar_diff_in_diff(df_eleitoral)
print("\nDifference-in-Differences:")
print(df_did.to_string(index=False))

# Gráfico DiD
if not df_did.empty:
    df_plot = df_did[df_did["ano"] != "Média"].copy()
    df_plot["ano"] = df_plot["ano"].astype(int)
    fig, ax = plt.subplots(figsize=(10, 6))
    x = np.arange(len(df_plot))
    w = 0.35
    ax.bar(x - w/2, df_plot["abhar_regulados"]*100, w, label="Regulados", color="#d62728")
    ax.bar(x + w/2, df_plot["abhar_nao_regulados"]*100, w, label="Não Regulados", color="#1f77b4")
    ax.set_xticks(x); ax.set_xticklabels(df_plot["ano"])
    ax.set_ylabel("ABHAR Médio (%)"); ax.set_title("DiD: Regulados vs Não Regulados")
    ax.legend(); ax.axhline(0, color="black", lw=0.8)
    plt.tight_layout()
    plt.savefig(os.path.join(OUTPUT_DIR, "did_barras.png"), dpi=300)
    plt.show()

## 8. Exportação dos Resultados

In [None]:
# Exportar tudo para Excel
with pd.ExcelWriter(os.path.join(OUTPUT_DIR, "resultados_ff3_bhar.xlsx"), engine="openpyxl") as writer:
    df_eleitoral.to_excel(writer, sheet_name="Eleitoral_Detalhado", index=False)
    hm_data.to_excel(writer, sheet_name="Heatmap_Eleitoral")

    sig = df_eleitoral[["setor","ano","abhar_vw","p_ttest","p_wilcoxon","n_ativos"]].copy()
    sig["sig_ttest"] = sig["p_ttest"].apply(
        lambda p: "***" if p<0.01 else ("**" if p<0.05 else ("*" if p<0.10 else "")))
    sig.to_excel(writer, sheet_name="Significancia", index=False)

    if not df_placebo.empty:
        df_placebo.to_excel(writer, sheet_name="Placebo_Detalhado", index=False)
        hm_placebo.to_excel(writer, sheet_name="Heatmap_Placebo")

    df_did.to_excel(writer, sheet_name="DiD", index=False)

print(f"✓ Resultados salvos em {OUTPUT_DIR}/resultados_ff3_bhar.xlsx")

## 9. Resumo da Significância Estatística

In [None]:
# Tabela resumo
if not df_eleitoral.empty:
    resumo = df_eleitoral.groupby("setor").agg(
        ABHAR_medio=("abhar_vw", "mean"),
        ABHAR_mediano=("abhar_vw", "median"),
        N_obs=("n_ativos", "sum"),
        p_ttest_medio=("p_ttest", "mean"),
        anos_sig_5pct=("p_ttest", lambda x: (x < 0.05).sum()),
    ).sort_values("ABHAR_medio")

    resumo["ABHAR_medio"] = resumo["ABHAR_medio"].map("{:+.4f}".format)
    resumo["ABHAR_mediano"] = resumo["ABHAR_mediano"].map("{:+.4f}".format)
    print("\n" + "=" * 70)
    print("RESUMO POR SETOR (Média ao longo dos anos eleitorais)")
    print("=" * 70)
    print(resumo.to_string())