In [19]:
# Importar Bibliotecas
import pyspark.sql.functions as F
import os
import pandas as pd
import time
from geopy.geocoders import Nominatim
from pyspark.sql import SparkSession

In [3]:
"""Crea una sesión de Spark con el nombre de la aplicación "Enriquecimiento de Geolocalización con Coordenadas", 
habilitando el soporte para Hive, y la inicializa o reutiliza si ya existe"""
spark = SparkSession \
    .builder \
    .appName("Enriquecimiento de Geolocalización con Coordenadas") \
    .enableHiveSupport() \
    .getOrCreate()

24/10/22 01:39:30 WARN Utils: Your hostname, felipe-Nitro-AN515-44 resolves to a loopback address: 127.0.1.1; using 192.168.15.10 instead (on interface wlp5s0)
24/10/22 01:39:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/22 01:39:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/22 01:39:31 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/10/22 01:39:31 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
24/10/22 01:39:31 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
24/10/22 01:39:31 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
24/10/22 01:39:31 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attemp

In [9]:
# Leer el archivo parquet de geolocalización de Correios
geo = spark.read.parquet('/home/felipe/despliegue_analytica/files_parquet/geolocation_correios.parquet')
# Leer el archivo parquet que contiene coordenadas geográficas
coords_df = spark.read.parquet('/home/felipe/despliegue_analytica/files_parquet/geo_coords.parquet')

In [14]:
"""Se seleccionan las columnas 'cep_prefix', 'city', y 'uf' del DataFrame 'geo' y se eliminan los duplicados.
Luego, se realiza una unión interna (inner join) con el DataFrame 'coords_df' que contiene las coordenadas, 
utilizando la columna 'cep_prefix' como clave de unión para garantizar que solo se mantengan los registros 
que coinciden en ambos DataFrames."""
geo_final = geo.select('cep_prefix','city', 'uf')\
   .dropDuplicates()\
   .join(
        coords_df,
        on=["cep_prefix"],
        how="inner"
    )

In [17]:
geo_final.show()

                                                                                

+----------+-----------------+---+-----------+-----------+
|cep_prefix|             city| uf|        lat|        lon|
+----------+-----------------+---+-----------+-----------+
|     64091|         teresina| PI| -5.0874608|-42.8049571|
|     64250|  domingos mourao| PI| -4.2544732|-41.2722903|
|     64250|  domingos mourao| PI| -4.2544732|-41.2722903|
|     64400|         amarante| PI| -6.2459374|-42.8476238|
|     64420|       palmeirais| PI| -5.9761427|-43.0637158|
|     64607|            picos| PI| -7.0823544|-41.4685053|
|     66912|            belem| PA|   -1.45056|-48.4682453|
|     66912|            belem| PA|   -1.45056|-48.4682453|
|     68627|      paragominas| PA|   -2.99564|-47.3548942|
|     69042|           manaus| AM| -3.1316333|-59.9825041|
|     69928|placido de castro| AC|-10.3239154|-67.1824196|
|     69928|placido de castro| AC|-10.3239154|-67.1824196|
|     70719|         brasilia| DF|-15.7934036|-47.8823172|
|     70719|         brasilia| DF|-15.7934036|-47.882317

In [22]:
"""Se convierten las columnas 'lat' y 'lon' del DataFrame 'geo_final' al tipo de dato 'double' 
para garantizar mayor precisión al trabajar con coordenadas geográficas, asegurando que los valores 
de latitud y longitud se almacenen como números de punto flotante."""
geo_final = geo_final\
.withColumn('lat', F.col('lat').cast('double'))\
.withColumn('lon', F.col('lon').cast('double'))

In [23]:
# Mostrar el esquema del DataFrame final
geo_final.printSchema()

root
 |-- cep_prefix: string (nullable = true)
 |-- city: string (nullable = true)
 |-- uf: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)



In [21]:
# Escribir el DataFrame resultante en un archivo parquet con las coordenadas de geolocalización
geo_final.write.parquet(f'/home/felipe/despliegue_analytica/files_parquet/geolocation_correios_coords.parquet', mode='overwrite')

                                                                                