In [66]:
# ======================================================
# PIPELINE "CONTRALORIA" — LECTURA, NORMALIZACIÓN, QA
# ======================================================
from pathlib import Path
import pandas as pd
import numpy as np
import io

# ----------------- CONFIG -----------------
ROOT      = Path.cwd()
DATA_DIR  = ROOT                   # o Path("data")
OUT_DIR   = ROOT / "outputs"
OUT_DIR.mkdir(exist_ok=True)

SECOP_FILE     = "SECOP.csv"
SIIF_FILE      = "SIFF.xlsx"
DIVIPOLA_FILE  = "DIVIPOLA.csv"

# ----------------- HELPERS -----------------
def hascol(df: pd.DataFrame, col: str) -> bool:
    return col in df.columns

def _clean_str(s):
    if pd.isna(s): return None
    s = str(s).strip().lower()
    for a,b in [("á","a"),("é","e"),("í","i"),("ó","o"),("ú","u")]:
        s = s.replace(a,b)
    return " ".join(s.split())

def clean_colnames(df: pd.DataFrame) -> pd.DataFrame:
    return df.rename(columns=lambda c: _clean_str(c).replace(" ", "_"))

def normalize_money(val):
    if pd.isna(val): return np.nan
    s = str(val).replace(" ", "").replace("\xa0","")
    s = s.replace(".", "").replace(",", ".")
    s = "".join(ch for ch in s if ch.isdigit() or ch in ".-")
    try:
        x = float(s)
        return np.nan if (x < 0 or x > 1e14) else x
    except:
        return np.nan

def normalize_dane(code, length=5):
    if pd.isna(code): return None
    s = "".join(ch for ch in str(code) if ch.isdigit())
    return s.zfill(length) if s else None

def normalize_nit(x):
    if pd.isna(x): return None
    s = "".join(ch for ch in str(x) if ch.isdigit())
    return s if s else None

def parse_date(x):
    if pd.isna(x): return pd.NaT
    x = str(x).strip()
    for fmt in ("%Y-%m-%d", "%d/%m/%Y", "%d-%m-%Y", "%m/%d/%Y"):
        try: return pd.to_datetime(x, format=fmt, errors="raise")
        except: pass
    return pd.to_datetime(x, errors="coerce")

def map_modalidad(x):
    mapa = {
        "licitacion":"licitacion","licitación":"licitacion",
        "seleccion abreviada":"seleccion_abreviada",
        "contratacion directa":"contratacion_directa",
        "minima cuantia":"minima_cuantia",
        "regimen especial":"regimen_especial",
        "concurso de meritos":"concurso_meritos","concurso de méritos":"concurso_meritos",
    }
    x = _clean_str(x)
    if x is None: return None
    return mapa.get(x, x.replace(" ","_"))
