
# Análisis de Evasión — TelecomX (JSON → Pandas → Limpieza → Visualización → Informe)

**Instrucciones rápidas**  
- Ejecuta las celdas de arriba hacia abajo.  
- La notebook descarga el JSON desde la URL pública, limpia los datos, calcula métricas y crea un informe final en Markdown en `informe_final.md`.

**Fuente de datos (puedes cambiarla si lo necesitas):**
```
https://raw.githubusercontent.com/ingridcristh/challenge2-data-science-LATAM/refs/heads/main/TelecomX_Data.json
```

> Nota: Esta notebook intenta detectar automáticamente la columna objetivo relacionada con *evasión / churn*. Si no la encuentra, seguirá con análisis descriptivo general y te indicará qué hacer.


In [None]:

# @title 1) Importaciones y Configuración
import io
import re
import json
import math
import requests
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import List, Tuple, Optional, Dict

# Configuración de pandas
pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 150)

# Semilla para reproducibilidad
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)

URL = "https://raw.githubusercontent.com/ingridcristh/challenge2-data-science-LATAM/refs/heads/main/TelecomX_Data.json"  # @param {type:"string"}


In [None]:

# @title 2) Cargar JSON → DataFrame
def load_json_to_df(url: str) -> pd.DataFrame:
    # Intento 1: pandas.read_json directo
    try:
        df = pd.read_json(url, dtype=False)
        if isinstance(df, pd.DataFrame) and df.shape[0] > 0:
            return df
    except Exception as e:
        print("read_json falló:", e)

    # Intento 2: usar requests y json.loads
    try:
        resp = requests.get(url, timeout=60)
        resp.raise_for_status()
        data = resp.json()
        if isinstance(data, list):
            return pd.DataFrame(data)
        elif isinstance(data, dict):
            # Si es un dict con una clave principal con lista
            for v in data.values():
                if isinstance(v, list):
                    return pd.DataFrame(v)
            # Si no, normalizar
            return pd.json_normalize(data)
    except Exception as e:
        print("requests/json falló:", e)

    raise RuntimeError("No se pudo cargar el JSON en un DataFrame. Verifica la URL o el formato.")

df_raw = load_json_to_df(URL)
print("Cargado: filas =", len(df_raw), " | columnas =", df_raw.shape[1])
df_raw.head(3)


In [None]:

# @title 3) Normalizar nombres de columnas
def normalize_colname(c: str) -> str:
    c = c.strip()
    c = re.sub(r"[^\w\s]", "_", c, flags=re.UNICODE)
    c = re.sub(r"\s+", "_", c, flags=re.UNICODE)
    c = c.lower()
    c = re.sub(r"_+", "_", c)
    return c.strip("_")

df = df_raw.copy()
df.columns = [normalize_colname(c) for c in df.columns]
print("Columnas normalizadas:", list(df.columns)[:20])
df.head(3)


In [None]:

# @title 4) Calidad de datos: tipos, nulos, duplicados, formatos
profile = {}

# Tipos iniciales
profile["dtypes_iniciales"] = df.dtypes.astype(str).to_dict()

# Conteo de nulos
nulls = df.isna().sum().sort_values(ascending=False)
profile["nulos_por_columna"] = nulls.to_dict()

# Duplicados
duplicated_rows = df.duplicated().sum()
profile["filas_duplicadas"] = int(duplicated_rows)

# Heurísticas de conversión de tipos
date_like = [c for c in df.columns if re.search(r"(fecha|date|_at|_on)$", c)]
numeric_like = [c for c in df.columns if re.search(r"(monto|charge|importe|total|balance|price|cost|amount|tenure|minutes|calls|gb|data|count|num|porcentaje|pct|rate)", c)]
bool_like = [c for c in df.columns if re.search(r"(si|sí|no|true|false|yes|no|churn|evad|evas|cancel|active|inactive|status)", c)]

# Convertir fechas
for c in date_like:
    try:
        df[c] = pd.to_datetime(df[c], errors="coerce")
    except Exception:
        pass

# Convertir numéricos
for c in numeric_like:
    if df[c].dtype == "object":
        # Remover símbolos comunes y convertir
        df[c] = (
            df[c]
            .astype(str)
            .str.replace(",", "", regex=False)
            .str.replace("$", "", regex=False)
            .str.replace("%", "", regex=False)
        )
        df[c] = pd.to_numeric(df[c], errors="coerce")

