In [1]:
############################ I RUN THIS TO HAVE SPARK ON GOOGLE COLAB  #############################

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

import findspark
findspark.init()
#######################################################################

**NOTA:** Debido a razones de límites en mi equipo, cada query toma alrededor de 30 minutos (!), por lo que en la mayoria de los casos sólo esbozaré el procedimiento necesario para responder las preguntas.

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('OPI').getOrCreate()

In [3]:
path = '/content/drive/MyDrive/all_data.csv'

df = spark.read.csv(path, header=True, inferSchema=True)
df.createOrReplaceTempView("profeco")

In [None]:
df.show()

+--------------------+--------------------+--------------------+--------------------+-----------------+------+--------------------+------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------+----------+
|            producto|        presentacion|               marca|           categoria|         catalogo|precio|       fechaRegistro|   cadenaComercial|                giro|     nombreComercial|           direccion|          estado|           municipio| latitud|  longitud|
+--------------------+--------------------+--------------------+--------------------+-----------------+------+--------------------+------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------+----------+
|CUADERNO FORMA IT...|96 HOJAS PASTA DU...|            ESTRELLA|    MATERIAL ESCOLAR| UTILES ESCOLARES|  25.9|2011-05-18 00:00:...|ABASTECEDORA LUMEN|          PAPELERIAS|ABASTECEDORA 

In [None]:
df.printSchema()

root
 |-- producto: string (nullable = true)
 |-- presentacion: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- catalogo: string (nullable = true)
 |-- precio: double (nullable = true)
 |-- fechaRegistro: string (nullable = true)
 |-- cadenaComercial: string (nullable = true)
 |-- giro: string (nullable = true)
 |-- nombreComercial: string (nullable = true)
 |-- direccion: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- latitud: string (nullable = true)
 |-- longitud: string (nullable = true)



**1a - ¿Cuántos registros hay?**

In [None]:
df.count()

62530715

**1b - ¿Cuántas categorías?**

In [None]:
from pyspark.sql.functions import countDistinct, count

df.select(countDistinct('categoria')).show()

+-------------------------+
|count(DISTINCT categoria)|
+-------------------------+
|                       41|
+-------------------------+



**1c - ¿Cuántas cadenas comerciales están siendo monitoreadas?**

In [None]:
df.select(countDistinct('cadenaComercial')).show()

+-------------------------------+
|count(DISTINCT cadenaComercial)|
+-------------------------------+
|                            705|
+-------------------------------+



Se puede obtener lo mismo sólo con SQL, dependiendo de qué es más cómodo:

In [None]:
spark.sql('SELECT COUNT(DISTINCT cadenaComercial) FROM profeco').show()

+-------------------------------+
|count(DISTINCT cadenaComercial)|
+-------------------------------+
|                            705|
+-------------------------------+



**1d - ¿Cómo podrías determinar la calidad de los datos? ¿Detectaste algún tipo de
inconsistencia o error en la fuente?**


Revisar el número de valores nulos por corlumna. También revisar que datos numericos tengan tipo int. o float. Revisar las entradas como fechas que tengan tipo timestamp o similar. etc. 

**1e - ¿Cuáles son los productos más monitoreados en cada entidad?**

Se debe hacer una lista con cada uno de los estados y hacer una rutina que corra el siguiente comando estado por estado.

En este caso lo hago manualmente, cómo ejemplo, para CDMX:

In [None]:
spark.sql('SELECT producto,  count(producto) AS numero_prod FROM profeco WHERE estado = "DISTRITO FEDERAL" GROUP BY producto ORDER BY numero_prod DESC').show(5)

+--------------------+-----------+
|            producto|numero_prod|
+--------------------+-----------+
|            REFRESCO|     287463|
|                 FUD|     207569|
|LECHE ULTRAPASTEU...|     175640|
|   DETERGENTE P/ROPA|     173452|
|             YOGHURT|     136720|
+--------------------+-----------+
only showing top 5 rows



También para EDOMEX:

In [None]:
spark.sql('SELECT producto,  count(producto) AS numero_prod FROM profeco WHERE estado = "MÉXICO" GROUP BY producto ORDER BY numero_prod DESC').show(5)

+--------------------+-----------+
|            producto|numero_prod|
+--------------------+-----------+
|            REFRESCO|     194939|
|                 FUD|     149141|
|   DETERGENTE P/ROPA|     132862|
|LECHE ULTRAPASTEU...|     116522|
|    JABON DE TOCADOR|      97330|
+--------------------+-----------+
only showing top 5 rows



etc.

**1f. ¿Cuál es la cadena comercial con mayor variedad de productos monitoreados?**

