# 6. Correlação e Seleção de Variáveis

Este bloco calcula a correlação entre variáveis numéricas e o alvo binário (point-biserial), exibe um ranking e plota um gráfico com as Top-N.

In [None]:
# Bibliotecas utilizadas no projeto
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import pointbiserialr

from sklearn.model_selection import train_test_split, StratifiedKFold, cross_validate
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score,
    f1_score, roc_auc_score, confusion_matrix, classification_report
)

from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline as ImbPipeline

import joblib
from collections import OrderedDict
from IPython.display import Markdown, display
import warnings
warnings.filterwarnings("ignore")


In [None]:

# === Correlação (alvo binário x numéricas) ===

# 1) Descobrir o DataFrame base
def _guess_df(globs):
    candidates = ["df", "df_base", "dados", "data", "df_analise", "df_tratado"]
    for c in candidates:
        if c in globs and isinstance(globs[c], pd.DataFrame):
            return globs[c], c
    paths = []
    for pat in ["data/*.csv", "*.csv", "datasets/*.csv"]:
        paths.extend(glob.glob(pat))
    pref = [p for p in paths if any(k in os.path.basename(p).lower() for k in ["telecom", "churn", "evas", "cliente"])]
    candidate_paths = pref + paths
    for p in candidate_paths:
        try:
            _df = pd.read_csv(p)
            return _df, f"loaded_from_{p}"
        except Exception:
            continue
    raise RuntimeError("Não foi possível localizar o DataFrame. Garanta que uma variável do tipo DataFrame esteja definida (ex.: df).")

df, df_name = _guess_df(globals())

# 2) Identificar coluna-alvo (binária)
def _guess_target(_df):
    target_names = ["Evasao","Evasão","Churn","churn","evadiu","cancelou","saida","Exited","Attrition"]
    cols_lower = {c.lower(): c for c in _df.columns}
    for name in target_names:
        if name in _df.columns:
            return name
        if name.lower() in cols_lower:
            return cols_lower[name.lower()]
    bin_candidates = [c for c in _df.columns if _df[c].dropna().nunique() == 2]
    if len(bin_candidates) == 1:
        return bin_candidates[0]
    raise RuntimeError("Não foi possível identificar a coluna alvo. Renomeie/defina explicitamente a variável alvo (ex.: 'Evasao').")

target_col = _guess_target(df)

# 3) Garantir que o alvo esteja binário {0,1}
y_raw = df[target_col].copy()
if y_raw.dtype == bool:
    y = y_raw.astype(int)
elif y_raw.dtype.kind in "biu":
    vals = sorted(pd.Series(y_raw.dropna().unique()).tolist())
    if set(vals).issubset({0,1}):
        y = y_raw.astype(int)
    else:
        most, least = pd.Series(vals).value_counts().index[:2].tolist()
        y = (y_raw == least).astype(int)
else:
    map_yes = {"yes","sim","true","churn","evadiu","cancelou","s","1"}
    y = y_raw.astype(str).str.strip().str.lower().map(lambda v: 1 if v in map_yes else 0)

# 4) Selecionar numéricas (antes de encoding, se disponível)
num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
if target_col in num_cols:
    num_cols.remove(target_col)

if len(num_cols) == 0:
    raise RuntimeError("Não há colunas numéricas detectadas para calcular correlação.")

# 5) Correlação point-biserial alvo (binário) vs cada numérica
rows = []
for c in num_cols:
    x = pd.to_numeric(df[c], errors="coerce")
    mask = (~pd.isna(x)) & (~pd.isna(y))
    if mask.sum() < 10:
        continue
    r, p = pointbiserialr(y[mask], x[mask])
    rows.append({"feature": c, "pointbiserial_r": r, "p_value": p, "abs_r": abs(r)})

corr_df = pd.DataFrame(rows).sort_values("abs_r", ascending=False).reset_index(drop=True)

display(corr_df.head(20))

# 6) Plot Top-N correlações (|r|)
TOPN = 15
top = corr_df.head(TOPN)
plt.figure(figsize=(10, max(4, TOPN*0.35)))
plt.barh(top["feature"][::-1], top["abs_r"][::-1])
plt.title(f"Top-{TOPN} | Correlação (|Point-Biserial r|) com {target_col}")
plt.xlabel("|r|")
plt.tight_layout()
plt.show()

