In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
from pandas import DataFrame
import numpy as np
import statsmodels.api as sm
import matplotlib.pyplot as plt

In [3]:
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName('Data_Analysis').getOrCreate()

In [4]:
df=spark.read.csv('all_data.csv')

In [5]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)



In [6]:
df=spark.read.format('csv').options(header='true').load('all_data.csv')

In [7]:
df.printSchema()

root
 |-- producto: string (nullable = true)
 |-- presentacion: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- catalogo: string (nullable = true)
 |-- precio: string (nullable = true)
 |-- fechaRegistro: string (nullable = true)
 |-- cadenaComercial: string (nullable = true)
 |-- giro: string (nullable = true)
 |-- nombreComercial: string (nullable = true)
 |-- direccion: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- latitud: string (nullable = true)
 |-- longitud: string (nullable = true)



In [8]:
df.show(10)

+--------------------+--------------------+------------------+----------------+----------------+------+--------------------+------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------+----------+
|            producto|        presentacion|             marca|       categoria|        catalogo|precio|       fechaRegistro|   cadenaComercial|                giro|     nombreComercial|           direccion|          estado|           municipio| latitud|  longitud|
+--------------------+--------------------+------------------+----------------+----------------+------+--------------------+------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------+----------+
|CUADERNO FORMA IT...|96 HOJAS PASTA DU...|          ESTRELLA|MATERIAL ESCOLAR|UTILES ESCOLARES|  25.9|2011-05-18 00:00:...|ABASTECEDORA LUMEN|          PAPELERIAS|ABASTECEDORA LUME...|CANNES No. 6 ESQ....

## Procesamiento de los datos
### ¿Cuantos registros hay?

In [9]:
df.cache()
print('Numero de registros', df.count())

Numero de registros 62530715


## ¿Cuantas categorias?

In [10]:
from pyspark.sql.functions import col, countDistinct
import pyspark.sql.functions as fn
df.agg(countDistinct(col("categoria")).alias('count')).show()

+-----+
|count|
+-----+
|   41|
+-----+



## ¿Cuantas cadenas comerciales estan siendo monitoreadas?

In [11]:
df.agg(countDistinct(col("cadenaComercial")).alias('count')).show()

+-----+
|count|
+-----+
|  705|
+-----+



## ¿Como podrias determinar la calidad de los datos?¿Detectaste ualgun tipo de inconsistencia o error en la fuente?

Podria usar la libreria PySpark check aunque aun no es familiarizado con ella. Para poder detectar missing values, inconsistencias, etc.

## ¿Cuales son los productos mas monitoreados en cada entidad?

In [21]:
df.createOrReplaceTempView('productos')

sqldf=spark.sql('select \
                    estado,\
                    MAX(Pcount) as Mascontado\
                    from (select\
                            productos.estado,\
                            productos.producto,\
                            count(productos.producto) as Pcount\
                            from productos\
                            group by productos.estado,productos.producto\
                         )group by estado')


In [22]:
sqldf.show(10)

+--------------------+----------+
|              estado|Mascontado|
+--------------------+----------+
|        QUINTANA ROO|     34846|
|          NUEVO LEÓN|     50307|
|             SINALOA|     33115|
|             TABASCO|     28754|
|     BAJA CALIFORNIA|     37243|
|            TLAXCALA|     43904|
|COAHUILA DE ZARAGOZA|     28613|
|                null|       804|
|       ESQ. SUR 125"|       130|
| COL. EDUARDO GUERRA|       275|
+--------------------+----------+
only showing top 10 rows



## ¿Cual es la cadena comercial con mayor variedad de productos monitoreados?


In [14]:
sqldf1=spark.sql('select cadenacomercial,\
                            SUM(Pcount) as conteoproducto\
                            from(\
                                 select productos.cadenacomercial,\
                                 productos.producto,\
                                 count(productos.producto) as Pcount\
                                 from productos group by productos.cadenacomercial,productos.producto\
                                 )group by cadenacomercial order by SUM(Pcount) DESC' )

In [17]:
 sqldf1.show(10)

+--------------------+--------------+
|     cadenacomercial|conteoproducto|
+--------------------+--------------+
|            WAL-MART|       8643133|
|      BODEGA AURRERA|       6765453|
|             SORIANA|       6546211|
|MEGA COMERCIAL ME...|       4899509|
|            CHEDRAUI|       4221625|
|  COMERCIAL MEXICANA|       2598903|
|            SUPERAMA|       2590459|
|HIPERMERCADO SORIANA|       1706956|
|     MERCADO SORIANA|       1533080|
|       SORIANA SUPER|       1389901|
+--------------------+--------------+
only showing top 10 rows



# Analisis exploratorio de los datos

## genera una canasta de productos basicos que te permita comparar precios geografica y temporalmente

In [15]:
sqldf2=spark.sql('select \
                    productos.producto,\
                    productos.presentacion,\
                    productos.marca,\
                    productos.categoria,\
                    productos.precio,\
                    productos.fechaRegistro,\
                    productos.cadenaComercial,\
                    productos.estado,\
                    productos.municipio,\
                    productos.latitud,\
                    productos.longitud\
                    from productos where\
                    productos.producto IN (\
                    "PASTA","HUEVO","LECHE","QUESO","POLLO")'
                )


In [18]:
sqldf2.show(3)

+--------+--------------------+--------+---------+------+--------------------+------------------+----------------+--------------------+---------+----------+
|producto|        presentacion|   marca|categoria|precio|       fechaRegistro|   cadenaComercial|          estado|           municipio|  latitud|  longitud|
+--------+--------------------+--------+---------+------+--------------------+------------------+----------------+--------------------+---------+----------+
|   HUEVO| PAQUETE CON 12 ROJO|SAN JUAN|    HUEVO| 17.57|2011-01-10 00:00:...|COMERCIAL MEXICANA|          MÉXICO|ATIZAPAN         ...|       NA|        NA|
|   HUEVO|PAQUETE CON 12 BL...|SAN JUAN|    HUEVO| 17.57|2011-01-10 00:00:...|COMERCIAL MEXICANA|          MÉXICO|ATIZAPAN         ...|       NA|        NA|
|   HUEVO|PAQUETE CON 12 BL...| BACHOCO|    HUEVO|  18.2|2011-01-10 00:00:...|COMERCIAL MEXICANA|DISTRITO FEDERAL|BENITO JUAREZ    ...|19.366903|-99.181344|
+--------+--------------------+--------+---------+------+-

In [25]:
sqldf2.createOrReplaceTempView('canasta')

ciudad=spark.sql('select canasta.municipio,\
                        SUM(canasta.precio) as CostoCanasta\
                        from\
                        canasta\
                        group by canasta.municipio,canasta.precio\
                        order by CostoCanasta DESC')

In [26]:
ciudad.show(5)

+--------------------+------------------+
|           municipio|      CostoCanasta|
+--------------------+------------------+
|IZTAPALAPA       ...|           17964.6|
|ECATEPEC         ...|16183.300000000003|
|GUADALAJARA      ...|           14809.0|
|GUADALAJARA      ...|           13516.0|
|ECATEPEC         ...|12917.700000000003|
+--------------------+------------------+
only showing top 5 rows

