# covariables

> Métodos para preparar covariables para análisis criminológico

In [None]:
#| default_exp covariables

In [None]:
#| include: false
from nbdev.showdoc import *

In [None]:
#| export
import os
import glob
from pathlib import Path
from fastcore.basics import *
import numpy as np
import pandas as pd
import geopandas as gpd
import seaborn as sns
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from criminologia_cdmx.etl import DATA_PATH, DOWNLOADS_PATH
import requests

## get_diccionario_censo

In [None]:
#| export
def get_diccionario_censo() -> pd.DataFrame:
    """Regresa un DataFrame con el diccionario de variables del censo."""
    fname = 'diccionario_datos_ageb_urbana_09_cpv2020.csv'
    absp = os.path.abspath(os.path.join(DATA_PATH, fname))
    dicionario = pd.read_csv(absp, skiprows=3)
    diccionario = (dicionario
                   .drop(range(0,8))
                   .drop(columns='Núm.')
                   .reset_index(drop=True)
                   .rename({'Mnemónico':'Nombre del Campo'}, axis=1))
    return diccionario

In [None]:
diccionario = get_diccionario_censo()
diccionario.head()

Unnamed: 0,Indicador,Descripción,Nombre del Campo,Rangos,Longitud
0,Población total,Total de personas que residen habitualmente en...,POBTOT,0...999999999,9
1,Población femenina,Total de mujeres que residen habitualmente en ...,POBFEM,0...999999999,9
2,Población masculina,Total de hombres que residen habitualmente en ...,POBMAS,0...999999999,9
3,Población de 0 a 2 años,Personas de 0 a 2 años de edad.,P_0A2,0…999999999,9
4,Población femenina de 0 a 2 años,Mujeres de 0 a 2 años de edad.,P_0A2_F,"0.,.999999999",9


## get_variables_censo

In [None]:
#| export
def get_variables_censo() -> pd.DataFrame:
    """Regresa un DataFrame con las variables del censo a nivel manzana."""
    fname = 'censo_manzanas.zip'
    absp = os.path.abspath(os.path.join(DATA_PATH, fname))
    df = pd.read_csv(absp, dtype={'CVEGEO':str, 'colonia_cve': 'Int64'})
    return df

In [None]:
censo = get_variables_censo()
censo.head()

Unnamed: 0,CVEGEO,AMBITO,TIPOMZA,POBTOT,POBFEM,POBMAS,P_0A2,P_0A2_F,P_0A2_M,P_3YMAS,...,VPH_INTER,VPH_STVP,VPH_SPMVPI,VPH_CVJ,VPH_SINRTV,VPH_SINLTC,VPH_SINCINT,VPH_SINTIC,colonia_cve,cuadrante_id
0,901000010898031,Urbana,Típica,93.0,56.0,37.0,4.0,,3.0,89.0,...,15.0,16.0,6.0,3.0,0.0,0.0,7.0,0.0,1119,17.0
1,901000012269024,Urbana,Típica,6.0,,,,,,,...,,,,,,,,,1082,14.0
2,901000011472068,Urbana,Típica,124.0,66.0,58.0,3.0,3.0,0.0,121.0,...,25.0,22.0,9.0,8.0,0.0,,7.0,0.0,1030,11.0
3,901000011824024,Urbana,Típica,340.0,177.0,163.0,12.0,8.0,4.0,327.0,...,69.0,56.0,29.0,14.0,,,25.0,,1135,110.0
4,901000012377004,Urbana,Típica,82.0,41.0,41.0,,0.0,,80.0,...,13.0,13.0,6.0,3.0,0.0,0.0,9.0,0.0,1081,18.0


## imputa_faltantes_manzana

