In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [4]:
appName = "opi"
spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

dfSchema = StructType([
  StructField("producto", StringType(), False),
  StructField("presentacion", StringType(), False),
  StructField("marca", StringType(), False),
  StructField("categoria", StringType(), False),
  StructField("catalogo", StringType(), False),
  StructField("precio", FloatType(), False),
  StructField("fechaRegistro", StringType(), False),
  StructField("cadenaComercial", StringType(), False),
  StructField("giro", StringType(), False),
  StructField("nombreComercial", StringType(), False),
  StructField("direccion", StringType(), False),
  StructField("estado", StringType(), False),
  StructField("municipio", StringType(), False),
  StructField("latitud", FloatType(), False),
  StructField("longitud", FloatType(), False)
])

In [5]:
#import pandas as pd
#df = pd.read_csv('all_data.csv')
#df.to_parquet('output.parquet')

In [6]:
df = spark.read.csv('all_data.csv',schema=dfSchema, header=True)

In [None]:
df.write.mode('overwrite').parquet("all_data.parquet")

In [None]:
df.show(5)

Procesamiento de los datos

a. ¿Cuántos registros hay?

In [5]:
#Cuenta el número de filas de nuestra dataframe
num_rows = df.count()
print("número de registros: ", num_rows)

número de registros:  62530715


b. ¿Cuántas categorías?

In [7]:
lst_1=df.select('categoria').distinct().collect()

In [8]:
lst_1

[Row(categoria='MATERIAL ESCOLAR'),
 Row(categoria='ARTS. PARA EL CUIDADO PERSONAL'),
 Row(categoria='PESCADOS Y MARISCOS EN CONSERVA'),
 Row(categoria='UTENSILIOS DOMESTICOS'),
 Row(categoria='categoria'),
 Row(categoria='DETERGENTES Y PRODUCTOS SIMILARES'),
 Row(categoria='CARNE Y VISCERAS DE RES'),
 Row(categoria='PRODUCTOS DE TEMPORADA (NAVIDEÐOS)'),
 Row(categoria='GALLETAS PASTAS Y HARINAS DE TRIGO'),
 Row(categoria='HORTALIZAS FRESCAS'),
 Row(categoria=None),
 Row(categoria='DERIVADOS DE LECHE'),
 Row(categoria='TORTILLAS Y DERIVADOS DEL MAIZ'),
 Row(categoria='GRASAS ANIMALES COMESTIBLES'),
 Row(categoria='APARATOS ELECTRONICOS'),
 Row(categoria='LEGUMBRES SECAS'),
 Row(categoria='CAFE'),
 Row(categoria='MUEBLES DE COCINA'),
 Row(categoria='CARNES FRIAS SECAS Y EMBUTIDOS'),
 Row(categoria='CHOCOLATES Y GOLOSINAS'),
 Row(categoria='PAN'),
 Row(categoria='CERVEZA'),
 Row(categoria='MEDICAMENTOS'),
 Row(categoria='CONDIMENTOS'),
 Row(categoria='REFRESCOS ENVASADOS'),
 Row(categori

In [9]:
len(lst_1)

42

Hay 40 categorías porque None y categoria no son categorías

c. ¿Cuántas cadenas comerciales están siendo monitoreadas?

In [10]:
lst_2=df.select('cadenaComercial').distinct().collect()

In [12]:
lst_2

[Row(cadenaComercial='JULIO CEPEDA JUGUETERIAS'),
 Row(cadenaComercial='WOOLWORTH'),
 Row(cadenaComercial='MARISCOS LA SEPTIMA'),
 Row(cadenaComercial='PAPELERIA EL TINTERO'),
 Row(cadenaComercial='LIBRERIA DE SANCHO PANZA'),
 Row(cadenaComercial='PAPELERIA Y LIBRERIA SAN MIGUEL'),
 Row(cadenaComercial='PLAZA DEL LIBRO'),
 Row(cadenaComercial='MERCADO ALONSO FELIPE DE ANDRADE'),
 Row(cadenaComercial='MERCADO LA CRUZ'),
 Row(cadenaComercial='ZAPATERIA PAKAR'),
 Row(cadenaComercial='SIGUBA'),
 Row(cadenaComercial='MEGA TOYS'),
 Row(cadenaComercial='MARISCOS VILLA AZUL'),
 Row(cadenaComercial='BODEGAS VICTORIA'),
 Row(cadenaComercial='MINISUPER INALA'),
 Row(cadenaComercial='S MART'),
 Row(cadenaComercial='LIBRERIA ATHENAS'),
 Row(cadenaComercial='FARMACIAS M+'),
 Row(cadenaComercial='MUEBLERIA CREDILAND'),
 Row(cadenaComercial='LOYEP (UNIFORMES)'),
 Row(cadenaComercial='FARMACIAS SAN FRANCISCO DE ASIS'),
 Row(cadenaComercial='PESCADERIA TAMPIQUEÑOS'),
 Row(cadenaComercial='PESCADERIA GUA

In [13]:
len(lst_2) #cadenaComercial, None

706

704 cadenas comerciales están siendo monitoreadas porque cadenaComercial y None no son cadenas comerciales

d. ¿Cómo podrías determinar la calidad de los datos? ¿Detectaste algún tipo de
inconsistencia o error en la fuente?

In [49]:
df.printSchema()

root
 |-- producto: string (nullable = true)
 |-- presentacion: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- catalogo: string (nullable = true)
 |-- precio: float (nullable = true)
 |-- fechaRegistro: string (nullable = true)
 |-- cadenaComercial: string (nullable = true)
 |-- giro: string (nullable = true)
 |-- nombreComercial: string (nullable = true)
 |-- direccion: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- latitud: float (nullable = true)
 |-- longitud: float (nullable = true)



In [47]:
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           df.columns]).toPandas().T

Unnamed: 0,0
producto,0
presentacion,0
marca,0
categoria,887338
catalogo,228
precio,20
fechaRegistro,0
cadenaComercial,1184
giro,287
nombreComercial,0


In [54]:
df = df.filter((df.estado.isNotNull()) | (df.cadenaComercial.isNotNull()))

In [55]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           df.columns]).toPandas().T

