In [5]:
# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np
from pathlib import Path
pd.options.display.float_format = "{:,.2f}".format

# =========================================================
# Config mapeos / alias de columnas y nombres de hojas
# (ajusta aquí si tu archivo usa encabezados distintos)
# =========================================================
XLSX_PATH = Path("db/vulnerabilidad.xlsx")

SHEET_PERSONAS = "Personas"
SHEET_UNIVERSO = "Universo Familiares"
SHEET_INGRESOS = "Ingresos"
SHEET_DEUDAS = "Deudas"

# Dtype for IDs → todo texto
DTYPE_DICT = {
    "identificacion": str,
    "ruc_empleador": str,
    "ced_padre": str,
    "ced_madre": str,
}

# Columnas esperadas (puedes mapear si difieren en tu xlsx)
COL_PERIODO = "periodo"
COL_IDENT = "identificacion"
COL_TIPO = "tipo"  # 'A' (Afluentes) o 'E' (Enrollment)

COL_CED_PADRE = "ced_padre"
COL_CED_MADRE = "ced_madre"

COL_ANIO = "anio"
COL_MES = "mes"
COL_SALARIO = "salario"  # ingreso mensual
COL_DEUDA = "valor"  # deuda

# Qué meses usar para ingresos y deudas (por defecto 2025-06)
ANIO_FILTRO = 2025
MES_FILTRO = 6

# Multiplicador para llevar salario mensual a anual
SALARIO_MESES_ANO = 14


# =========================================================
# Helpers
# =========================================================


def parse_monto(val) -> float:
    if pd.isna(val):
        return float("nan")
    s = str(val).strip()
    if s == "":
        return float("nan")
    # quita símbolos y cambia coma→punto
    s = s.replace(" ", "").replace("$", "").replace(",", ".")
    try:
        return float(s)
    except Exception:
        return float("nan")