In [None]:
#| export
def imputa_faltantes_manzana(censo: pd.DataFrame, # Variables del censo en manzanas `get_variables_censo`,
                             cols: list, # Lista con las columnas en donde se deben imputar faltantes.
                             metodo:str='ceros' # método a usar para la imputación.
    ) -> pd.DataFrame: # Igual a `censo` pero con los datos faltantes imputados
    """ Regresa un df con los datos faltantes imputados a nivel manzana.
    
        params:
        
        cols: list: lista con las columnas en donde se deben imputar faltantes.
        metodo: método a usar para la imputación.
        NOTA: Por lo pronto sólo implementa dos métodos muy simples: llena con ceros 
        o aleatorio entre 0 y 3. En el futuro podríamos implementar mejores formas
    """
    if metodo == 'ceros':
        censo = censo.fillna(0)
    elif metodo == 'random':
        rand = pd.DataFrame(np.random.randint(0, 4, size=(censo.shape[0], len(cols))),
                            columns=cols, 
                            index=censo.index)
        censo.update(rand) 
    else:
        raise ValueError("imputacion debe ser ceros o random")
    return censo

:::{.callout-warning}

Por lo pronto sólo implementa dos métodos muy simples: llena con ceros o aleatorio entre 0 y 3. En el futuro podríamos implementar mejores formas

:::

In [None]:
# Prueba funcionamiento normal
vars_pob = [v for v in diccionario['Nombre del Campo'].unique() 
            if (v.startswith('P') and v != 'PROM_HNV') ]
vars_viv = [v for v in diccionario['Nombre del Campo'].unique() if v.startswith('V')]
vars_viv.append('OCUPVIVPAR')
cols = vars_pob + vars_viv
imputados_ceros = imputa_faltantes_manzana(censo.head().copy(), cols)
imputados_rand = imputa_faltantes_manzana(censo.head().copy(), cols, metodo='random')
assert np.count_nonzero(imputados_rand.loc[:,cols].head().isna()) == 0
assert np.count_nonzero(imputados_ceros.loc[:,cols].head().isna()) == 0
# Prueba que arroje la excepción
# TODO

## agrega_en_unidades

In [None]:
#| export
def agrega_en_unidades(censo: pd.DataFrame, # Variables del censo en manzanas `get_variables_censo`
                       diccionario: pd.DataFrame, # `get_diccionario_censo`
                       agregacion:str='colonias', # colonias/cuadrantes o nombre del campo en `censo`
                       imputacion:str='ceros', # ceros/random. método para rellenar los datos faltantes.
                       umbral_faltantes:float=0.5  # Porcentaje de datos faltantes en una manzana para considerarla en el análisis
    ) -> pd.DataFrame: # Los datos del censo agregados en las unidades requeridas.
    """ Agrega las variables del censo en las unidades espaciales especificadas."""
    vars_pob = [v for v in diccionario['Nombre del Campo'].unique() 
                if (v.startswith('P') and v != 'PROM_HNV') ]
    vars_viv = [v for v in diccionario['Nombre del Campo'].unique() if v.startswith('V')]
    vars_viv.append('OCUPVIVPAR')
    if agregacion == 'colonias':
        columna_agrega = 'colonia_cve'
    elif agregacion == 'cuadrantes':
        columna_agrega = 'cuadrante_id'
    else:
        columna_agrega = agregacion
    try:
        assert columna_agrega in censo.columns
    except AssertionError:
        print("La columna de agregación debe estar en los datos.")
        raise
    censo.dropna(thresh=umbral_faltantes*(len(vars_pob) + len(vars_viv)), inplace=True)
    censo = imputa_faltantes_manzana(censo, vars_pob + vars_viv, metodo=imputacion)
    censo = censo[[columna_agrega] + vars_pob + vars_viv].groupby(columna_agrega).sum()
    # Calculamos las columnas que requieren trato espacial
    censo['PROM_OCUP'] = censo['OCUPVIVPAR'].div(censo['VIVPAR_HAB'])
    censo['PROM_OCUP_C'] = censo['OCUPVIVPAR'].div(censo['VPH_1CUART'] + 2*censo['VPH_2CUART'] + 3*censo['VPH_3YMASC'])
    return censo

:::{.callout-warning}

Las columnas PROM_HNV, GRAPROES(F/M) se pierden porque no hay forma de calcularlas.

:::

In [None]:
agregado = agrega_en_unidades(censo, diccionario, imputacion='random')
agregado = agrega_en_unidades(censo, diccionario)
agregado