Unnamed: 0,0
producto,0
presentacion,0
marca,0
categoria,887338
catalogo,228
precio,20
fechaRegistro,0
cadenaComercial,0
giro,287
nombreComercial,0


Hay registros en la columna de categoría que no son categorías al igual que en la de cadena comercial. Hay nullos en 9 de las 15 columnas.

e. ¿Cuáles son los productos más monitoreados en cada entidad?

In [56]:
lst_3 = df.groupBy("estado").count().orderBy("count").select("estado").collect()

In [57]:
lst_3

[Row(estado='estado'),
 Row(estado=' ESQ. SUR 125"'),
 Row(estado=' COL. EDUARDO GUERRA'),
 Row(estado='MORELOS'),
 Row(estado='NAYARIT'),
 Row(estado='GUERRERO'),
 Row(estado='CHIAPAS'),
 Row(estado='SAN LUIS POTOSÍ'),
 Row(estado='DURANGO'),
 Row(estado='CAMPECHE'),
 Row(estado='AGUASCALIENTES'),
 Row(estado='VERACRUZ DE IGNACIO DE LA LLAVE'),
 Row(estado='CHIHUAHUA'),
 Row(estado='BAJA CALIFORNIA SUR'),
 Row(estado='HIDALGO'),
 Row(estado='OAXACA'),
 Row(estado='COLIMA'),
 Row(estado='TAMAULIPAS'),
 Row(estado='BAJA CALIFORNIA'),
 Row(estado='ZACATECAS'),
 Row(estado='COAHUILA DE ZARAGOZA'),
 Row(estado='QUERÉTARO'),
 Row(estado='SONORA'),
 Row(estado='SINALOA'),
 Row(estado='TABASCO'),
 Row(estado='PUEBLA'),
 Row(estado='QUINTANA ROO'),
 Row(estado='TLAXCALA'),
 Row(estado='MICHOACÁN DE OCAMPO'),
 Row(estado='YUCATÁN'),
 Row(estado='GUANAJUATO'),
 Row(estado='NUEVO LEÓN'),
 Row(estado='JALISCO'),
 Row(estado='MÉXICO'),
 Row(estado='DISTRITO FEDERAL')]

In [58]:
lst_3=lst_3[3:]

In [59]:
lst_3

