## ETL con el cluster

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName('profeco_etl') \
    .getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1746299663530_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
from pyspark.sql.functions import count as count_, lit, sum as sum_, round, row_number, desc, sum as sum_, countDistinct, when, lower, trim, count, avg, first
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Carga el CSV en Spark

In [None]:
schema = StructType([
    StructField('producto', StringType(), True),
    StructField('presentacion', StringType(), True),
    StructField('marca', StringType(), True),
    StructField('categoria', StringType(), True),
    StructField('catalogo', StringType(), True),
    StructField('precio', FloatType(), True),
    StructField('fecharegistro', StringType(), True),
    StructField('cadenacomercial', StringType(), True),
    StructField('giro', StringType(), True),
    StructField('nombrecomercial', StringType(), True),
    StructField('direccion', StringType(), True),
    StructField('estado', StringType(), True),
    StructField('municipio', StringType(), True),
    StructField('latitud', StringType(), True),
    StructField('longitud', StringType(), True),
    StructField('anio', IntegerType(), True)    
])


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Cargamos los datos de S3, el URI del bucket es s3://itam-analytics-grb/profeco/clean/
# anio es int, precio es float. Todo lo demás podemos tratarlo como string
df_profeco = spark.read \
    .option('header', True) \
    .schema(schema) \
    .csv('s3://itam-analytics-grb/profeco/clean/')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Vamos a guardar en parquet particionado por catalogo y anio. Checamos que existen
df_profeco.select("catalogo", "anio").show(5)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+----+
|catalogo|anio|
+--------+----+
|juguetes|2018|
|juguetes|2018|
|juguetes|2018|
|juguetes|2018|
|juguetes|2018|
+--------+----+
only showing top 5 rows

### Guarda el CSV como parquet en S3, particionalo por catalogo. (Utiliza todos los trucos que consideres).

In [None]:
# Guardamos en parquet, comprimido con snappy
df_profeco.write \
    .mode('overwrite') \
    .option('compression','snappy') \
    .partitionBy('catalogo', 'anio') \
    .parquet('s3://itam-analytics-grb/profeco/parquet/')


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Borramos el dataset, ya que lo vamos a cargar con parquet
del df_profeco

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Carga el parquet en Spark

In [None]:
# Cargamos el parquet. 
df_parquet = spark.read.parquet("s3://itam-analytics-grb/profeco/parquet/")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## PREGUNTAS DE TODOS LOS CATÁLOGOS: 
### Contesta las siguientes preguntas utilizando PySpark. Realiza el siguiente análisis (por año) y sobre todos los catálogos.

### 1. ¿Cuántos catálogos diferentes tenemos?

In [None]:
# 1. ¿Cuántos catálogos diferentes tenemos?

conteo_anio_cat = df_parquet.groupBy('anio', 'catalogo').count().orderBy('anio')

# Creamos un dataset que contiene el conteo por año (Esto nos permitirá realizar un conteo de catalogos por año)
catalogos_por_anio = conteo_anio_cat.select("anio", "catalogo") \
    .groupBy("anio") \
    .agg(count_("catalogo").alias("conteo_anio"))

# Hacemos un join
conteo_anio_cat = conteo_anio_cat \
    .join(catalogos_por_anio, on="anio", how="left") \
    .orderBy('anio') \
    .withColumnRenamed('count', 'cont_anio_catalogo')

# Guardamos la salida (Vamos a usarlo más adelante)
conteo_anio_cat.coalesce(1).write \
    .mode("overwrite") \
    .option("header", True) \
    .csv("s3://itam-analytics-grb/profeco/salidas/p1/")


# Imprimimos el output
conteo_anio = conteo_anio_cat \
    .select('anio', 'conteo_anio') \
    .distinct() \
    .orderBy('anio')


print("El numero de catalogos por anio puede observarse en la siguiente tabla:")
conteo_anio.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

