In [None]:
# Definimos una funcion que realice la limpieza y persistencia.

def limpiar_y_persistir(df_raw, entity, con, upsert_sql_file, valid_fn, cols_clean):
    """
    df_raw: DataFrame con datos crudos (raw)
    entity: nombre de la entidad (ventas, clientes, productos)
    con: conexión sqlite3
    upsert_sql_file: ruta al fichero SQL con el UPSERT
    valid_fn: función que recibe df y devuelve máscara booleana de filas válidas
    cols_clean: columnas que se deben conservar en clean
    """
    if df_raw.empty:
        print(f"No hay datos para {entity}")
        return

    # Copia para trabajar
    df = df_raw.copy()

    # Validación
    valid_mask = valid_fn(df)
    quarantine = df.loc[~valid_mask].copy()
    clean = df.loc[valid_mask].copy()

    # Guardar quarantine en CSV
    quarantine.to_csv(OUT / "quality" / f"{entity}_invalidos.csv", index=False)

    # Deduplicar por claves (ejemplo simple: todas las cols_clean)
    if not clean.empty:
        clean = (clean.sort_values("_ingest_ts")
                      .drop_duplicates(subset=cols_clean, keep="last"))

        # Persistencia en Parquet
        parquet_file = OUT / "parquet" / f"clean_{entity}.parquet"
        clean.to_parquet(parquet_file, index=False)

        # Persistencia en SQLite vía UPSERT
        upsert_sql = Path(upsert_sql_file).read_text(encoding="utf-8")
        for _, r in clean.iterrows():
            con.execute(upsert_sql, {k: r[k] for k in cols_clean + ["_ingest_ts"]})
        con.commit()

        print(f"Persistidos {len(clean)} registros limpios en {entity} (SQLite + Parquet)")
