In [5]:
import numpy as np
import pandas as pd
from scipy import stats
from sklearn.feature_selection import mutual_info_classif, mutual_info_regression
from sklearn.preprocessing import LabelEncoder

In [6]:
df = pd.read_excel("Teste_Matching.xlsx")

In [7]:
df.head()

Unnamed: 0,ID,Tratamento,Convidado,Outcoming_Y-1,Outcoming_Y,Timing1_Y,Timing2_Y,Position_Y-1,Position_Y,State_Y-1,State_Y,Change_dept
0,1,0,0,51,53,9,7,1,1,7,5,0
1,2,1,1,79,95,3,4,2,2,7,7,0
2,3,0,0,80,71,2,9,2,2,5,5,0
3,4,1,1,65,71,9,7,1,2,2,2,0
4,5,0,0,2,6,4,3,2,3,3,3,1


In [8]:
# ===============================================================
# Scanner de associações (T <-> X, Y <-> X) para decisão de inclusão
# ===============================================================
# O que faz:
# - Para cada feature X:
#   * Se X é numérica:
#       - com Tratamento (binário): correlação ponto-biserial (Pearson com binário) + p-valor; SMD opcional
#       - com Outcome:
#           - se Y binário: ponto-biserial
#           - se Y contínuo: Pearson + Spearman
#   * Se X é categórica:
#       - com Tratamento/Y binário: qui-quadrado + Cramer's V (com correção de viés)
#       - com Y contínuo: ANOVA one-way + eta^2
#   * Mutual Information (não-linear): MI(X;T) e MI(X;Y)
# - Classifica:
#     confounder_candidate: associado a T e Y
#     instrument_like     : associado a T e não a Y (evitar, a menos que queira instrumentar)
#     prognostic_only     : associado a Y e não a T (bom p/ precisão)
#     weak/irrelevant     : sem associações fortes
# - Parâmetros de decisão (thresholds) ajustáveis.
#
# Pré-requisitos: pandas, numpy, scipy, scikit-learn, statsmodels (já usa no notebook)
# ===============================================================

# --------- helpers de medidas ---------
def cramers_v_corrected(confusion):
    """Cramer's V com correção de viés (Bergsma, 2013)."""
    chi2 = stats.chi2_contingency(confusion, correction=False)[0]
    n = confusion.sum().sum()
    r, k = confusion.shape
    phi2 = chi2 / n
    phi2corr = max(0, phi2 - ((k-1)*(r-1))/(n-1))
    rcorr = r - ((r-1)**2)/(n-1)
    kcorr = k - ((k-1)**2)/(n-1)
    denom = min((kcorr-1), (rcorr-1))
    return np.sqrt(phi2corr / denom) if denom > 0 else 0.0

def eta_squared_anova(groups):
    """Eta^2 da ANOVA: SSeffect / SStotal."""
    # groups: lista de arrays por nível categórico
    all_vals = np.concatenate(groups)
    grand_mean = np.mean(all_vals)
    ss_total = np.sum((all_vals - grand_mean)**2)
    ss_between = np.sum([len(g)*(np.mean(g)-grand_mean)**2 for g in groups])
    return ss_between / ss_total if ss_total > 0 else 0.0

def is_binary_series(s):
    vals = pd.unique(s.dropna())
    return len(vals) == 2

def _mutual_info_safe(x, y, y_is_binary, discrete_x):
    """MI robusto a NaN, escolhe função apropriada."""
    df_xy = pd.DataFrame({'x': x, 'y': y}).dropna()
    if df_xy.empty:
        return np.nan
    X = df_xy[['x']].values
    yy = df_xy['y'].values
    if y_is_binary or (is_binary_series(pd.Series(yy))):
        # classif
        if discrete_x:
            # scikit assume inteiros para discretos; garantir int
            X = X.astype(int)
        return float(mutual_info_classif(X, yy, discrete_features=discrete_x, random_state=42)[0])
    else:
        # regressão
        if discrete_x:
            X = X.astype(int)
        return float(mutual_info_regression(X, yy, discrete_features=discrete_x, random_state=42)[0])

