In [1]:
from pyspark.sql import SparkSession
import pyspark.sql
import time

In [2]:
import pandas as pd
import numpy as np
import sys
pd.set_option('display.max_columns', 500)
np.set_printoptions(threshold=sys.maxsize)
pd.options.display.max_rows = 999
pd.options.mode.chained_assignment = None

In [3]:
import pyspark.sql.functions as func
from pyspark.sql.window import Window
from pyspark.sql.functions import countDistinct

In [4]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [13]:
df = spark.read.csv("/home/bebeto/Escritorio/OPI/Examen DS/data/all_data.csv", header=True)

In [14]:
df.printSchema()

root
 |-- producto: string (nullable = true)
 |-- presentacion: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- catalogo: string (nullable = true)
 |-- precio: string (nullable = true)
 |-- fechaRegistro: string (nullable = true)
 |-- cadenaComercial: string (nullable = true)
 |-- giro: string (nullable = true)
 |-- nombreComercial: string (nullable = true)
 |-- direccion: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- latitud: string (nullable = true)
 |-- longitud: string (nullable = true)



# 1 Procesamiento de los datos

In [5]:
#1. ¿Cuántos registros hay?
print('Numero de registros: {}'.format(df.count()))
# 62530715

In [16]:
#2. ¿Cuántas categorías?
start_time = time.time()

gr = df.select(df['categoria']).distinct().count()
print('Numero de categorias: {}'.format(gr))

print("--- segundos %s ---" % (time.time() - start_time))

Numero de categorias: 42
--- segundos 433.9514808654785 ---


In [17]:
#3. ¿Cuántas cadenas comerciales están siendo monitoreadas (y, por lo tanto, reportadas en esa base de datos)?
start_time = time.time()

gr = df.select(df['nombreComercial']).distinct().count()
print('Numero de cadenas comerciales: {}'.format(gr))

print("--- segundos %s ---" % (time.time() - start_time))

Numero de cadenas comerciales: 3895
--- segundos 419.03385519981384 ---


In [163]:
#4 ¿Cómo podrías determinar la calidad de los datos? 

#- a) Primero que nada que la fuente de donde se obtiene la información es confiable
#- b) Verificar que no haya valores perdidos (missing values)
#- c) Verificar que el tipo de dato de las columnas sea consistente con el nombre (ejemplo edad, que entre los valores no existan valores con punto flotante o cadenas de caracteres)
#- d) Si hay columnas que contengan nombres (personas, empresas) o direcciones, verificar que no se tengan duplicados, si el caso lo requiere.
#- e) Columnas numericas verificar existencia de outliers
#- f) Inconsistencia de codificacion en valores binarios ( H-hombre, M-mujer ) y que existan valores no propios con la columna a menos que se coloque un valor para los missing values (ej. L)

#  ¿Detectaste algún tipo de inconsistencia o error en la fuente?

# Si, hay missing values, en algunos casos solo esta el campo vacio, en otros aparece la cadena NA

In [13]:
#5. ¿Cuáles son los productos más monitoreados en cada estado de la república?
start_time = time.time()

gr = df.groupBy("estado", "producto").agg(func.count("producto").alias("num_productos"))
maxWindowSpec = Window().partitionBy("estado")
maxColumn = func.max("num_productos").over(maxWindowSpec)
dfWithMax = gr.withColumn("max_num_productos",maxColumn)
result = dfWithMax.filter("max_num_productos - num_productos == 0").orderBy('max_num_productos', ascending=False).drop("max_num_productos","num_productos")
result.show()

print("--- segundos %s ---" % (time.time() - start_time))

+--------------------+--------------------+-------------+-----------------+
|              estado|            producto|num_productos|max_num_productos|
+--------------------+--------------------+-------------+-----------------+
|    DISTRITO FEDERAL|            REFRESCO|       287463|           287463|
|              MÉXICO|            REFRESCO|       194939|           194939|
|             JALISCO|            REFRESCO|        81718|            81718|
|          NUEVO LEÓN|   DETERGENTE P/ROPA|        50307|            50307|
|          GUANAJUATO|            REFRESCO|        49441|            49441|
|            TLAXCALA|            REFRESCO|        43904|            43904|
| MICHOACÁN DE OCAMPO|   DETERGENTE P/ROPA|        40144|            40144|
|     BAJA CALIFORNIA|            REFRESCO|        37243|            37243|
|             YUCATÁN|LECHE ULTRAPASTEU...|        35991|            35991|
|        QUINTANA ROO|                 FUD|        34846|            34846|
|           

In [8]:
#6. ¿Cuál es la cadena comercial con mayor variedad de productos monitoreados?
start_time = time.time()
gr = df.select("nombreComercial", "producto").groupBy("nombreComercial").agg(countDistinct("producto").alias("num_productos_distintos")).orderBy('num_productos_distintos', ascending=False)
gr.show(1)
print("--- segundos %s ---" % (time.time() - start_time))