# Convertir booleanos (Yes/No, Sí/No, True/False)
def to_bool_series(s: pd.Series) -> Optional[pd.Series]:
    if s.dtype != "object":
        return None
    mapping = {
        "yes": True, "no": False,
        "si": True, "sí": True, "no.": False, "n": False, "s": True,
        "true": True, "false": False,
        "1": True, "0": False
    }
    vals = s.dropna().astype(str).str.strip().str.lower().unique()
    # Si tiene pocos valores y encajan en el mapeo, convertir
    if len(vals) <= 6 and all(v in mapping or v in ["", "nan"] for v in vals):
        return s.astype(str).str.strip().str.lower().map(mapping).astype("boolean")
    return None

for c in bool_like:
    conv = to_bool_series(df[c])
    if conv is not None:
        df[c] = conv

# Eliminar duplicados exactos
if duplicated_rows > 0:
    df = df.drop_duplicates().reset_index(drop=True)

profile["dtypes_finales"] = df.dtypes.astype(str).to_dict()
profile["shape_final"] = df.shape

print("Chequeos completos.")
print("Duplicados eliminados:", duplicated_rows)
print("Nulos por columna (top 10):")
nulls.head(10)


In [None]:

# @title 5) Detección automática de 'evasión' (columna objetivo)
candidate_patterns = [
    r"churn", r"evad", r"evas", r"cancel", r"aband", r"desert", r"default",
    r"fraud", r"attrition", r"baja", r"leav"
]

target_col = None
for c in df.columns:
    if any(re.search(p, c) for p in candidate_patterns):
        # preferir columnas binarias
        nunique = df[c].nunique(dropna=True)
        if nunique <= 5:
            target_col = c
            break

if target_col is None:
    # fallback: buscar columnas booleanas
    for c in df.columns:
        if str(df[c].dtype) in ["bool", "boolean"]:
            if any(re.search(p, c) for p in candidate_patterns) or df[c].nunique(dropna=True) == 2:
                target_col = c
                break

print("Columna objetivo detectada:", target_col)
if target_col is not None:
    print("Distribución de clases:")
    print(df[target_col].value_counts(dropna=False))
else:
    print("No se detectó automáticamente una columna clara de evasión. Se continuará con análisis general.")


In [None]:

# @title 6) Relevancia de columnas vs evasión
relevance_table = pd.DataFrame()

if target_col is not None:
    # Preparación de features
    y = df[target_col].astype("float").fillna(-1)  # -1 para NaN (se excluirá)
    mask = y.isin([0.0, 1.0])
    y = y[mask]
    X = df.loc[mask].drop(columns=[target_col]).copy()

    # Codificación simple: numéricos tal cual; categóricos → códigos
    X_enc = pd.DataFrame(index=X.index)
    for c in X.columns:
        if pd.api.types.is_numeric_dtype(X[c]):
            X_enc[c] = X[c].fillna(X[c].median())
        else:
            X_enc[c] = X[c].astype("category").cat.codes.replace(-1, np.nan)
            X_enc[c] = X_enc[c].fillna(X_enc[c].median())

    # Mutual information (sin scikit, estimador discretizado simple)
    def discretize(s: pd.Series, bins=10):
        try:
            return pd.qcut(s, q=min(bins, s.nunique()), duplicates='drop').astype(str)
        except Exception:
            return s.astype(str)

    def mi_discrete(x: pd.Series, y: pd.Series) -> float:
        # cálculo MI básico con tablas de contingencia
        xy = pd.crosstab(x, y)
        px = xy.sum(axis=1) / xy.values.sum()
        py = xy.sum(axis=0) / xy.values.sum()
        pxy = xy / xy.values.sum()
        mi = 0.0
        for i in pxy.index:
            for j in pxy.columns:
                pij = pxy.loc[i, j]
                if pij > 0:
                    mi += pij * math.log(pij / (px[i] * py[j] + 1e-12) + 1e-12)
        return float(mi)

    scores = []
    for c in X_enc.columns:
        xv = X_enc[c]
        xv_disc = discretize(xv)
        try:
            score = mi_discrete(xv_disc, y)
        except Exception:
            score = np.nan
        scores.append((c, score))

    relevance_table = pd.DataFrame(scores, columns=["columna", "mi_discreta"]).sort_values("mi_discreta", ascending=False)
    display(relevance_table.head(20))