Unnamed: 0_level_0,POBTOT,POBFEM,POBMAS,P_0A2,P_0A2_F,P_0A2_M,P_3YMAS,P_3YMAS_F,P_3YMAS_M,P_5YMAS,...,VPH_INTER,VPH_STVP,VPH_SPMVPI,VPH_CVJ,VPH_SINRTV,VPH_SINLTC,VPH_SINCINT,VPH_SINTIC,OCUPVIVPAR,PROM_OCUP_C
colonia_cve,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
0,135.0,131.0,139.0,129.0,120.0,118.0,135.0,109.0,117.0,115.0,...,129.0,110.0,112.0,107.0,123.0,127.0,120.0,116.0,103.0,0.137517
1,87.0,79.0,75.0,74.0,84.0,78.0,79.0,74.0,87.0,76.0,...,70.0,76.0,77.0,76.0,84.0,79.0,85.0,84.0,83.0,0.156900
2,94.0,110.0,104.0,114.0,97.0,92.0,106.0,105.0,98.0,107.0,...,114.0,102.0,105.0,108.0,110.0,100.0,92.0,109.0,93.0,0.141123
3,16.0,20.0,23.0,22.0,28.0,21.0,23.0,26.0,19.0,24.0,...,20.0,28.0,23.0,15.0,18.0,22.0,26.0,26.0,16.0,0.155340
4,51.0,40.0,37.0,47.0,44.0,41.0,48.0,29.0,50.0,33.0,...,48.0,55.0,32.0,42.0,42.0,51.0,41.0,49.0,48.0,0.173285
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1820,74.0,101.0,102.0,100.0,98.0,102.0,98.0,89.0,91.0,89.0,...,82.0,66.0,99.0,109.0,106.0,101.0,87.0,73.0,102.0,0.205231
1821,205.0,181.0,214.0,223.0,225.0,209.0,204.0,202.0,220.0,226.0,...,206.0,204.0,219.0,210.0,206.0,206.0,193.0,202.0,193.0,0.156783
1822,3.0,3.0,5.0,7.0,1.0,7.0,5.0,8.0,3.0,4.0,...,11.0,4.0,5.0,11.0,7.0,5.0,4.0,10.0,2.0,0.038462
1823,55.0,77.0,62.0,68.0,60.0,74.0,71.0,62.0,53.0,77.0,...,58.0,74.0,77.0,57.0,77.0,71.0,77.0,75.0,80.0,0.193237


## censo_a_tasas

In [None]:
#| export
def censo_a_tasas(censo: pd.DataFrame, # Puede venir de `agrega_en_unidades` o `imputa_faltantes_manzana`
                  diccionario, # `get_diccionario_censo`
                  umbral_faltantes:float=0.5  # Porcentaje de datos faltantes en una manzana para considerarla en el análisis
    ):
    """ Convierte las variables del censo a tasas en la agregación seleccionada."""
    pob_col = 'POBTOT'
    hog_col = 'TOTHOG'
    viv_col = 'VIVPAR_HAB'
    vars_pob = [v for v in diccionario['Nombre del Campo'].unique() if v.startswith('P')]
    vars_pob_no_tasa = ['POBTOT', 'PROM_HNV']
    vars_pob = [v for v in vars_pob if (v not in vars_pob_no_tasa)]
    vars_viv_no_tasa = ['VIVPAR_HAB', 'PROM_OCUP', 'PRO_OCUP_C'] # No tiene sentido calcular tasas para estas variables
    vars_viv = [v for v in diccionario['Nombre del Campo'].unique() 
                if (v.startswith('V') and v not in vars_viv_no_tasa)]
    censo[vars_pob] = censo[vars_pob].div(censo[pob_col], axis=0)
    censo[vars_viv] = censo[vars_viv].div(censo[viv_col], axis=0)
    censo = censo.dropna(thresh=umbral_faltantes*(len(vars_pob) + len(vars_viv)))
    return censo

In [None]:
agregado_tasas = censo_a_tasas(agregado, diccionario)
manzanas_tasas = censo_a_tasas(imputados_ceros, diccionario)

## get_uso_de_suelo

In [None]:
#| export
def get_uso_de_suelo() -> pd.DataFrame:
    """Regresa un DataFrame con las variables de uso de suelo a nivel manzana."""
    absp = os.path.abspath(os.path.join(DATA_PATH, 'usos_suelo.csv'))
    df = pd.read_csv(absp, dtype={'CVEGEO':str,'colonia_cve': 'Int64','cuadrante_id':str})
    return df

