In [75]:
import os
from pathlib import Path

import geopandas as gpd
import numpy as np
import osmnx as ox
import pandana as pdna
import pandas as pd
import pca
import itertools

In [2]:
data_path = Path(os.environ["DATA_PATH"])
population_grids_path = Path(os.environ["POPULATION_GRIDS_PATH"])
segregation_path = Path(os.environ["SEGREGATION_PATH"])
census_path = Path(os.environ["CENSUS_PATH"])
geostatistical_framework_path = Path(os.environ["GEOSTATISTICAL_FRAMEWORK_PATH"])

# DataFrames

## Census

In [54]:
df_census_base = pd.read_csv(
    census_path / "2020" / "08.csv",
    usecols=[
        "ENTIDAD",
        "MUN",
        "LOC",
        "AGEB",
        "MZA",
        "NOM_LOC",
        "POBTOT",
        "P_0A2",
        "P_3A5",
        "P_60YMAS",
        "P18YM_PB",
        "P_18YMAS",
        "GRAPROES",
        "TVIVPARHAB",
        "PRO_OCUP_C",
        "VPH_PISODT",
        "VPH_C_ELEC",
        "VPH_REFRI",
        "VPH_LAVAD",
        "VPH_HMICRO",
        "VPH_NDACMM",
    ],
)

## AGEBs

In [55]:
df_geom_agebs = (
    gpd.read_file(
        population_grids_path
        / "final"
        / "zone_agebs"
        / "shaped"
        / "2020"
        / "08.2.03.gpkg",
    )
    .drop(columns=["POBTOT"])
    .set_index("CVEGEO")
)

df_census_agebs = (
    df_census_base.query("NOM_LOC == 'Total AGEB urbana'")
    .assign(
        CVEGEO=lambda df: (
            df["ENTIDAD"].astype(str).str.zfill(2)
            + df["MUN"].astype(str).str.zfill(3)
            + df["LOC"].astype(str).str.zfill(4)
            + df["AGEB"].astype(str).str.zfill(4)
        ),
    )
    .drop(columns=["ENTIDAD", "MUN", "LOC", "AGEB", "NOM_LOC"])
    .set_index("CVEGEO")
    .replace("*", np.nan)
    .astype(float)
)

df = df_geom_agebs.join(df_census_agebs, how="inner")

## Blocks

In [5]:
df_geom_blocks = (
    gpd.read_file(geostatistical_framework_path / "2020" / "08_chihuahua" / "08m.shp")
    .assign(CVEGEO_AGEB=lambda x: x.CVEGEO.str[:13])
    .query("CVEGEO_AGEB in @df_geom_agebs.index")
    .set_index("CVEGEO")
    .to_crs("EPSG:6372")
)


df_census_blocks = (
    df_census_base.assign(
        CVEGEO=lambda df: (
            df["ENTIDAD"].astype(str).str.zfill(2)
            + df["MUN"].astype(str).str.zfill(3)
            + df["LOC"].astype(str).str.zfill(4)
            + df["AGEB"].astype(str).str.zfill(4)
            + df["MZA"].astype(str).str.zfill(3)
        ),
    )
    .drop(columns=["ENTIDAD", "MUN", "LOC", "AGEB", "MZA", "NOM_LOC"])
    .set_index("CVEGEO")
    .replace(["*", "N/D"], np.nan)
    .astype(float)
)

df_blocks = df_geom_blocks.join(df_census_blocks, how="inner")

# Gráfica

In [6]:
bounds = tuple(float(x) for x in df.to_crs("EPSG:4326").total_bounds)

g = ox.graph_from_bbox(bounds, network_type="drive")
g = ox.project_graph(g, to_crs="EPSG:6372")
g = ox.add_edge_speeds(g)
g = ox.add_edge_travel_times(g)

In [7]:
coords = df_blocks.centroid.get_coordinates()
df_blocks["node_id"] = ox.nearest_nodes(g, coords["x"], coords["y"])

In [8]:
df_nodes, df_edges = ox.graph_to_gdfs(g, nodes=True, edges=True)
df_edges = df_edges.reset_index()

