# Business Queries with the INEGI database

In [1]:
import findspark # Activates Apache Spark, an engine for large-scale data processing.
findspark.init()
import shapely # manipulation and analysis of planar geometric objects
import pandas as pd
import geopandas as gpd # working with geospatial data
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
from sedona.register import SedonaRegistrator # Sedona is a cluster computing system for processing large-scale spatial data
from sedona.utils import SedonaKryoRegistrator, KryoSerializer

In [2]:
# Details of the Spark session
spark = SparkSession. \
builder. \
appName('appName'). \
config("spark.serializer", KryoSerializer.getName). \
config("spark.executor.memory", "5g"). \
config("spark.driver.memory", "10g"). \
config('spark.driver.maxResultSize', '5g'). \
config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
config('spark.jars.packages',
           'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.2.0-incubating,org.datasyslab:geotools-wrapper:1.1.0-25.2'). \
getOrCreate()
SedonaRegistrator.registerAll(spark)

True

#### We will only use the information and data from Nuevo León

In [None]:
# Generate a readable database for Nuevo León's blocks
# Just needs to be ran the first time

import time

start_time = time.time()

path_scince = "../SCINCE 2020/"
Estados_SCINCE = ['19_NL']

for estado in Estados_SCINCE:
    print(f"Procesando el estado: {estado}")
    tabla_principal_poblacion = f"../SCINCE 2020/{estado}/cartografia/manzana.shp"

    TPP = gpd.read_file(tabla_principal_poblacion)
    Pandas_TPP = pd.DataFrame(TPP)
    Spark_TPP = spark.createDataFrame(Pandas_TPP)

    tablas_secundarias = ['caracteristicas_economicas','discapacidad','educacion','etnicidad','fecundidad','hogares_censales','migracion','mortalidad','religion','servicios_de_salud','situacion_conyugal','vivienda']

    df = None
    for tabla_sec in tablas_secundarias:
        tabla_secundaria = f"../SCINCE 2020/{estado}/tablas/cpv2020_manzana_{tabla_sec}.dbf"
        SEC = gpd.read_file(tabla_secundaria)
        Pandas_SEC = pd.DataFrame(SEC).drop(['geometry'], axis=1)
        Spark_SEC = spark.createDataFrame(Pandas_SEC)
        if df is None:
            df = Spark_TPP.join(Spark_SEC, on=['CVEGEO'], how='inner')
        else:
            df = df.join(Spark_SEC, on=['CVEGEO'], how='inner')
    
    print(f"Guardando el estado: {estado}")
    df.write.parquet(f"../DB_NL_Reto_v1/{estado}.parquet")
    print(f"Concluido estado: {estado}")
    
print("--- %s segundos ---" % (time.time() - start_time))

In [3]:
db_nl = spark.read.parquet(f"../DB_NL_Reto_v1/*.parquet")
db_nl.printSchema()

