In [None]:
import dask
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import os

BLOCKSIZE = "64MB"
TEMP_DIR = "./.dask-tmp"
PARQUET_DIR = "persona_parquet_by_mun"
CSV_OUT_DIR = "municipio"
os.makedirs(TEMP_DIR, exist_ok=True)
os.makedirs(CSV_OUT_DIR, exist_ok=True)

In [None]:
dask.config.set({
    "temporary_directory": TEMP_DIR,
})

# ---- Lectura en streaming ----
ddf = dd.read_csv(
    "Persona_CPV-2024.csv",
    dtype=str,
    sep=";",
    blocksize=BLOCKSIZE,
    assume_missing=True,
    low_memory=False
)

In [None]:
for col in ["idep", "iprov", "imun"]:
    if col in ddf.columns:
        ddf[col] = ddf[col].astype("category")

ddf = ddf.assign(
    PROV=(ddf["idep"].astype(str) + ddf["iprov"].astype(str)),
)
ddf = ddf.assign(
    MUN=(ddf["PROV"].astype(str) + ddf["imun"].astype(str))
)

ddf = ddf.rename(columns=str.upper)

ddf = ddf.rename(columns={
    "IDEP": "DEPAR",
    "I00": "N_VIV",
})

for c in ["IMUN", "IPROV"]:
    if c in ddf.columns:
        ddf = ddf.drop(columns=c)

def _reorder_columns(cols):
    cols = list(cols)
    if "PROV" in cols and "DEPAR" in cols:
        cols.remove("PROV")
        idx = cols.index("DEPAR") + 1
        cols.insert(idx, "PROV")
    # mover MUN después de PROV
    if "MUN" in cols and "PROV" in cols:
        cols.remove("MUN")
        idx = cols.index("PROV") + 1
        cols.insert(idx, "MUN")
    return cols

ddf = ddf[_reorder_columns(ddf.columns)]

[########################################] | 100% Completed | 204.94 s


In [5]:
with ProgressBar():
    ddf.to_parquet(
        PARQUET_DIR,
        write_index=False,
        engine="pyarrow",
        compression="snappy",
        partition_on=["MUN"]
    )

pq = dd.read_parquet(PARQUET_DIR, columns=["MUN"],dtype_backend=str)
unique_muns = sorted(pq["MUN"].dropna().unique().compute())

[########################################] | 100% Completed | 486.09 s


In [None]:
writes = []
for mun in unique_muns:
    ddf_mun = dd.read_parquet(PARQUET_DIR, filters=[("MUN", "==", mun)],dtype_backend=str)
    writes.append(
        ddf_mun.to_csv(
            os.path.join(CSV_OUT_DIR, f"persona-municipio-{mun}.csv"),
            single_file=True,
            index=False,
            header=True
        )
    )

with ProgressBar():
    dask.compute(*writes)


[########################################] | 100% Completed | 204.94 s
