In [1]:
import geopandas as gpd
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from shapely.geometry import Point, Polygon

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import os
import sys
module_path = os.path.abspath(os.path.join('../../'))
if module_path not in sys.path:
    sys.path.append(module_path)
    import aup

ImportError: cannot import name 'GEOSException' from 'shapely.errors' (/opt/conda/envs/gds/lib/python3.9/site-packages/shapely/errors.py)

## Downloading network

In [None]:
USOS_ZONA_7 = gpd.read_file("Distrito_Zona7.geojson")

In [None]:
print(USOS_ZONA_7.crs)

In [None]:
USOS_ZONA_7.shape

In [None]:
USOS_ZONA_7.plot()

In [None]:
# Convertir la geometría del polígono en WKT (para usar SQL en la query)
poly_wkt = USOS_ZONA_7.dissolve().geometry.iloc[0].wkt
# print(poly_wkt)  # Verificar si genera un WKT válido

In [None]:
schema = "denue"
table = "denue_2022"

manzana = "manzana"
entidad = "cve_ent"
localidad = "cve_loc"
municipio = "cve_mun"
id = "id"
latitud = "latitud"
longitud = "longitud"
codigo_act = "codigo_act"
ageb = "ageb"
per_ocu = "per_ocu"


# Crear la consulta manteniendo el mismo CRS
query_censo = f"""
SELECT 
  "{id}", 
  "{entidad}",
  "{localidad}",
  "{manzana}",
  "{municipio}",
  "{latitud}",
  "{longitud}",
  "{codigo_act}",
  "{ageb}",
  "{per_ocu}",
  "geometry"
FROM {schema}.{table}
WHERE ST_Intersects(
  ST_Transform(geometry, 32613),  -- Transformar geometría de DENUE a 32613 
  ST_GeomFromText('{poly_wkt}', 32613)  -- Usar el polígono en 32613
)
"""

# Ejecutar la consulta
Denue = aup.gdf_from_query(query_censo, geometry_col='geometry')

# Asegurar que el GeoDataFrame resultante tenga el CRS correcto
Denue = Denue.to_crs("EPSG:32613")


print(Denue.shape)
Denue.head()


In [None]:
print(Denue.is_valid.sum())  # Geometrías válidas
print(Denue.geometry.is_empty.sum())  # Geometrías vacías

In [None]:
# Verificar coordenadas extremas
print("\nRango de coordenadas USOS_ZONA_7:")
print(USOS_ZONA_7.total_bounds)
print("\nRango de coordenadas Denue:")
print(Denue.total_bounds)

## Clasificar las distintas actividades económicas con base en su terminación en "codigo_act" basándonos en el Directorio Estadístico Nacional de Unidades Económicas

In [None]:
# Función para clasificar según la terminación de codigo_act
def asignar_tipo(codigo):
    if pd.isna(codigo):  # Si está vacío
        return 'Sin código'
    
    # Asegurar que sea string para evaluar el código
    codigo_str = str(codigo).strip()

    if not codigo_str.isdigit():
        return 'Código inválido'
    
    # Define tus conjuntos de códigos
    industria = {'11', '21', '23', '31', '32', '33', '55'}
    
    servicios = {'22', '48', '49', '52', '53', '54', '56', '72',
                 '81'}
    
    comercio = {'43', '46'}
    
    cultural_recreativo = {'51','71'}
    
    educacion = {'61'}
    
    salud = {'62'}

    gobierno = {'93'}
    
    # Verificar si el código está en alguno de los conjuntos
    if codigo_str[:2] in industria:
        return 'Industria'
    elif codigo_str[:2] in servicios:
        return 'Servicios'
    elif codigo_str[:2] in comercio:
        return 'Comercio'
    elif codigo_str[:2] in cultural_recreativo:
        return 'Cultural/Recreativo'
    elif codigo_str[:2] in educacion:
        return 'Educación'
    elif codigo_str[:2] in salud:
        return 'Salud'
    elif codigo_str[:2] in gobierno:
        return 'Gobierno'
    else:
        return 'Desconocido'

# Aplica la función al DataFrame
Denue['tipo_act'] = Denue['codigo_act'].apply(asignar_tipo)

In [None]:
Denue.head(3)

In [None]:
Denue["tipo_act"].unique()

In [None]:
desconocidos = Denue[Denue['tipo_act'] == 'Desconocido']
desconocidos

### Relación espacio -> espacio

#### Mostrar de manera completa las relaciones y jerarquías entre los distintos espacios

In [None]:
jerarquia_completa = Denue[['cve_ent', 'cve_mun', 'cve_loc', 'ageb', 'manzana']].sort_values(by=['cve_ent', 'cve_mun', 'cve_loc', 'ageb', 'manzana'])
jerarquia_completa.head(10)