root
 |-- CVEGEO: string (nullable = true)
 |-- POB1: double (nullable = true)
 |-- POB2: double (nullable = true)
 |-- POB2_R: double (nullable = true)
 |-- POB4: double (nullable = true)
 |-- POB4_R: double (nullable = true)
 |-- POB5: double (nullable = true)
 |-- POB5_R: double (nullable = true)
 |-- POB6: double (nullable = true)
 |-- POB6_R: double (nullable = true)
 |-- POB7: double (nullable = true)
 |-- POB7_R: double (nullable = true)
 |-- POB8: double (nullable = true)
 |-- POB8_R: double (nullable = true)
 |-- POB9: double (nullable = true)
 |-- POB9_R: double (nullable = true)
 |-- POB10: double (nullable = true)
 |-- POB10_R: double (nullable = true)
 |-- POB11: double (nullable = true)
 |-- POB11_R: double (nullable = true)
 |-- POB12: double (nullable = true)
 |-- POB12_R: double (nullable = true)
 |-- POB13: double (nullable = true)
 |-- POB13_R: double (nullable = true)
 |-- POB14: double (nullable = true)
 |-- POB14_R: double (nullable = true)
 |-- POB15: double (nul

In [None]:
# Generate a readable database for Nuevo León's businesses
# Just needs to be ran the first time

start_time = time.time()

HX = gpd.read_file("../Mex-Hex-5k/Mex-Hex-5k.shp")

Estados_DENUE = ["19"]
for estado in Estados_DENUE:
    
    print(f"Procesando el estado: {estado}")
    if estado in ["15_1","15_2"] :
        tabla_denue = f"../DENUE_05_2022/denue_{estado}_shp/conjunto_de_datos/denue_inegi_{estado}.shp"
    else:
        tabla_denue = f"../DENUE_05_2022/denue_{estado}_shp/conjunto_de_datos/denue_inegi_{estado}_.shp"
    
    TD = gpd.read_file(tabla_denue)
    TD = TD.set_crs('epsg:4326', allow_override=True)
    Spark_TD = spark.createDataFrame(TD.to_crs(HX.crs))
    
    print(f"Guardando el estado: {estado}")
    Spark_TD.write.parquet(f"../DB_NL_DENUE_Reto_v1/{estado}.parquet")
    print(f"Concluido estado: {estado}")

    
print("--- %s segundos ---" % (time.time() - start_time))

In [4]:
db_nl_denue = spark.read.parquet(f"../DB_NL_DENUE_Reto_v1/*.parquet")
db_nl_denue.printSchema()

root
 |-- id: long (nullable = true)
 |-- clee: string (nullable = true)
 |-- nom_estab: string (nullable = true)
 |-- raz_social: string (nullable = true)
 |-- codigo_act: string (nullable = true)
 |-- nombre_act: string (nullable = true)
 |-- per_ocu: string (nullable = true)
 |-- tipo_vial: string (nullable = true)
 |-- nom_vial: string (nullable = true)
 |-- tipo_v_e_1: string (nullable = true)
 |-- nom_v_e_1: string (nullable = true)
 |-- tipo_v_e_2: string (nullable = true)
 |-- nom_v_e_2: string (nullable = true)
 |-- tipo_v_e_3: string (nullable = true)
 |-- nom_v_e_3: string (nullable = true)
 |-- numero_ext: string (nullable = true)
 |-- letra_ext: string (nullable = true)
 |-- edificio: string (nullable = true)
 |-- edificio_e: string (nullable = true)
 |-- numero_int: string (nullable = true)
 |-- letra_int: string (nullable = true)
 |-- tipo_asent: string (nullable = true)
 |-- nomb_asent: string (nullable = true)
 |-- tipoCenCom: string (nullable = true)
 |-- nom_CenCom: 

In [5]:
# Creates a temporary table that allows to run SQL queries
db_nl.createOrReplaceTempView("db_nl")

In [6]:
# Select variables that will be used for analysis

x=spark.sql("select POB11,POB14,POB21,POB23,DISC1,DISC7,DISC8,DISC9,DISC10,DISC11,DISC12,DISC13 from db_nl")

#x=spark.sql("select MIG7_R from db_nl")

In [7]:
x.show(30)

+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+
|POB11|POB14|POB21|POB23|DISC1|DISC7|DISC8|DISC9|DISC10|DISC11|DISC12|DISC13|
+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+
|  5.0|  3.0| 15.0|  5.0|  0.0|  0.0|  0.0|  0.0|   0.0|   0.0|   0.0|   0.0|
|  5.0|  4.0| 15.0|  5.0|  0.0|  0.0|  0.0|  0.0|   0.0|   0.0|   0.0|  -6.0|
| -8.0| -8.0| -8.0| -8.0| -8.0| -8.0| -8.0| -8.0|  -8.0|  -8.0|  -8.0|  -8.0|
| 28.0| 23.0| 59.0|  5.0| -6.0|  0.0| -6.0|  0.0|   0.0|   0.0|   0.0|   0.0|
| -6.0| -6.0| -6.0| -6.0| -6.0| -6.0| -6.0| -6.0|  -6.0|  -6.0|  -6.0|  -6.0|
|  7.0|  4.0| 23.0|  9.0| -6.0| -6.0|  0.0|  0.0|   0.0|   0.0|   0.0|   0.0|
|  6.0|  4.0| 12.0|  0.0|  0.0|  0.0|  0.0|  0.0|   0.0|   0.0|   0.0|   0.0|
|  5.0|  3.0| 16.0|  6.0| -6.0| -6.0| -6.0|  0.0|   0.0|  -6.0|   0.0|   0.0|
| 13.0| 14.0| 30.0|  3.0| -6.0| -6.0| -6.0|  0.0|   0.0|   0.0|  -6.0|  -6.0|
| -6.0|  4.0|  7.0|  0.0|  0.0|  0.0|  0.0|  0.0|   0.0|   0.0| 

In [8]:
x.count()

76995

In [None]:
# ### INCOMPLETE

# #!pip install flask_sqlalchemy

# import pandas as pd

# from flask_sqlalchemy import SQLAlchemy

# from sqlalchemy import create_engine

# engine=create_engine("parquet:///?URI=C:/SCINCE_Parquets/*.parquet")
# #x1=pd.read_parquet(f"../SCINCE_Parquets/*.parquet")
# x1=pd.read_sql(x,engine)
# x1.head()

In [11]:
# Get a CSV file with data
x.write.option("header","true").csv("/INEGI_NL_csv.csv")