# B1 QQP

Primero obtuvimos los datos del portal de Profeco.

In [41]:
# import os
# import wget
# import zipfile

# if not os.path.exists('qqp.zip') or not os.path.exists('qqp.csv'):
#     wget.download(
#        'https://drive.google.com/uc?export=download&id=0B-4W2dww7ELNazFfOFVhNG5vckE',
#         'qqp.zip'
#     )

# if not os.path.exists('qqp.csv'):
#     zipfile.ZipFile('qqp.zip').extractall()
#     os.rename('all_data.csv','qqp.csv') 

El archivo `csv` es enorme (~19gb), por lo que lo vamos a convertir a un formato más trabajable. Para ello, primero quisieramos ver qué contiene.

In [27]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

In [3]:
csv_file = 'qqp.csv'
parquet_file = 'pyspark_qqp/qqp.parquet'
chunksize = 100_000

csv_stream = pd.read_csv(csv_file, sep=',', chunksize=chunksize, low_memory=False)

In [4]:
display(
    csv_stream.get_chunk(1)
)

Unnamed: 0,producto,presentacion,marca,categoria,catalogo,precio,fechaRegistro,cadenaComercial,giro,nombreComercial,direccion,estado,municipio,latitud,longitud
0,CUADERNO FORMA ITALIANA,96 HOJAS PASTA DURA. CUADRICULA CHICA,ESTRELLA,MATERIAL ESCOLAR,UTILES ESCOLARES,25.9,2011-05-18 00:00:00.000,ABASTECEDORA LUMEN,PAPELERIAS,ABASTECEDORA LUMEN SUCURSAL VILLA COAPA,CANNES No. 6 ESQ. CANAL DE MIRAMONTES,DISTRITO FEDERAL,TLALPAN,19.29699,-99.125417


In [5]:
csv_stream.get_chunk(1).dtypes

producto            object
presentacion        object
marca               object
categoria           object
catalogo            object
precio             float64
fechaRegistro       object
cadenaComercial     object
giro                object
nombreComercial     object
direccion           object
estado              object
municipio           object
latitud            float64
longitud           float64
dtype: object

Lo vamos a pasar a `parquet`, pero con los tipos correctos.

In [6]:
for i, chunk in enumerate(csv_stream):
    tmp_chunk = chunk.copy()
    tmp_chunk.fechaRegistro = pd.to_datetime(tmp_chunk.fechaRegistro, errors='coerce')
    tmp_chunk.precio = pd.to_numeric(tmp_chunk.precio, errors='coerce')
    tmp_chunk.latitud = pd.to_numeric(tmp_chunk.latitud, errors='coerce')
    tmp_chunk.longitud = pd.to_numeric(tmp_chunk.longitud, errors='coerce')
    print('\rChunk {}'.format(i+1), end="")
    if i == 0:
        parquet_schema = pa.Table.from_pandas(df=tmp_chunk).schema
        parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')
    table = pa.Table.from_pandas(tmp_chunk, schema=parquet_schema)
    parquet_writer.write_table(table)

print('\nFinished writing parquet file')
parquet_writer.close()

Chunk 626
Finished writing parquet file


Importamos `pyspark`.

In [1]:
import findspark
findspark.init('/home/radioabava/spark-3.0.0-bin-hadoop3.2/')

In [2]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext()
sqlContext = SQLContext(sc)
df = sqlContext.read.load("pyspark_qqp/qqp.parquet")

### ¿Cuántos registros hay?

In [3]:
print((df.count(), len(df.columns)))

(62530713, 15)


Observamos que hay más de 62 millones de registros.

### ¿Cuántas categorías?

In [4]:
df.select('categoria').distinct().count()

42

Hay 42 categorías de productos.

### ¿Cuántas cadenas comerciales están siendo monitoreadas?

In [5]:
df.select('cadenaComercial').distinct().count()

706

706 cadenas están siendo monitoreadas.

### ¿Cómo podrías determinar la calidad de los datos?

Lo primero que podemos observar es cuántos registros inválidos existen.

In [6]:
{ col: df.filter(df[col].isNull()).count() for col in df.columns }

{'producto': 0,
 'presentacion': 0,
 'marca': 0,
 'categoria': 887338,
 'catalogo': 228,
 'precio': 20,
 'fechaRegistro': 20,
 'cadenaComercial': 1184,
 'giro': 287,
 'nombreComercial': 0,
 'direccion': 0,
 'estado': 15054,
 'municipio': 15054,
 'latitud': 1152743,
 'longitud': 1152743}

Lo que más falta son latitudes y longitudes. Posteriormente las categorías. Una primera idea para completar los faltantes sería tomar la categoría del mismo producto en otro registro. De igual manera, por medio de la dirección, podríamos completar algunos datos de latitudes y longitudes. Sin embargo, para este ejercicio y dado que los datos faltantes representan menos del 3% del total, simplemente los eliminaremos.

In [3]:
df = df.na.drop()

In [8]:
print((df.count(), len(df.columns)))

(60502738, 15)


Ahora eliminamos los duplicados.

In [4]:
df = df.dropDuplicates()

In [5]:
df.count()

58834819

Como se puede observar en el código de traducción del `csv` en `parquet`, hubo que forzar la conversión en todas las variables cuantitativas. Esto porque se detectaron algunas inconsistencias en ciertos registros. En particular, algunos precios, en vez de traer el monto, traían la leyenda `precio`. Al forzar la conversión, automáticamente los volvimos `nan` en el archivo `parquet`. Lo mismo ocurrió con `longitud`, `latitud` y `fechaRegistro`.