El numero de catalogos por anio puede observarse en la siguiente tabla:
+----+-----------+
|anio|conteo_anio|
+----+-----------+
|2018|         10|
|2019|         11|
|2020|         10|
|2021|         10|
|2022|         12|
|2023|         11|
|2024|         10|
+----+-----------+

### 2. ¿Cuáles son las 20 categorías por catálogo con más observaciones?

In [None]:
# 2. ¿Cuáles son las 20 categorías por catálogo con más observaciones?
# Guarda la salida de este query en tu bucket de S3, lo necesitaremos más adelante."

# Generamos dataframe con conteo por categoria
conteo_categoria_anio = df_parquet.groupBy('anio', 'catalogo', 'categoria') \
    .count() \
    .withColumnRenamed('count', 'cont_anio_categoria')

# Unimos con dataframe 
conteo_categoria_anio = conteo_categoria_anio \
    .join(conteo_anio_cat, on=['anio', 'catalogo'], how = 'left') \
    .select('anio', 'catalogo', 'categoria', 'cont_anio_catalogo', 'cont_anio_categoria') \
    .orderBy('anio', 'catalogo', 'cont_anio_categoria')

# Guardamos dataframe completo
conteo_categoria_anio.coalesce(1).write \
    .mode("overwrite") \
    .option("header", True) \
    .csv("s3://itam-analytics-grb/profeco/salidas/p2/completo")

# Creamoslas dataframe con 20 categórias solamente

# Creamos ventana para realizar un orden de mayor a menor en cont_anio_categoria 
ventana = Window.partitionBy("anio", "catalogo").orderBy(desc("cont_anio_categoria"))

# Aplicamos ventana, nos quedamos con el top20 de categorías por catalogo y anio
conteo_top_20 = conteo_categoria_anio \
    .withColumn('orden_categoria', row_number().over(ventana)) \
    .filter(col('orden_categoria') <= 20) \
    .orderBy('anio','catalogo', 'orden_categoria')

# Guardamos dataframe con el top 20
conteo_categoria_anio.coalesce(1).write \
    .mode("overwrite") \
    .option("header", True) \
    .csv("s3://itam-analytics-grb/profeco/salidas/p2/top_20") 

conteo_top_20.show(25)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+------------------+--------------------+------------------+-------------------+---------------+
|anio|          catalogo|           categoria|cont_anio_catalogo|cont_anio_categoria|orden_categoria|
+----+------------------+--------------------+------------------+-------------------+---------------+
|2018|           basicos|arts. para el cui...|           8804761|            1159575|              1|
|2018|           basicos|detergentes y pro...|           8804761|             669682|              2|
|2018|           basicos|  derivados de leche|           8804761|             668873|              3|
|2018|           basicos| refrescos envasados|           8804761|             636955|              4|
|2018|           basicos|carnes frias seca...|           8804761|             593642|              5|
|2018|           basicos|         condimentos|           8804761|             526847|              6|
|2018|           basicos|frutas y legumbre...|           8804761|             4792

### 3. ¿Tenemos datos de todos los estados del país? De no ser así, ¿cuáles faltan?

In [None]:
# 3. ¿Tenemos datos de todos los estados del país? De no ser así, ¿cuáles faltan?
# No existe Distrito Federal, ya se llama Ciudad de México en todas las bases. Hay 32 valores distintos de estado

# Creamos un dataframe con el conteo de observaciones por estado y anio. 
obs_estado = df_parquet \
    .groupBy('estado', 'anio') \
    .count() \
    .orderBy('estado', 'anio') \
    .withColumnRenamed('count','cont_est_anio')

# Contamos en cuantos años aparece un estado
obs_anios_est = obs_estado \
    .select('estado', 'anio') \
    .distinct() \
    .groupBy('estado') \
    .agg(countDistinct('anio').alias('num_anios_est')) \
    .orderBy('estado') 