In [None]:
spark.sql("SELECT cadenaComercial, COUNT(DISTINCT producto) AS num_prods FROM profeco GROUP BY cadenaComercial ORDER BY num_prods DESC").show()

+--------------------+---------+
|     cadenaComercial|num_prods|
+--------------------+---------+
|             SORIANA|     1059|
|            WAL-MART|     1051|
|MEGA COMERCIAL ME...|     1049|
|  COMERCIAL MEXICANA|     1036|
|            CHEDRAUI|     1026|
|     MERCADO SORIANA|     1024|
|      BODEGA AURRERA|     1012|
|HIPERMERCADO SORIANA|     1006|
|              H.E.B.|     1001|
|        SORIANA PLUS|      999|
|       SORIANA SUPER|      996|
|BODEGA COMERCIAL ...|      979|
|        I.S.S.S.T.E.|      937|
|            SUPERAMA|      936|
|              S MART|      851|
|SUPERMERCADOS SAN...|      849|
|              SUMESA|      848|
|         CITY MARKET|      844|
|FARMACIA GUADALAJARA|      819|
|            CASA LEY|      808|
+--------------------+---------+
only showing top 20 rows



Ganador:  Soriana.

**2a - Genera una canasta de productos básicos que te permita comparar los precios
geográfica y temporalmente. Justifica tu elección y procedimiento**

Podemos escoger la canasta básica establecida por el gobierno: maiz, frijol, azucar, harina, atún, etc.

Sin embargo, creo que puede ser interesante comparar esto con los productos más comprados en todo el pais. 

In [None]:
spark.sql("SELECT producto, COUNT(producto) AS num_prods FROM profeco GROUP BY producto ORDER BY num_prods DESC").show(10)

+--------------------+---------+
|            producto|num_prods|
+--------------------+---------+
|            REFRESCO|  1247981|
|   DETERGENTE P/ROPA|   990122|
|                 FUD|   933410|
|LECHE ULTRAPASTEU...|   886716|
|             SHAMPOO|   745467|
|    JABON DE TOCADOR|   744914|
|      CHILES EN LATA|   724862|
|            MAYONESA|   697586|
|             YOGHURT|   632362|
|         DESODORANTE|   623684|
+--------------------+---------+
only showing top 10 rows



Claramente la canasta básica del covierno no coincide en todo. La razón es que algunos de los productos de la canasta básica no se compran en supermercados. En nuestro caso, podemos tomar los primeros 5 productos más vendidos: Refresco, Detergente, Fud, Leche, Shampoo.

**¿Cuál es la ciudad más cara del país? ¿Cuál es la más barata?**

In [7]:
# Hacemos lo siguiente: hacemos un query con el precio promedio de los productos de nuestra canasta, por municipio
## Despues hacemos la suma de dichos promedios por municipio y ordenamos de mayor a menor

cara = spark.sql("SELECT DISTINCT municipio, SUM(precio_caro) OVER (PARTITION BY municipio) AS compra_cara FROM (SELECT DISTINCT municipio, producto, AVG(precio) OVER (PARTITION BY municipio, producto) AS precio_caro FROM profeco WHERE producto IN ('REFRESCO', 'DETERGENTE P/ROPA', 'FUD', 'LECHE ULTRAPASTEURIZADA', 'SHAMPOO') ORDER BY precio_caro DESC) ORDER BY compra_cara DESC")
cara.show()

+--------------------+------------------+
|           municipio|       compra_cara|
+--------------------+------------------+
|SAN PEDRO GARZA G...|217.39785649066005|
|        HUIXQUILUCAN| 216.8869899942785|
|     PACHUCA DE SOTO| 215.2043456909919|
|         TLAQUEPAQUE|  213.349432981075|
|          CUAUTITLÁN| 211.8357091652402|
|        AZCAPOTZALCO|210.04420565264616|
|        BOCA DEL RÍO| 208.6811400978044|
|             REYNOSA| 208.0587701920812|
|             URUAPAN|207.49227857892078|
|      ÁLVARO OBREGÓN|207.10170419030254|
|SAN NICOLÁS DE LO...|207.02418927644106|
|            COACALCO|205.97422111516224|
|             TLALPAN| 205.7586059002221|
|               LERMA|205.45531735989022|
|         SOLIDARIDAD|205.15808339685364|
|     SAN LUIS POTOSÍ|205.11077973728177|
|           NAUCALPAN|204.44441321458214|
|              LA PAZ|204.04773687834506|
|       BENITO JUÁREZ| 203.6132885391588|
|             ZAPOPAN|203.31718787938536|
+--------------------+------------