else:
    print("Sin columna objetivo, se omite la tabla de relevancia.")


In [None]:

# @title 7) Crear columna 'Cuentas_Diarias' (valor diario estimado)
def create_cuentas_diarias(df: pd.DataFrame) -> pd.Series:
    # Heurística: si existe monthlycharges/monto_mensual => dividir entre 30
    monthly_cols = [c for c in df.columns if re.search(r"(monthly|mensual)", c)]
    total_cols   = [c for c in df.columns if re.search(r"(total|acumulado)", c)]
    tenure_cols  = [c for c in df.columns if re.search(r"(tenure|meses|months)", c)]
    daily = pd.Series(np.nan, index=df.index, dtype="float")

    # Caso 1: monthly charges
    for c in monthly_cols:
        if pd.api.types.is_numeric_dtype(df[c]):
            daily = df[c] / 30.0
            break

    # Caso 2: total / tenure
    if daily.isna().all() and total_cols and tenure_cols:
        for tc in total_cols:
            for tn in tenure_cols:
                if pd.api.types.is_numeric_dtype(df[tc]) and pd.api.types.is_numeric_dtype(df[tn]):
                    with np.errstate(divide='ignore', invalid='ignore'):
                        est = df[tc] / (df[tn] * 30.0)
                        est.replace([np.inf, -np.inf], np.nan, inplace=True)
                        if est.notna().mean() > daily.notna().mean():
                            daily = est

    return daily

df["cuentas_diarias"] = create_cuentas_diarias(df)
df["cuentas_diarias"].describe()


In [None]:

# @title 8) Estadística descriptiva (media, mediana, desviación estándar)
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
desc_stats = pd.DataFrame({
    "media": df[numeric_cols].mean(),
    "mediana": df[numeric_cols].median(),
    "desv_std": df[numeric_cols].std(ddof=1),
})
display(desc_stats.head(20))


In [None]:

# @title 9) Visualizaciones de evasión
# NOTA: Reglas solicitadas: usar matplotlib, una figura por gráfico, sin especificar colores.
import os

os.makedirs("figuras", exist_ok=True)

if target_col is not None:
    # 9.1 Distribución de la evasión
    plt.figure()
    df[target_col].value_counts(dropna=False).sort_index().plot(kind="bar")
    plt.title("Distribución de Evasión (objetivo)")
    plt.xlabel(target_col)
    plt.ylabel("Número de registros")
    plt.tight_layout()
    plt.savefig("figuras/distribucion_evasion.png")
    plt.show()

    # 9.2 Evasión por categorías (top 3 variables categóricas más relevantes)
    cat_cols = [c for c in df.columns if c != target_col and not pd.api.types.is_numeric_dtype(df[c])]
    # Seleccionamos hasta 3 columnas categóricas con mayor MI (si existe tabla)
    top_cats = []
    if 'relevance_table' in globals() and isinstance(relevance_table, pd.DataFrame) and not relevance_table.empty:
        for _, row in relevance_table.iterrows():
            c = row["columna"]
            if c in cat_cols:
                top_cats.append(c)
            if len(top_cats) >= 3:
                break
    else:
        top_cats = cat_cols[:3]

    for c in top_cats:
        plt.figure()
        rates = df.groupby(c)[target_col].mean()
        rates.plot(kind="bar")
        plt.title(f"Tasa de evasión por {c}")
        plt.xlabel(c)
        plt.ylabel("Tasa de evasión")
        plt.tight_layout()
        plt.savefig(f"figuras/tasa_evasion_por_{c}.png")
        plt.show()

    # 9.3 Si existe 'cuentas_diarias', ver distribución por estado de evasión
    if "cuentas_diarias" in df.columns and pd.api.types.is_numeric_dtype(df["cuentas_diarias"]):
        plt.figure()
        df.boxplot(column="cuentas_diarias", by=target_col)
        plt.title("Cuentas Diarias por estado de evasión")
        plt.suptitle("")
        plt.xlabel(target_col)
        plt.ylabel("Cuentas Diarias (estimación)")
        plt.tight_layout()
        plt.savefig("figuras/cuentas_diarias_por_evasion.png")
        plt.show()
