In [None]:
```python
# -*- coding: utf-8 -*-
"""
CLASE: Experimentos controlados — Script de Python
Datos: data.dta  (columnas: id, resultado, grupo, edad, genero, programa, libros)

Objetivos:
  1) Cargar y preparar datos (y, D, X)
  2) (Opcional) Aleatorización en Python
  3) Balance tipo "difmedias" (tabla T/C/diff/t/p/se)
  4) ¿Asignación correcta?  D ~ X  (LPM / Logit / Probit)
  5) Efecto del tratamiento (y~D) con y sin controles (SE robustas HC1)
  6) Efectos heterogéneos (mujer, libros, q_edad) + gráfico
  7) Exportar tablas y gráficos

Requisitos (si faltan):
    pip install pandas numpy statsmodels scipy openpyxl matplotlib
"""

import os
import warnings
warnings.filterwarnings("ignore")

from typing import Optional, Dict, List, Tuple

import numpy as np
import pandas as pd

import statsmodels.api as sm
import statsmodels.formula.api as smf

# SciPy opcional (para t-test exacto). Si no está, usamos aproximación normal.
try:
    from scipy import stats as sps
    SCIPY_OK = True
except Exception:
    SCIPY_OK = False

import matplotlib.pyplot as plt

RNG_DEFAULT_SEED = 20250813


# ---------- 1) Cargar y preparar ----------

def load_and_prepare(path: str) -> pd.DataFrame:
    """
    Lee data.dta y construye variables derivadas:
      y, D (1=B), mujer, pregrado, maestria
    """
    df = pd.read_stata(path)
    df = df.copy()
    df["y"] = pd.to_numeric(df["resultado"], errors="coerce")
    df["D"] = (df["grupo"] == "B").astype(int)
    df["mujer"] = (df["genero"] == "Mujer").astype(int)
    df["pregrado"] = (df["programa"] == "Pregrado").astype(int)
    df["maestria"] = (df["programa"] == "Maestría").astype(int)
    return df


# ---------- 2) (Opcional) Aleatorización ----------

def randomize_bernoulli(n: int, p: float = 0.5, seed: Optional[int] = None) -> np.ndarray:
    rng = np.random.default_rng(seed if seed is not None else RNG_DEFAULT_SEED)
    return (rng.random(n) < p).astype(int)

def randomize_exact(n: int, prop: float, seed: Optional[int] = None) -> np.ndarray:
    """
    Asigna exactamente round(n*prop) tratados.
    """
    rng = np.random.default_rng(seed if seed is not None else RNG_DEFAULT_SEED)
    idx = np.arange(n)
    rng.shuffle(idx)
    ntr = int(np.round(n * prop))
    D = np.zeros(n, dtype=int)
    D[idx[:ntr]] = 1
    return D

def randomize_stratified(df: pd.DataFrame, strata_cols: List[str], prop: float = 0.5,
                         seed: Optional[int] = None, outcol: str = "D_strata") -> pd.DataFrame:
    """
    Asignación estratificada con conteo exacto por estrato (round(n*prop)).
    """
    rng = np.random.default_rng(seed if seed is not None else RNG_DEFAULT_SEED)
    df = df.copy()
    df[outcol] = 0
    for _, g in df.groupby(strata_cols, dropna=False):
        idx = g.index.to_numpy()
        n = len(idx)
        ntr = int(np.round(n * prop))
        shuffled = idx.copy()
        rng.shuffle(shuffled)
        treated = shuffled[:ntr]
        df.loc[treated, outcol] = 1
    return df


# ---------- 3) Balance tipo "difmedias" ----------

def ttest_welch(x1: np.ndarray, x0: np.ndarray) -> Tuple[float, float]:
    """
    Welch's t-test (two-sided). Devuelve (tstat, pval).
    Usa SciPy si disponible; si no, aproximación normal (fallback).
    """
    x1 = x1[~np.isnan(x1)]
    x0 = x0[~np.isnan(x0)]
    if SCIPY_OK:
        res = sps.ttest_ind(x1, x0, equal_var=False, alternative="two-sided")
        return float(res.statistic), float(res.pvalue)
    # Fallback normal approx
    m1, m0 = x1.mean(), x0.mean()
    v1, v0 = x1.var(ddof=1), x0.var(ddof=1)
    n1, n0 = x1.size, x0.size
    se = np.sqrt(v1/n1 + v0/n0)
    tstat = (m1 - m0) / se if se > 0 else 0.0
    # p-valor bicaudal aproximado (normal estándar)
    pval = 2 * (1 - 0.5 * (1 + np.math.erf(abs(tstat) / np.sqrt(2))))
    return tstat, pval

def balance_table(df: pd.DataFrame, vars_list: List[str], dcol: str = "D") -> pd.DataFrame:
    rows = []
    for v in vars_list:
        g1 = df.loc[df[dcol] == 1, v].to_numpy(dtype=float)
        g0 = df.loc[df[dcol] == 0, v].to_numpy(dtype=float)
        mean_T, mean_C = np.nanmean(g1), np.nanmean(g0)
        sd_T, sd_C = np.nanstd(g1, ddof=1), np.nanstd(g0, ddof=1)
        N_T, N_C = np.sum(~np.isnan(g1)), np.sum(~np.isnan(g0))
        tstat, pval = ttest_welch(g1, g0)
        diff = mean_T - mean_C
        se = (diff / tstat) if (tstat not in [0, np.inf, -np.inf]) else np.nan
        rows.append(dict(variable=v, mean_T=mean_T, mean_C=mean_C, diff=diff,
                         tstat=tstat, pval=pval, se=se, sd_T=sd_T, sd_C=sd_C, N_T=N_T, N_C=N_C))
    out = pd.DataFrame(rows)
    return out[["variable","mean_T","mean_C","diff","tstat","pval","se","sd_T","sd_C","N_T","N_C"]]


# ---------- 4) ¿Asignación correcta? D ~ X ----------

def assignment_checks(df: pd.DataFrame, X: List[str]) -> Dict[str, object]:
    out: Dict[str, object] = {}
    # LPM con SE robustas HC1
    f_lpm = "D ~ " + " + ".join(X)
    lpm = smf.ols(f_lpm, data=df).fit(cov_type="HC1")
    out["LPM"] = lpm

    # Logit y Probit
    logit = smf.logit(f_lpm, data=df).fit(disp=False)
    probit = smf.probit(f_lpm, data=df).fit(disp=False)
    out["logit"] = logit
    out["probit"] = probit
    return out


# ---------- 5) Tratamiento: y ~ D (+X) ----------

def treatment_models(df: pd.DataFrame, X: List[str]) -> Dict[str, object]:
    m1 = smf.ols("y ~ D", data=df).fit(cov_type="HC1")
    f2 = "y ~ D + " + " + ".join(X)
    m2 = smf.ols(f2, data=df).fit(cov_type="HC1")
    return {"y~D": m1, "y~D+X": m2}


# ---------- 6) Heterogeneidad ----------

def heterogeneity_mujer(df: pd.DataFrame, controls: Optional[List[str]] = None):
    controls = controls or ["edad", "libros", "pregrado", "maestria"]
    f = "y ~ D * mujer + " + " + ".join(controls)
    return smf.ols(f, data=df).fit(cov_type="HC1")

def heterogeneity_libros(df: pd.DataFrame, controls: Optional[List[str]] = None) -> Tuple[pd.DataFrame, object]:
    controls = controls or ["edad", "mujer", "pregrado", "maestria"]
    f = "y ~ D * libros + " + " + ".join(controls)
    m = smf.ols(f, data=df).fit(cov_type="HC1")

    # Grid para predicciones con controles en su media
    Lmax = int(np.nanmax(df["libros"].to_numpy()))
    grid = pd.DataFrame({"libros": np.arange(0, Lmax + 1, 1)})
    avg = {c: float(np.nanmean(df[c].to_numpy())) for c in controls}
    for c, val in avg.items():
        grid[c] = val

    # Dos grupos: D=0 y D=1
    grid0 = grid.copy(); grid0["D"] = 0
    grid1 = grid.copy(); grid1["D"] = 1

    grid0["yhat"] = m.predict(grid0)
    grid1["yhat"] = m.predict(grid1)

    g0 = grid0[["libros","yhat"]].rename(columns={"yhat":"yhat_C"})
    g1 = grid1[["libros","yhat"]].rename(columns={"yhat":"yhat_T"})
    prof = g0.merge(g1, on="libros", how="inner")
    prof["diff_T_minus_C"] = prof["yhat_T"] - prof["yhat_C"]
    return prof, m

def plot_libros_profile(profile_df: pd.DataFrame, outpath: str = "margins_libros.png") -> None:
    plt.figure(figsize=(7, 4.5))
    # Una figura, dos líneas (Control/Tratado); sin fijar colores
    plt.plot(profile_df["libros"], profile_df["yhat_C"], label="Control")
    plt.plot(profile_df["libros"], profile_df["yhat_T"], label="Tratado")
    plt.xlabel("Libros")
    plt.ylabel("Predicción de y")
    plt.title("Predicción y vs. libros por grupo (controles al promedio)")
    plt.legend()
    plt.tight_layout()
    plt.savefig(outpath, dpi=300)
    plt.close()


# ---------- 7) Export helpers ----------

def to_excel(df: pd.DataFrame, path: str) -> None:
    ext = os.path.splitext(path)[1].lower()
    if ext not in [".xlsx", ".xls"]:
        path = os.path.splitext(path)[0] + ".xlsx"
    df.to_excel(path, index=False)


# ---------- 8) Main pipeline ----------

def main(data_path: str = "data.dta") -> None:
    print(">> Cargando datos...")
    df = load_and_prepare(data_path)
    print(df.head())

    X = ["edad", "mujer", "libros", "pregrado", "maestria"]

    # Balance en X e Y
    print(">> Balance en X...")
    bal_X = balance_table(df, X, dcol="D")
    to_excel(bal_X, "Table_Balance_raw.xlsx")

    print(">> Balance en y...")
    bal_y = balance_table(df, ["y"], dcol="D")
    to_excel(bal_y, "Diff_y_raw.xlsx")

    # ¿Asignación correcta?
    print(">> Chequeos de asignación D ~ X...")
    checks = assignment_checks(df, X)
    with open("table_balance_models.txt", "w", encoding="utf-8") as f:
        for name, model in checks.items():
            f.write(f"\n==== {name} ====\n")
            f.write(model.summary().as_text())

    # Tratamiento con y sin controles
    print(">> Modelos y ~ D (+X)...")
    tmods = treatment_models(df, X)
    with open("table_treatment.txt", "w", encoding="utf-8") as f:
        for name, model in tmods.items():
            f.write(f"\n==== {name} ====\n")
            f.write(model.summary().as_text())

    # Heterogeneidad mujer
    print(">> Heterogeneidad por mujer...")
    m_mujer = heterogeneity_mujer(df)
    with open("margins_mujer.txt", "w", encoding="utf-8") as f:
        f.write(m_mujer.summary().as_text())

    # Heterogeneidad por libros + gráfico
    print(">> Heterogeneidad por libros...")
    prof_lib, m_lib = heterogeneity_libros(df)
    to_excel(prof_lib, "margins_libros.xlsx")
    plot_libros_profile(prof_lib, "margins_libros.png")
    with open("margins_libros_model.txt", "w", encoding="utf-8") as f:
        f.write(m_lib.summary().as_text())

    # Heterogeneidad por cuartiles de edad
    print(">> Heterogeneidad por cuartiles de edad...")
    df_q = df.copy()
    # cuartiles 1..4
    df_q["q_edad"] = pd.qcut(df_q["edad"].rank(method="first"), 4, labels=False) + 1
    m_q = smf.ols("y ~ D * C(q_edad) + mujer + libros + pregrado + maestria", data=df_q).fit(cov_type="HC1")

    # Predicciones D=0/1 por cuartil (controles al promedio)
    avg_controls = {
        "mujer": float(df_q["mujer"].mean()),
        "libros": float(df_q["libros"].mean()),
        "pregrado": float(df_q["pregrado"].mean()),
        "maestria": float(df_q["maestria"].mean())
    }
    rows = []
    for q in sorted(df_q["q_edad"].unique()):
        for D in [0, 1]:
            ex = dict(D=D, q_edad=q, **avg_controls)
            yhat = m_q.predict(pd.DataFrame([ex]))[0]
            rows.append(dict(q_edad=int(q), D=D, yhat=float(yhat)))
    pred_q = pd.DataFrame(rows).pivot(index="q_edad", columns="D", values="yhat").rename(columns={0:"C",1:"T"})
    pred_q["ATE_hat"] = pred_q["T"] - pred_q["C"]
    pred_q = pred_q.reset_index()
    to_excel(pred_q, "margins_qedad.xlsx")
    with open("margins_qedad_model.txt", "w", encoding="utf-8") as f:
        f.write(m_q.summary().as_text())

    # Medias T/C/diff para y y X juntos
    print(">> Medias para y + X...")
    bal_yX = balance_table(df, ["y"] + X, dcol="D")
    to_excel(bal_yX, "means_yX.xlsx")

    print("\nListo ✅")
    print("- Balance (X): Table_Balance_raw.xlsx")
    print("- Diff(y):     Diff_y_raw.xlsx")
    print("- D~X modelos: table_balance_models.txt")
    print("- Tratamiento: table_treatment.txt")
    print("- Margins mujer/libros/q_edad: margins_mujer.txt, margins_libros.xlsx, margins_qedad.xlsx")
    print("- Gráfico:     margins_libros.png")
    print("- Medias y+X:  means_yX.xlsx")


if __name__ == "__main__":
    # Cambia la ruta si tu data.dta está en otra carpeta
    main("data.dta")
```