# Agregamos columna que diga "completo" o "faltante" dependiendo de si hay observaciones en los 7 años o no.
obs_anios_est = obs_anios_est.withColumn('estatus', 
                                        when(col('num_anios_est')==7, 'completo').otherwise('faltante')
                                        )

# Guardamos dataframes
obs_estado.coalesce(1).write \
    .mode("overwrite") \
    .option("header", True) \
    .csv("s3://itam-analytics-grb/profeco/salidas/p3/obs_estado") 

obs_anios_est.coalesce(1).write \
    .mode("overwrite") \
    .option("header", True) \
    .csv("s3://itam-analytics-grb/profeco/salidas/p3/obs_anios_est") 

print("Existen 32 entidades federativas y existe informacion de 7 anios (2018-2024). En la siguiente tabla se muestra el numero de anios en los que existe informacion de cada entidad federativa.\nEn ella se observa que falta informacion de Colima y Nayarit en cuatro anios.")
obs_anios_est.show(35)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Existen 32 entidades federativas y existe informacion de 7 anios (2018-2024). En la siguiente tabla se muestra el numero de anios en los que existe informacion de cada entidad federativa.
En ella se observa que falta informacion de Colima y Nayarit en cuatro anios.
+--------------------+-------------+--------+
|              estado|num_anios_est| estatus|
+--------------------+-------------+--------+
|      aguascalientes|            7|completo|
|     baja california|            7|completo|
| baja california sur|            7|completo|
|            campeche|            7|completo|
|             chiapas|            7|completo|
|           chihuahua|            7|completo|
|    ciudad de méxico|            7|completo|
|coahuila de zaragoza|            7|completo|
|              colima|            3|faltante|
|             durango|            7|completo|
|    estado de méxico|            7|completo|
|          guanajuato|            7|completo|
|            guerrero|            7|completo

In [None]:
# 4. ¿Cuántas observaciones tenemos por estado?
# Ya tenemos este dataframe.


# Guardamos de nuevo por sanidad mental
obs_estado.coalesce(1).write \
    .mode("overwrite") \
    .option("header", True) \
    .csv("s3://itam-analytics-grb/profeco/salidas/p4/obs_estado")

# Generamos dataframe por estado con suma de cont_est_anio
obs_estado_tota = obs_estado \
    .select('estado','cont_est_anio') \
    .groupBy('estado') \
    .agg(sum_('cont_est_anio').alias('total_estado'))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+----+-------------+
|             estado|anio|cont_est_anio|
+-------------------+----+-------------+
|     aguascalientes|2018|       235847|
|     aguascalientes|2019|       289973|
|     aguascalientes|2020|       216598|
|     aguascalientes|2021|       240772|
|     aguascalientes|2022|       291820|
|     aguascalientes|2023|       313809|
|     aguascalientes|2024|        79726|
|    baja california|2018|       619093|
|    baja california|2019|       165344|
|    baja california|2020|       195002|
|    baja california|2021|       104216|
|    baja california|2022|       160203|
|    baja california|2023|       209715|
|    baja california|2024|        60414|
|baja california sur|2018|       163771|
|baja california sur|2019|       135558|
|baja california sur|2020|       244874|
|baja california sur|2021|       283821|
|baja california sur|2022|       282548|
|baja california sur|2023|       291569|
+-------------------+----+-------------+
only showing top

In [None]:
# 5. De cada estado obten: el número de catalogos diferentes por año, ¿ha aumentado el número de catálogos con el tiempo? 
# Vamos a contar también el número de categorias, para reralizar un análisis más extenso

# Primero creamos dataframe de numero de catalogos por estado y anio
num_catalogo_estado = df_parquet \
    .select('anio', 'estado', 'catalogo') \
    .groupBy('anio', 'estado') \
    .agg(countDistinct('catalogo').alias('num_catalogos')) \
    .orderBy('estado', 'anio')

