In [1]:
import findspark
findspark.init()
import shapely
import pandas as pd
import geopandas as gpd
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from shapely import wkt


In [2]:
spark = SparkSession. \
builder. \
appName('appName'). \
config("spark.serializer", KryoSerializer.getName). \
config("spark.executor.memory", "5g"). \
config("spark.driver.memory", "10g"). \
config('spark.driver.maxResultSize', '5g'). \
config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
config('spark.jars.packages',
           'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.2.0-incubating,org.datasyslab:geotools-wrapper:1.1.0-25.2'). \
getOrCreate()
SedonaRegistrator.registerAll(spark)

True

In [3]:
### informacion de vivienda

In [4]:
BD_MANZANAS = spark.read.parquet(f"../SCINCE_Parquets/*.parquet")
BD_MANZANAS_EEVVV = BD_MANZANAS.select('CVEGEO', 'ECO1_R', 'VIV82_R', 'VIV83_R', 'VIV29_R','VIV36_R','VIV37_R','VIV35_R','VIV34_R','VIV33_R', 'geometry')
BD_MANZANAS_EEVVV.cache()
BD_MANZANAS_EEVVV.printSchema()
BD_MANZANAS_EEVVV.show()

root
 |-- CVEGEO: string (nullable = true)
 |-- ECO1_R: double (nullable = true)
 |-- VIV82_R: double (nullable = true)
 |-- VIV83_R: double (nullable = true)
 |-- VIV29_R: double (nullable = true)
 |-- VIV36_R: double (nullable = true)
 |-- VIV37_R: double (nullable = true)
 |-- VIV35_R: double (nullable = true)
 |-- VIV34_R: double (nullable = true)
 |-- VIV33_R: double (nullable = true)
 |-- geometry: string (nullable = true)