In [9]:
net = pdna.Network(
    df_nodes["x"],
    df_nodes["y"],
    df_edges["u"],
    df_edges["v"],
    df_edges[["travel_time"]],
)
net.precompute(3600)

Generating contraction hierarchies with 1 threads.
Setting CH node vector of size 54312
Setting CH edge vector of size 146959
Range graph removed 130488 edges of 293918
. 10% . 20% . 30% . 40% . 50% . 60% . 70% . 80% . 90% . 100%


# Stats

In [10]:
rows = {}

## Porcentaje población 0-5 años

In [11]:
rows["porcentaje_pob_0a5"] = (df["P_0A2"] + df["P_3A5"]) / df["POBTOT"]

## Porcentaje de población >60

In [12]:
rows["porcentaje_pob_60"] = df["P_60YMAS"] / df["POBTOT"]

## Porcentaje de población con menos de preparatoria terminada

In [13]:
rows["porcentaje_menos_prepa_terminada"] = 1 - df["P18YM_PB"] / df["P_18YMAS"]

## Ingreso

In [14]:
rows["ingreso"] = (
    gpd.read_file(segregation_path / "incomes" / "M08.04.gpkg")
    .rename(columns={"cvegeo": "CVEGEO"})
    .set_index("CVEGEO")["income_pc"]
)

In [15]:
columns = []
for key, value in rows.items():
    columns.append(value.rename(key))
out = pd.concat(columns, axis=1).join(df[["geometry"]])
out = gpd.GeoDataFrame(out, crs=df.crs, geometry="geometry")

# Accesibilidad

In [16]:
def calculate_weighted_accessibility(
    net: pdna.Network,
    coords: pd.DataFrame,
    poi_type: str,
    df_blocks: gpd.GeoDataFrame,
    *,
    weight_col: str = "POBTOT",
) -> pd.Series:
    net.set_pois(poi_type, 3600, 1, coords["x"], coords["y"])
    res = (
        net.nearest_pois(3600, poi_type)
        .rename(columns={1: "travel_time"})
        .reset_index(names="node_id")
    )
    return (
        df_blocks.merge(res, on="node_id", how="left")
        .assign(num=lambda df: df[weight_col] * df["travel_time"])
        .groupby("CVEGEO_AGEB")
        .agg(
            {
                "num": "sum",
                weight_col: "sum",
            },
        )
        .assign(**{f"{poi_type}_travel_time": lambda df: df["num"] / df[weight_col]})[
            f"{poi_type}_travel_time"
        ]
        .rename_axis("CVEGEO")
    )

## Hospitales

In [17]:
df_hospitals = (
    gpd.read_file(data_path / "datos" / "Unidad_Medica_Wgs84")
    .reset_index(drop=True)
    .filter(["geometry"])
    .to_crs("EPSG:6372")
)

rows["tiempo_viaje_hospitales"] = (
    calculate_weighted_accessibility(
        net,
        df_hospitals.get_coordinates(),
        "hospital",
        df_blocks,
    )
    / 60
)

## Preparatorias

In [18]:
df_highschools = (
    gpd.read_file(data_path / "datos" / "Preparatorias_Wgs84")
    .reset_index(drop=True)
    .filter(["geometry"])
    .to_crs("EPSG:6372")
)

rows["tiempo_viaje_preparatorias"] = (
    calculate_weighted_accessibility(
        net,
        df_highschools.get_coordinates(),
        "highschool",
        df_blocks,
    )
    / 60
)

## Espacios recreativos

# Desigualdad social

In [62]:
df_uneq = (
    df.assign(
        prom_ocupantes=lambda df: df["PRO_OCUP_C"],
        frac_sin_piso_tierra=lambda df: df["VPH_PISODT"] / df["TVIVPARHAB"],
        frac_con_electrica=lambda df: df["VPH_C_ELEC"] / df["TVIVPARHAB"],
        frac_sin_vehiculo=lambda df: df["VPH_NDACMM"] / df["TVIVPARHAB"],
        frac_con_refrigerador=lambda df: df["VPH_REFRI"] / df["TVIVPARHAB"],
        frac_con_lavadora=lambda df: df["VPH_LAVAD"] / df["TVIVPARHAB"],
        frac_con_microondas=lambda df: df["VPH_HMICRO"] / df["TVIVPARHAB"],
        grado_prom_escolaridad=lambda df: df["GRAPROES"],
    )
    .filter(regex="frac|prom")
    .dropna()
)