# Después creamos dataframe de número de categorias por estado y anio
num_categoria_estado = df_parquet \
    .select('anio', 'estado', 'categoria') \
    .groupBy('anio', 'estado') \
    .agg(countDistinct('categoria').alias('num_categorias')) \
    .orderBy('estado', 'anio')

# Realizamos join
df_completo =  num_categoria_estado \
    .join(num_catalogo_estado, on=['anio', 'estado'], how='inner') \
    .orderBy('estado', 'anio')

# Guardamos el dataframe
df_completo.coalesce(1).write \
    .mode("overwrite") \
    .option("header", True) \
    .csv("s3://itam-analytics-grb/profeco/salidas/p5/metricas_estado") 

df_completo.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------------------+--------------+-------------+
|anio|             estado|num_categorias|num_catalogos|
+----+-------------------+--------------+-------------+
|2018|     aguascalientes|            41|            9|
|2019|     aguascalientes|            41|           11|
|2020|     aguascalientes|            41|            9|
|2021|     aguascalientes|            41|           10|
|2022|     aguascalientes|            41|           12|
|2023|     aguascalientes|            41|           11|
|2024|     aguascalientes|            42|            9|
|2018|    baja california|            41|           10|
|2019|    baja california|            41|           10|
|2020|    baja california|            41|            9|
|2021|    baja california|            41|            9|
|2022|    baja california|            41|           11|
|2023|    baja california|            41|           11|
|2024|    baja california|            42|           10|
|2018|baja california sur|            41|       

## PREGUNTAS CON DATAFRAME DE MEDICAMENTOS
### Utilizando Spark contesta las siguientes preguntas a partir del catálogo que le tocó a tu equipo. Recuerda trabajar en el archivo con los datos particionados de otra manera tus queries van a tardar mucho.

El catálogo asignado a nuestro equipo es "Medicamentos". Este catálogo muestra un bajo número de valores distintos en el campo 'marca' debido a que más del 90% de las observaciones tiene como valor 's/m', que es probable que correspondan a medicamentos genéricos. Debido a lo anterior, en su lugar las preguntas planteadas originalmente se harán en función del campo producto.

La tabla de abajo muestra nuestra justificación.

In [None]:
df_parquet_meds = spark.read.parquet('s3://itam-analytics-grb/profeco/parquet/catalogo=medicamentos/')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
df_justificacion = df_parquet_meds \
    .groupBy('marca') \
    .count()

num_obs= df_parquet_meds \
    .select('marca') \
    .count()

df_justificacion = df_justificacion.withColumn(
    "porcentaje", round(col("count") / lit(num_obs) * 100, 2)
)

df_justificacion.show(40)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------+----------+
|               marca|   count|porcentaje|
+--------------------+--------+----------+
|               dalux|   68765|      0.36|
|inhala care  ó ho...|   21623|      0.11|
|           sin marca|  236323|      1.23|
|               hands|    2529|      0.01|
|                zuum|  105307|      0.55|
|          zuum. klin|   18959|       0.1|
|               aurax|      72|       0.0|
|            farmacom|   11732|      0.06|
|             curitas|   29303|      0.15|
|            vitascom|   30272|      0.16|
|     cualquier marca|  189570|      0.99|
|         quality day|   39406|      0.21|
|           supra med|    9055|      0.05|
|              jumper|     256|       0.0|
|              conair|   11117|      0.06|
|         safety mask|       1|       0.0|
|            dr. simi|      36|       0.0|
|              fregón|      35|       0.0|
|        alfa medical|    8062|      0.04|
|             soriana|   26029|      0.14|
|          

### 1. ¿Cuántos productos diferentes tiene tu categoria?

In [None]:
# Número de productos por total
productos_total = df_parquet_meds \
    .select('producto') \
    .distinct() \
    .count()

print(f"En el dataset de medicamentos existen {productos_total} productos distintos, sin distincion de anio.")


# Número de productos por anio
df_marcas_anio = df_parquet_meds \
    .groupBy('anio') \
    .agg(countDistinct('producto').alias('num_producto')) \
    .orderBy('anio')