+---------------+-----------------------+
|nombreComercial|num_productos_distintos|
+---------------+-----------------------+
|        SORIANA|                   1000|
+---------------+-----------------------+
only showing top 1 row

--- segundos 486.1200981140137 ---


# 3. Visualización
#### a. Genera un mapa que nos permita identificar la oferta de categorías en la zona metropolitana de León Guanajuato y el nivel de precios en cada una de ellas. Se darán puntos extra si el mapa es interactivo

In [159]:
import pandas as pd
import geopandas as gpd
import numpy as np
from geopandas.tools import sjoin
import folium
from folium.plugins import MarkerCluster
from folium import IFrame
import shapely
from shapely.geometry import Point
import branca

In [6]:
tiendas_df = spark.read.csv("./data/all_data.csv", header=True)

In [10]:
df_guanajuato = tiendas_df.filter((func.col("estado") == 'GUANAJUATO'))

In [12]:
df_guanajuato.count()

2638456

In [31]:
#solo veremos una fraccion para no saturar
frac = df_guanajuato.sample(withReplacement=False, fraction=0.00009)

In [32]:
frac.count()

192

In [33]:
df_pd_guanajuato = frac.toPandas()

In [46]:
df_pd_guanajuato = df_pd_guanajuato[(df_pd_guanajuato['latitud'] != 'NA') & (df_pd_guanajuato['longitud'] != 'NA')]

In [48]:
df_pd_guanajuato['latitud'] = df_pd_guanajuato['latitud'].astype(float)
df_pd_guanajuato['longitud'] = df_pd_guanajuato['longitud'].astype(float)

In [49]:
#Se crea una GeoSerie de las locaciones convertidas a Shapely geometry objects, colocamos el crs a 4326
tiendas_geo = gpd.GeoSeries(df_pd_guanajuato.apply(lambda z: Point( z['longitud'], z['latitud']), 1),crs={'init': 'epsg:4326'})

#Creacion de un Geodataframe
tiendas = gpd.GeoDataFrame(df_pd_guanajuato.drop(['longitud', 'latitud'], 1), geometry=tiendas_geo)
tiendas.head()

               producto                                       presentacion  \
0         DVD / BLU RAY  DVD-C350. 1 CD; DOLBY DIGITAL; DTS DIGITAL OUT...   
1                TEMPRA              CAJA CON 10 TABLETAS DE 500 MG ADULTO   
2      LECHE CONDENSADA                         BOTELLA 450 GR. (PLASTICO)   
3  COMPONENTES DE AUDIO  CMT-EH15. CD'S 1 MP3. CASETERAS 1 MECANICA. PO...   
4           TELEVISORES  50 PT 350. COLOR 50 PLGS. PANTALLA PLASMA. C/ ...   

        marca              categoria           catalogo precio  \
0     SAMSUNG  APARATOS ELECTRONICOS  ELECTRODOMESTICOS    578   
1   SIN MARCA           MEDICAMENTOS       MEDICAMENTOS   31.4   
2  LA LECHERA        LECHE PROCESADA            BASICOS     28   
3        SONY  APARATOS ELECTRONICOS  ELECTRODOMESTICOS   1623   
4          LG  APARATOS ELECTRONICOS  ELECTRODOMESTICOS  10995   

             fechaRegistro            cadenaComercial  \
0  2011-11-10 00:00:00.000    MEGA COMERCIAL MEXICANA   
1  2011-02-01 00:00:

In [140]:
def popupHTML(lista):
    html="""
    <p>Tienda: {} </p>
    <p>Producto: {} </p>
    <p>Marca: {} </p>
    <b><p>Precio: $ {} </p></b>
    """.format(lista[0], lista[1], lista[2], lista[3])
    return html

In [153]:
def anadir_puntos_cluster(mapobj, gdf, lista_popups):
    
    coords, popups = [], [] 
    
    for i, row in gdf.iterrows():
        
        coords.append([row.geometry.y, row.geometry.x])
        
        label = popupHTML([row[lista_popups[0]],row[lista_popups[1]], row[lista_popups[2]], row[lista_popups[3]]])
        
        iframe = branca.element.IFrame(html=label, width=500, height=150)
        popup  = folium.Popup(iframe, max_width=500)
        popups.append(popup)
        
    tiendas_layer = folium.FeatureGroup(name = 'tiendas_layer')

    tiendas_layer.add_child(MarkerCluster(locations = coords, popups = popups))
    
    mapobj.add_child(tiendas_layer)
    
    return mapobj

In [154]:
mapa_tiendas = folium.Map([21.09516, -101.62957 ], zoom_start = 12)
mapa_tiendas = anadir_puntos_cluster(mapa_tiendas, tiendas, ['cadenaComercial','producto', 'marca', 'precio'])

In [160]:
folium.LayerControl().add_to(mapa_tiendas)
mapa_tiendas.save('mapa_tiendas.html')
mapa_tiendas