In [None]:
jerarquia_completa.shape

In [None]:
conteo_manzanas = jerarquia_completa.groupby(['cve_ent', 'cve_mun', 'cve_loc', 'ageb', 'manzana']) \
    .size().reset_index(name='cont_manzanas') #  Contar cuántas manzanas hay por agrupación y convertimos a string

In [None]:
conteo_manzanas.head(5)

In [None]:
conteo_manzanas["cont_manzanas"].unique()

In [None]:
jerarquia_completa['cod_16d'] = (
    jerarquia_completa['cve_ent'].astype(str).str[:2] + 
    jerarquia_completa['cve_mun'].astype(str).str[:3] + 
    jerarquia_completa['cve_loc'].astype(str).str[:4] + 
    jerarquia_completa['ageb'].astype(str).str[:4] + 
    jerarquia_completa['manzana'].astype(str).str[:3]
)

In [None]:
jerarquia_completa

In [None]:
Denue_completo = Denue.merge(
    jerarquia_completa)

Denue_completo.head(3)

In [None]:
Denue_completo.shape

## Importar las manzanas de censo

In [None]:
schema = "sociodemografico"
table = "censo_inegi_20_mza"

entidad = "cve_ent"
localidad = "cve_loc"
municipio = "cve_mun"
manzana = "cvegeo_mza"
ageb = "cve_ageb"
ambito = "ambito"
tipomza = "tipomza"
pobtot = "pobtot"

query_censo = f"""
SELECT 
"{entidad}",
"{manzana}",
"{localidad}",
"{municipio}",
"{ageb}",
"{ambito}",
"{tipomza}",
"{pobtot}",
"geometry"
FROM {schema}.{table}
WHERE ST_Intersects(
  ST_Transform(geometry, 32613),  -- Transformar geometría de DENUE a 32613 
  ST_GeomFromText('{poly_wkt}', 32613)  -- Usar el polígono en 32613
)
"""

# Ejecutar la consulta y cargar los datos en un GeoDataFrame
censo_denue = aup.gdf_from_query(query_censo, geometry_col='geometry')
censo_denue = censo_denue.to_crs("EPSG:32613")

print(censo_denue.shape)
censo_denue.head()

### Contabilizar cuántas personas hay ejerciendo cada actividad económica

In [None]:
def number_of_jobs(per_ocu):
    jobs_dict = {'0 a 5 personas':3,
                '6 a 10 personas':8,
                '11 a 30 personas':20,
                '31 a 50 personas':40,
                '51 a 100 personas':75,
                '101 a 250 personas':175,
                '251 y más personas':325}
    per_ocu_num = jobs_dict[per_ocu]
    return per_ocu_num

In [None]:
Denue_completo['per_ocu_num'] = Denue_completo.per_ocu.apply(lambda per_ocu: number_of_jobs(per_ocu))
Denue_completo.head(2)

### Hacer el match entre las manzanas del denue y las manzanas del censo

In [None]:
censo_denue.rename(columns = {"cve_ageb": "ageb"} , inplace = True)
censo_denue.rename(columns = {"cvegeo_mza": "manzana"} , inplace = True)