def _label_encode_safe(s):
    """LabelEncode em categórica preservando NaN (removidos nas análises)."""
    le = LabelEncoder()
    # filtra NaN e ajusta
    non_na = s.dropna().astype(str)
    le.fit(non_na)
    out = pd.Series(index=s.index, dtype=float)
    out.loc[non_na.index] = le.transform(non_na).astype(float)
    return out

# --------- função principal ---------
def scan_associations(
    df,
    treatment_col='Tratamento',
    outcome_col='Outcoming_Y',
    candidate_features=None,
    alpha=0.05,
    thr_corr_T=0.10,       # |r| ou V >= 0.10 ~ efeito pequeno
    thr_corr_Y=0.10,       # |r|/V para Y binário; para ANOVA usar thr_eta2
    thr_eta2=0.02,         # eta^2 ~ 0.01 pequeno, 0.06 médio; aqui 0.02 conservador
    compute_smd=True,
    drop_const_dummies=True # remove dummies constantes (0/1) sem variação
):
    """
    Retorna DataFrame com:
      feature, type, n_T, assoc_T_metric, p_T, n_Y, assoc_Y_metric, p_Y,
      MI_T, MI_Y, classification
    """
    T = df[treatment_col]
    Y = df[outcome_col]
    y_is_binary = is_binary_series(Y)

    # Se candidato não veio, use todas as colunas exceto T e Y
    if candidate_features is None:
        candidate_features = [c for c in df.columns if c not in [treatment_col, outcome_col]]

    results = []

    for col in candidate_features:
        if col in [treatment_col, outcome_col]:
            continue

        s = df[col]
        # descartar dummies constantes (evita erros e p-valores bizarros)
        if drop_const_dummies and s.nunique(dropna=True) <= 1:
            continue

        is_numeric = pd.api.types.is_numeric_dtype(s)
        # Para categóricas, vamos criar versão codificada para MI e ANOVA
        s_enc = s if is_numeric else _label_encode_safe(s)

        # ---------- Associação com Tratamento (T binário) ----------
        # Métrica + p-valor
        if is_numeric:
            # ponto-biserial (Pearson com T binário)
            df_tx = pd.DataFrame({'x': s, 't': T}).dropna()
            n_T = len(df_tx)
            if n_T > 2 and df_tx['x'].var() > 0 and df_tx['t'].var() > 0:
                r_T, p_T = stats.pointbiserialr(df_tx['t'], df_tx['x'])
                assoc_T = abs(r_T)
            else:
                r_T, p_T, assoc_T, n_T = np.nan, np.nan, np.nan, n_T
            # SMD opcional (tratado vs controle)
            if compute_smd and n_T > 2 and df_tx['t'].nunique() == 2:
                x1 = df_tx.loc[df_tx['t']==1, 'x']; x0 = df_tx.loc[df_tx['t']==0, 'x']
                v1, v0 = x1.var(ddof=1), x0.var(ddof=1)
                smd = (x1.mean() - x0.mean()) / np.sqrt(0.5*(v1+v0) + 1e-12) if v1>0 and v0>0 else np.nan
            else:
                smd = np.nan
        else:
            # qui-quadrado + Cramer's V
            ct = pd.crosstab(T, s, dropna=True)
            n_T = ct.values.sum()
            try:
                chi2, p_T, _, _ = stats.chi2_contingency(ct, correction=False)
                assoc_T = cramers_v_corrected(ct)
            except Exception:
                chi2, p_T, assoc_T = np.nan, np.nan, np.nan
            smd = np.nan

        # Mutual information com T
        try:
            MI_T = _mutual_info_safe(s_enc, T, y_is_binary=True, discrete_x=not is_numeric)
        except Exception:
            MI_T = np.nan

        # ---------- Associação com Y (contínuo ou binário) ----------
        if y_is_binary:
            # Y binário
            if is_numeric:
                df_yx = pd.DataFrame({'x': s, 'y': Y}).dropna()
                n_Y = len(df_yx)
                if n_Y > 2 and df_yx['x'].var() > 0 and df_yx['y'].var() > 0:
                    r_Y, p_Y = stats.pointbiserialr(df_yx['y'], df_yx['x'])
                    assoc_Y = abs(r_Y)
                else:
                    r_Y, p_Y, assoc_Y, n_Y = np.nan, np.nan, np.nan, n_Y
            else:
                ct = pd.crosstab(Y, s, dropna=True)
                n_Y = ct.values.sum()
                try:
                    chi2, p_Y, _, _ = stats.chi2_contingency(ct, correction=False)
                    assoc_Y = cramers_v_corrected(ct)
                except Exception:
                    p_Y, assoc_Y = np.nan, np.nan
        else:
            # Y contínuo
            if is_numeric:
                df_yx = pd.DataFrame({'x': s, 'y': Y}).dropna()
                n_Y = len(df_yx)
                if n_Y > 2 and df_yx['x'].var() > 0 and df_yx['y'].var() > 0:
                    pear_r, pear_p = stats.pearsonr(df_yx['x'], df_yx['y'])
                    spear_r, spear_p = stats.spearmanr(df_yx['x'], df_yx['y'])
                    # usaremos |Pearson| como métrica principal; Spearman como apoio
                    assoc_Y, p_Y = abs(pear_r), pear_p
                else:
                    assoc_Y, p_Y, n_Y = np.nan, np.nan, n_Y
            else:
                # ANOVA: Y ~ X_cat
                df_yx = pd.DataFrame({'x': s, 'y': Y}).dropna()
                n_Y = len(df_yx)
                groups = [g.values for _, g in df_yx.groupby('x')['y']]
                if len(groups) >= 2 and all(len(g) > 1 for g in groups):
                    try:
                        F, p_Y = stats.f_oneway(*groups)
                    except Exception:
                        # f_oneway pode falhar com variâncias muito desiguais; caia fora educadamente
                        F, p_Y = np.nan, np.nan
                    eta2 = eta_squared_anova(groups) if all(len(g)>1 for g in groups) else np.nan
                    assoc_Y = eta2  # para categórica vs contínuo, use eta^2 como força
                else:
                    assoc_Y, p_Y = np.nan, np.nan

        # Mutual information com Y
        try:
            MI_Y = _mutual_info_safe(s_enc, Y, y_is_binary=y_is_binary, discrete_x=not is_numeric)
        except Exception:
            MI_Y = np.nan

        # ---------- Regras de decisão ----------
        # Sinaliza associação "forte" com T e Y conforme tipo de teste
        assoc_T_strong = (assoc_T >= thr_corr_T) and (p_T < alpha) if not np.isnan(assoc_T) and not np.isnan(p_T) else False

        if y_is_binary:
            assoc_Y_strong = (assoc_Y >= thr_corr_Y) and (p_Y < alpha) if not np.isnan(assoc_Y) and not np.isnan(p_Y) else False
        else:
            # se categórica vs contínuo usamos eta^2; para numérica vs contínuo usamos |r|
            if not is_numeric:
                assoc_Y_strong = (assoc_Y >= thr_eta2) and (p_Y < alpha) if not np.isnan(assoc_Y) and not np.isnan(p_Y) else False
            else:
                assoc_Y_strong = (assoc_Y >= thr_corr_Y) and (p_Y < alpha) if not np.isnan(assoc_Y) and not np.isnan(p_Y) else False

        if assoc_T_strong and assoc_Y_strong:
            label = 'confounder_candidate (INCLUIR)'
        elif assoc_T_strong and not assoc_Y_strong:
            label = 'instrument_like (EVITAR na modelagem causal; pode servir como IV)'
        elif (not assoc_T_strong) and assoc_Y_strong:
            label = 'prognostic_only (BOM p/ precisão)'
        else:
            label = 'weak/irrelevant (OPCIONAL)'

        results.append({
            'feature'        : col,
            'type'           : ('numeric' if is_numeric else 'categorical'),
            'n_T'            : int(n_T),
            'assoc_T_metric' : assoc_T,    # |r| (num) ou Cramer's V (cat)
            'p_T'            : p_T,
            'SMD_pre'        : smd if compute_smd else np.nan,
            'n_Y'            : int(n_Y) if 'n_Y' in locals() else np.nan,
            'assoc_Y_metric' : assoc_Y,    # |r| (num) ou eta^2 (cat vs cont) / V (cat vs bin)
            'p_Y'            : p_Y,
            'MI_T'           : MI_T,
            'MI_Y'           : MI_Y,
            'classification' : label
        })

    out = pd.DataFrame(results)

    # Ordena: 1) confounders, 2) prognósticas, 3) instrument-like, 4) fracas
    cat_order = pd.CategoricalDtype(
        ['confounder_candidate (INCLUIR)',
         'prognostic_only (BOM p/ precisão)',
         'instrument_like (EVITAR na modelagem causal; pode servir como IV)',
         'weak/irrelevant (OPCIONAL)'],
        ordered=True
    )
    out['classification'] = out['classification'].astype(cat_order)
    out = out.sort_values(['classification', 'assoc_Y_metric', 'assoc_T_metric'], ascending=[True, False, False]).reset_index(drop=True)
    return out