def _norm_cols(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df.columns = [str(c).strip() for c in df.columns]
    return df


def make_hogar_id(ced_padre: str, ced_madre: str) -> str:
    """
    Genera un ID de hogar estable:
    - Si ambos '0' -> ""
    - Si solo uno válido -> ese
    - Si ambos válidos -> concatenación ordenada "menor-mayor"
    """
    a = str(ced_padre or "").strip()
    b = str(ced_madre or "").strip()
    a = "0" if a == "" else a
    b = "0" if b == "" else b
    if a == "0" and b == "0":
        return ""
    if a == "0":
        return b
    if b == "0":
        return a
    return "-".join(sorted([a, b]))


def _stats_from_series(s: pd.Series) -> dict:
    """Devuelve min, max, mediana, media, std y n para una serie numérica."""
    s = pd.to_numeric(s, errors="coerce")
    s = s.replace([np.inf, -np.inf], np.nan).dropna()
    return {
        "min": float(s.min()) if not s.empty else np.nan,
        "max": float(s.max()) if not s.empty else np.nan,
        "mediana": float(s.median()) if not s.empty else np.nan,
        "media": float(s.mean()) if not s.empty else np.nan,
        "desviacion": float(s.std(ddof=1)) if len(s) > 1 else 0.0,
        "n": int(s.size),
    }


def _remove_outliers_iqr_series(s: pd.Series, k: float = 1.5) -> pd.Series:
    """Filtra outliers de una serie usando Tukey (IQR)."""
    s = pd.to_numeric(s, errors="coerce").dropna()
    if s.empty:
        return s
    q1, q3 = s.quantile(0.25), s.quantile(0.75)
    iqr = q3 - q1
    low, high = q1 - k * iqr, q3 + k * iqr
    return s[(s >= low) & (s <= high)]


# =========================================================
# Carga de datos desde el Excel
# =========================================================
def cargar_vulnerabilidad_xlsx(path: Path) -> dict:
    # leemos todas las hojas, forzando columnas de ID a str
    wb = pd.read_excel(
        path,
        sheet_name=None,
        engine="openpyxl",
        dtype=DTYPE_DICT,  # 👈 aquí la normalización
    )
    # Normalizamos encabezados
    data = {name: _norm_cols(df) for name, df in wb.items()}
    return data


# =========================================================
# ETL para familias, salarios, deudas (replicando tu lógica)
# =========================================================
def obtener_datos_familias(
    dfs: dict, periodo: str, grupo_seleccionado: str
) -> pd.DataFrame:
    """
    Une Personas (filtrado por periodo y tipo) con Universo Familiares (padre/madre).
    Filtra filas donde al menos un padre/madre sea válido (distinto de '0').
    """
    df_personas = dfs.get(SHEET_PERSONAS, pd.DataFrame()).copy()
    df_universo = dfs.get(SHEET_UNIVERSO, pd.DataFrame()).copy()
    if df_personas.empty or df_universo.empty:
        return pd.DataFrame()

    # Normaliza algunas columnas clave a string
    for c in [COL_PERIODO, COL_IDENT, COL_TIPO]:
        if c in df_personas.columns:
            df_personas[c] = df_personas[c].astype(str).str.strip()

    # Filtra por periodo y grupo (tipo)
    personas_periodo = df_personas[
        (df_personas[COL_PERIODO] == str(periodo))
        & (df_personas[COL_TIPO] == str(grupo_seleccionado))
    ].copy()
    if personas_periodo.empty:
        return pd.DataFrame()

    # Normaliza universo familiares
    for c in [COL_IDENT, COL_CED_PADRE, COL_CED_MADRE]:
        if c in df_universo.columns:
            df_universo[c] = df_universo[c].astype(str).str.strip().replace({"": "0"})

    familias = personas_periodo.merge(df_universo, on=COL_IDENT, how="left")
    familias = familias[
        (familias[COL_CED_PADRE].fillna("0") != "0")
        | (familias[COL_CED_MADRE].fillna("0") != "0")
    ].copy()
    return familias


def obtener_datos_salario_deuda_familia(
    dfs: dict, familias_df: pd.DataFrame
) -> pd.DataFrame:
    if familias_df.empty:
        return familias_df

    out = familias_df.copy()
    out["salario_familiar"] = 0.0
    out["deuda_familiar"] = 0.0

    # --- Ingresos (vectorizado) ---
    df_ingresos = dfs.get("Ingresos", pd.DataFrame()).copy()
    if not df_ingresos.empty:
        if "salario" in df_ingresos.columns:
            df_ingresos["salario"] = df_ingresos["salario"].apply(parse_monto).fillna(0.0)
        ingresos_mes = df_ingresos[
            (df_ingresos["anio"] == ANIO_FILTRO) & (df_ingresos["mes"] == MES_FILTRO)
        ].copy()

        if not ingresos_mes.empty:
            ing_por_id = ingresos_mes.groupby("identificacion", as_index=False)[
                "salario"
            ].sum()
            ing_map = dict(zip(ing_por_id["identificacion"], ing_por_id["salario"]))

            s_padre = out["ced_padre"].map(ing_map).fillna(0.0)
            s_madre = out["ced_madre"].map(ing_map).fillna(0.0)
            out.loc[:, "salario_familiar"] = (s_padre + s_madre) * SALARIO_MESES_ANO

    # --- Deudas (vectorizado) ---
    df_deudas = dfs.get("Deudas", pd.DataFrame()).copy()
    if not df_deudas.empty:
        if "valor" in df_deudas.columns:
            df_deudas["valor"] = df_deudas["valor"].apply(parse_monto).fillna(0.0)
        deudas_mes = df_deudas[
            (df_deudas["anio"] == ANIO_FILTRO) & (df_deudas["mes"] == MES_FILTRO)
        ].copy()

        if not deudas_mes.empty:
            deuda_por_id = deudas_mes.groupby("identificacion", as_index=False)[
                "valor"
            ].sum()
            deuda_map = dict(zip(deuda_por_id["identificacion"], deuda_por_id["valor"]))

            d_padre = out["ced_padre"].map(deuda_map).fillna(0.0)
            d_madre = out["ced_madre"].map(deuda_map).fillna(0.0)
            out.loc[:, "deuda_familiar"] = d_padre + d_madre

    return out


def _hogares_unicos_salario_deuda(familias_df: pd.DataFrame) -> pd.DataFrame:
    """
    Condensa a 1 registro por hogar: hogar_id, salario_familiar, deuda_familiar.
    """
    if familias_df.empty:
        return pd.DataFrame(columns=["hogar_id", "salario_familiar", "deuda_familiar"])

    tmp = familias_df.copy()
    tmp[COL_CED_PADRE] = tmp[COL_CED_PADRE].astype(str).str.strip().replace({"": "0"})
    tmp[COL_CED_MADRE] = tmp[COL_CED_MADRE].astype(str).str.strip().replace({"": "0"})

    tmp["hogar_id"] = tmp.apply(
        lambda r: make_hogar_id(r[COL_CED_PADRE], r[COL_CED_MADRE]), axis=1
    )
    tmp = tmp[tmp["hogar_id"] != ""]

    hogares = (
        tmp.groupby("hogar_id", as_index=False)[["salario_familiar", "deuda_familiar"]]
        .first()
        .copy()
    )

    hogares["salario_familiar"] = pd.to_numeric(
        hogares["salario_familiar"], errors="coerce"
    ).fillna(0.0)
    hogares["deuda_familiar"] = pd.to_numeric(
        hogares["deuda_familiar"], errors="coerce"
    ).fillna(0.0)
    hogares = hogares[
        (hogares["salario_familiar"] >= 0) & (hogares["deuda_familiar"] >= 0)
    ]
    return hogares


def calcular_estadisticas_ratios(
    df_hogares: pd.DataFrame, remover_outliers: bool = False, k_iqr: float = 1.5
) -> pd.DataFrame:
    """
    Espera un DF con columnas salario_familiar (anual) y deuda_familiar (total).
    Calcula stats para:
      1) ingreso_anual / deuda_total  (denominador > 0)
      2) deuda_total / ingreso_anual  (denominador > 0)
    """
    idx = ["min", "max", "mediana", "media", "desviacion", "n"]
    if df_hogares.empty:
        return pd.DataFrame(
            index=idx, columns=["ingreso_sobre_deuda", "deuda_sobre_ingreso"]
        )

    r1_mask = df_hogares["deuda_familiar"] > 0
    r2_mask = df_hogares["salario_familiar"] > 0

    r1 = (
        df_hogares.loc[r1_mask, "salario_familiar"]
        / df_hogares.loc[r1_mask, "deuda_familiar"]
    )
    r2 = (
        df_hogares.loc[r2_mask, "deuda_familiar"]
        / df_hogares.loc[r2_mask, "salario_familiar"]
    )

    if remover_outliers:
        r1 = _remove_outliers_iqr_series(r1, k=k_iqr)
        r2 = _remove_outliers_iqr_series(r2, k=k_iqr)

    stats_r1 = _stats_from_series(r1)
    stats_r2 = _stats_from_series(r2)

    out = pd.DataFrame(
        {
            "ingreso_sobre_deuda": stats_r1,
            "deuda_sobre_ingreso": stats_r2,
        }
    )
    return out


# =========================================================
# Ejecución: por grupo (A/E) y por periodo(s)
# =========================================================
def ejecutar_consola(
    grupo_seleccionado: str, remover_outliers: bool = True, k_iqr: float = 1.5
):
    """
    Imprime en consola las estadísticas para cada periodo disponible del grupo indicado.
    grupo_seleccionado: 'A' (Afluentes) o 'E' (Enrollment)
    """
    dfs = cargar_vulnerabilidad_xlsx(XLSX_PATH)
    df_personas = dfs.get(SHEET_PERSONAS, pd.DataFrame()).copy()
    if df_personas.empty:
        print("No se encontró la hoja 'Personas' o está vacía.")
        return

    # Periodos disponibles para el grupo
    mask_grupo = df_personas[COL_TIPO].astype(str).str.strip() == str(
        grupo_seleccionado
    )
    periodos = (
        df_personas.loc[mask_grupo, COL_PERIODO]
        .dropna()
        .astype(str)
        .drop_duplicates()
        .sort_values()
        .tolist()
    )
    if not periodos:
        print(f"No hay periodos para el grupo {grupo_seleccionado}.")
        return

    print(f"\n=== Grupo: {grupo_seleccionado} | Periodos: {', '.join(periodos)} ===")
    print(
        f"Parámetros ingresos/deudas usados: año={ANIO_FILTRO}, mes={MES_FILTRO}, salario_anual={SALARIO_MESES_ANO}x\n"
    )

    for periodo in periodos:
        familias = obtener_datos_familias(dfs, periodo, grupo_seleccionado)
        if familias.empty:
            print(f"[{periodo}] Sin familias.")
            continue

        familias_sd = obtener_datos_salario_deuda_familia(dfs, familias)
        hogares = _hogares_unicos_salario_deuda(familias_sd)

        stats = calcular_estadisticas_ratios(
            hogares, remover_outliers=remover_outliers, k_iqr=k_iqr
        )

        print(f"--- Periodo {periodo} ---")
        print(f"Total hogares únicos: {len(hogares):,}")
        print(stats.to_string())
        print()


# =========================================================
# MAIN (ajusta grupo según necesites)
# =========================================================
if __name__ == "__main__":
    # Ejecuta para Enrollment (E) y Afluentes (A) si quieres
    ejecutar_consola(grupo_seleccionado="E", remover_outliers=False, k_iqr=1.5)
    # ejecutar_consola(grupo_seleccionado="A", remover_outliers=True, k_iqr=1.5)


=== Grupo: E | Periodos: 202520 ===
Parámetros ingresos/deudas usados: año=2025, mes=6, salario_anual=14x

--- Periodo 202520 ---
Total hogares únicos: 12,731
            ingreso_sobre_deuda  deuda_sobre_ingreso
min                        0.00                 0.00
max                2,038,400.00               664.93
mediana                    0.73                 0.67
media                    356.22                 2.90
desviacion            23,680.17                12.23
n                     10,969.00             9,797.00



In [7]:
import pandas as pd
from pathlib import Path
from datetime import datetime

# ==========================
# Config (ajusta si quieres)
# ==========================
DB_DIR = Path("db")
SHEET_UNIVERSO = "Universo Familiares"
SHEET_PERSONAS = "Personas"  # opcional
SHEET_INGRESOS = "Ingresos"  # requerido para 'trabajando'
FILE_NAME = None  # si conoces el nombre, ponlo: "vulnerabilidad.xlsx"
PERIODO = None  # ejemplo: "2024-1" o None para no filtrar por periodo
ANIO = 2025
MES = 6
EMPLEOS_VALIDOS = ["Relacion de Dependencia", "Afiliacion Voluntaria"]

# Filtros que quieres replicar del caso:
CANT_PAPAS = 2  # 1, 2, o None (para Todos)
CANT_PAPAS_TRAB = 1  # 0, 1, 2, o None (para Todos)
TIPO_EMPLEO = "Todos"  # "Todos" | "Relacion de Dependencia" | "Afiliacion Voluntaria" | "Desconocido"


# ==========================
# Helpers
# ==========================
def make_hogar_id(ced_padre: str, ced_madre: str) -> str:
    """
    Replica la idea de hogar único, independiente del orden.
    Ignora "0" como no-informativo.
    """
    a = str(ced_padre).strip()
    b = str(ced_madre).strip()
    ids = [x for x in (a, b) if x and x != "0"]
    if not ids:
        return ""
    ids = sorted(ids)
    return "|".join(ids)


def load_excel_auto(db_dir: Path, file_name: str | None):
    if file_name:
        fp = db_dir / file_name
        if not fp.exists():
            raise FileNotFoundError(f"No se encontró: {fp}")
        return fp
    cands = list(db_dir.glob("*.xls*"))
    if not cands:
        raise FileNotFoundError("No se encontró ningún Excel en db/")
    return cands[0]


# ==========================
# 1) Cargar hojas
# ==========================
file_path = load_excel_auto(DB_DIR, FILE_NAME)
print(f"Usando archivo: {file_path}")

xl = pd.ExcelFile(file_path)
if SHEET_UNIVERSO not in xl.sheet_names:
    raise KeyError(f"No existe la hoja '{SHEET_UNIVERSO}' en el archivo.")

df_univ = pd.read_excel(
    file_path, sheet_name=SHEET_UNIVERSO, dtype=str, engine="openpyxl"
)
df_univ.columns = [c.strip() for c in df_univ.columns]

# PERSONAS (opcional, para filtrar por periodo)
df_personas = None
if SHEET_PERSONAS in xl.sheet_names:
    df_personas = pd.read_excel(
        file_path, sheet_name=SHEET_PERSONAS, dtype=str, engine="openpyxl"
    )
    df_personas.columns = [c.strip() for c in df_personas.columns]

# INGRESOS (requerido para n_trab y tipo)
if SHEET_INGRESOS not in xl.sheet_names:
    raise KeyError(f"No existe la hoja '{SHEET_INGRESOS}' en el archivo.")
df_ing = pd.read_excel(
    file_path, sheet_name=SHEET_INGRESOS, dtype=str, engine="openpyxl"
)
df_ing.columns = [c.strip() for c in df_ing.columns]

# ==========================
# 2) Normalización mínima
# ==========================
# Universo
req_cols_univ = {"identificacion", "ced_padre", "ced_madre"}
lower_map = {c: c.lower() for c in df_univ.columns}
df_univ = df_univ.rename(columns=lower_map)
faltan = req_cols_univ - set(df_univ.columns)
if faltan:
    raise KeyError(f"En '{SHEET_UNIVERSO}' faltan columnas: {faltan}")

df_univ["ced_padre"] = df_univ["ced_padre"].astype(str).str.strip()
df_univ["ced_madre"] = df_univ["ced_madre"].astype(str).str.strip()
df_univ["hogar_id"] = df_univ.apply(
    lambda r: make_hogar_id(r["ced_padre"], r["ced_madre"]), axis=1
)
df_univ = df_univ[
    (df_univ["hogar_id"] != "")
    & ((df_univ["ced_padre"] != "0") | (df_univ["ced_madre"] != "0"))
].copy()

# Personas (opcional: filtrar por periodo)
if df_personas is not None:
    df_personas = df_personas.rename(
        columns={c: c.lower() for c in df_personas.columns}
    )
    if PERIODO is not None:
        if (
            "periodo" not in df_personas.columns
            or "identificacion" not in df_personas.columns
        ):
            raise KeyError(
                f"En '{SHEET_PERSONAS}' faltan 'periodo' y/o 'identificacion' para filtrar."
            )
        ids_periodo = (
            df_personas.loc[df_personas["periodo"] == str(PERIODO), "identificacion"]
            .dropna()
            .unique()
        )
        df_univ = df_univ[df_univ["identificacion"].isin(ids_periodo)].copy()

# Ingresos JUN/2025
df_ing = df_ing.rename(columns={c: c.lower() for c in df_ing.columns})
req_cols_ing = {"identificacion", "anio", "mes", "tipo_empleo", "salario"}
faltan_ing = req_cols_ing - set(df_ing.columns)
if faltan_ing:
    raise KeyError(f"En '{SHEET_INGRESOS}' faltan columnas: {faltan_ing}")

df_ing = df_ing[
    (df_ing["anio"].astype(str) == str(ANIO)) & (df_ing["mes"].astype(str) == str(MES))
].copy()
df_ing["tipo_empleo"] = df_ing["tipo_empleo"].astype(str).str.strip()
df_ing["salario"] = pd.to_numeric(df_ing["salario"], errors="coerce")


# ==========================
# 3) Aplicar filtros como en tu app (Enrollment)
# ==========================
# 3.1 Contar papás DISTINCT (ignora "0")
def count_distinct_parents(row):
    s = {row["ced_padre"], row["ced_madre"]}
    s.discard("0")
    return len(s)


df_univ["n_papas"] = df_univ.apply(count_distinct_parents, axis=1)
if CANT_PAPAS in (1, 2):
    df_univ = df_univ[df_univ["n_papas"] == CANT_PAPAS].copy()

# 3.2 Mapa hogar_id -> fam_id (padre/madre, sin "0")
pairs = []
for _, r in df_univ.iterrows():
    if r["ced_padre"] != "0":
        pairs.append((r["hogar_id"], r["ced_padre"]))
    if r["ced_madre"] != "0":
        pairs.append((r["hogar_id"], r["ced_madre"]))
df_mapa = pd.DataFrame(pairs, columns=["hogar_id", "fam_id"]).drop_duplicates()

# 3.3 Merge con ingresos mes6
df_emp = df_mapa.merge(
    df_ing[["identificacion", "tipo_empleo", "salario"]],
    left_on="fam_id",
    right_on="identificacion",
    how="left",
)
df_emp["tipo_empleo_mes6"] = df_emp["tipo_empleo"].where(
    df_emp["tipo_empleo"].isin(EMPLEOS_VALIDOS), "Desconocido"
)
df_emp["trabaja_mes6"] = df_emp["tipo_empleo_mes6"].isin(EMPLEOS_VALIDOS)

# 3.4 Filtro por tipo de empleo (si no es "Todos")
if TIPO_EMPLEO != "Todos":
    df_emp = df_emp[df_emp["tipo_empleo_mes6"] == TIPO_EMPLEO].copy()

# 3.5 Filtro por cantidad de papás trabajando (0/1/2)
agg = df_emp.groupby("hogar_id", as_index=False).agg(n_trab=("trabaja_mes6", "sum"))
if CANT_PAPAS_TRAB in (0, 1, 2):
    agg = agg[agg["n_trab"] == CANT_PAPAS_TRAB].copy()

hogares_filtrados = set(agg["hogar_id"])
df_emp = df_emp[df_emp["hogar_id"].isin(hogares_filtrados)].copy()

# ==========================
# 4) Detección de hogares "raros"
#    (esperaríamos 2 personas únicas; encontramos < 2)
# ==========================
by_hogar = (
    df_emp.groupby("hogar_id")["fam_id"].nunique().reset_index(name="n_personas_unicas")
)
hogares_raros = by_hogar[by_hogar["n_personas_unicas"] < 2].copy()

print(f"Total hogares filtrados: {len(hogares_filtrados)}")
print(f"Hogares 'raros' (n_personas_unicas < 2): {len(hogares_raros)}")

# Detalle por hogar raro: qué fam_id trae, con qué tipo_empleo y salario
detalle_raros = df_emp[df_emp["hogar_id"].isin(set(hogares_raros["hogar_id"]))].copy()
# agrega info original del universo (ced_padre/ced_madre) para diagnósticos
detalle_raros = detalle_raros.merge(
    df_univ[["hogar_id", "identificacion", "ced_padre", "ced_madre"]].drop_duplicates(),
    on="hogar_id",
    how="left",
    suffixes=("", "_est"),
)

# ==========================
# 5) Exportar a Excel (2 hojas)
# ==========================
out_name = f"hogares_raros_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
out_path = file_path.parent / out_name

with pd.ExcelWriter(out_path, engine="openpyxl") as xw:
    hogares_raros.to_excel(xw, index=False, sheet_name="hogares_raros_resumen")
    detalle_raros.to_excel(xw, index=False, sheet_name="hogares_raros_detalle")

print(f"Archivo generado: {out_path}")
display(hogares_raros.head(10))
display(detalle_raros.head(10))

Usando archivo: db\vulnerabilidad.xlsx
Total hogares filtrados: 8492
Hogares 'raros' (n_personas_unicas < 2): 0
Archivo generado: db\hogares_raros_20250923_123733.xlsx


Unnamed: 0,hogar_id,n_personas_unicas


Unnamed: 0,hogar_id,fam_id,identificacion,tipo_empleo,salario,tipo_empleo_mes6,trabaja_mes6,identificacion_est,ced_padre,ced_madre


In [9]:
# -------------------------------------------------
# 3D) orden y columnas útiles (robusto a columnas faltantes/acentos)
# -------------------------------------------------
# Normaliza posibles variantes con acento
def _normalize_cols(df):
    ren = {}
    for c in df.columns:
        cl = c.lower().strip()
        # mapea variantes comunes
        if cl in {"identificación", "identificacion"}:
            ren[c] = "identificacion"
        elif cl in {"cedula_padre", "cédula_padre"}:
            ren[c] = "ced_padre"
        elif cl in {"cedula_madre", "cédula_madre"}:
            ren[c] = "ced_madre"
        else:
            ren[c] = c  # no cambiar
    return df.rename(columns=ren)


detalle_multi = _normalize_cols(detalle_multi)
df_univ = _normalize_cols(df_univ)

cols_show = [
    "fam_id",
    "hogar_id",
    "rol_en_hogar",
    "trabaja_mes6",
    "tipo_empleo_mes6",
    "salario",
    "identificacion",
    "ced_padre",
    "ced_madre",
]

# Qué columnas realmente están
existentes = [c for c in cols_show if c in detalle_multi.columns]
faltantes = [c for c in cols_show if c not in detalle_multi.columns]
if faltantes:
    print("⚠️ Columnas no presentes en detalle_multi y serán omitidas:", faltantes)

detalle_multi = detalle_multi[existentes].sort_values(["fam_id", "hogar_id"])

display(detalle_multi.head(20))

# -------------------------------------------------
# 4) Exportar a Excel: resumen y detalle
# -------------------------------------------------
out_name = f"familiares_multi_hogar_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
with pd.ExcelWriter(out_name, engine="openpyxl") as xw:
    multi_hogar.to_excel(xw, index=False, sheet_name="resumen_fam_multi_hogar")
    detalle_multi.to_excel(xw, index=False, sheet_name="detalle_fam_multi_hogar")

print(f"Archivo generado: {out_name}")

⚠️ Columnas no presentes en detalle_multi y serán omitidas: ['identificacion']


Unnamed: 0,fam_id,hogar_id,rol_en_hogar,trabaja_mes6,tipo_empleo_mes6,salario,ced_padre,ced_madre
0,502367485,0502367485|0502506215,madre,True,Afiliacion Voluntaria,470.0,502506215,502367485
1,502367485,0502367485|1803303641,madre,True,Afiliacion Voluntaria,470.0,1803303641,502367485
44,603142324,0603142324|1001987864,madre,True,Relacion de Dependencia,550.0,1001987864,603142324
43,603142324,0603142324|1714146741,madre,True,Relacion de Dependencia,550.0,1714146741,603142324
7,1001660545,1001660545|1002120432,padre,False,Desconocido,,1001660545,1002120432
5,1001660545,1001660545|1709880262,padre,False,Desconocido,,1001660545,1709880262
8,1002177341,1002177341|1002524872,padre,False,Desconocido,,1002177341,1002524872
6,1002177341,1002177341|1002698551,padre,False,Desconocido,,1002177341,1002698551
11,1002527750,1001986254|1002527750,madre,True,Relacion de Dependencia,1000.0,1001986254,1002527750
46,1002527750,1002527750|1705318283,madre,True,Relacion de Dependencia,1000.0,1705318283,1002527750


Archivo generado: familiares_multi_hogar_20250923_124310.xlsx


In [11]:
import pandas as pd
from pathlib import Path
from datetime import datetime

# ==========================
# Config
# ==========================
DB_DIR = Path("db")
FILE_NAME ="vulnerabilidad.xlsx"
SHEET_UNIVERSO = "Universo Familiares"
SHEET_PERSONAS = "Personas"
SHEET_INGRESOS = "Ingresos"
FACULTAD_NAME = "FACULTAD DE CIENCIAS DE LA SALUD"

ANIO = 2025
MES = 6
EMPLEOS_VALIDOS = ["Relacion de Dependencia", "Afiliacion Voluntaria"]

# Filtros tal como los tienes:
CANT_PAPAS = 2  # 1, 2 o None ("Todos")
CANT_PAPAS_TRAB = 1  # 0, 1, 2 o None ("Todos")
TIPO_EMPLEO = "Todos"  # "Todos" | "Relacion de Dependencia" | "Afiliacion Voluntaria" | "Desconocido"


# ==========================
# Helpers
# ==========================
def load_excel_auto(db_dir: Path, file_name: str | None) -> Path:
    if file_name:
        fp = db_dir / file_name
        if not fp.exists():
            raise FileNotFoundError(f"No se encontró: {fp}")
        return fp
    cands = list(db_dir.glob("*.xls*"))
    if not cands:
        raise FileNotFoundError("No se encontró ningún Excel en db/")
    return cands[0]


def normalize_cols(df: pd.DataFrame) -> pd.DataFrame:
    # normaliza a minúsculas y mapea variantes con acento
    ren = {}
    for c in df.columns:
        cl = c.strip().lower()
        if cl in {"identificación", "identificacion"}:
            ren[c] = "identificacion"
        elif cl in {"facultad"}:
            ren[c] = "facultad"
        elif cl in {"cédula_padre", "cedula_padre"}:
            ren[c] = "ced_padre"
        elif cl in {"cédula_madre", "cedula_madre"}:
            ren[c] = "ced_madre"
        elif cl in {"año"}:
            ren[c] = "anio"
        else:
            ren[c] = cl
    return df.rename(columns=ren)


def make_hogar_id(ced_padre: str, ced_madre: str) -> str:
    a = str(ced_padre).strip()
    b = str(ced_madre).strip()
    ids = [x for x in (a, b) if x and x != "0"]
    if not ids:
        return ""
    return "|".join(sorted(ids))


def count_distinct_parents(row) -> int:
    s = {row["ced_padre"], row["ced_madre"]}
    s.discard("0")
    return len(s)


# ==========================
# 1) Cargar hojas
# ==========================
file_path = load_excel_auto(DB_DIR, FILE_NAME)
print(f"Usando archivo: {file_path}")

xl = pd.ExcelFile(file_path)
for sh in (SHEET_UNIVERSO, SHEET_PERSONAS, SHEET_INGRESOS):
    if sh not in xl.sheet_names:
        raise KeyError(f"No existe la hoja '{sh}' en el archivo.")

df_univ = pd.read_excel(
    file_path, sheet_name=SHEET_UNIVERSO, dtype=str, engine="openpyxl"
)
df_per = pd.read_excel(
    file_path, sheet_name=SHEET_PERSONAS, dtype=str, engine="openpyxl"
)
df_ing = pd.read_excel(
    file_path, sheet_name=SHEET_INGRESOS, dtype=str, engine="openpyxl"
)

df_univ = normalize_cols(df_univ)
df_per = normalize_cols(df_per)
df_ing = normalize_cols(df_ing)

# Validación mínima
for req, df_ in (
    ({"identificacion", "ced_padre", "ced_madre"}, df_univ),
    ({"identificacion", "facultad"}, df_per),
    ({"identificacion", "anio", "mes", "tipo_empleo", "salario"}, df_ing),
):
    faltan = req - set(df_.columns)
    if faltan:
        raise KeyError(
            f"Faltan columnas {faltan} en una de las hojas correspondientes."
        )

# ==========================
# 2) Filtrar Personas por facultad
# ==========================
ids_facultad = (
    df_per.loc[
        df_per["facultad"].astype(str).str.strip().str.lower() == FACULTAD_NAME.lower(),
        "identificacion",
    ]
    .dropna()
    .unique()
)

print(f"Identificaciones en facultad '{FACULTAD_NAME}': {len(ids_facultad)}")

# ==========================
# 3) Universo filtrado por esos estudiantes
# ==========================
u = df_univ[df_univ["identificacion"].isin(ids_facultad)].copy()
u["ced_padre"] = u["ced_padre"].astype(str).str.strip()
u["ced_madre"] = u["ced_madre"].astype(str).str.strip()
u["hogar_id"] = u.apply(lambda r: make_hogar_id(r["ced_padre"], r["ced_madre"]), axis=1)
u = u[
    (u["hogar_id"] != "") & ((u["ced_padre"] != "0") | (u["ced_madre"] != "0"))
].copy()

# contar papás DISTINCT y filtrar 2
u["n_papas"] = u.apply(count_distinct_parents, axis=1)
if CANT_PAPAS in (1, 2):
    u = u[u["n_papas"] == CANT_PAPAS].copy()

print("Hogares (tras filtro 2 papás):", u["hogar_id"].nunique())

# ==========================
# 4) Ingresos JUN/2025 y merge a familiares
# ==========================
ing6 = df_ing[
    (df_ing["anio"].astype(str) == str(ANIO)) & (df_ing["mes"].astype(str) == str(MES))
].copy()
ing6["tipo_empleo"] = ing6["tipo_empleo"].astype(str).str.strip()
ing6["salario"] = pd.to_numeric(ing6["salario"], errors="coerce")

pairs = []
for _, r in u.iterrows():
    if r["ced_padre"] != "0":
        pairs.append((r["hogar_id"], r["ced_padre"]))
    if r["ced_madre"] != "0":
        pairs.append((r["hogar_id"], r["ced_madre"]))
df_mapa = pd.DataFrame(pairs, columns=["hogar_id", "fam_id"]).drop_duplicates()

df_emp = df_mapa.merge(
    ing6[["identificacion", "tipo_empleo", "salario"]],
    left_on="fam_id",
    right_on="identificacion",
    how="left",
)
df_emp["tipo_empleo_mes6"] = df_emp["tipo_empleo"].where(
    df_emp["tipo_empleo"].isin(EMPLEOS_VALIDOS), "Desconocido"
)
df_emp["trabaja_mes6"] = df_emp["tipo_empleo_mes6"].isin(EMPLEOS_VALIDOS)

# filtro por tipo empleo (si aplica)
if TIPO_EMPLEO != "Todos":
    df_emp = df_emp[df_emp["tipo_empleo_mes6"] == TIPO_EMPLEO].copy()

# hogares con exactamente 1 trabajando
agg = df_emp.groupby("hogar_id", as_index=False).agg(n_trab=("trabaja_mes6", "sum"))
if CANT_PAPAS_TRAB in (0, 1, 2):
    agg = agg[agg["n_trab"] == CANT_PAPAS_TRAB].copy()

hogares_filtrados = set(agg["hogar_id"])
df_emp = df_emp[df_emp["hogar_id"].isin(hogares_filtrados)].copy()

print("Hogares filtrados finales:", len(hogares_filtrados))

# ==========================
# 5) Ver la asimetría y detectar al/los causantes (TRABAJANDO)
# ==========================
# Esperado (si no hay solapes): #hogares trabajando únicos = #hogares
trab = df_emp[df_emp["trabaja_mes6"]].copy()
no_trab = df_emp[~df_emp["trabaja_mes6"]].copy()

trab_unicos = trab["fam_id"].nunique()
no_trab_unicos = no_trab["fam_id"].nunique()

print("Familiares ÚNICOS - Trabajando:", trab_unicos)
print("Familiares ÚNICOS - No Trabajando:", no_trab_unicos)
print("Total ÚNICOS:", trab_unicos + no_trab_unicos)
print("Esperado (= hogares × 2):", len(hogares_filtrados) * 2)

# fam_id en >1 hogar dentro del set TRABAJANDO (estos causan que 842 -> 841)
solapes_trab = trab.groupby("fam_id")["hogar_id"].nunique() > 1
fam_multi_trab = solapes_trab[solapes_trab].index.tolist()

print("Familiares TRABAJANDO en >1 hogar:", len(fam_multi_trab))
print(fam_multi_trab[:10])

# ==========================
# 6) Detalle del/los fam_id problemáticos
# ==========================
detalle = trab[trab["fam_id"].isin(fam_multi_trab)].copy()
# añade datos del universo para ver qué estudiante(s) los vincularon
detalle = detalle.merge(
    u[["hogar_id", "identificacion", "ced_padre", "ced_madre"]].drop_duplicates(),
    on="hogar_id",
    how="left",
)


# rol en el hogar para mayor claridad
def rol_en_hogar(row):
    if row["fam_id"] == row["ced_padre"]:
        return "padre"
    if row["fam_id"] == row["ced_madre"]:
        return "madre"
    return "desconocido"


detalle["rol_en_hogar"] = detalle.apply(rol_en_hogar, axis=1)

cols_show = [
    "fam_id",
    "hogar_id",
    "rol_en_hogar",
    "tipo_empleo_mes6",
    "salario",
    "identificacion",
    "ced_padre",
    "ced_madre",
]
detalle = detalle[cols_show].sort_values(["fam_id", "hogar_id"])

print("\n=== DETALLE DE PERSONAS TRABAJANDO MULTI-HOGAR ===")
display(detalle.head(50))

# ==========================
# 7) Exportar a Excel
# ==========================
out_name = f"salud_personas_trabajando_multi_hogar_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
out_path = file_path.parent / out_name
with pd.ExcelWriter(out_path, engine="openpyxl") as xw:
    pd.DataFrame(
        {
            "hogares_filtrados": [len(hogares_filtrados)],
            "trabajando_unicos": [trab_unicos],
            "no_trabajando_unicos": [no_trab_unicos],
            "total_unicos": [trab_unicos + no_trab_unicos],
            "esperado_hogares_x2": [len(hogares_filtrados) * 2],
        }
    ).to_excel(xw, index=False, sheet_name="resumen")
    pd.DataFrame({"fam_id_multi_trab": fam_multi_trab}).to_excel(
        xw, index=False, sheet_name="ids_multi_trab"
    )
    detalle.to_excel(xw, index=False, sheet_name="detalle_multi_trab")

print(f"\nArchivo generado: {out_path}")

Usando archivo: db\vulnerabilidad.xlsx
Identificaciones en facultad 'FACULTAD DE CIENCIAS DE LA SALUD': 2145
Hogares (tras filtro 2 papás): 1988
Hogares filtrados finales: 842
Familiares ÚNICOS - Trabajando: 841
Familiares ÚNICOS - No Trabajando: 842
Total ÚNICOS: 1683
Esperado (= hogares × 2): 1684
Familiares TRABAJANDO en >1 hogar: 1
['1716691843']


KeyError: "['identificacion'] not in index"