In [None]:
# Claves únicas del DENUE (para verificar coincidencias)
claves_denue = Denue_completo[
    ['cve_ent', 'cve_mun', 'cve_loc', 'ageb', 'manzana', 'cod_16d]]

In [None]:
claves_denue

In [None]:
# Añadir columna binaria (1 si existe en DENUE, 0 si no)
censo_denue['tiene_denue'] = censo_denue.apply(
    lambda row: 1 if (
        (claves_denue['cod_16d'] == row['manzana'])
    ).any() else 0,
    axis=1
)

In [None]:
censo_denue.head(2)

In [None]:
(censo_denue["tiene_denue"] == 0).sum() # Manzanas sin denue

In [None]:
censo_con_denue = censo_denue[censo_denue['tiene_denue'] != 0] 
# Nos quedamos únicamente con aquellas manzanas que sí encontraron coincidencia con Denue
print(censo_con_denue.shape)
censo_con_denue.head(2)

## Crear centroides

In [None]:
print("Columnas en Denue_completo:", Denue_completo.columns.tolist())

In [None]:
censo_con_denue.columns

In [None]:
print(censo_con_denue[['cve_ent']].isnull().sum())

In [None]:
censo_con_denue.info()

In [None]:
print(censo_denue.columns[censo_denue.columns.duplicated()])

In [None]:
# Calcular centroides de manzanas
manzanas_gdf = (
    censo_con_denue[['cve_ent', 'cve_mun', 'cve_loc', 'ageb', 'manzana', 'geometry']]
    .drop_duplicates()
    .assign(centroide=lambda df: df.geometry.centroid)
)

# Calcular distancias entre establecimientos y centroides
denue_con_distancias = (
    Denue_completo[['cve_ent', 'cve_mun', 'cve_loc', 'ageb', 'cod_16d', 'geometry', 'codigo_act', 'per_ocu_num']]
    .merge(manzanas_gdf[['cve_ent', 'cve_mun', 'cve_loc', 'ageb', 'manzana', 'centroide']],
           on=['cve_ent', 'cve_mun', 'cve_loc', 'ageb'])
    .assign(distancia=lambda df: df.geometry.distance(df.centroide))
)

# Calcular d_mean directamente
d_mean_por_manzana = denue_con_distancias.groupby( # distancia promedio entre los establecimientos del DENUE y el centroide de su manzana.
    ['cve_ent', 'cve_mun', 'cve_loc', 'ageb', 'manzana']
)['distancia'].mean().reset_index(name = 'd_mean')


# Asegurar que d_mean esté en manzanas_gdf
manzanas_gdf = manzanas_gdf.merge(d_mean_por_manzana, on=['cve_ent', 'cve_mun', 'cve_loc', 'ageb', 'manzana'], how='left')

resultados = []
for _, manzana in manzanas_gdf.iterrows():
    dentro_manzana = Denue_completo[
        Denue_completo['geometry'].within(manzana['geometry'])
    ]
    
    resultados.append({
        'cve_ent': manzana['cve_ent'],
        'cve_mun': manzana['cve_mun'],
        'cve_loc': manzana['cve_loc'],
        'ageb': manzana['ageb'],
        'manzana': manzana['manzana'],
        'd_mean': manzana['d_mean'], 
        'total_actividades': len(dentro_manzana),
        'actividades_diferentes': dentro_manzana['codigo_act'].nunique(), # Conteo de actividades por mza
        'per_ocu_total': dentro_manzana['per_ocu_num'].sum(),
        'geometry': manzana['geometry']
    })

# Confirmamos que d_mean no tiene valores nulos
# print(analisis_final[['d_mean']].isna().sum())  
#  Resumen
print("Resumen del análisis:")
print(f"- Manzanas analizadas: {len(analisis_final)}")
print(f"- Distancia media promedio: {analisis_final['d_mean'].mean():.2f} metros")
print(f"- Número de actividades promedio por manzana: {analisis_final['total_actividades'].mean():.2f}")
print(f"- Actividades económicas únicas por manzana: {analisis_final['actividades_diferentes'].mean():.2f}")


In [None]:
analisis_final.shape

In [None]:
analisis_final.head(2)

In [None]:
fig, ax = plt.subplots(figsize=(10, 8))
manzanas_gdf.plot(ax=ax, color='none', edgecolor='gray', label="Manzanas")
Denue_completo.plot(ax=ax, color='red', markersize=5, label="Actividad")
manzanas_gdf['centroide'].plot(ax=ax, color='blue', markersize=10, label="Centroides")

plt.title("Centroides de Manzanas y Establecimientos DENUE")
plt.legend()
plt.show()

In [None]:
plt.figure(figsize=(8, 5))
plt.hist(d_mean_por_manzana['d_mean'], bins=30, color='orchid', alpha=0.7, edgecolor='black')
plt.xlabel("Distancia Media (d_mean)")
plt.ylabel("Frecuencia")
plt.title("Distribución de Distancia Media a Centroides")
plt.grid()
plt.show()

# Spatial-kde

In [None]:
!pip install spatial-kde

In [None]:
from typing import Optional
from spatial_kde import spatial_kernel_density


spatial_kernel_density(
    points: gpd.GeoDataFrame = gdf,
    radius: float = 1.0,
    output_path: str = "/output/path.tif",
    output_pixel_size: float = 1.0,
    output_driver: str = "GTiff",
    weight_col: Optional[str] = None,
    scaled: bool = False,
)

    """Calculate Kernel Density / heatmap from ``points``

    .. note:: Distance calculations are planar so care should be taken with data
              that is in geographic coordinate systems

    Parameters
    ----------
    points : gpd.GeoDataFrame
        Input GeoDataFrame of points to generate a KDE from
    radius : float
        Radius of KDE, same units as the coordinate reference system of ``points``
        Sometimes referred to as search radius or bandwidth
    output_path : str
        Path to write output raster to
    output_pixel_size : float
        Output cell/pixel size of the created array. Same units as the coordinate
        reference system of ``points``
    output_driver : str
        Output format (driver) used to create image. See also
        https://rasterio.readthedocs.io/en/latest/api/rasterio.drivers.html
    weight_col : Optional[str], optional
        A column in ``points`` to weight the kernel density by, any points that
        are NaN in this field will not contribute to the KDE.
        If None, the all points will have uniform weight of 1.
    scaled : bool
        If True will output mathematically scaled values, else will output raw
        values.
    """