# 7. Análises Direcionadas

Visualizações focadas na relação com a evasão: boxplots das variáveis mais correlacionadas e dispersões `tenure x MonthlyCharges`. Se não houver TotalCharges, cria-se `TotalCharges_calc = MonthlyCharges * tenure` (apenas para análise).

In [None]:


assert 'df' in globals() and 'target_col' in globals()

y_plot = df[target_col].copy()
if y_plot.dtype != 'int64' and y_plot.dtype != 'int32':
    map_yes = {"yes","sim","true","churn","evadiu","cancelou","s","1"}
    y_plot = y_plot.astype(str).str.strip().str.lower().map(lambda v: 1 if v in map_yes else 0).fillna(0).astype(int)

df_plot = df.copy()
df_plot["_alvo_bin"] = y_plot

num_top = []
if 'corr_df' in globals() and not corr_df.empty:
    num_top = corr_df['feature'].tolist()[:2]
else:
    num_cols = df_plot.select_dtypes(include=[np.number]).columns.tolist()
    if target_col in num_cols: 
        num_cols.remove(target_col)
    var_order = df_plot[num_cols].var().sort_values(ascending=False).index.tolist()
    num_top = var_order[:2]

for c in num_top:
    plt.figure(figsize=(7,4))
    data0 = df_plot.loc[df_plot["_alvo_bin"]==0, c].dropna().values
    data1 = df_plot.loc[df_plot["_alvo_bin"]==1, c].dropna().values
    plt.boxplot([data0, data1], labels=["Sem Evasão (0)","Com Evasão (1)"], showmeans=True)
    plt.title(f"Boxplot de {c} por Evasão")
    plt.ylabel(c)
    plt.tight_layout()
    plt.show()

x_name, y_name = None, None
for cand in ["tenure","TempoContrato","tempo_contrato","meses"]:
    if cand in df_plot.columns:
        x_name = cand
        break
for cand in ["MonthlyCharges","CobrancaMensal","mensalidade","mensal"]:
    if cand in df_plot.columns:
        y_name = cand
        break

if x_name and y_name:
    plt.figure(figsize=(7,5))
    m0 = df_plot["_alvo_bin"] == 0
    m1 = df_plot["_alvo_bin"] == 1
    plt.scatter(df_plot.loc[m0, x_name], df_plot.loc[m0, y_name], alpha=0.4, label="Sem Evasão (0)")
    plt.scatter(df_plot.loc[m1, x_name], df_plot.loc[m1, y_name], alpha=0.4, label="Com Evasão (1)")
    plt.xlabel(x_name)
    plt.ylabel(y_name)
    plt.title(f"Dispersão: {x_name} x {y_name} por Evasão")
    plt.legend()
    plt.tight_layout()
    plt.show()

total_name = None
for cand in ["TotalCharges","TotalGasto","GastoTotal","Fatura_Total"]:
    if cand in df_plot.columns:
        total_name = cand
        break

if total_name is None and x_name and y_name:
    df_plot["TotalCharges_calc"] = pd.to_numeric(df_plot[y_name], errors="coerce") * pd.to_numeric(df_plot[x_name], errors="coerce")
    total_name = "TotalCharges_calc"

if total_name:
    plt.figure(figsize=(7,4))
    data0 = df_plot.loc[df_plot["_alvo_bin"]==0, total_name].dropna().values
    data1 = df_plot.loc[df_plot["_alvo_bin"]==1, total_name].dropna().values
    plt.boxplot([data0, data1], labels=["Sem Evasão (0)","Com Evasão (1)"], showmeans=True)
    plt.title(f"Boxplot de {total_name} por Evasão")
    plt.ylabel(total_name)
    plt.tight_layout()
    plt.show()

# 9. Diagnóstico de Overfitting / Underfitting

Compara validação cruzada (treino) x desempenho no teste do modelo final, com deltas e interpretação automática.

In [None]:



warnings.filterwarnings("ignore")

best_pipe = None
X_train = X_test = y_train = y_test = None