print("\nPor anio, la evolucion de productos distintos se muestra en la tabla de abajo.")
df_marcas_anio.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

En el dataset de medicamentos existen 382 productos distintos, sin distincion de anio.

Por anio, la evolucion de productos distintos se muestra en la tabla de abajo.
+----+------------+
|anio|num_producto|
+----+------------+
|2018|         355|
|2019|         356|
|2020|         382|
|2021|         380|
|2022|         380|
|2023|         380|
|2024|         379|
+----+------------+

### 2. ¿Cuál es el producto con mayor precio? ¿En qué estado?

In [None]:
df_mayor_precio = df_parquet_meds \
    .select('estado', 'producto', 'precio', 'anio') \
    .orderBy(desc('precio'))

print("El producto con el mayor precio es Arava, el cual tiene un precio de 5272.0")
print("Este producto se vendió a ese precio en Baja California, Guerrero y Chihuahua en 2024")
df_mayor_precio.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

El producto con el mayor precio es Arava, el cual tiene un precio de 5272.0
Este producto se vendi? a ese precio en Baja California, Guerrero y Chihuahua en 2024
+--------------------+--------+------+----+
|              estado|producto|precio|anio|
+--------------------+--------+------+----+
|     baja california|   arava|5275.0|2024|
|            guerrero|   arava|5275.0|2024|
|           chihuahua|   arava|5275.0|2024|
|     baja california|   arava|5275.0|2024|
|     baja california|   arava|5275.0|2024|
|     baja california|   arava|5275.0|2024|
|            guerrero|   arava|5275.0|2024|
|           chihuahua|   arava|5275.0|2024|
|            guerrero|   arava|5275.0|2024|
|           chihuahua|   arava|5275.0|2024|
|           chihuahua|   arava|5275.0|2024|
|           querétaro|   arava|5040.0|2024|
|           chihuahua|   arava|5040.0|2024|
|          tamaulipas|   arava|5040.0|2024|
|      aguascalientes|   arava|5040.0|2024|
|          nuevo león|   arava|5040.0|2024|
|c

### 3. ¿Cuál es la marca con menor precio en CDMX?

In [None]:

# Creamoslas ventana para asignar un número a cada estado
ventana = Window.orderBy('estado')

# Cramos dataframe con una observacion por estado y un número asignado a cada estado (ciudad de méxico ews 7)
df_ents = df_parquet_meds.select('estado').distinct() \
    .withColumn('ent', row_number().over(ventana))

# Agregamos la columna ent (que es un número que identifica al estado) al df original
df_parquet_meds = df_parquet_meds.join(ents, on='estado', how='left')


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Ordenamos de menor a mayor
df_menor_precio_cdmx = df_parquet_meds \
    .filter(col('ent') == 7) \
    .select('estado', 'producto', 'precio', 'anio') \
    .orderBy('precio')
# Resultado
print("El articulo con menor precio en Ciudad de México es 'Ampliron Duo' con un precio de 5 pesos en 2023")
df_menor_precio_cdmx.show(1)

# Por anio
ventana = Window.partitionBy('anio').orderBy('precio')

df_menor_precio_cdmx = df_parquet_meds \
    .filter(col('ent') == 7) \
    .select('estado', 'producto', 'precio', 'anio') \
    .withColumn('orden', row_number().over(ventana)) \
    .filter(col('orden') == 1) \
    .orderBy('anio')

print("\nPor anio podemos observar el producto con menor precio, tambien")
df_menor_precio_cdmx.show(7)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

El articulo con menor precio en Ciudad de M?xico es 'Ampliron Duo' con un precio de 5 pesos en 2023
+----------------+------------+------+----+
|          estado|    producto|precio|anio|
+----------------+------------+------+----+
|ciudad de méxico|ampliron duo|   5.0|2023|
+----------------+------------+------+----+
only showing top 1 row