In [78]:
model = pca.pca(normalize=True, n_components=1)

pca_results = {}
for i in range(1, len(df_uneq.columns) + 1):
    for comb in itertools.combinations(df_uneq.columns, i):
        temp = df_uneq.filter(comb)
        res = model.fit_transform(temp)
        pca_results[tuple(comb)] = res["explained_var"][0]

[06-08-2025 17:44:58] [pca.pca] [INFO] Extracting column labels from dataframe.
[06-08-2025 17:44:58] [pca.pca] [INFO] Extracting row labels from dataframe.
[06-08-2025 17:44:58] [pca.pca] [INFO] Normalizing input data per feature (zero mean and unit variance)..
[06-08-2025 17:44:58] [pca.pca] [INFO] The PCA reduction is performed on the 1 columns of the input dataframe.
[06-08-2025 17:44:58] [pca.pca] [INFO] Fit using PCA.
[06-08-2025 17:44:58] [pca.pca] [INFO] Compute loadings and PCs.
[06-08-2025 17:44:58] [pca.pca] [INFO] Compute explained variance.
[06-08-2025 17:44:58] [pca.pca] [INFO] Outlier detection using Hotelling T2 test with alpha=[0.05] and n_components=[1]
[06-08-2025 17:44:58] [pca.pca] [INFO] Multiple test correction applied for Hotelling T2 test: [fdr_bh]
[06-08-2025 17:44:58] [pca.pca] [INFO] Outlier detection using SPE/DmodX with n_std=[3]
[06-08-2025 17:44:58] [pca.pca] [INFO] Cleaning previous fitted model results...
[06-08-2025 17:44:58] [pca.pca] [INFO] Extracti

In [81]:
pca_results

{('prom_ocupantes',): 1.0,
 ('frac_sin_piso_tierra',): 1.0,
 ('frac_con_electrica',): 1.0,
 ('frac_sin_vehiculo',): 1.0,
 ('frac_con_refrigerador',): 1.0,
 ('frac_con_lavadora',): 1.0,
 ('frac_con_microondas',): 1.0,
 ('grado_prom_escolaridad',): 1.0,
 ('prom_ocupantes', 'frac_sin_piso_tierra'): 0.6631914253981327,
 ('prom_ocupantes', 'frac_con_electrica'): 0.5537501223549237,
 ('prom_ocupantes', 'frac_sin_vehiculo'): 0.8014416649193914,
 ('prom_ocupantes', 'frac_con_refrigerador'): 0.7268089902130876,
 ('prom_ocupantes', 'frac_con_lavadora'): 0.7741558474147308,
 ('prom_ocupantes', 'frac_con_microondas'): 0.9138046383870159,
 ('prom_ocupantes', 'grado_prom_escolaridad'): 0.8504100555141073,
 ('frac_sin_piso_tierra', 'frac_con_electrica'): 0.8561059453321719,
 ('frac_sin_piso_tierra', 'frac_sin_vehiculo'): 0.5522990937321621,
 ('frac_sin_piso_tierra', 'frac_con_refrigerador'): 0.8560699671730383,
 ('frac_sin_piso_tierra', 'frac_con_lavadora'): 0.7744439792914147,
 ('frac_sin_piso_tierr

array([0.57921927, 0.81397394, 0.87155097, 0.91299994, 0.94965561,
       0.9734597 , 0.9868635 , 1.        ])

# Out

In [21]:
out = gpd.GeoDataFrame(
    pd.DataFrame.from_dict(rows, orient="index").transpose().join(df[["geometry"]]),
    crs=df.crs,
    geometry="geometry",
).to_crs("EPSG:4326")
out.to_file("./test.gpkg")