In [None]:
uso_suelo = get_uso_de_suelo()
uso_suelo

Unnamed: 0,CVEGEO,colonia_cve,cuadrante_id,Industria,Comercio,Servicios
0,0901000010898031,1119,017,1,4,5
1,0901000012269024,1082,014,0,0,0
2,0901000011472068,1030,011,1,1,0
3,0901000011824024,1135,0110,0,3,2
4,0901000012377004,1081,018,0,0,0
...,...,...,...,...,...,...
66379,0900700015376020,1442,019,0,0,0
66380,0900700015376021,1442,019,1,4,4
66381,0900700013045056,1419,0113,0,0,0
66382,0900700013045032,1419,0113,2,13,14


## agrega_uso_suelo

In [None]:
#| export
def agrega_uso_suelo(usos:pd.DataFrame, # `get_uso_de_suelo`
                     unidades:str='colonias' # colonias/cuadrantes
    ):
    """ Regresa un DataFrame con los usos agregados en las unidades espaciales."""
    if unidades == 'colonias':
        columna_agrega = 'colonia_cve'
    elif unidades == 'cuadrantes':
        columna_agrega = 'cuadrante_id'
    else:
        raise ValueError("unidades debe ser 'colonias' o 'cuadrantes'")
    usos = usos.groupby(columna_agrega).sum(numeric_only=True)
    usos['Intensidad'] = usos.sum(axis=1)
    usos['Entropía'] = (np.log(usos[['Industria', 'Comercio', 'Servicios']]
                               .div(usos['Intensidad'], axis=0))
                        .sum(axis=1) / np.log(3))
    return usos

In [None]:
us = agrega_uso_suelo(uso_suelo, unidades='colonias')
assert len(set(['Intensidad', 'Entropía']).intersection(set(us.columns))) == 2
us = agrega_uso_suelo(uso_suelo, unidades='cuadrantes')
assert len(set(['Intensidad', 'Entropía']).intersection(set(us.columns))) == 2
# TODO probar que arroja la excepción

  usos = usos.groupby(columna_agrega).sum()
  result = func(self.values, **kwargs)
  usos = usos.groupby(columna_agrega).sum()


## IndicePCA

In [None]:
#| export
class IndicePCA(object):
    """ Clase para crear indices basados en PCA."""
    def __init__(self, 
                covariables:pd.DataFrame, # `agrega_en_unidades` El índice del DataFrame debe ser el id de la unidad de agregación
                vars_indice:list # ista de las columnas con las que calculamos el índice
        ):
        self.datos = covariables
        self.vars_indice = vars_indice
        self.varianza_explicada = None
        self.indice = None
        


### Calcular el índice

Para calcular el índice sólo lammamos al método.

In [None]:
#| export
@patch
def calcula_indice(self:IndicePCA):
    """Calcula el índice y lo guarda en self.indice"""
    pca = PCA(n_components=1, svd_solver='full', random_state=1)
    indicadores = self.datos.replace([np.inf, -np.inf], np.nan)
    indicadores = indicadores[self.vars_indice].dropna()
    X = StandardScaler().fit_transform(indicadores.values)
    pca.fit(X)
    self.varianza_explicada = pca.explained_variance_ratio_
    indice = pca.fit_transform(X)[:,:1]
    id_var = indicadores.index.name
    df = indicadores.reset_index()[[id_var]]
    # df['Índice'] = abs(indice)
    df['Índice'] = indice
    self.indice = df

In [None]:
diccionario = get_diccionario_censo()
censo = get_variables_censo()
agregado = agrega_en_unidades(censo, diccionario, imputacion='random')
agregado = censo_a_tasas(agregado, diccionario)
vars_indice = ['P5_HLI', 'POB_AFRO', 'PCON_DISC', 'P3A5_NOA', 
               'P6A11_NOA', 'P12A14NOA', 'P15YM_AN', 'PSINDER', 'PDESOCUP']
# indice = get_indice_pca(agregado, vars_indice)
# indice
indice = IndicePCA(agregado, vars_indice)
indice.calcula_indice()
print(indice.varianza_explicada)

[0.67916195]


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()