In [12]:
# 1) Escolha as colunas candidatas
candidate_cols = ['Timing1_Y','Position_Y-1','State_Y-1','State_Y-1','Outcoming_Y-1']

# 2) Rode o scanner
assoc_table = scan_associations(
    df=df,
    treatment_col='Tratamento',
    outcome_col='Outcoming_Y',
    candidate_features=candidate_cols,
    alpha=0.05,
    thr_corr_T=0.10,
    thr_corr_Y=0.10,
    thr_eta2=0.02
)

# 3) Veja a tabela/resultados
print(assoc_table.head(20))

         feature     type   n_T  assoc_T_metric           p_T   SMD_pre   n_Y  \
0  Outcoming_Y-1  numeric  1999        0.042574  5.701722e-02 -0.092194  1999   
1      Timing1_Y  numeric  1999        0.114716  2.707722e-07  0.249778  1999   
2      State_Y-1  numeric  1999        0.010674  6.333819e-01  0.022996  1999   
3      State_Y-1  numeric  1999        0.010674  6.333819e-01  0.022996  1999   
4   Position_Y-1  numeric  1999        0.001542  9.450813e-01 -0.003349  1999   

   assoc_Y_metric       p_Y      MI_T      MI_Y  \
0        0.964235  0.000000  0.000000  1.338826   
1        0.013398  0.549383  0.007308  0.004418   
2        0.035922  0.108362  0.006689  0.000000   
3        0.035922  0.108362  0.006689  0.000000   
4        0.007910  0.723767  0.000000  0.000000   

                                      classification  
