<a href="https://colab.research.google.com/github/abxda/COLMEX-ML/blob/main/Semana_07_DENUE_COLMEX.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import time
from zipfile import ZipFile
import requests
from tqdm import tqdm
import duckdb
import geopandas as gpd
import shutil

In [None]:
# --------------------------------------------------
# 1. Configuración
# --------------------------------------------------

urls_denue = [
    f"https://www.inegi.org.mx/contenidos/masiva/denue/denue_{str(i).zfill(2)}_shp.zip"
    for i in list(range(1, 15)) + ["15_1", "15_2"] + list(range(16, 33))
]

base_dir = "denue_data"
zip_dir = os.path.join(base_dir, "zips")
shp_dir = os.path.join(base_dir, "shp")
geo_parquet_dir = os.path.join(base_dir, "geoparquet")
os.makedirs(base_dir, exist_ok=True)
os.makedirs(shp_dir, exist_ok=True)
os.makedirs(geo_parquet_unificado := os.path.join(base_dir, "geoparquet_unificado"), exist_ok=True)

# Funciones

def download(url, directory):
    filename = url.split('/')[-1]
    filepath = os.path.join(directory, filename)
    if os.path.exists(filepath):
        print(f"{filename} ya existe.")
        return filepath
    print(f"Descargando {filename} ...")
    r = requests.get(url, stream=True)
    with open(filepath, 'wb') as f:
        for data in r.iter_content(1024):
            f.write(data)
    return filepath

def extract_shapefile(zip_path, shp_dir):
    with ZipFile(zip_path, 'r') as z:
        z.extractall(shp_dir)
    print(f"Extraído {zip_path} en {shp_dir}")

def convert_to_geoparquet(shp_dir):
    for root, _, files in os.walk(shp_dir):
        for file in files:
            if file.endswith('.shp'):
                shp_path = os.path.join(root, file)
                parquet_path = shp_path.replace('.shp', '.geoparquet')
                if os.path.exists(parquet_path):
                    continue
                gdf = gpd.read_file(shp_path,  encoding='ISO-8859-1')
                gdf.to_parquet(parquet_path)

In [None]:
# Procesamiento

# Descarga y extracción
for url in urls_denue:
    zip_path = download(url, base_dir)
    extract_shapefile(zip_path, shp_dir)

In [None]:
# Conversión a GeoParquet
convert_to_geoparquet(shp_dir)

In [None]:
dst

In [None]:
# Unificación GeoParquet
for root, dirs, files in os.walk(shp_dir):
    for file in files:
        if file.endswith('.geoparquet'):
            src = os.path.join(root, file)
            dst = os.path.join(geo_parquet_unificado, file)
            shutil.copy(src, dst)

In [None]:
parquet_files = [os.path.join(geo_parquet_unificado, f) for f in os.listdir(geo_parquet_unificado)]

In [None]:
# Creación DuckDB
# Creación del nuevo archivo DuckDB con campos adicionales
db_nacional_path = "denue_total_4.duckdb"
if os.path.exists(db_nacional_path):
    os.remove(db_nacional_path)
con_nacional = duckdb.connect(db_nacional_path)

con_nacional.execute("INSTALL spatial;")
con_nacional.execute("LOAD spatial;")

con_nacional.execute("DROP TABLE IF EXISTS denue;")
con_nacional.execute(f"""
    CREATE TABLE denue AS
    SELECT
        *
    FROM read_parquet({parquet_files});
""")

print("Archivo denue_nacional.duckdb creado exitosamente.")


In [None]:
con_nacional.execute("select * from denue limit 10").df()

In [None]:
con_nacional.close()