# Merge por coordenadas con DuckDB

- Lee inmuebles con columnas latitud/longitud y colonia.
- Lee AGEB desde CSVs y asegura latitud/longitud en DOUBLE.
- Extrae y estandariza neighbourhood de AGEB.
- Join por colonia (estandarizada) y elige el AGEB más cercano dentro de la misma colonia.
- Devuelve df_merge y guarda parquet.

In [1]:
import os
import re
import duckdb
import pandas as pd
from unidecode import unidecode
from pyproj import Transformer
from datetime import datetime

# Parámetros de entrada/salida (dinámicos por fecha)
# "2025-08-31", "2025-09-14", "2025-09-20", "2025-09-27", "2025-10-05", "2025-10-11"
run_date = "2025-09-14"
run_date = datetime.strptime(run_date, "%Y-%m-%d").strftime("%Y-%m-%d")

inmuebles_path = f"../../data/processed/inmuebles24_departamentos_coordenadas_{run_date}.parquet"  # requiere latitud/longitud y colonia
ageb_input = "../../data/processed/INEGI/colonia/*.csv"  # CSVs de INEGI (glob) o ruta única
output_path = f"../../data/processed/merged_inmuebles24_departamentos_duckdb_{run_date}.parquet"

# Función para estandarizar texto
def estandarizar_texto(texto):
    if pd.isna(texto):
        return ""
    texto = texto.replace("\n", " ")  # Reemplazar saltos de línea por espacios
    texto = texto.replace(",", "")  # Quitar comas
    return unidecode(texto.strip().lower())

In [2]:
# Conexión DuckDB + extensión spatial
con = duckdb.connect(database=":memory:")
try:
    con.execute("INSTALL spatial;")
except Exception:
    pass
con.execute("LOAD spatial;")

# Cargar AGEB desde CSVs a pandas y transformar coordenadas (INEGI EPSG:6372 -> WGS84 EPSG:4326)
df_ageb = con.execute(f"""
SELECT *
FROM read_csv_auto('{ageb_input}', ignore_errors=True, all_varchar=True)
""").fetchdf()

if {'lon','lat'}.issubset(df_ageb.columns):
    # Convertir a float y transformar con pyproj
    df_ageb[['lon','lat']] = df_ageb[['lon','lat']].astype(float)
    transformer = Transformer.from_crs("epsg:6372", "epsg:4326", always_xy=True)
    lon_wgs, lat_wgs = transformer.transform(df_ageb['lon'].values, df_ageb['lat'].values)
    df_ageb['longitud'] = lon_wgs
    df_ageb['latitud'] = lat_wgs
elif {'longitud','latitud'}.issubset(df_ageb.columns):
    # Ya vienen en WGS84, asegurar tipo float
    df_ageb[['longitud','latitud']] = df_ageb[['longitud','latitud']].astype(float)
else:
    raise ValueError("AGEB necesita columnas lon/lat o longitud/latitud para la transformación.")

# Extraer y estandarizar neighbourhood de address
pat = re.compile(r"'neighbourhood':\s*'([^']+)'")
df_ageb['neighbourhood'] = df_ageb['address'].str.extract(pat)
df_ageb['neighbourhood'] = df_ageb['neighbourhood'].apply(estandarizar_texto)
df_ageb['neighbourhood'] = df_ageb['neighbourhood'].str.replace('colonia ', '').str.replace('Colonia ', '')

# Registrar tabla ageb en DuckDB
con.register("ageb_df", df_ageb)
con.execute("CREATE OR REPLACE TABLE ageb AS SELECT * FROM ageb_df")

# Cargar inmuebles y estandarizar colonia
df_inmuebles = pd.read_parquet(inmuebles_path)
df_inmuebles['colonia_std'] = df_inmuebles['colonia'].apply(estandarizar_texto)
con.register("inmuebles_df", df_inmuebles)
con.execute("CREATE OR REPLACE TABLE inmuebles AS SELECT * FROM inmuebles_df")

# Validar columnas de coordenadas en inmuebles
inm_cols = [r[1] for r in con.execute("PRAGMA table_info('inmuebles')").fetchall()]
if not {'latitud','longitud','colonia_std'}.issubset(set(inm_cols)):
    raise ValueError("El parquet de inmuebles debe incluir columnas 'latitud', 'longitud' y 'colonia'.")
con.execute("ALTER TABLE inmuebles ALTER COLUMN latitud TYPE DOUBLE")
con.execute("ALTER TABLE inmuebles ALTER COLUMN longitud TYPE DOUBLE")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

<_duckdb.DuckDBPyConnection at 0x103d8c430>

In [3]:
# Join por colonia estandarizada: toma el AGEB más cercano dentro de la misma colonia
query = """
SELECT 
  i.*, 
  a.*, 
  ST_Distance(ST_Point(i.longitud, i.latitud), ST_Point(a.longitud, a.latitud)) AS distancia
FROM inmuebles i
LEFT JOIN LATERAL (
  SELECT a.*
  FROM ageb a
  WHERE a.neighbourhood = i.colonia_std
  ORDER BY ST_Distance(ST_Point(i.longitud, i.latitud), ST_Point(a.longitud, a.latitud)) ASC
  LIMIT 1
) a ON TRUE
"""

df_merge = con.execute(query).fetchdf()
print("df_merge shape:", df_merge.shape)
df_merge.head(3)

df_merge shape: (6040, 154)


Unnamed: 0,precio_mxn,lote_m2,recamaras,baños,estacionamiento,es_amueblado,es_penthouse,direccion,colonia,cp,...,municipio_1,address,road,quarter,borough,postcode,longitud_1,latitud_1,neighbourhood,distancia
0,16000.0,30,1.0,1.0,1.0,1,0,av. tamaulipas 257 alvaro obregon ciudad de m...,alvaro obregon,9230,...,venustiano_carranza,"{'road': 'Avenida Congreso de la Unión', 'neig...",Avenida Congreso de la Unión,,Venustiano Carranza,15990,-99.12147,19.412086,alvaro obregon,0.153109
1,50000.0,167,3.0,2.0,3.0,0,0,av bernardo quintana alvaro obregon ciudad de...,alvaro obregon,9230,...,venustiano_carranza,"{'road': 'Avenida Congreso de la Unión', 'neig...",Avenida Congreso de la Unión,,Venustiano Carranza,15990,-99.12147,19.412086,alvaro obregon,0.137192
2,38000.0,80,2.0,1.0,1.0,0,0,zamora 70 - condesa cuauhtemoc,condesa,6140,...,cuauhtemoc,"{'house_number': '70', 'road': 'Calle Zamora',...",Calle Zamora,,Cuauhtémoc,6140,-99.177612,19.416966,condesa,0.000157


In [4]:
df_merge.isnull().sum()

precio_mxn            0
lote_m2              50
recamaras             0
baños                 0
estacionamiento       0
                   ... 
postcode            741
longitud_1          741
latitud_1           741
neighbourhood       741
distancia          2130
Length: 154, dtype: int64

In [6]:
df_merge.AGEB.isnull().sum()

741

In [5]:
# Guardar parquet final
os.makedirs(os.path.dirname(output_path), exist_ok=True)
df_merge.to_parquet(output_path, index=False)
print("Parquet escrito en:", output_path)

Parquet escrito en: ../../data/processed/merged_inmuebles24_departamentos_duckdb_2025-09-14.parquet
