In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd


### Se crea Session en SPARK

In [2]:
spark = SparkSession \
   .builder \
   .config("spark.some.config.option", "some-values") \
   .getOrCreate()

### Se carga el archivo all_data.csv

In [3]:
df_all_data = spark.read.format("csv").option("header", "true").load("data/all_data.csv")

In [11]:
df_all_data.printSchema()

root
 |-- producto: string (nullable = true)
 |-- presentacion: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- catalogo: string (nullable = true)
 |-- precio: string (nullable = true)
 |-- fechaRegistro: string (nullable = true)
 |-- cadenaComercial: string (nullable = true)
 |-- giro: string (nullable = true)
 |-- nombreComercial: string (nullable = true)
 |-- direccion: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- latitud: string (nullable = true)
 |-- longitud: string (nullable = true)



### 1. ¿Cuántos registros hay?

In [12]:
df_all_data.count()

62530715

### 2. ¿Cuántas categorías?

In [14]:
categorias = df_all_data.select("categoria")\
                        .filter("categoria is not NULL")\
                        .distinct()

In [15]:
categorias.count()

41

### 3. ¿Cuántas cadenas comerciales están siendo monitoreadas (y, por lo tanto, reportadas en esa base de datos)?

In [4]:
cadenasComerciales = df_all_data.select("cadenaComercial")\
                        .filter("cadenaComercial is not NULL")\
                        .distinct()

In [5]:
cadenasComerciales.count()

705

### 4. ¿Cuáles son los productos más monitoreados en cada estado de la república?

In [9]:
productos = df_all_data.where((col('estado').isNotNull()) & (col('producto').isNotNull()))\
                    .groupBy("estado","producto")\
                    .agg(count("producto").alias("total")) \
                    .select("estado","producto","total")

In [10]:
from pyspark.sql.window import Window

w = Window().partitionBy("estado").orderBy(col("total").desc())

productosMasMonitoreados = productos.withColumn("rn", row_number().over(w))\
                           .where(col("rn") == 1)\
                           .select("estado", "producto","total")\
                           .orderBy(col("total").desc())

In [11]:
productosMasMonitoreados.show(5)

+----------------+-----------------+------+
|          estado|         producto| total|
+----------------+-----------------+------+
|DISTRITO FEDERAL|         REFRESCO|287463|
|          MÉXICO|         REFRESCO|194939|
|         JALISCO|         REFRESCO| 81718|
|      NUEVO LEÓN|DETERGENTE P/ROPA| 50307|
|      GUANAJUATO|         REFRESCO| 49441|
+----------------+-----------------+------+
only showing top 5 rows



In [12]:
df_productosMonitoreados = productosMasMonitoreados.toPandas()

In [13]:
df_productosMonitoreados

Unnamed: 0,estado,producto,total
0,DISTRITO FEDERAL,REFRESCO,287463
1,MÉXICO,REFRESCO,194939
2,JALISCO,REFRESCO,81718
3,NUEVO LEÓN,DETERGENTE P/ROPA,50307
4,GUANAJUATO,REFRESCO,49441
5,TLAXCALA,REFRESCO,43904
6,MICHOACÁN DE OCAMPO,DETERGENTE P/ROPA,40144
7,BAJA CALIFORNIA,REFRESCO,37243
8,YUCATÁN,LECHE ULTRAPASTEURIZADA,35991
9,QUINTANA ROO,FUD,34846


### 5. ¿Cuál es la cadena comercial con mayor variedad de productos monitoreados?

In [74]:
cadenasComercialesPorProducto= df_all_data.where(col('producto').isNotNull())\
                              .groupBy('cadenaComercial') \
                              .agg(countDistinct('producto').alias('totalProductos'))\
                              .orderBy(desc('totalProductos')).limit(5)


In [75]:
cadenasComercialesPorProducto.show()

+--------------------+--------------+
|     cadenaComercial|totalProductos|
+--------------------+--------------+
|             SORIANA|          1059|
|            WAL-MART|          1051|
|MEGA COMERCIAL ME...|          1049|
|  COMERCIAL MEXICANA|          1036|
|            CHEDRAUI|          1026|
+--------------------+--------------+



### Parar session de SPARK

In [None]:
spark.stop()