Por anio podemos observar el producto con menor precio, tambien
+----------------+--------------------+------+----+-----+
|          estado|            producto|precio|anio|orden|
+----------------+--------------------+------+----+-----+
|ciudad de méxico|         paracetamol|   7.0|2018|    1|
|ciudad de méxico|        acido folico|   7.0|2019|    1|
|ciudad de méxico|    guantes de látex|  5.21|2020|    1|
|ciudad de méxico|   gel antibacterial|   5.4|2021|    1|
|ciudad de méxico|        acido folico|  7.05|2022|    1|
|ciudad de méxico|        ampliron duo|   5.0|2023|    1|
|ciudad de méxico|clorfenamina comp...|  10.0|2024|    1|
+----------

### 4. ¿Cuál es el producto con mayor número de observaciones?

In [None]:
df_conteo = df_parquet_meds \
    .groupBy('producto', 'anio') \
    .agg(count('*').alias('num_obs'))

ventana = Window.partitionBy('anio').orderBy(col('num_obs').desc())

df_mas_obs = df_conteo \
    .withColumn('orden', row_number().over(ventana)) \
    .filter(col('orden') == 1) \
    .orderBy('anio')


print('Por anio el producto con mayor numero de observaciones se muestra en la siguente tabla')
print('\nLa pandemia de Covid-19 tuvo un efecto importante en este ranking.')
df_mas_obs.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Por anio el producto con mayor numero de observaciones se muestra en la siguente tabla

La pandemia de Covid-19 tuvo un efecto importante en este ranking.
+-----------------+----+-------+-----+
|         producto|anio|num_obs|orden|
+-----------------+----+-------+-----+
|           cialis|2018|  33544|    1|
|         micardis|2019|  25812|    1|
|           tabcin|2020|  31549|    1|
|gel antibacterial|2021|  45671|    1|
|gel antibacterial|2022|  52479|    1|
|gel antibacterial|2023|  55492|    1|
|gel antibacterial|2024|  14015|    1|
+-----------------+----+-------+-----+

### 5. ¿Cuáles son el top 5 de marcas con mayor precio en cada estado? ¿Son diferentes?

In [None]:
# Calculamos el precio promedio por producto, estado y anio (para no repetir productos)
precio_prom = df_parquet_meds.groupBy('estado', 'anio', 'producto') \
    .agg(avg('precio').alias('precio_prom'))

# ventana
ventana = Window.partitionBy('estado', 'anio').orderBy(col('precio_prom').desc())

# Creamos dataframe de top 5 productos más caros por estado, año 
df_top5_estado_anio = precio_prom \
    .withColumn('orden', row_number().over(ventana)) \
    .filter(col('orden') <= 5) \
    .orderBy('estado', 'anio', 'orden')

df_top5_estado_anio.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+----+--------+------------------+-----+
|        estado|anio|producto|       precio_prom|orden|
+--------------+----+--------+------------------+-----+
|aguascalientes|2018| cytotec|2789.0102689560144|    1|
|aguascalientes|2018|   arava| 2153.623006982741|    2|
|aguascalientes|2018|  keppra| 2071.391358928916|    3|
|aguascalientes|2018| asenlix|1628.6417380386258|    4|
|aguascalientes|2018| lipitor| 1523.565196600486|    5|
|aguascalientes|2019| cytotec| 2789.412213740458|    1|
|aguascalientes|2019|   arava|2266.4246575342468|    2|
|aguascalientes|2019|  keppra|2140.8033777185387|    3|
|aguascalientes|2019| lipitor| 1598.637640449438|    4|
|aguascalientes|2019| asenlix|1582.5726116204717|    5|
+--------------+----+--------+------------------+-----+
only showing top 10 rows

In [None]:
# Guardamos en un diccionario 7 dataframes pivoteados, uno por anio
anios = [row["anio"] for row in df_top5_estado_anio.select("anio").distinct().collect()]