else:
    # Visualización general si no hay target
    plt.figure()
    df.isna().sum().sort_values(ascending=False).head(20).plot(kind="bar")
    plt.title("Valores nulos por columna (Top 20)")
    plt.xlabel("Columna")
    plt.ylabel("Nulos")
    plt.tight_layout()
    plt.savefig("figuras/nulos_top20.png")
    plt.show()


In [None]:

# @title 10) Informe final (Markdown)
lines = []
lines.append("# Informe Final — Análisis de Evasión (TelecomX)\n")
lines.append("## 1. Resumen de la carga y estructura\n")
lines.append(f"- Filas: **{df.shape[0]}**, Columnas: **{df.shape[1]}**\n")
lines.append("### Columnas (primeras 30)\n")
cols_list = ", ".join(list(df.columns)[:30])
lines.append(f"{cols_list}\n")

lines.append("## 2. Calidad de datos\n")
lines.append(f"- Filas duplicadas eliminadas: **{profile.get('filas_duplicadas', 0)}**\n")
lines.append("### Nulos por columna (Top 10)\n")
lines.append(df.isna().sum().sort_values(ascending=False).head(10).to_markdown())

lines.append("\n## 3. Tipos de datos (antes → después)\n")
dtypes_i = profile.get("dtypes_iniciales", {})
dtypes_f = profile.get("dtypes_finales", {})
pairs = []
for c in df.columns:
    di = dtypes_i.get(c, "?")
    dfinal = dtypes_f.get(c, "?")
    if di != dfinal:
        pairs.append(f"- `{c}`: {di} → {dfinal}")
if not pairs:
    lines.append("- No hubo cambios sustanciales en los tipos inferidos.\n")
else:
    lines.extend(pairs)

lines.append("\n## 4. Columna objetivo (evasión)\n")
if 'target_col' in globals() and target_col is not None:
    lines.append(f"- Objetivo detectado: **`{target_col}`**\n")
    dist = df[target_col].value_counts(dropna=False).to_frame("conteo")
    lines.append("### Distribución de clases\n")
    lines.append(dist.to_markdown())
else:
    lines.append("- **No se detectó** automáticamente una columna de evasión. Revisa posibles columnas como `churn`, `evasión`, `cancelado`, etc.\n")

lines.append("\n## 5. Variables relevantes\n")
if 'relevance_table' in globals() and isinstance(relevance_table, pd.DataFrame) and not relevance_table.empty:
    lines.append(relevance_table.head(15).to_markdown(index=False))
else:
    lines.append("- Sin variable objetivo, no se calcularon relevancias.")

lines.append("\n## 6. Cuentas_Diarias\n")
if "cuentas_diarias" in df.columns:
    desc_cd = df["cuentas_diarias"].describe().to_frame().to_markdown()
    lines.append("- Columna creada: `cuentas_diarias` (estimación)")
    lines.append(desc_cd)
else:
    lines.append("- No fue posible crear `cuentas_diarias` por falta de columnas monetarias/tenencia.")

lines.append("\n## 7. Estadística descriptiva\n")
lines.append(desc_stats.head(30).to_markdown())

lines.append("\n## 8. Visualizaciones generadas\n")
figs = []
if os.path.exists("figuras"):
    for f in sorted(os.listdir("figuras")):
        if f.lower().endswith((".png", ".jpg", ".jpeg", ".webp")):
            figs.append(f"- `figuras/{f}`")
if figs:
    lines.extend(figs)
else:
    lines.append("- No se generaron figuras (posiblemente por falta de columna objetivo).")

lines.append("\n## 9. Recomendaciones siguientes\n")
lines.append("- Validar manualmente la columna objetivo de evasión si no fue detectada.")
lines.append("- Revisar categorías con altas tasas de evasión para diseñar retención.")
lines.append("- Evaluar modelos predictivos (árboles, logística) si el objetivo está claro.")
lines.append("- Añadir variables de negocio (descuentos, quejas, antigüedad) para mejor señal.")

with open("/mnt/data/informe_final.md", "w", encoding="utf-8") as f:
    f.write("\n".join(lines))

print("Informe generado en /mnt/data/informe_final.md")