Para los outliers, podríamos usar la regla del [rango intercuartil](https://en.wikipedia.org/wiki/Interquartile_range). En nuestro caso, por simplicidad vamos a despreciar todo lo que esté más allá de 3 desviaciones estándar de la media.

In [6]:
import pyspark.sql.functions as F

df_stats = df.select(
    F.mean(F.col('precio')).alias('precio_mean'),
    F.stddev(F.col('precio')).alias('precio_std'),
    F.min(F.col('precio')).alias('precio_min'),
    F.max(F.col('precio')).alias('precio_max'),
    F.mean(F.col('latitud')).alias('latitud_mean'),
    F.stddev(F.col('latitud')).alias('latitud_std'),
    F.min(F.col('latitud')).alias('latitud_min'),
    F.max(F.col('latitud')).alias('latitud_max'),
    F.mean(F.col('longitud')).alias('longitud_mean'),
    F.stddev(F.col('longitud')).alias('longitud_std'),
    F.min(F.col('longitud')).alias('longitud_min'),
    F.max(F.col('longitud')).alias('longitud_max'),
).collect()[0]

In [7]:
display(df_stats)

Row(precio_mean=521.0713249850005, precio_std=2001.017809523212, precio_min=0.1, precio_max=299999.0, latitud_mean=23.814449363067286, latitud_std=270.1015986824846, latitud_min=0.0, latitud_max=28745.542, longitud_mean=-100.05063097301948, longitud_std=10.955574335990459, longitud_min=-1061.61231, longitud_max=115.47893700000002)

In [8]:
filtered_df = df.filter(df['precio'] <= df_stats['precio_mean'] + 3 * df_stats['precio_std']) \
                .filter(df['latitud'] <= df_stats['latitud_mean'] + 3 * df_stats['latitud_std']) \
                .filter(df['latitud'] >= df_stats['latitud_mean'] - 3 * df_stats['latitud_std']) \
                .filter(df['longitud'] <= df_stats['longitud_mean'] + 3 * df_stats['longitud_std']) \
                .filter(df['longitud'] >= df_stats['longitud_mean'] - 3 * df_stats['longitud_std'])

In [54]:
display(filtered_df.count())

57452976

Hemos eliminado duplicados, faltantes atípicos y algunos que identificamos como mal capturados.

### ¿Cuáles son los productos más monitoreados en cada entidad?

In [9]:
prod_estado_df = filtered_df[['estado', 'producto']].groupby('estado','producto').count()

In [10]:
from pyspark.sql.window import Window

window = Window.partitionBy(
    prod_estado_df['estado']
).orderBy(
    prod_estado_df['count'].desc()
)

top_prod_estado_df = prod_estado_df.select(
    '*',
    F.rank().over(window).alias('posicion')
).filter(F.col('posicion') <= 2) \
 .orderBy(F.col('estado'), F.col('count').desc())

In [11]:
top_prod_estado_df.show(64)

+--------------------+--------------------+------+--------+
|              estado|            producto| count|posicion|
+--------------------+--------------------+------+--------+
|      AGUASCALIENTES|                 FUD| 10018|       1|
|      AGUASCALIENTES|            REFRESCO|  8073|       2|
|     BAJA CALIFORNIA|            REFRESCO| 35360|       1|
|     BAJA CALIFORNIA|   DETERGENTE P/ROPA| 21017|       2|
| BAJA CALIFORNIA SUR|            REFRESCO| 26290|       1|
| BAJA CALIFORNIA SUR|                 FUD| 16181|       2|
|            CAMPECHE|                 FUD| 12011|       1|
|            CAMPECHE|            REFRESCO| 10639|       2|
|             CHIAPAS|            REFRESCO| 13776|       1|
|             CHIAPAS|                 FUD|  9726|       2|
|           CHIHUAHUA|   DETERGENTE P/ROPA| 14333|       1|
|           CHIHUAHUA|      CHILES EN LATA| 13224|       2|
|COAHUILA DE ZARAGOZA|                 FUD| 26720|       1|
|COAHUILA DE ZARAGOZA|            REFRES

### ¿Cuál es la cadena comercial con mayor variedad de productos monitoreados?

In [12]:
categorias_cadena_df = filtered_df[
    ['cadenaComercial', 'producto']
].groupby('cadenaComercial') \
 .agg(F.countDistinct('producto'))

In [15]:
categorias_cadena_df.orderBy(F.col('count(producto)').desc()).show()

+--------------------+---------------+
|     cadenaComercial|count(producto)|
+--------------------+---------------+
|             SORIANA|           1046|
|            WAL-MART|           1038|
|MEGA COMERCIAL ME...|           1036|
|  COMERCIAL MEXICANA|           1023|
|            CHEDRAUI|           1013|
|     MERCADO SORIANA|           1010|
|      BODEGA AURRERA|            999|
|HIPERMERCADO SORIANA|            993|
|              H.E.B.|            988|
|        SORIANA PLUS|            986|
|       SORIANA SUPER|            983|
|BODEGA COMERCIAL ...|            963|
|        I.S.S.S.T.E.|            925|
|            SUPERAMA|            923|
|              S MART|            837|
|         CITY MARKET|            834|
|              SUMESA|            834|
|SUPERMERCADOS SAN...|            828|
|FARMACIA GUADALAJARA|            816|
|           SUPER CHE|            793|
+--------------------+---------------+
only showing top 20 rows