pivot_anio = {}

for anio in anios:
    df_anio = df_top5_estado_anio.filter(col('anio') == anio)
    
    pivot_df = df_anio.groupBy('estado') \
        .pivot('orden', [1, 2, 3, 4, 5]) \
        .agg(first('producto'))
    
    pivot_anio[anio] = pivot_df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 2018
 


In [None]:
pivot_anio[2018].show(32)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+------+------+-------+-------+
|              estado|      1|     2|     3|      4|      5|
+--------------------+-------+------+------+-------+-------+
|      aguascalientes|cytotec| arava|keppra|asenlix|lipitor|
|     baja california|cytotec| arava|keppra|asenlix|lipitor|
| baja california sur|cytotec| arava|keppra|asenlix|lipitor|
|            campeche|cytotec| arava|keppra|asenlix|lipitor|
|             chiapas|cytotec| arava|keppra|asenlix|lipitor|
|           chihuahua|cytotec| arava|keppra|asenlix|lipitor|
|    ciudad de méxico|cytotec| arava|keppra|asenlix|lipitor|
|coahuila de zaragoza|cytotec| arava|keppra|asenlix|lipitor|
|              colima|cytotec| arava|keppra|asenlix|lipitor|
|             durango|cytotec| arava|keppra|asenlix|bonviva|
|    estado de méxico|cytotec| arava|keppra|asenlix|lipitor|
|          guanajuato|cytotec| arava|keppra|asenlix|lipitor|
|            guerrero|cytotec|keppra| arava|asenlix|lipitor|
|             hidalgo|cy

#### 2019

In [None]:
pivot_anio[2019].show(32)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+-----+------+-------+-------+
|              estado|      1|    2|     3|      4|      5|
+--------------------+-------+-----+------+-------+-------+
|      aguascalientes|cytotec|arava|keppra|lipitor|asenlix|
|     baja california|cytotec|arava|keppra|asenlix|lipitor|
| baja california sur|cytotec|arava|keppra|asenlix|lipitor|
|            campeche|cytotec|arava|keppra|asenlix|lipitor|
|             chiapas|cytotec|arava|keppra|lipitor| evista|
|           chihuahua|cytotec|arava|keppra|asenlix|lipitor|
|    ciudad de méxico|cytotec|arava|keppra|asenlix|lipitor|
|coahuila de zaragoza|cytotec|arava|keppra|asenlix|lipitor|
|              colima|cytotec|arava|keppra|lipitor|asenlix|
|             durango|cytotec|arava|keppra|asenlix|lipitor|
|    estado de méxico|cytotec|arava|keppra|asenlix|lipitor|
|          guanajuato|cytotec|arava|keppra|lipitor|asenlix|
|            guerrero|cytotec|arava|keppra|asenlix|lipitor|
|             hidalgo|cytotec|arava|kepp

#### 2020

In [None]:
pivot_anio[2020].show(32)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------+-----------+-----------+------------+---------------+
|              estado|          1|          2|          3|           4|              5|
+--------------------+-----------+-----------+-----------+------------+---------------+
|      aguascalientes|    cytotec|      arava|     keppra|fosamax plus|        lipitor|
|     baja california|    cytotec|      arava|     keppra|     asenlix|        lipitor|
| baja california sur|    cytotec|      arava|     keppra|     asenlix|        lipitor|
|            campeche|    cytotec|      arava|     keppra|     lipitor|        asenlix|
|             chiapas|    cytotec|      arava|     keppra|     lipitor|   fosamax plus|
|           chihuahua|nebulizador| termómetro|aderogyl 15| antiflu-des|antiflu-des jr.|
|    ciudad de méxico|    cytotec|      arava|     keppra|     lipitor|   fosamax plus|
|coahuila de zaragoza|    cytotec|      arava|     keppra|     lexapro|        lipitor|
|              colima|      arav

#### 2021

