# Limpieza de datos 'Quien es quien' en los precios PROFECO

En este notebook vamos a limpiar datos de una serie de archivos CSV y los vamos a guardar en archivos Parquet, particionados por catálogo y año. Los datos crudos se encuentran en carpetas de S3. 

Los datos originales no tienen encabezados, por lo que los añadimos manualmente. 

Para facilitar el trabajo, y la detección de errores, bajamos cada conjunto de datos en un dataframe por año, y hacemos las operaciones de limpieza y transformación en cada uno de los dataframes. 

In [2]:
%%info

UsageError: Cell magic `%%info` not found.


## Configuramos el entorno de SPARK

In [None]:
%%configure -f
{ "conf":{
          "spark.pyspark.python": "python",
          "spark.pyspark.virtualenv.enabled": "true",
          "spark.pyspark.virtualenv.type":"native",
          "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
         }
}

UsageError: Cell magic `%%configure` not found.


In [None]:
# Spark
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.window import Window

# Tipos de datos
from pyspark.sql.types import (
    StringType, FloatType, IntegerType, DateType, StructType, StructField
)

# Funciones de PySpark
from pyspark.sql.functions import (
    col, lit, lower, trim, regexp_replace, udf
)

# Otros
import unicodedata
from functools import reduce
import re



Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1746307366755_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
spark = SparkSession.builder \
    .appName("profeco_completo") \
    .getOrCreate()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Ruta base
ruta_base = "s3://itam-analytics-sofia/profeco/raw_descomprimidos"

# Lista de años
anios = list(range(2018, 2025))

# Columnas del dataset
columnas = [
    "producto", "presentacion", "marca", "tipo", "catalogo",
    "precio", "fecha", "tienda", "tipo_tienda", "sucursal",
    "direccion", "estado", "ciudad", "latitud", "longitud"
]

# Crear variables df_2018, df_2019, etc.
for anio in anios:
    ruta = f"{ruta_base}/{anio}/*.csv"
    #Leemos los CSV de cada año, y lo guardamos en un dataframe temporal
    df_tmp = spark.read.csv(ruta, header=False, inferSchema=False)
    df_tmp = df_tmp.toDF(*columnas)
    
    # Cast de tipos
    df_tmp = df_tmp.withColumn("precio", col("precio").cast(FloatType()))
    df_tmp = df_tmp.withColumn("latitud", col("latitud").cast(FloatType()))
    df_tmp = df_tmp.withColumn("longitud", col("longitud").cast(FloatType()))
    df_tmp = df_tmp.withColumn("fecha", col("fecha").cast(DateType()))
    
    # Añadir columna de AÑO
    df_tmp = df_tmp.withColumn("anio", lit(anio))
    
    # Asignar nombre a la variable globalmente (df_2018, df_2019, ...)
    globals()[f"df_{anio}"] = df_tmp




FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Verifica que las variables df_2018, df_2019, ..., df_2024 existan
for anio in range(2018, 2025):
    print(f"df_{anio} existe:", f"df_{anio}" in globals())


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

df_2018 existe: True
df_2019 existe: True
df_2020 existe: True
df_2021 existe: True
df_2022 existe: True
df_2023 existe: True
df_2024 existe: True

In [None]:
for anio in range(2018, 2025):
    print(f"\n--- Esquema de df_{anio} ---")
    globals()[f"df_{anio}"].printSchema()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


--- Esquema de df_2018 ---
root
 |-- producto: string (nullable = true)
 |-- presentacion: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- catalogo: string (nullable = true)
 |-- precio: float (nullable = true)
 |-- fecha: date (nullable = true)
 |-- tienda: string (nullable = true)
 |-- tipo_tienda: string (nullable = true)
 |-- sucursal: string (nullable = true)
 |-- direccion: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- ciudad: string (nullable = true)
 |-- latitud: float (nullable = true)
 |-- longitud: float (nullable = true)
 |-- anio: integer (nullable = false)


--- Esquema de df_2019 ---
root
 |-- producto: string (nullable = true)
 |-- presentacion: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- catalogo: string (nullable = true)
 |-- precio: float (nullable = true)
 |-- fecha: date (nullable = true)
 |-- tienda: string (nullable = true)
 |--