0                  prognostic_only (BOM p/ precisão)  
1  instrument_like (EVITAR na modelagem causal; p...  
2                         weak/irrelevant

In [None]:
import numpy as np
import pandas as pd

def nearest_neighbor_match(
    df,
    k=2,
    with_replacement=False,                 # << escolha: True = com reposição | False = sem reposição
    caliper_factor=0.25,                    # largura = fator * DP(score) nos CONTROLES
    strata_cols=('Position_Y-1', 'State_Y-1'),  # colunas para matching EXATO por estrato
    treatment_col='Tratamento',
    ps_col='ps',                            # caso queira usar PS direto
    logit_ps_col='logit_ps',                # coluna com logit(PS); será criada se não existir e houver 'ps'
    use_logit_score=True,                   # True = usar logit(PS) no cálculo de distância/caliper
    verbose=True
):
    """
    Matching k:1 com/sem reposição + caliper + exato por estrato.
    Retorna um DataFrame com colunas: ['t_idx', 'c_idx', 'w'].
    'w' é o peso DENTRO do tratado (soma=1 por tratado emparelhado).
    """
    df = df.copy()

    # 1) Score a ser usado (logit(PS) por padrão)
    if use_logit_score:
        if logit_ps_col not in df.columns:
            if ps_col not in df.columns:
                raise ValueError("Informe 'logit_ps' ou 'ps' na base.")
            p = np.clip(df[ps_col].to_numpy(), 1e-6, 1-1e-6)
            df[logit_ps_col] = np.log(p/(1-p))
        score_col = logit_ps_col
    else:
        if ps_col not in df.columns:
            raise ValueError("use_logit_score=False exige coluna 'ps'.")
        score_col = ps_col

    # 2) Estratos para matching exato (ou 'all' se não quiser estratificar)
    if strata_cols and len(strata_cols) > 0:
        df['strata'] = df.loc[:, list(strata_cols)].astype(str).agg('||'.join, axis=1)
    else:
        df['strata'] = 'all'

    treated_idx = df.index[df[treatment_col] == 1]
    control_idx = df.index[df[treatment_col] == 0]

    # 3) Caliper (DP do score calculada nos controles)
    caliper = caliper_factor * np.std(df.loc[control_idx, score_col])
    if np.isnan(caliper) or caliper <= 0:
        raise ValueError("Caliper inválido (verifique score nos controles).")

    # 4) Índices de controles por estrato (para matching exato)
    strata_to_ctrl = {}
    for i in control_idx:
        strata_to_ctrl.setdefault(df.at[i, 'strata'], []).append(i)

    pairs = []            # (t_idx, c_idx, w)
    dists = []            # diagnósticos de distância
    used_controls = set() # impede reuso quando with_replacement=False

    # 5) Loop nos tratados
    for ti in treated_idx:
        key = df.at[ti, 'strata']
        pool_all = strata_to_ctrl.get(key, [])
        if not pool_all:
            continue

        # pool elegível (considera reuso ou não)
        if with_replacement:
            pool = pool_all
        else:
            pool = [j for j in pool_all if j not in used_controls]
            if not pool:
                continue

        cand = np.array(pool)
        d = np.abs(df.loc[cand, score_col].values - df.at[ti, score_col])

        # aplica caliper
        in_caliper = d <= caliper
        if not np.any(in_caliper):
            continue

        cand = cand[in_caliper]
        d = d[in_caliper]

        # escolhe até k mais próximos
        order = np.argsort(d)
        chosen = cand[order][:k]

        if len(chosen) == 0:
            continue

        w = 1.0 / len(chosen)  # pesos iguais dentro do tratado
        for cj in chosen:
            pairs.append((ti, cj, w))
            dists.append(abs(df.at[cj, score_col] - df.at[ti, score_col]))
            if not with_replacement:
                used_controls.add(cj)

    pairs = pd.DataFrame(pairs, columns=['t_idx', 'c_idx', 'w'])

    # 6) Logs úteis
    if verbose:
        n_trat_total    = int((df[treatment_col] == 1).sum())
        n_trat_pareados = pairs['t_idx'].nunique() if not pairs.empty else 0
        n_ctrl_unicos   = pairs['c_idx'].nunique() if not pairs.empty else 0
        tag = "com REUSO" if with_replacement else "sem reuso"

        print(f"\nPareamento ({tag}): {n_trat_pareados} tratados pareados de {n_trat_total} "
              f"({n_trat_pareados/max(1,n_trat_total):.1%}). Links: {len(pairs)}")
        print(f"Controles únicos usados: {n_ctrl_unicos}" + ("" if with_replacement else " (reuso proibido)"))

        if len(dists) > 0:
            dists = np.array(dists)
            print("Distância no score: mediana={:.4f} | P90={:.4f} | máx={:.4f}"
                  .format(np.median(dists), np.quantile(dists, 0.90), np.max(dists)))

    return pairs

# -------------------------
# EXEMPLOS DE USO:
# -------------------------
# 1) Sem reposição (equivalente ao seu código original, mas parametrizado)
# pairs_no_reuse = nearest_neighbor_match(df, k=2, with_replacement=False,
#                                         caliper_factor=0.25,
#                                         strata_cols=('Position_Y-1','State_Y-1'),
#                                         treatment_col='Tratamento',
#                                         logit_ps_col='logit_ps', use_logit_score=True)

# 2) Com reposição (permite que um mesmo controle sirva a múltiplos tratados)
# pairs_with_reuse = nearest_neighbor_match(df, k=2, with_replacement=True,
#                                           caliper_factor=0.25,
#                                           strata_cols=('Position_Y-1','State_Y-1'),
#                                           treatment_col='Tratamento',
#                                           logit_ps_col='logit_ps', use_logit_score=True)