In [None]:
pivot_anio[2021].show(32)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+-------+-------+--------------------+--------------------+
|              estado|      1|      2|      3|                   4|                   5|
+--------------------+-------+-------+-------+--------------------+--------------------+
|      aguascalientes|cytotec|  arava| keppra|             lexapro|        fosamax plus|
|     baja california|cytotec|  arava| keppra|        fosamax plus|             lipitor|
| baja california sur|  arava|cytotec| keppra|             lipitor|        fosamax plus|
|            campeche|  arava|cytotec| keppra|             lipitor|        fosamax plus|
|             chiapas|cytotec|  arava| keppra|             lipitor|        fosamax plus|
|           chihuahua|cytotec|  arava| keppra|        fosamax plus|             lipitor|
|    ciudad de méxico|cytotec|  arava| keppra|             lipitor|lantus. insulina ...|
|coahuila de zaragoza|cytotec|  arava| keppra|             lexapro|        fosamax plus|
|             durango

#### 2022

In [None]:
pivot_anio[2022].show(32)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+----------+--------+--------------------+--------------------+
|              estado|      1|         2|       3|                   4|                   5|
+--------------------+-------+----------+--------+--------------------+--------------------+
|      aguascalientes|cytotec|     arava|  keppra|lantus. insulina ...|             lipitor|
|     baja california|cytotec|     arava|  keppra|             lipitor|lantus. insulina ...|
| baja california sur|  arava|   cytotec|  keppra|             lipitor|        fosamax plus|
|            campeche|cytotec|     arava|  keppra|             lipitor|lantus. insulina ...|
|             chiapas|cytotec|     arava|  keppra|             lipitor|        fosamax plus|
|           chihuahua|  arava|   cytotec|  keppra|lantus. insulina ...|        fosamax plus|
|    ciudad de méxico|cytotec|     arava|  keppra|             lipitor|lantus. insulina ...|
|coahuila de zaragoza|  arava|   cytotec|  keppra|             lexapro

#### 2023

In [None]:
pivot_anio[2023].show(32)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------+------------+--------+--------------------+--------------------+
|              estado|       1|           2|       3|                   4|                   5|
+--------------------+--------+------------+--------+--------------------+--------------------+
|      aguascalientes|   arava|     cytotec|  keppra|           xatral-od|             lipitor|
|     baja california|   arava|     cytotec|  keppra|           xatral-od|             asenlix|
| baja california sur|   arava|     cytotec|  keppra|             lipitor|           xatral-od|
|            campeche|   arava|     cytotec|  keppra|             lipitor|           xatral-od|
|             chiapas| cytotec|       arava|  keppra|             lipitor|             asenlix|
|           chihuahua|   arava|     cytotec|  keppra|           xatral-od|             lipitor|
|    ciudad de méxico|   arava|     cytotec|  keppra|             lipitor|           xatral-od|
|coahuila de zaragoza|   arava|     cyto

#### 2024

In [None]:
pivot_anio[2024].show(32)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+-------+------+--------------------+--------------------+
|              estado|      1|      2|     3|                   4|                   5|
+--------------------+-------+-------+------+--------------------+--------------------+
|      aguascalientes|  arava|cytotec|keppra|           xatral-od|lantus. insulina ...|
|     baja california|  arava|cytotec|keppra|           xatral-od|        secotex ocas|
| baja california sur|  arava|cytotec|keppra|           xatral-od|lantus. insulina ...|
|            campeche|  arava|cytotec|keppra|           xatral-od|lantus. insulina ...|
|             chiapas|  arava|cytotec|keppra|        secotex ocas|             asenlix|
|           chihuahua|  arava|cytotec|keppra|           xatral-od|lantus. insulina ...|
|    ciudad de méxico|  arava|cytotec|keppra|           xatral-od|             lipitor|
|coahuila de zaragoza|  arava|cytotec|keppra|           xatral-od|lantus. insulina ...|
|             durango|  arava|cy