possible_model_paths = [
    "artefatos/modelo_final.pkl",
    "artefatos/pipeline_final.pkl",
    "artefatos/modelo_rf.pkl",
    "artefatos/best_model.pkl",
    "model/pipeline_final.pkl",
]
possible_data_paths = [
    ("artefatos/X_train.pkl","artefatos/y_train.pkl","artefatos/X_test.pkl","artefatos/y_test.pkl"),
]

try:
    for p in possible_model_paths:
        if os.path.exists(p):
            best_pipe = joblib.load(p)
            break
    if best_pipe is None:
        if "best_model" in globals():
            best_pipe = globals()["best_model"]
        elif "final_model" in globals():
            best_pipe = globals()["final_model"]
except Exception:
    pass

try:
    for (a,b,c,d) in possible_data_paths:
        if a and os.path.exists(a):
            X_train = joblib.load(a)
            y_train = joblib.load(b) if b and os.path.exists(b) else None
            X_test  = joblib.load(c) if c and os.path.exists(c) else None
            y_test  = joblib.load(d) if d and os.path.exists(d) else None
            break
except Exception:
    pass

if X_train is None or y_train is None:
    if 'df' in globals() and 'target_col' in globals():
        y_all = globals()['df'][target_col]
        X_all = globals()['df'].drop(columns=[target_col])
        X_train, X_test, y_train, y_test = train_test_split(
            X_all, y_all, test_size=0.2, random_state=42, stratify=y_all
        )

if best_pipe is None:
    raise RuntimeError("Não foi possível localizar o pipeline/modelo final. Ex.: 'best_model' ou .pkl em artefatos/.")

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scoring = {"acc":"accuracy","prec":"precision","rec":"recall","f1":"f1","roc":"roc_auc"}
cv_res = cross_validate(best_pipe, X_train, y_train, scoring=scoring, cv=cv, n_jobs=-1, return_train_score=False)

cv_mean = {k: float(np.mean(v)) for k,v in cv_res.items() if k.startswith("test_")}
cv_tbl = pd.DataFrame({k.replace("test_","CV_"+k.split("_",1)[1]): [v] for k,v in cv_mean.items()})

best_pipe.fit(X_train, y_train)
y_pred = best_pipe.predict(X_test)
y_proba = None
if hasattr(best_pipe, "predict_proba"):
    y_proba = best_pipe.predict_proba(X_test)[:,1]
else:
    try:
        y_proba = best_pipe.decision_function(X_test)
    except Exception:
        pass

test_metrics = {
    "TEST_acc": accuracy_score(y_test, y_pred),
    "TEST_prec": precision_score(y_test, y_pred, zero_division=0),
    "TEST_rec": recall_score(y_test, y_pred, zero_division=0),
    "TEST_f1": f1_score(y_test, y_pred, zero_division=0),
}
if y_proba is not None:
    try:
        test_metrics["TEST_roc"] = roc_auc_score(y_test, y_proba)
    except Exception:
        pass
test_tbl = pd.DataFrame([test_metrics])

comp = pd.concat([cv_tbl, test_tbl], axis=1)
display(comp.T)

def _interp(row):
    msgs = []
    for met in ["acc","prec","rec","f1","roc"]:
        cv_key = "CV_"+met
        t_key = "TEST_"+met
        if cv_key in row and t_key in row and pd.notna(row[cv_key]) and pd.notna(row[t_key]):
            delta = row[t_key] - row[cv_key]
            msgs.append(f"{met.upper()}: TEST - CV = {delta:.4f}")
    if msgs:
        print("\n".join(msgs))
    if "CV_f1" in row and "TEST_f1" in row:
        diff = row["TEST_f1"] - row["CV_f1"]
        if diff < -0.03:
            print("\nPossível OVERFITTING: teste abaixo da média de CV.")
        elif diff > 0.03:
            print("\nPossível UNDERFITTING (ou CV pessimista): teste acima da média de CV.")
        else:
            print("\nGeneralização consistente: CV ~ Teste.")
            
_interp(comp.iloc[0])

# 10. Conclusão Estratégica

Gera um sumário auto-preenchido com os principais fatores (Top-10) e sugestões de retenção heurísticas, além de recomendações operacionais.
- 6. Correlação e Seleção de Variáveis
- 7. Análises Direcionadas
- 9. Diagnóstico de Overfitting / Underfitting
- 10. Conclusão Estratégica