def coalesce_duplicate_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Si hay columnas duplicadas (mismo nombre), combina por fila tomando el primer no nulo."""
    dup_names = df.columns[df.columns.duplicated()].unique()
    for name in dup_names:
        sub = df.loc[:, df.columns == name]            # todas las columnas con ese nombre
        df[name] = sub.bfill(axis=1).iloc[:, 0]        # primer valor no nulo por fila
        df.drop(columns=sub.columns[1:], inplace=True) # elimina duplicadas, deja la 1ª
    return df



In [67]:
# ----------------- LECTURA -----------------
def load_divipola() -> pd.DataFrame:
    """
    Tu DIVIPOLA.csv viene con toda la línea envuelta en comillas y comillas dobles internas.
    Se limpia línea a línea y luego se parsea como CSV con comas.
    """
    raw = Path(DATA_DIR / DIVIPOLA_FILE).read_text(encoding="utf-8", errors="ignore")
    lines = raw.splitlines()
    fixed = []
    for line in lines:
        s = line.strip()
        if s.startswith('"'): s = s[1:]
        if s.endswith('"'):   s = s[:-1]
        s = s.replace('""','"')
        fixed.append(s)
    text = "\n".join(fixed)
    df = pd.read_csv(io.StringIO(text), sep=",", dtype=str)

    # columnas reales vistas en tu archivo:
    # 'Código Departamento','Nombre Departamento','cod_dane','Nombre Municipio', ...
    df = clean_colnames(df)
    rename = {
        "codigo_departamento": "cod_depto",
        "nombre_departamento": "departamento_std",
        "cod_dane": "cod_dane",
        "nombre_municipio": "municipio_std",
    }
    df = df.rename(columns={k:v for k,v in rename.items() if k in df.columns})

    if not hascol(df, "cod_dane"):
        raise ValueError("DIVIPOLA.csv no trae 'cod_dane' tras la normalización.")

    df["cod_dane"] = df["cod_dane"].apply(normalize_dane)
    if hascol(df, "departamento_std"): df["departamento_std"] = df["departamento_std"].apply(_clean_str)
    if hascol(df, "municipio_std"):    df["municipio_std"]    = df["municipio_std"].apply(_clean_str)

    df = df.dropna(subset=["cod_dane"]).drop_duplicates("cod_dane")
    keep = [c for c in ["cod_dane","departamento_std","municipio_std"] if hascol(df, c)]
    return df[keep]

def load_secop() -> pd.DataFrame:
    df = pd.read_csv(DATA_DIR / SECOP_FILE, sep=";", dtype=str, low_memory=False)
    df = clean_colnames(df)

    rename = {
        "nombre_de_la_entidad": "entidad",
        "nit_de_la_entidad": "nit",
        "departamento_entidad": "departamento",
        "municipio_entidad": "municipio",
        "modalidad_de_contratacion": "modalidad",
        "objeto_del_contrato": "objeto",
        "objeto_del_proceso": "objeto",
        "fecha_de_firma_del_contrato": "fecha_firma",
        "id_proceso": "id_proceso",
        "id_contrato": "numero_contrato",
        "valor_contrato": "valor_contrato",
        "codigo_dane_municipio": "cod_dane",
    }
    df = df.rename(columns={k: v for k, v in rename.items() if k in df.columns})

    # 🔧 COALESCE de columnas duplicadas (causa del error)
    df = coalesce_duplicate_columns(df)

    # ---- normalizaciones (todas verificando df.columns) ----
    if "valor_contrato" in df.columns: df["valor_contrato"] = df["valor_contrato"].apply(normalize_money)
    if "nit"            in df.columns: df["nit"]            = df["nit"].apply(normalize_nit)
    if "fecha_firma"    in df.columns: df["fecha_firma"]    = df["fecha_firma"].apply(parse_date)
    if "modalidad"      in df.columns: df["modalidad"]      = df["modalidad"].apply(map_modalidad)
    if "cod_dane"       in df.columns: df["cod_dane"]       = df["cod_dane"].apply(normalize_dane)

    # ---- strings limpios ----
    for c in ["entidad","objeto","departamento","municipio","id_proceso","numero_contrato"]:
        if c in df.columns:
            df[c] = df[c].map(_clean_str)   # map sobre Serie (más seguro que apply)

    # ---- vigencia derivada ----
    if "fecha_firma" in df.columns:
        df["vigencia"] = df["fecha_firma"].dt.year.astype("Int64")

    return df


def load_siif() -> pd.DataFrame:
    """SIFF.xlsx tiene 2 filas de metadata (Año Fiscal, etc.)."""
    df = pd.read_excel(DATA_DIR / SIIF_FILE, dtype=str, skiprows=2)
    df = clean_colnames(df)

    possible = {
        "vigencia":  ["vigencia","anio","año","ano"],
        "cod_dane":  ["cod_dane","codigo_dane","codigo_municipio","cod_municipio"],
        "asignado":  ["asignado","apropiacion_inicial","presupuesto_asignado","apropiacion"],
        "ejecutado": ["ejecutado","obligaciones","compromisos","devengado"],
    }
    for std, cands in possible.items():
        for c in cands:
            if hascol(df, c):
                df.rename(columns={c: std}, inplace=True)
                break

    for c in ["asignado","ejecutado"]:
        if hascol(df, c): df[c] = df[c].apply(normalize_money)
    if hascol(df, "cod_dane"): df["cod_dane"] = df["cod_dane"].apply(normalize_dane)
    if hascol(df, "vigencia"): df["vigencia"] = pd.to_numeric(df["vigencia"], errors="coerce").astype("Int64")

    if {"asignado","ejecutado"}.issubset(df.columns):
        df["pct_ejec"] = np.where(df["asignado"]>0, df["ejecutado"]/df["asignado"], np.nan)

    return df

In [68]:
# ----------------- QA ENGINE -----------------
def qa_rules(df, dataset_name) -> pd.DataFrame:
    issues = []
    y = pd.Timestamp.today().year

    if hascol(df, "valor_contrato"):
        bad = df[df["valor_contrato"].isna() | (df["valor_contrato"]<=0)]
        for i,row in bad.iterrows():
            issues.append((dataset_name,i,"valor_contrato",row.get("valor_contrato"),">0 y no nulo"))

    if hascol(df, "vigencia"):
        bad = df[df["vigencia"].isna() | (df["vigencia"]<2008) | (df["vigencia"]>y+1)]
        for i,row in bad.iterrows():
            issues.append((dataset_name,i,"vigencia",row.get("vigencia"),f"2008..{y+1}"))

    if hascol(df, "cod_dane"):
        bad = df[df["cod_dane"].isna() | (df["cod_dane"].astype(str).str.len()!=5)]
        for i,row in bad.iterrows():
            issues.append((dataset_name,i,"cod_dane",row.get("cod_dane"),"5 dígitos"))

    if {"vigencia","fecha_firma"}.issubset(df.columns):
        bad = df[(~df["fecha_firma"].isna()) & (~df["vigencia"].isna())
                 & ((df["fecha_firma"].dt.year < (df["vigencia"]-1)) |
                    (df["fecha_firma"].dt.year > (df["vigencia"]+1)))]
        for i,row in bad.iterrows():
            issues.append((dataset_name,i,"fecha_firma",str(row.get("fecha_firma")),"Año ≈ vigencia (±1)"))

    return pd.DataFrame(issues, columns=["dataset","row_index","columna","valor","regla"])

def dedup_with_report(df: pd.DataFrame, key_cols: list, dataset_name: str):
    if not all(c in df.columns for c in key_cols):
        return df.copy(), pd.DataFrame({"dataset":[dataset_name],"cantidad":[0],"detalle":[f"Sin claves {key_cols}"]})
    before = len(df)
    mask = df.duplicated(subset=key_cols, keep="first")
    df2  = df.drop_duplicates(subset=key_cols, keep="first").copy()
    rep  = pd.DataFrame({
        "dataset":[dataset_name],
        "cantidad":[int(mask.sum())],
        "detalle":[f"Claves {key_cols}. Registros antes={before}, despues={len(df2)}"]
    })
    return df2, rep

In [69]:
# ----------------- PIPELINE -----------------
if __name__ == "__main__":
    # 1) Cargar
    divi  = load_divipola()
    secop = load_secop()
    siif  = load_siif()

    # 2) QA + deduplicación
    qa_secop = qa_rules(secop, "SECOP")
    qa_siif  = qa_rules(siif,  "SIIF")

    secop, rep_dups_secop = dedup_with_report(
        secop,
        key_cols=[c for c in ["id_proceso","numero_contrato","vigencia","nit","valor_contrato","fecha_firma"] if hascol(secop, c)],
        dataset_name="SECOP"
    )
    siif = siif.drop_duplicates().copy()

    # 3) Enriquecer con DIVIPOLA
    if hascol(secop, "cod_dane") and secop["cod_dane"].notna().any():
        secop = secop.merge(divi, on="cod_dane", how="left")
    elif {"departamento","municipio"}.issubset(secop.columns):
        secop["departamento_std"] = secop["departamento"].apply(_clean_str)
        secop["municipio_std"]    = secop["municipio"].apply(_clean_str)
        secop = secop.merge(divi, on=["departamento_std","municipio_std"], how="left")

    if hascol(siif, "cod_dane"):
        siif = siif.merge(divi, on="cod_dane", how="left")

    # 4) Agregados SECOP + integración con SIIF
    if {"cod_dane","vigencia","valor_contrato"}.issubset(secop.columns):
        secop_agg = (secop
                     .groupby(["cod_dane","vigencia"], as_index=False)["valor_contrato"]
                     .sum()
                     .rename(columns={"valor_contrato":"valor_secop"}))
    else:
        secop_agg = pd.DataFrame()

    resumen = None
    if not secop_agg.empty and {"cod_dane","vigencia","asignado"}.issubset(siif.columns):
        resumen = (siif.merge(secop_agg, on=["cod_dane","vigencia"], how="left")
                        .assign(gap=lambda d: np.where(d["asignado"]>0, d["valor_secop"]/d["asignado"], np.nan)))
        orden = ["vigencia","cod_dane","departamento_std","municipio_std",
                 "asignado","ejecutado","pct_ejec","valor_secop","gap"]
        resumen = resumen[[c for c in orden if c in resumen.columns]]

    # 5) Exportes
    secop.to_csv(OUT_DIR/"01_secop_limpio.csv", index=False, encoding="utf-8")
    siif.to_csv(OUT_DIR/"02_siif_limpio.csv",  index=False, encoding="utf-8")
    if resumen is not None:
        resumen.to_csv(OUT_DIR/"03_resumen_municipal.csv", index=False, encoding="utf-8")

    qa_all = pd.concat([d for d in [qa_secop, qa_siif] if not d.empty], ignore_index=True) \
             if (not qa_secop.empty or not qa_siif.empty) else \
             pd.DataFrame(columns=["dataset","row_index","columna","valor","regla"])
    qa_all.to_csv(OUT_DIR/"qa_errores.csv", index=False, encoding="utf-8")
    rep_dups_secop.to_csv(OUT_DIR/"qa_posibles_duplicados.csv", index=False, encoding="utf-8")

    with open(OUT_DIR/"README_proceso.txt","w",encoding="utf-8") as f:
        f.write("== Normalización Contraloría ==\n")
        f.write(f"SECOP (filas): {len(secop)}\n")
        f.write(f"SIIF  (filas): {len(siif)}\n")
        if resumen is not None: f.write(f"RESUMEN municipal (filas): {len(resumen)}\n")
        f.write(f"QA errores: {len(qa_all)}\n")
        f.write(f"Duplicados SECOP eliminados: {int(rep_dups_secop['cantidad'].iloc[0])}\n")

    print("✅ Proceso completo. Revisa carpeta outputs/")

✅ Proceso completo. Revisa carpeta outputs/