[Row(estado='MORELOS'),
 Row(estado='NAYARIT'),
 Row(estado='GUERRERO'),
 Row(estado='CHIAPAS'),
 Row(estado='SAN LUIS POTOSÍ'),
 Row(estado='DURANGO'),
 Row(estado='CAMPECHE'),
 Row(estado='AGUASCALIENTES'),
 Row(estado='VERACRUZ DE IGNACIO DE LA LLAVE'),
 Row(estado='CHIHUAHUA'),
 Row(estado='BAJA CALIFORNIA SUR'),
 Row(estado='HIDALGO'),
 Row(estado='OAXACA'),
 Row(estado='COLIMA'),
 Row(estado='TAMAULIPAS'),
 Row(estado='BAJA CALIFORNIA'),
 Row(estado='ZACATECAS'),
 Row(estado='COAHUILA DE ZARAGOZA'),
 Row(estado='QUERÉTARO'),
 Row(estado='SONORA'),
 Row(estado='SINALOA'),
 Row(estado='TABASCO'),
 Row(estado='PUEBLA'),
 Row(estado='QUINTANA ROO'),
 Row(estado='TLAXCALA'),
 Row(estado='MICHOACÁN DE OCAMPO'),
 Row(estado='YUCATÁN'),
 Row(estado='GUANAJUATO'),
 Row(estado='NUEVO LEÓN'),
 Row(estado='JALISCO'),
 Row(estado='MÉXICO'),
 Row(estado='DISTRITO FEDERAL')]

In [60]:
lst_4=[]
for i in range(len(lst_3)):
    lst_4.append(str(lst_3[i]).split("'")[1])

In [61]:
len(lst_4)

32

In [35]:
df_corregida = df.filter(col('estado').isin(lst_4))

In [44]:
print(df.count(),df_corregida.count()

62530715 62501145


In [39]:
lst_5=df_corregida.groupBy("estado").count().orderBy("count").select("estado").collect()

In [40]:
len(lst_5)

32

In [41]:
df4=df_corregida.groupBy("estado","producto").count().sort('count',ascending=False)

In [43]:
for i in range(len(lst_5)):
    df4.filter(df4.estado == str(lst_5[i]).split("'")[1]).show(3)

+-------+--------+-----+
| estado|producto|count|
+-------+--------+-----+
|MORELOS|REFRESCO| 9632|
|MORELOS|     FUD| 6909|
+-------+--------+-----+
only showing top 2 rows

+-------+---------+-----+
| estado| producto|count|
+-------+---------+-----+
|NAYARIT| REFRESCO| 8003|
|NAYARIT|PANTALLAS| 7083|
+-------+---------+-----+
only showing top 2 rows

+--------+--------+-----+
|  estado|producto|count|
+--------+--------+-----+
|GUERRERO|REFRESCO| 8932|
|GUERRERO|     FUD| 8846|
+--------+--------+-----+
only showing top 2 rows

+-------+--------+-----+
| estado|producto|count|
+-------+--------+-----+
|CHIAPAS|REFRESCO|14452|
|CHIAPAS|     FUD|10417|
+-------+--------+-----+
only showing top 2 rows

+---------------+---------+-----+
|         estado| producto|count|
+---------------+---------+-----+
|SAN LUIS POTOSÍ|      FUD|10164|
|SAN LUIS POTOSÍ|PANTALLAS| 8662|
+---------------+---------+-----+
only showing top 2 rows

+-------+-----------------+-----+
| estado|         product

En la tabla se observa los productos más monitoreados en cada entidad. Por ejemplo, en el DISTRITO FEDERAL el refresco y fud son los más monitoreados.

f. ¿Cuál es la cadena comercial con mayor variedad de productos monitoreados?

In [45]:
df5=df_corregida.groupBy("cadenaComercial").agg(countDistinct("producto")).sort("count(producto)",ascending=False)

In [46]:
df5.show()

+--------------------+---------------+
|     cadenaComercial|count(producto)|
+--------------------+---------------+
|             SORIANA|           1059|
|            WAL-MART|           1051|
|MEGA COMERCIAL ME...|           1049|
|  COMERCIAL MEXICANA|           1036|
|            CHEDRAUI|           1026|
|     MERCADO SORIANA|           1024|
|      BODEGA AURRERA|           1012|
|HIPERMERCADO SORIANA|           1006|
|              H.E.B.|           1001|
|        SORIANA PLUS|            999|
|       SORIANA SUPER|            996|
|BODEGA COMERCIAL ...|            979|
|        I.S.S.S.T.E.|            937|
|            SUPERAMA|            936|
|              S MART|            851|
|SUPERMERCADOS SAN...|            849|
|              SUMESA|            848|
|         CITY MARKET|            844|
|FARMACIA GUADALAJARA|            819|
|            CASA LEY|            808|
+--------------------+---------------+
only showing top 20 rows



La cadena comercial con mayor variedad de productos monitoreados es SORIANA