In [None]:


if 'best_pipe' not in globals():
    raise RuntimeError("O modelo final 'best_pipe' não está disponível. Rode o bloco anterior (over/underfitting).")

feature_names = None
try:
    set_config(transform_output="default")
    if hasattr(best_pipe, "named_steps"):
        pre = best_pipe.named_steps.get("preprocess") or best_pipe.named_steps.get("preprocessor")
        if pre is not None and hasattr(pre, "get_feature_names_out"):
            feature_names = pre.get_feature_names_out()
except Exception:
    pass

importances = None
model = best_pipe
if hasattr(model, "named_steps"):
    for k in ("clf","model","classifier","final_estimator"):
        if k in model.named_steps:
            est = model.named_steps[k]
            break
    else:
        est = None
else:
    est = model

if est is not None:
    if hasattr(est, "feature_importances_"):
        importances = np.array(est.feature_importances_)
    elif hasattr(est, "coef_"):
        coef = est.coef_
        if hasattr(coef, "toarray"):
            coef = coef.toarray()
        importances = np.mean(np.abs(coef), axis=0)

ranking = []
if importances is not None and feature_names is not None and len(importances) == len(feature_names):
    for n, v in zip(feature_names, importances):
        ranking.append((n, float(v)))
elif 'corr_df' in globals() and not corr_df.empty:
    ranking = list(zip(corr_df['feature'].tolist(), corr_df['abs_r'].tolist()))
else:
    X_try = globals().get('X_train', None)
    if X_try is not None and isinstance(X_try, pd.DataFrame):
        variances = X_try.var().sort_values(ascending=False)
        ranking = list(zip(variances.index.tolist(), variances.values.tolist()))

TOPN = 10
ranking = sorted(ranking, key=lambda x: x[1], reverse=True)[:TOPN]

suggestions = OrderedDict([
    ("contract|month", "Clientes com contrato mensal tendem a evadir. Ofereça **migração para anual** com benefícios."),
    ("tenure|tempo", "Clientes com **pouco tempo de casa** exigem **onboarding reforçado** e contatos proativos (90 dias)."),
    ("charges|mensal", "Cobrança mensal alta -> ofereça **planos mais adequados** ou **bundles** com melhor custo-benefício."),
    ("internet|fiber|dsl", "Tipo de Internet influencia satisfação. **Monitore qualidade** e **oriente upgrade** quando viável."),
    ("techsupport|support|suporte", "Ausência de **suporte técnico** liga-se à evasão. Proponha **canal prioritário**."),
    ("payment|automatic|cartao|debito", "Incentive **débito automático**/cartão para reduzir fricção e atrasos."),
    ("dependents|partner|senior", "Perfis família/idade podem demandar **planos familiares** e comunicação segmentada."),
    ("phoneservice|multiplelines", "Reveja **pacotes de telefonia** e tarifas; alinhe oferta ao uso real."),
])

def suggest_for_feature(name):
    n = str(name).lower()
    for k, msg in suggestions.items():
        keys = k.split("|")
        if any(kk in n for kk in keys):
            return msg
    return "Monitore em campanhas de **retenção segmentada** e teste ofertas A/B."

lines = []
lines.append("## 10.1 Principais fatores (Top-10)")
for i, (feat, score) in enumerate(ranking, 1):
    lines.append(f"- **{i:02d}. {feat}** — relevância: {score:.4f}")

lines.append("\n## 10.2 Estratégias de retenção sugeridas (heurísticas)")
already = set()
for feat, _ in ranking:
    s = suggest_for_feature(feat)
    if s not in already:
        lines.append(f"- **{feat}:** {s}")
        already.add(s)

lines.append("""
## 10.3 Recomendações operacionais
- **Threshold:** se o custo de perder um cliente é alto, reduza o limiar para aumentar *recall* e acione **playbooks**.
- **Playbook 7–14 dias:** contato proativo + oferta personalizada (upgrade, desconto temporário, bônus de fidelidade).
- **Monitoração:** reentreinar o modelo trimestralmente; acompanhar *drift*; comparar **CV x produção**.
""")

display(Markdown("\n".join(lines)))