In [None]:
for anio in range(2018, 2025):
    print(f"\n--- Datos de df_{anio} ---")
    globals()[f"df_{anio}"].show(5, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


--- Datos de df_2018 ---
+---------+--------------------------------------------------------------------------+--------------+-------------------+-----------------+------+----------+---------+----------------------+---------------------------------+------------------------------------------------------------------+--------------+--------------+--------+----------+----+
|producto |presentacion                                                              |marca         |tipo               |catalogo         |precio|fecha     |tienda   |tipo_tienda           |sucursal                         |direccion                                                         |estado        |ciudad        |latitud |longitud  |anio|
+---------+--------------------------------------------------------------------------+--------------+-------------------+-----------------+------+----------+---------+----------------------+---------------------------------+--------------------------------------------------------

## Limpieza de los datos: 

- Eliminamos las columnas que no nos interesan.
- Estandarizamos las columnas tipo string: 
    - Convertimos a minúsculas
    - Eliminamos acentos y caracteres especiales
    - Eliminamos dobles espacios

In [None]:
columnas_a_eliminar = ['latitud', 'longitud', 'direccion', 'presentacion', 'tienda', 'tipo_tienda', 'sucursal']

df_2018_limpio = df_2018.drop(*columnas_a_eliminar)
df_2019_limpio = df_2019.drop(*columnas_a_eliminar)
df_2020_limpio = df_2020.drop(*columnas_a_eliminar)
df_2021_limpio = df_2021.drop(*columnas_a_eliminar)
df_2022_limpio = df_2022.drop(*columnas_a_eliminar)
df_2023_limpio = df_2023.drop(*columnas_a_eliminar)
df_2024_limpio = df_2024.drop(*columnas_a_eliminar)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
df_2024.printSchema()
df_2024.show(3, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- producto: string (nullable = true)
 |-- presentacion: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- catalogo: string (nullable = true)
 |-- precio: float (nullable = true)
 |-- fecha: date (nullable = true)
 |-- tienda: string (nullable = true)
 |-- tipo_tienda: string (nullable = true)
 |-- sucursal: string (nullable = true)
 |-- direccion: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- ciudad: string (nullable = true)
 |-- latitud: float (nullable = true)
 |-- longitud: float (nullable = true)
 |-- anio: integer (nullable = false)

+---------------+---------------------------------------------------+----------------------------------------+---------------------+-----------------+------+----------+------+---------------------------+--------------------------------------+------------------------------------------------------------------------------------------+--------------+--------------+--------+

In [None]:
import unicodedata
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

# Función para limpiar texto: quitar acentos, pasar a minúsculas y eliminar dobles espacios
def limpiar_texto(texto):
    if texto is None:
        return None
    texto = unicodedata.normalize('NFKD', texto)  # Normaliza acentos
    texto = texto.encode('ASCII', 'ignore').decode('utf-8')  # Elimina acentos
    texto = texto.lower()  # Minúsculas
    texto = ' '.join(texto.split())  # Quita espacios múltiples
    return texto

# Registrar como UDF en Spark
limpiar_udf = udf(limpiar_texto, StringType())


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Columnas de texto a limpiar
columnas_texto = ['producto', 'marca', 'tipo', 'catalogo', 'estado', 'ciudad']

# Aplica la limpieza a cada DataFrame por año
for anio in range(2018, 2025):
    df = globals()[f"df_{anio}_limpio"]
    for columna in columnas_texto:
        df = df.withColumn(columna, limpiar_udf(col(columna)))
    globals()[f"df_{anio}_limpio"] = df  # actualiza el DataFrame en el entorno global


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Limpieza de columnas ESTADO y CIUDAD

- Nos aseguramos que en la columna 'estado' sólo estén los nombres de los 32 estados de la República Mexicana. 
- Nos aseguramos que en la columna 'ciudad' sólo estén los nombres de ciudades Mexicanas. 

In [None]:
for anio in range(2018, 2025):
    print(f"\n--- ESTADOS ÚNICOS en {anio} ---")
    df = globals()[f"df_{anio}_limpio"]
    estados_unicos = df.select("estado").distinct().orderBy("estado").collect()
    for fila in estados_unicos:
        print(fila["estado"])
    
    print(f"\n--- CIUDADES ÚNICAS en {anio} ---")
    ciudades_unicas = df.select("ciudad").distinct().orderBy("ciudad").collect()
    for fila in ciudades_unicas:
        print(fila["ciudad"])



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


--- ESTADOS ?NICOS en 2018 ---
aguascalientes
baja california
baja california sur
campeche
chiapas
chihuahua
ciudad de mexico
coahuila de zaragoza
col. eduardo guerra
colima
durango
estado de mexico
guanajuato
guerrero
hidalgo
jalisco
michoacan de ocampo
morelos
mz. 54
nayarit
nuevo leon
oaxaca
puebla
queretaro
quintana roo
san luis potosi
sinaloa
sonora
tabasco
tamaulipas
tlaxcala
veracruz
yucatan
zacatecas

--- CIUDADES ?NICAS en 2018 ---
acapulco de juarez
agua prieta
aguascalientes
alvaro obregon
apizaco
apodaca
atizapan
atizapan de zaragoza
azcapotzalco
benito juarez
boca del rio
campeche
centro
chihuahua
coacalco
colima
coyoacan
cp. 27280"
cuauhtemoc
cuautitlan
cuautitlan izcalli
cuernavaca
culiacan
durango
ecatepec
ensenada
general escobedo
gomez palacio
guadalajara
guadalupe
guasave
gustavo a. madero
hermosillo
huixquilucan
iztacalco
iztapalapa
juarez
la paz
la piedad
leon
lerma
los cabos
lt. 1
magdalena contreras
matamoros
mazatlan
merida
metepec
mexicali
miguel hidalgo
miner

In [None]:
# Lista de valores inválidos
estados_invalidos = [
    "mz. 54", 
    "col. eduardo guerra", 
    "col. centro", 
    "entre jose maria bustamante y ricardo flores magon", 
    "esq. 19 norte"
]

ciudades_invalidas = [
    "lt. 1", 
    "col. centro", 
    "col. jesus garcia", 
    'cp. 27280"', 
    'cp. 72000"'
]

# Aplicar filtro a todos los DataFrames de 2018 a 2024
for anio in range(2018, 2025):
    df = globals()[f"df_{anio}_limpio"]
    
    df = df.filter(~col("estado").isin(estados_invalidos))
    df = df.filter(~col("ciudad").isin(ciudades_invalidas))
    
    globals()[f"df_{anio}_limpio"] = df


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Verificarlo

for anio in range(2018, 2025):
    df = globals()[f"df_{anio}_limpio"]
    
    print(f"\n--- ESTADOS ÚNICOS en {anio} ---")
    estados = df.select("estado").distinct().orderBy("estado").collect()
    for fila in estados:
        print(fila["estado"])
    
    print(f"\n--- CIUDADES ÚNICAS en {anio} ---")
    ciudades = df.select("ciudad").distinct().orderBy("ciudad").collect()
    for fila in ciudades:
        print(fila["ciudad"])


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


--- ESTADOS ?NICOS en 2018 ---
aguascalientes
baja california
baja california sur
campeche
chiapas
chihuahua
ciudad de mexico
coahuila de zaragoza
colima
durango
estado de mexico
guanajuato
guerrero
hidalgo
jalisco
michoacan de ocampo
morelos
nayarit
nuevo leon
oaxaca
puebla
queretaro
quintana roo
san luis potosi
sinaloa
sonora
tabasco
tamaulipas
tlaxcala
veracruz
yucatan
zacatecas

--- CIUDADES ?NICAS en 2018 ---
acapulco de juarez
agua prieta
aguascalientes
alvaro obregon
apizaco
apodaca
atizapan
atizapan de zaragoza
azcapotzalco
benito juarez
boca del rio
campeche
centro
chihuahua
coacalco
colima
coyoacan
cuauhtemoc
cuautitlan
cuautitlan izcalli
cuernavaca
culiacan
durango
ecatepec
ensenada
general escobedo
gomez palacio
guadalajara
guadalupe
guasave
gustavo a. madero
hermosillo
huixquilucan
iztacalco
iztapalapa
juarez
la paz
la piedad
leon
lerma
los cabos
magdalena contreras
matamoros
mazatlan
merida
metepec
mexicali
miguel hidalgo
mineral de la reforma
monterrey
morelia
naucalpan

In [None]:
from pyspark.sql.functions import lit

#Añadir la columna "anio" a cada DataFrame
df_2018_limpio = df_2018_limpio.withColumn("anio", lit(2018))
df_2019_limpio = df_2019_limpio.withColumn("anio", lit(2019))
df_2020_limpio = df_2020_limpio.withColumn("anio", lit(2020))
df_2021_limpio = df_2021_limpio.withColumn("anio", lit(2021))
df_2022_limpio = df_2022_limpio.withColumn("anio", lit(2022))
df_2023_limpio = df_2023_limpio.withColumn("anio", lit(2023))
df_2024_limpio = df_2024_limpio.withColumn("anio", lit(2024))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Subir datos a S3
- Guardamos los dataframes como uno sólo
- Escribimos el dataframe en formato parquet particionado por año y catálogo. 

In [None]:
df_todo = df_2018_limpio.unionByName(df_2019_limpio)\
                        .unionByName(df_2020_limpio)\
                        .unionByName(df_2021_limpio)\
                        .unionByName(df_2022_limpio)\
                        .unionByName(df_2023_limpio)\
                        .unionByName(df_2024_limpio)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
df_todo.write.mode("overwrite") \
    .partitionBy("catalogo", "anio") \
    .parquet("s3://itam-analytics-sofia/profeco/parquet/", compression="snappy")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
#Verificamos que se haya guardado correctamente. 
df_verificacion = spark.read.parquet("s3://itam-analytics-sofia/profeco/parquet/")
df_verificacion.printSchema()
df_verificacion.select("catalogo", "anio").distinct().show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- producto: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- precio: float (nullable = true)
 |-- fecha: date (nullable = true)
 |-- estado: string (nullable = true)
 |-- ciudad: string (nullable = true)
 |-- catalogo: string (nullable = true)
 |-- anio: integer (nullable = true)

+-----------------+----+
|         catalogo|anio|
+-----------------+----+
|          basicos|2024|
|          basicos|2018|
|          basicos|2020|
|          basicos|2021|
|          basicos|2019|
|          basicos|2022|
|          basicos|2023|
|      aeropuertos|2019|
|     medicamentos|2020|
|     medicamentos|2024|
|     medicamentos|2023|
|     medicamentos|2019|
|     medicamentos|2018|
|     medicamentos|2021|
|     medicamentos|2022|
|electrodomesticos|2023|
| utiles escolares|2019|
| utiles escolares|2018|
|electrodomesticos|2022|
|electrodomesticos|2021|
+-----------------+----+
only showing top 20 rows