+----------------+------+-------+-------+-------+-------+-------+-------+-------+-------+--------------------+
|          CVEGEO|ECO1_R|VIV82_R|VIV83_R|VIV29_R|VIV36_R|VIV37_R|VIV35_R|VIV34_R|VIV33_R|            geometry|
+----------------+------+-------+-------+-------+-------+-------+-------+-------+-------+--------------------+
|1900100010021028|  -6.0|   -6.0|    0.0|   60.0|   80.0|   -6.0|   60.0|   -6.0|   80.0|POLYGON ((2659951...|
|1900100010021031|  37.5|   -6.0|    0.0|  100.0|   75.0|   -6.0|   -6.0|   75.0|  100.0|POLYGON ((2659516...|
|1900100010

In [5]:
BD_MANZANAS_EEVVV.createOrReplaceTempView("manzanas")

In [6]:
BD_MZA_EEVVV_CORREGIDO = spark.sql("""SELECT 
                                      CVEGEO,
                                      IF((ISNULL(ECO1_R) OR ECO1_R < 0), 0, ECO1_R) AS ECO1_R,
                                      IF((ISNULL(VIV82_R) OR VIV82_R < 0), 0, VIV82_R) AS VIV82_R,
                                      IF((ISNULL(VIV83_R) OR VIV83_R < 0), 0, VIV83_R) AS VIV83_R,
                                      IF((ISNULL(VIV29_R) OR VIV29_R < 0), 0, VIV29_R) AS VIV29_R,
                                      IF((ISNULL(VIV36_R) OR VIV36_R < 0), 0, VIV36_R) AS VIV36_R,
                                      IF((ISNULL(VIV37_R) OR VIV37_R < 0), 0, VIV37_R) AS VIV37_R,
                                      IF((ISNULL(VIV35_R) OR VIV35_R < 0), 0, VIV35_R) AS VIV35_R,
                                      IF((ISNULL(VIV34_R) OR VIV34_R < 0), 0, VIV34_R) AS VIV34_R,
                                      IF((ISNULL(VIV33_R) OR VIV33_R < 0), 0, VIV33_R) AS VIV33_R,
                                      geometry
                                     FROM manzanas""")

BD_MZA_EEVVV_CORREGIDO.cache()
BD_MZA_EEVVV_CORREGIDO.show()

+----------------+------+-------+-------+-------+-------+-------+-------+-------+-------+--------------------+
|          CVEGEO|ECO1_R|VIV82_R|VIV83_R|VIV29_R|VIV36_R|VIV37_R|VIV35_R|VIV34_R|VIV33_R|            geometry|
+----------------+------+-------+-------+-------+-------+-------+-------+-------+-------+--------------------+
|1900100010021028|   0.0|    0.0|    0.0|   60.0|   80.0|    0.0|   60.0|    0.0|   80.0|POLYGON ((2659951...|
|1900100010021031|  37.5|    0.0|    0.0|  100.0|   75.0|    0.0|    0.0|   75.0|  100.0|POLYGON ((2659516...|
|1900100010021036|   0.0|    0.0|    0.0|    0.0|    0.0|    0.0|    0.0|    0.0|    0.0|POLYGON ((2659395...|
|1900100010036005|  47.9|   54.5|    0.0|   72.7|  100.0|   54.5|   22.7|   36.4|   95.5|POLYGON ((2660345...|
|1900100010036019|   0.0|    0.0|    0.0|    0.0|    0.0|    0.0|    0.0|    0.0|    0.0|POLYGON ((2660117...|
|1900100010036028|  38.5|   58.3|    0.0|   66.7|   91.7|   25.0|   25.0|    0.0|  100.0|POLYGON ((2660120...|
|

In [7]:
### INFORMACIÓN DE POBLACIÓN

In [8]:
from pyspark.sql.functions import col, greatest, when, lit

# Agregar las variables al DataFrame
# POBLACION_MTY = spark.read.parquet(f"../SCINCE_Parquets/*.parquet")
POBLACION_MTY = BD_MANZANAS.select("CVEGEO", "POB1", "POB8", "POB9", "POB13", "POB30", "POB14","POB15", "POB16", "EDU34", "EDU37", "EDU43", "EDU46")

POBLACION_MTY.cache()
POBLACION_MTY.createOrReplaceTempView("poblacion")

# Aplicar las mismas transformaciones que hiciste anteriormente
POBLACION_MTY = spark.sql("""
    SELECT 
        CVEGEO,
        IF((ISNULL(POB1) OR POB1 < 0), 0, POB1) AS POB1,
        IF((ISNULL(POB8) OR POB8 < 0), 0, POB8) AS POB8,
        IF((ISNULL(POB9) OR POB9 < 0), 0, POB9) AS POB9,
        IF((ISNULL(POB13) OR POB13 < 0), 0, POB13) AS POB13,
        IF((ISNULL(POB30) OR POB30 < 0), 0, POB30) AS POB30,
        IF((ISNULL(POB14) OR POB14 < 0), 0, POB14) AS POB14,
        IF((ISNULL(POB15) OR POB15 < 0), 0, POB15) AS POB15,
        IF((ISNULL(POB16) OR POB16 < 0), 0, POB16) AS POB16,
        IF((ISNULL(EDU34) OR EDU34 < 0), 0, EDU34) AS EDU34,
        IF((ISNULL(EDU37) OR EDU37 < 0), 0, EDU37) AS EDU37,
        IF((ISNULL(EDU43) OR EDU43 < 0), 0, EDU43) AS EDU43,
        IF((ISNULL(EDU46) OR EDU46 < 0), 0, EDU46) AS EDU46
        
    FROM poblacion
""")

POBLACION_MTY.cache()

# Crear las columnas de población
POBLACION_MTY = POBLACION_MTY.withColumn("Menores_Edad", col("POB8") + col("POB9"))
POBLACION_MTY = POBLACION_MTY.withColumn("Adulto", col("POB13") + col("POB30"))
POBLACION_MTY = POBLACION_MTY.withColumn("Adulto_x", col("POB14"))
POBLACION_MTY = POBLACION_MTY.withColumn("Adulto_Mayor", col("POB15") + col("POB16"))
POBLACION_MTY = POBLACION_MTY.withColumn("Estudios_inc", col("EDU34") + col("EDU37") + col("EDU43"))
POBLACION_MTY = POBLACION_MTY.withColumn("Estudios_fin", col("EDU46"))
columns_to_drop = ["POB1", "POB8", "POB9", "POB13", "POB30", "POB31", "POB32", "POB33", "POB34", "POB15", "POB16", "EDU34", "EDU37", "EDU43", "EDU46"]
# Eliminar las columnas
POBLACION_MTY = POBLACION_MTY.drop(*columns_to_drop)

POBLACION_MTY.show()

+----------------+-----+------------+------+--------+------------+------------+------------+
|          CVEGEO|POB14|Menores_Edad|Adulto|Adulto_x|Adulto_Mayor|Estudios_inc|Estudios_fin|
+----------------+-----+------------+------+--------+------------+------------+------------+
|1900100010021028|  3.0|         8.0|   4.0|     3.0|         0.0|        15.0|         0.0|
|1900100010021031|  4.0|         0.0|   4.0|     4.0|         0.0|        12.0|         3.0|
|1900100010021036|  0.0|         0.0|   0.0|     0.0|         0.0|         0.0|         0.0|
|1900100010036005| 23.0|        29.0|  21.0|    23.0|        10.0|        52.0|         5.0|
|1900100010036019|  0.0|         0.0|   0.0|     0.0|         0.0|         0.0|         0.0|
|1900100010036028|  4.0|         9.0|   4.0|     4.0|         4.0|        19.0|         4.0|
|1900100010036029|  4.0|         0.0|   5.0|     4.0|         0.0|         8.0|         0.0|
|1900100010036030|  3.0|         4.0|   3.0|     3.0|         3.0|    

In [9]:
BD_MZA_EEVVV_CORREGIDO = BD_MZA_EEVVV_CORREGIDO.join(POBLACION_MTY, 'CVEGEO', 'inner')

# Mostrar el DataFrame resultante
BD_MANZANAS_EEVVV.show()

+----------------+------+-------+-------+-------+-------+-------+-------+-------+-------+--------------------+
|          CVEGEO|ECO1_R|VIV82_R|VIV83_R|VIV29_R|VIV36_R|VIV37_R|VIV35_R|VIV34_R|VIV33_R|            geometry|
+----------------+------+-------+-------+-------+-------+-------+-------+-------+-------+--------------------+
|1900100010021028|  -6.0|   -6.0|    0.0|   60.0|   80.0|   -6.0|   60.0|   -6.0|   80.0|POLYGON ((2659951...|
|1900100010021031|  37.5|   -6.0|    0.0|  100.0|   75.0|   -6.0|   -6.0|   75.0|  100.0|POLYGON ((2659516...|
|1900100010021036|  -8.0|   -8.0|   -8.0|   -8.0|   -8.0|   -8.0|   -8.0|   -8.0|   -8.0|POLYGON ((2659395...|
|1900100010036005|  47.9|   54.5|   -6.0|   72.7|  100.0|   54.5|   22.7|   36.4|   95.5|POLYGON ((2660345...|
|1900100010036019|  -6.0|   -6.0|   -6.0|   -6.0|   -6.0|   -6.0|   -6.0|   -6.0|   -6.0|POLYGON ((2660117...|
|1900100010036028|  38.5|   58.3|   -6.0|   66.7|   91.7|   25.0|   25.0|   -6.0|  100.0|POLYGON ((2660120...|
|

In [10]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

vecAssembler = VectorAssembler(inputCols=["ECO1_R", "VIV82_R", "VIV83_R", "VIV29_R", "VIV36_R", "VIV37_R", "VIV35_R", "VIV34_R", "VIV33_R"], outputCol="features")
BD_MZA_EEVVV_CORREGIDO_VEC = vecAssembler.transform(BD_MZA_EEVVV_CORREGIDO)
# BD_MZA_EEVVV_CORREGIDO_VEC.cache()
# BD_MZA_EEVVV_CORREGIDO_VEC.show()

In [11]:
kmeans = KMeans(k=5, seed=1, maxIter=250) 
model = kmeans.fit(BD_MZA_EEVVV_CORREGIDO_VEC.select('features'))
transformed = model.transform(BD_MZA_EEVVV_CORREGIDO_VEC)
transformed.show()

+----------------+------+-------+-------+-------+-------+-------+-------+-------+-------+--------------------+-----+------------+------+--------+------------+------------+------------+--------------------+----------+
|          CVEGEO|ECO1_R|VIV82_R|VIV83_R|VIV29_R|VIV36_R|VIV37_R|VIV35_R|VIV34_R|VIV33_R|            geometry|POB14|Menores_Edad|Adulto|Adulto_x|Adulto_Mayor|Estudios_inc|Estudios_fin|            features|prediction|
+----------------+------+-------+-------+-------+-------+-------+-------+-------+-------+--------------------+-----+------------+------+--------+------------+------------+------------+--------------------+----------+
|1900100010021028|   0.0|    0.0|    0.0|   60.0|   80.0|    0.0|   60.0|    0.0|   80.0|POLYGON ((2659951...|  3.0|         8.0|   4.0|     3.0|         0.0|        15.0|         0.0|(9,[3,4,6,8],[60....|         0|
|1900100010021031|  37.5|    0.0|    0.0|  100.0|   75.0|    0.0|    0.0|   75.0|  100.0|POLYGON ((2659516...|  4.0|         0.0|   

In [12]:
from pyspark.sql.functions import expr

# Convierte la columna 'geometry' de WKT a geometría
transformed.createOrReplaceTempView("result_kmeans")
# clusters = spark.sql(""" select prediction, 
#     mean(ECO1_R) as mean_econ_activa, 
#     mean(EDU46_R) as mean_educ_superior,
#     mean(VIV82_R) as mean_tv_paga,
#     mean(VIV83_R) as mean_streaming,
#     mean(VIV84_R) as mean_videojuegos,
#     count(*) as conteo
    # MEAN(Menores_Edad) as mean_Menor_Edad,
    # MEAN(Adulto) AS mean_Adulto,
    # MEAN(Adulto_x) AS mean_Adulto_x,

    # MEAN(Adulto_Mayor) AS mean_Adulto_Mayor,
    # MEAN(Estudios_inc) AS mean_Estudios_inc,
    # MEAN(Estudios_fin) AS mean_Estudios_fin,

# from result_kmeans group by prediction order by mean_econ_activa""")

clusters = spark.sql("""SELECT prediction, 
    MEAN(ECO1_R) AS mean_econ_activa, 
    MEAN(VIV82_R) AS mean_tv_paga,
    MEAN(VIV83_R) AS mean_streaming,
    MEAN(VIV29_R) AS mean_variable_29,
    MEAN(VIV36_R) AS mean_variable_36,
    MEAN(VIV37_R) AS mean_variable_37,
    MEAN(VIV35_R) AS mean_variable_35,
    MEAN(VIV34_R) AS mean_variable_34,
    MEAN(VIV33_R) AS mean_variable_33,

    COUNT(*) AS conteo
FROM result_kmeans 
GROUP BY prediction 
ORDER BY mean_econ_activa""")


# clusters.show()

In [13]:
df_manzanas = spark.sql("""SELECT ECO1_R as eco_act,
                                  VIV82_R as tv_paga,
                                  VIV83_R as stream,
                                  VIV29_R as var_29,
                                  VIV36_R as var_36,
                                  VIV37_R as var_37,
                                  VIV35_R as var_35,
                                  VIV34_R as var_34,
                                  VIV33_R as var_33,
                                  Menores_Edad,
                                  Adulto,
                                  Adulto_x,
                                  Adulto_Mayor,
                                  Estudios_fin,
                                  Estudios_inc,
                                  
                                  
                                  geometry,
                                  prediction 
                           FROM result_kmeans""").toPandas()

df_manzanas['geometry'] = df_manzanas['geometry'].apply(wkt.loads)

geopandas_df_grid = gpd.GeoDataFrame(df_manzanas, geometry="geometry")

archivo = 'manzanas_kmeans'
geopandas_df_grid.to_file(f"./resultados/{archivo}.shp")
crs='PROJCS["Mexico_ITRF2008_LCC",GEOGCS["Mexico_ITRF2008",DATUM["Mexico_ITRF2008",SPHEROID["GRS_1980",6378137,298.257222101],TOWGS84[0,0,0,0,0,0,0]],PRIMEM["Greenwich",0],UNIT["Degree",0.0174532925199433]],PROJECTION["Lambert_Conformal_Conic_2SP",AUTHORITY["EPSG","9802"]],PARAMETER["Central_Meridian",-102],PARAMETER["Latitude_Of_Origin",12],PARAMETER["False_Easting",2500000],PARAMETER["False_Northing",0],PARAMETER["Standard_Parallel_1",17.5],PARAMETER["Standard_Parallel_2",29.5],PARAMETER["Scale_Factor",1],UNIT["Meter",1,AUTHORITY["EPSG","9001"]],AUTHORITY["EPSG","6372"]]'
PRJ_file = open(f"./resultados/{archivo}.prj","w")
PRJ_file.write(crs)
PRJ_file.close()

  geopandas_df_grid.to_file(f"./resultados/{archivo}.shp")


In [14]:
BD_DENUE = spark.read.parquet(f"../DENUE_Parquets/*.parquet")
BD_DENUE.createOrReplaceTempView("denue")
BD_DENUE.cache()
BD_DENUE.count()


186092

In [15]:
transformed = transformed.withColumn('geometry', expr("ST_GeomFromWKT(geometry)"))
transformed.createOrReplaceTempView("result_kmeans")


In [16]:
!pip install pyproj



In [17]:
# from shapely.geometry import Point
# cfe = pd.read_csv('Localizaciondecentrosdeatencion.csv')
# #cfe['geometry'] = cfe.apply(lambda row: Point(row['Latitud'], row['Longitud']), axis=1)
# # Convertir el DataFrame a un GeoDataFrame
# cfe_spark = spark.createDataFrame(cfe)
# # Registrar el DataFrame de Spark como una vista temporal en Spark SQL
# cfe_spark.createOrReplaceTempView("cfe")
# cfe_spark = cfe_spark.withColumn("geometry", STPOINT(col("Latitud"),col("Longitud")))

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.functions import col
import pyspark.sql.functions as F
import pyproj

cfe = spark.read.csv('Localizaciondecentrosdeatencion (2).csv', header=True, inferSchema=True)

# Define el CRS de entrada (WGS 84) y el CRS de salida (por ejemplo, UTM Zone 14N)
crs_entrada = pyproj.CRS("EPSG:4326")  # WGS 84 (Latitud/Longitud)
crs_salida = 'PROJCS["Mexico_ITRF2008_LCC",GEOGCS["Mexico_ITRF2008",DATUM["Mexico_ITRF2008",SPHEROID["GRS_1980",6378137,298.257222101],TOWGS84[0,0,0,0,0,0,0]],PRIMEM["Greenwich",0],UNIT["Degree",0.0174532925199433]],PROJECTION["Lambert_Conformal_Conic_2SP",AUTHORITY["EPSG","9802"]],PARAMETER["Central_Meridian",-102],PARAMETER["Latitude_Of_Origin",12],PARAMETER["False_Easting",2500000],PARAMETER["False_Northing",0],PARAMETER["Standard_Parallel_1",17.5],PARAMETER["Standard_Parallel_2",29.5],PARAMETER["Scale_Factor",1],UNIT["Meter",1,AUTHORITY["EPSG","9001"]],AUTHORITY["EPSG","6372"]]'

# Crea un transformador de coordenadas
transformador = pyproj.Transformer.from_crs(crs_entrada, crs_salida, always_xy=True)

# Define una función UDF (User Defined Function) para aplicar la transformación
def transformar_coordenadas(latitud, longitud):
    x, y = transformador.transform(longitud, latitud)
    return f"POINT ({x:.8f} {y:.8f})"

# Agrega una nueva columna 'geometry' con las coordenadas transformadas
cfe = cfe.withColumn("geometry", F.udf(transformar_coordenadas)(col("Latitud"), col("Longitud")))
from pyspark.sql.functions import expr

# Convierte la columna 'geometry' en objetos geométricos
cfe = cfe.withColumn("geometry", expr("ST_GeomFromText(geometry)"))
# Muestra el DataFrame resultante
cfe.show()

# Registrar el DataFrame de Spark como una vista temporal en Spark SQL
cfe.createOrReplaceTempView("cfe")

+----------+--------------------+--------------------+--------------------------------------------------+-----------------------------------+-----------+------------+--------------------+
|   Estado |          Municipio |     Nombre del CAC |Dirección (Calle, Numero, Colonia, Codigo Postal) |Cuenta con CFEmatico 24hrs (Si -No)|    Latitud|    Longitud|            geometry|
+----------+--------------------+--------------------+--------------------------------------------------+-----------------------------------+-----------+------------+--------------------+
|Nuevo León|               China|               CHINA|                              Calle Aldama no. ...|                                 SI|25.70518112|-99.23457336|POINT (2776222.16...|
|Nuevo León|        Montemorelos|        MONTEMORELOS|                              Calle Hidalgo no....|                                 SI|25.18851089|-99.82679749|POINT (2717936.54...|
|Nuevo León|             Linares|             LINARES|      

In [18]:
Influencia_Gasolineras = spark.sql("""SELECT 
    CVEGEO,
    ECO1_R as eco_act,
    VIV82_R as tv_paga,
    VIV83_R as stream,
    VIV29_R as var_29,
    VIV36_R as var_36,
    VIV37_R as var_37,
    VIV35_R as var_35,
    VIV34_R as var_34,
    VIV33_R as var_33,
    Menores_Edad as men_ed,
    Adulto as ad,
    Adulto_x as adx,
    Adulto_Mayor as ad_m,
    Estudios_fin as e_fin,
    Estudios_inc as e_inc,
    result_kmeans.geometry,
    prediction,
    COUNT(*) as n_gas
FROM result_kmeans, denue
WHERE (denue.codigo_act IN ('468411') AND ST_Intersects(result_kmeans.geometry, ST_Buffer(denue.geometry, 1500)))
GROUP BY CVEGEO, eco_act, tv_paga, stream, var_29, var_36, var_37, var_35, var_34, var_33, men_ed,
                                  ad, adx,
                                  ad_m,
                                  e_fin,
                                  e_inc,result_kmeans.geometry, prediction;
""").toPandas()


Influencia_Gasolineras_SHP = gpd.GeoDataFrame(Influencia_Gasolineras, geometry="geometry")
archivo = 'manzanas_kmeans_gasolineras'
Influencia_Gasolineras_SHP.to_file(f"./resultados/{archivo}.shp")
crs='PROJCS["Mexico_ITRF2008_LCC",GEOGCS["Mexico_ITRF2008",DATUM["Mexico_ITRF2008",SPHEROID["GRS_1980",6378137,298.257222101],TOWGS84[0,0,0,0,0,0,0]],PRIMEM["Greenwich",0],UNIT["Degree",0.0174532925199433]],PROJECTION["Lambert_Conformal_Conic_2SP",AUTHORITY["EPSG","9802"]],PARAMETER["Central_Meridian",-102],PARAMETER["Latitude_Of_Origin",12],PARAMETER["False_Easting",2500000],PARAMETER["False_Northing",0],PARAMETER["Standard_Parallel_1",17.5],PARAMETER["Standard_Parallel_2",29.5],PARAMETER["Scale_Factor",1],UNIT["Meter",1,AUTHORITY["EPSG","9001"]],AUTHORITY["EPSG","6372"]]'
PRJ_file = open(f"./resultados/{archivo}.prj","w")
PRJ_file.write(crs)
PRJ_file.close()

In [19]:
Influencia_Oxxos = spark.sql("""SELECT 
    CVEGEO,
    ECO1_R as eco_act,
    VIV82_R as tv_paga,
    VIV83_R as stream,
    VIV29_R as var_29,
    VIV36_R as var_36,
    VIV37_R as var_37,
    VIV35_R as var_35,
    VIV34_R as var_34,
    VIV33_R as var_33,
    Menores_Edad as men_ed,
    Adulto as ad,
    Adulto_x as adx,
    Adulto_Mayor as ad_m,
    Estudios_fin as e_fin,
    Estudios_inc as e_inc,
    result_kmeans.geometry,
    prediction,
    COUNT(*) as n_oxxos
FROM result_kmeans, denue
WHERE (denue.codigo_act in ('462112') and
                            LOWER(nom_estab) like '%oxxo%' and
                            LOWER(nom_estab) not like '%distribuc%' and ST_Intersects(result_kmeans.geometry, ST_Buffer(denue.geometry, 1500)))
GROUP BY CVEGEO, CVEGEO, eco_act, tv_paga, stream, var_29, var_36, var_37, var_35, var_34, var_33, men_ed,
                                  ad, adx,
                                  ad_m,
                                  e_fin,
                                  e_inc,result_kmeans.geometry, prediction;
""").toPandas()

Influencia_oxxos_SHP = gpd.GeoDataFrame(Influencia_Oxxos, geometry="geometry")
archivo = 'manzanas_kmeans_oxxos'
Influencia_Gasolineras_SHP.to_file(f"./resultados/{archivo}.shp")
crs='PROJCS["Mexico_ITRF2008_LCC",GEOGCS["Mexico_ITRF2008",DATUM["Mexico_ITRF2008",SPHEROID["GRS_1980",6378137,298.257222101],TOWGS84[0,0,0,0,0,0,0]],PRIMEM["Greenwich",0],UNIT["Degree",0.0174532925199433]],PROJECTION["Lambert_Conformal_Conic_2SP",AUTHORITY["EPSG","9802"]],PARAMETER["Central_Meridian",-102],PARAMETER["Latitude_Of_Origin",12],PARAMETER["False_Easting",2500000],PARAMETER["False_Northing",0],PARAMETER["Standard_Parallel_1",17.5],PARAMETER["Standard_Parallel_2",29.5],PARAMETER["Scale_Factor",1],UNIT["Meter",1,AUTHORITY["EPSG","9001"]],AUTHORITY["EPSG","6372"]]'
PRJ_file = open(f"./resultados/{archivo}.prj","w")
PRJ_file.write(crs)
PRJ_file.close()

In [20]:
consulta_cfe = spark.sql("""
    SELECT 
        CVEGEO,
        ECO1_R as eco_act,
        VIV82_R as tv_paga,
        VIV83_R as stream,
        VIV29_R as var_29,
        VIV36_R as var_36,
        VIV37_R as var_37,
        VIV35_R as var_35,
        VIV34_R as var_34,
        VIV33_R as var_33,
        Menores_Edad as men_ed,
        Adulto as ad,
        Adulto_x as ad_x,
        Adulto_Mayor as ad_m,
        Estudios_fin as e_fin,
        Estudios_inc as e_inc,
        result_kmeans.geometry,
        prediction,
        COUNT(*) as n_cfe
    FROM cfe,result_kmeans
    WHERE (ST_Intersects(result_kmeans.geometry, ST_Buffer(cfe.geometry, 1500)))
    GROUP BY CVEGEO, eco_act, tv_paga, stream, var_29, var_36, var_37, var_35, var_34, var_33, men_ed,
                                  ad,ad_x,
                                  ad_m,
                                  e_fin,
                                  e_inc,result_kmeans.geometry, prediction
""").toPandas()

Influencia_cfe_SHP = gpd.GeoDataFrame(consulta_cfe, geometry="geometry")
archivo = 'manzanas_kmeans_cfe'
Influencia_Gasolineras_SHP.to_file(f"./resultados/{archivo}.shp")
crs='PROJCS["Mexico_ITRF2008_LCC",GEOGCS["Mexico_ITRF2008",DATUM["Mexico_ITRF2008",SPHEROID["GRS_1980",6378137,298.257222101],TOWGS84[0,0,0,0,0,0,0]],PRIMEM["Greenwich",0],UNIT["Degree",0.0174532925199433]],PROJECTION["Lambert_Conformal_Conic_2SP",AUTHORITY["EPSG","9802"]],PARAMETER["Central_Meridian",-102],PARAMETER["Latitude_Of_Origin",12],PARAMETER["False_Easting",2500000],PARAMETER["False_Northing",0],PARAMETER["Standard_Parallel_1",17.5],PARAMETER["Standard_Parallel_2",29.5],PARAMETER["Scale_Factor",1],UNIT["Meter",1,AUTHORITY["EPSG","9001"]],AUTHORITY["EPSG","6372"]]'
PRJ_file = open(f"./resultados/{archivo}.prj","w")
PRJ_file.write(crs)
PRJ_file.close()


In [21]:
consulta_escuelas = spark.sql("""SELECT 
    CVEGEO,
    ECO1_R as eco_act,
    VIV82_R as tv_paga,
    VIV83_R as stream,
    VIV29_R as var_29,
    VIV36_R as var_36,
    VIV37_R as var_37,
    VIV35_R as var_35,
    VIV34_R as var_34,
    VIV33_R as var_33,
    Menores_Edad as men_ed,
    Adulto as ad,
    Adulto_x as adx,
    Adulto_Mayor as ad_m,
    Estudios_fin as e_fin,
    Estudios_inc as e_inc,
    result_kmeans.geometry,
    prediction,
    COUNT(*) as n_escuelas
FROM result_kmeans, denue
WHERE ((denue.codigo_act in ('611111') or denue.codigo_act in ('611112') or denue.codigo_act in ('611121') or denue.codigo_act in ('611122') or
        denue.codigo_act in ('611161') or denue.codigo_act in ('611162') or denue.codigo_act in ('611131') or denue.codigo_act in ('611132') or
        denue.codigo_act in ('611311') or denue.codigo_act in ('611312')) and
                           ST_Intersects(result_kmeans.geometry, ST_Buffer(denue.geometry, 1500)))
GROUP BY CVEGEO, CVEGEO, eco_act, tv_paga, stream, var_29, var_36, var_37, var_35, var_34, var_33, men_ed,
                                  ad, adx,
                                  ad_m,
                                  e_fin,
                                  e_inc,result_kmeans.geometry, prediction;
""").toPandas()

Influencia_escuelas_SHP = gpd.GeoDataFrame(consulta_escuelas, geometry="geometry")
archivo = 'manzanas_kmeans_escuelas'
Influencia_escuelas_SHP.to_file(f"./resultados/{archivo}.shp")
crs='PROJCS["Mexico_ITRF2008_LCC",GEOGCS["Mexico_ITRF2008",DATUM["Mexico_ITRF2008",SPHEROID["GRS_1980",6378137,298.257222101],TOWGS84[0,0,0,0,0,0,0]],PRIMEM["Greenwich",0],UNIT["Degree",0.0174532925199433]],PROJECTION["Lambert_Conformal_Conic_2SP",AUTHORITY["EPSG","9802"]],PARAMETER["Central_Meridian",-102],PARAMETER["Latitude_Of_Origin",12],PARAMETER["False_Easting",2500000],PARAMETER["False_Northing",0],PARAMETER["Standard_Parallel_1",17.5],PARAMETER["Standard_Parallel_2",29.5],PARAMETER["Scale_Factor",1],UNIT["Meter",1,AUTHORITY["EPSG","9001"]],AUTHORITY["EPSG","6372"]]'
PRJ_file = open(f"./resultados/{archivo}.prj","w")
PRJ_file.write(crs)
PRJ_file.close()

In [22]:
Influencia_super = spark.sql("""SELECT 
    CVEGEO,
    ECO1_R as eco_act,
    VIV82_R as tv_paga,
    VIV83_R as stream,
    VIV29_R as var_29,
    VIV36_R as var_36,
    VIV37_R as var_37,
    VIV35_R as var_35,
    VIV34_R as var_34,
    VIV33_R as var_33,
    Menores_Edad as men_ed,
    Adulto as ad,
    Adulto_x as adx,
    Adulto_Mayor as ad_m,
    Estudios_fin as e_fin,
    Estudios_inc as e_inc,
    result_kmeans.geometry,
    prediction,
    COUNT(*) as n_super_merc
FROM result_kmeans, denue
WHERE (denue.codigo_act in ('462111') and
                            (LOWER(nom_estab) like '%wal mart%' or LOWER(nom_estab) like '%walmart%' or LOWER(nom_estab) like '%soriana%' or LOWER(nom_estab) like '%aurrera%' or LOWER(nom_estab) like '%heb%' )and
                            LOWER(nom_estab) not like '%operad%' and ST_Intersects(result_kmeans.geometry, ST_Buffer(denue.geometry, 700)))
GROUP BY CVEGEO, CVEGEO, eco_act, tv_paga, stream, var_29, var_36, var_37, var_35, var_34, var_33, men_ed,
                                  ad, adx,
                                  ad_m,
                                  e_fin,
                                  e_inc,result_kmeans.geometry, prediction;
""").toPandas()

Influencia_super_SHP = gpd.GeoDataFrame(Influencia_super, geometry="geometry")
archivo = 'manzanas_kmeans_merc'
Influencia_super_SHP.to_file(f"./resultados/{archivo}.shp")
crs='PROJCS["Mexico_ITRF2008_LCC",GEOGCS["Mexico_ITRF2008",DATUM["Mexico_ITRF2008",SPHEROID["GRS_1980",6378137,298.257222101],TOWGS84[0,0,0,0,0,0,0]],PRIMEM["Greenwich",0],UNIT["Degree",0.0174532925199433]],PROJECTION["Lambert_Conformal_Conic_2SP",AUTHORITY["EPSG","9802"]],PARAMETER["Central_Meridian",-102],PARAMETER["Latitude_Of_Origin",12],PARAMETER["False_Easting",2500000],PARAMETER["False_Northing",0],PARAMETER["Standard_Parallel_1",17.5],PARAMETER["Standard_Parallel_2",29.5],PARAMETER["Scale_Factor",1],UNIT["Meter",1,AUTHORITY["EPSG","9001"]],AUTHORITY["EPSG","6372"]]'
PRJ_file = open(f"./resultados/{archivo}.prj","w")
PRJ_file.write(crs)
PRJ_file.close()



  Influencia_super_SHP.to_file(f"./resultados/{archivo}.shp")


In [23]:
Influencia_Gasolineras = spark.createDataFrame(Influencia_Gasolineras) 
Influencia_Gasolineras.createOrReplaceTempView("Influencia_Gasolineras")
merged_df = spark.sql("""
    SELECT r.*, COALESCE(f.n_gas, 0) as n_gas
    FROM result_kmeans r
    LEFT JOIN (
        SELECT CVEGEO, n_gas
        FROM Influencia_Gasolineras
    ) f
    ON r.CVEGEO = f.CVEGEO
""")


In [24]:
# Realiza el merge con todas las columnas de merged_df y solo dos columnas de otro_df
Influencia_Oxxos = spark.createDataFrame(Influencia_Oxxos) 
Influencia_Oxxos.createOrReplaceTempView("Influencia_Oxxos")
# Influencia_Oxxos.select("CVGEO", "n_oxxos")
merged_df2 = merged_df.join(Influencia_Oxxos.select("CVEGEO", "n_oxxos"), on="CVEGEO", how="left")


In [25]:
consulta_cfe = spark.createDataFrame(consulta_cfe) 
consulta_cfe.createOrReplaceTempView("consulta_cfe")

In [26]:
merged_df3 = merged_df2.join(consulta_cfe.select("CVEGEO", "n_cfe"), on="CVEGEO", how="left")


In [27]:
consulta_escuelas = spark.createDataFrame(consulta_escuelas) 
consulta_escuelas.createOrReplaceTempView("consulta_escuelas")
merged_df4 = merged_df3.join(consulta_escuelas.select("CVEGEO", "n_escuelas"), on="CVEGEO", how="left")

In [28]:
Influencia_super = spark.createDataFrame(Influencia_super) 
Influencia_super.createOrReplaceTempView("Influencia_super")
merged_df5 = merged_df4.join(Influencia_super.select("CVEGEO", "n_super_merc"), on="CVEGEO", how="left")
merged_df5.show()

+----------------+------+-------+-------+-------+-------+-------+-------+-------+-------+--------------------+-----+------------+------+--------+------------+------------+------------+--------------------+----------+-----+-------+-----+----------+------------+
|          CVEGEO|ECO1_R|VIV82_R|VIV83_R|VIV29_R|VIV36_R|VIV37_R|VIV35_R|VIV34_R|VIV33_R|            geometry|POB14|Menores_Edad|Adulto|Adulto_x|Adulto_Mayor|Estudios_inc|Estudios_fin|            features|prediction|n_gas|n_oxxos|n_cfe|n_escuelas|n_super_merc|
+----------------+------+-------+-------+-------+-------+-------+-------+-------+-------+--------------------+-----+------------+------+--------+------------+------------+------------+--------------------+----------+-----+-------+-----+----------+------------+
|1900100010021028|   0.0|    0.0|    0.0|   60.0|   80.0|    0.0|   60.0|    0.0|   80.0|POLYGON ((2659951...|  3.0|         8.0|   4.0|     3.0|         0.0|        15.0|         0.0|(9,[3,4,6,8],[60....|         0| 

In [29]:
merged_df5_pd = merged_df5.drop("features").toPandas()

In [30]:
merged_df5_pd.describe()

Unnamed: 0,ECO1_R,VIV82_R,VIV83_R,VIV29_R,VIV36_R,VIV37_R,VIV35_R,VIV34_R,VIV33_R,POB14,...,Adulto_x,Adulto_Mayor,Estudios_inc,Estudios_fin,prediction,n_gas,n_oxxos,n_cfe,n_escuelas,n_super_merc
count,76995.0,76995.0,76995.0,76995.0,76995.0,76995.0,76995.0,76995.0,76995.0,76995.0,...,76995.0,76995.0,76995.0,76995.0,76995.0,76995.0,70713.0,11519.0,74721.0,18458.0
mean,48.563283,37.393876,21.134904,46.838831,73.659784,53.894941,43.586551,36.532107,76.161529,20.861108,...,20.861108,9.518683,36.691629,11.916034,2.395078,4.723904,10.766846,1.00026,33.288125,1.178134
std,26.865603,28.628429,25.562076,34.059008,38.689089,36.451919,33.856667,32.165729,39.252741,25.452192,...,25.452192,12.262504,44.074716,21.559161,1.324758,4.591915,9.370031,0.016137,23.905205,0.416924
min,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,1.0,1.0
25%,43.1,0.0,0.0,0.0,75.0,0.0,0.0,0.0,84.6,3.0,...,3.0,0.0,4.0,0.0,1.0,1.0,4.0,1.0,13.0,1.0
50%,58.1,41.0,12.3,50.0,92.9,64.0,50.0,34.0,96.3,15.0,...,15.0,6.0,24.0,4.0,3.0,3.0,9.0,1.0,30.0,1.0
75%,66.2,58.6,36.4,75.0,100.0,84.6,72.7,61.5,100.0,31.0,...,31.0,14.0,56.0,15.0,4.0,8.0,15.0,1.0,49.0,1.0
max,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,2803.0,...,2803.0,310.0,4105.0,910.0,4.0,33.0,86.0,2.0,148.0,4.0


In [31]:
for i in ['n_oxxos', 'n_cfe',
       'n_escuelas', 'n_super_merc']:
    merged_df5_pd[i] = merged_df5_pd[i].fillna(0)

In [32]:
merged_df5_pd.describe()

Unnamed: 0,ECO1_R,VIV82_R,VIV83_R,VIV29_R,VIV36_R,VIV37_R,VIV35_R,VIV34_R,VIV33_R,POB14,...,Adulto_x,Adulto_Mayor,Estudios_inc,Estudios_fin,prediction,n_gas,n_oxxos,n_cfe,n_escuelas,n_super_merc
count,76995.0,76995.0,76995.0,76995.0,76995.0,76995.0,76995.0,76995.0,76995.0,76995.0,...,76995.0,76995.0,76995.0,76995.0,76995.0,76995.0,76995.0,76995.0,76995.0,76995.0
mean,48.563283,37.393876,21.134904,46.838831,73.659784,53.894941,43.586551,36.532107,76.161529,20.861108,...,20.861108,9.518683,36.691629,11.916034,2.395078,4.723904,9.888382,0.149646,32.304981,0.282434
std,26.865603,28.628429,25.562076,34.059008,38.689089,36.451919,33.856667,32.165729,39.252741,25.452192,...,25.452192,12.262504,44.074716,21.559161,1.324758,4.591915,9.450963,0.356836,24.214495,0.542816
min,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,43.1,0.0,0.0,0.0,75.0,0.0,0.0,0.0,84.6,3.0,...,3.0,0.0,4.0,0.0,1.0,1.0,3.0,0.0,11.0,0.0
50%,58.1,41.0,12.3,50.0,92.9,64.0,50.0,34.0,96.3,15.0,...,15.0,6.0,24.0,4.0,3.0,3.0,8.0,0.0,29.0,0.0
75%,66.2,58.6,36.4,75.0,100.0,84.6,72.7,61.5,100.0,31.0,...,31.0,14.0,56.0,15.0,4.0,8.0,14.0,0.0,48.0,0.0
max,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,2803.0,...,2803.0,310.0,4105.0,910.0,4.0,33.0,86.0,2.0,148.0,4.0


In [33]:
merged_df5_pd['Service'] = merged_df5_pd['n_super_merc']
merged_df5_pd.loc[merged_df5_pd['Service'] >1,'Service'] = 1

In [34]:
Influencia_final_SHP = gpd.GeoDataFrame(merged_df5_pd, geometry="geometry")
Influencia_final_SHP.dtypes

archivo = 'manzanas_kmeans_final_v2'
Influencia_final_SHP.to_file(f"./resultados/{archivo}.shp")
crs='PROJCS["Mexico_ITRF2008_LCC",GEOGCS["Mexico_ITRF2008",DATUM["Mexico_ITRF2008",SPHEROID["GRS_1980",6378137,298.257222101],TOWGS84[0,0,0,0,0,0,0]],PRIMEM["Greenwich",0],UNIT["Degree",0.0174532925199433]],PROJECTION["Lambert_Conformal_Conic_2SP",AUTHORITY["EPSG","9802"]],PARAMETER["Central_Meridian",-102],PARAMETER["Latitude_Of_Origin",12],PARAMETER["False_Easting",2500000],PARAMETER["False_Northing",0],PARAMETER["Standard_Parallel_1",17.5],PARAMETER["Standard_Parallel_2",29.5],PARAMETER["Scale_Factor",1],UNIT["Meter",1,AUTHORITY["EPSG","9001"]],AUTHORITY["EPSG","6372"]]'
PRJ_file = open(f"./resultados/{archivo}.prj","w")
PRJ_file.write(crs)
PRJ_file.close()


  Influencia_final_SHP.to_file(f"./resultados/{archivo}.shp")


In [35]:
merged_df5_pd.to_csv('merged5.csv')

In [37]:
merged_df5_pd[merged_df5_pd['n_super_merc']=0]

SyntaxError: invalid syntax (2080469731.py, line 1)