In [None]:
## El municipio más caro es San Pedro Garza Garcia

In [9]:
# Hacemos algo similar: hacemos un query con el precio promedio de los productos de nuestra canasta, por municipio
## Despues hacemos la suma de dichos promedios por municipio y ordenamos de menor a mayor 

barato = spark.sql("SELECT DISTINCT municipio, SUM(precio_barato) OVER (PARTITION BY municipio) AS compra_barata FROM (SELECT DISTINCT municipio, producto, AVG(precio) OVER (PARTITION BY municipio, producto) AS precio_barato FROM profeco WHERE producto IN ('REFRESCO', 'DETERGENTE P/ROPA', 'FUD', 'LECHE ULTRAPASTEURIZADA', 'SHAMPOO') ORDER BY precio_barato ASC) ORDER BY compra_barata ASC")
barato.show()

+--------------------+------------------+
|           municipio|     compra_barata|
+--------------------+------------------+
|TLAJOMULCO DE ZUÑ...|12.799999999999969|
|CUAJIMALPA DE MOR...|13.565953654188949|
|TLAHUAC          ...|13.679775280898877|
|MILPA ALTA       ...| 13.81283422459893|
|             TLÁHUAC|              15.0|
|          MILPA ALTA|              15.0|
|    GENERAL ESCOBEDO|              15.0|
|CUAJIMALPA DE MOR...|              15.0|
|GENERAL ESCOBEDO ...| 15.46583850931677|
|ATIZAPAN DE ZARAG...|           25.8975|
|ATIZAPÁN DE ZARAGOZA| 75.31979999999999|
|TLAQUEPAQUE      ...| 129.9642952325469|
|SANTA CRUZ XOXOCO...|158.79924516322956|
|GUASAVE          ...|159.57206585367652|
|ZACATELCO        ...|160.49334334824488|
|CHIAUTEMPAN      ...|160.99891856834864|
|TULTITLAN        ...|162.36614092956552|
|SAN PEDRO GARZA G...|163.74611607171408|
|HUAMANTLA        ...| 165.2192413225726|
|APODACA          ...| 165.5379930745749|
+--------------------+------------

**2d. ¿Cuál es el estado más caro y en qué mes?**

In [12]:
## Puedo calcular el estado más caro, reutilizando el query para municipios.

estado_caro = spark.sql("SELECT DISTINCT estado, SUM(precio_caro) OVER (PARTITION BY estado) AS compra_cara FROM (SELECT DISTINCT estado, producto, AVG(precio) OVER (PARTITION BY estado, producto) AS precio_caro FROM profeco WHERE producto IN ('REFRESCO', 'DETERGENTE P/ROPA', 'FUD', 'LECHE ULTRAPASTEURIZADA', 'SHAMPOO') ORDER BY precio_caro DESC) ORDER BY compra_cara DESC")
estado_caro.show()

+--------------------+------------------+
|              estado|       compra_cara|
+--------------------+------------------+
|     SAN LUIS POTOSÍ|202.71557213130023|
| BAJA CALIFORNIA SUR|200.38483590506513|
|          TAMAULIPAS|199.36316616219798|
|            GUERRERO|199.13012274323754|
|             NAYARIT|198.05648438773568|
|             MORELOS|196.62800199664963|
|             HIDALGO|196.13185267678838|
|            CAMPECHE|194.99959180966994|
|     BAJA CALIFORNIA| 194.9383263865298|
|        QUINTANA ROO|193.50680877742104|
|           CHIHUAHUA|192.68913390406604|
|COAHUILA DE ZARAGOZA|191.87903424202457|
|VERACRUZ DE IGNAC...|189.39515602898933|
|      AGUASCALIENTES|189.29881047063415|
|             DURANGO|189.05349423951793|
|             CHIAPAS|188.92518332641737|
|              SONORA|187.06202509086233|
|    DISTRITO FEDERAL| 184.3700186455659|
| COL. EDUARDO GUERRA|184.19481490138622|
|              MÉXICO| 183.5369834958451|
+--------------------+------------

Suena extraño: yo hubiera pensado que la ciudad más cara era CDMX, pero igual depende de la elección de productos en la canasta.

**2e - ¿Cuáles son los principales riesgos de hacer análisis de series de tiempo con
estos datos?**

Un problema importante siempre es que no hay registro de cierto días, así que hay que pensar en la mejor forma de hacer un fill para dichos casos. O grandes diferencias en ventas en distintos estados/ municipios/ tiendas para una fecha dada, lo cual implicaría diseñar una forma de normalizar datos. 

Por razones de tiempo y capacidad de cómputo, me detengo aquí. 