In [26]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import rank, col
from pyspark.sql.window import Window
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Opi") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [2]:
df = spark.read.csv("all_data.csv",header=True)

In [3]:
df.show()

+--------------------+--------------------+--------------------+--------------------+-----------------+------+--------------------+------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------+----------+
|            producto|        presentacion|               marca|           categoria|         catalogo|precio|       fechaRegistro|   cadenaComercial|                giro|     nombreComercial|           direccion|          estado|           municipio| latitud|  longitud|
+--------------------+--------------------+--------------------+--------------------+-----------------+------+--------------------+------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------+----------+
|CUADERNO FORMA IT...|96 HOJAS PASTA DU...|            ESTRELLA|    MATERIAL ESCOLAR| UTILES ESCOLARES|  25.9|2011-05-18 00:00:...|ABASTECEDORA LUMEN|          PAPELERIAS|ABASTECEDORA 

In [4]:
df.printSchema()

root
 |-- producto: string (nullable = true)
 |-- presentacion: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- catalogo: string (nullable = true)
 |-- precio: string (nullable = true)
 |-- fechaRegistro: string (nullable = true)
 |-- cadenaComercial: string (nullable = true)
 |-- giro: string (nullable = true)
 |-- nombreComercial: string (nullable = true)
 |-- direccion: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- latitud: string (nullable = true)
 |-- longitud: string (nullable = true)



In [5]:
df.count()

62530715

In [7]:
df.select("categoria").distinct().show()

+--------------------+
|           categoria|
+--------------------+
|    MATERIAL ESCOLAR|
|ARTS. PARA EL CUI...|
|PESCADOS Y MARISC...|
|UTENSILIOS DOMEST...|
|           categoria|
|DETERGENTES Y PRO...|
|CARNE Y VISCERAS ...|
|PRODUCTOS DE TEMP...|
|GALLETAS PASTAS Y...|
|  HORTALIZAS FRESCAS|
|                null|
|  DERIVADOS DE LECHE|
|TORTILLAS Y DERIV...|
|GRASAS ANIMALES C...|
|APARATOS ELECTRON...|
|     LEGUMBRES SECAS|
|                CAFE|
|   MUEBLES DE COCINA|
|CARNES FRIAS SECA...|
|CHOCOLATES Y GOLO...|
+--------------------+
only showing top 20 rows



In [8]:
df.select("categoria").distinct().count()

42

In [10]:
df.select("cadenaComercial").distinct().count()

706

In [18]:
conteo_productos_estado = df.groupBy("estado","producto").count()

In [19]:
conteo_productos_estado.show()

+--------------------+--------------------+-----+
|              estado|            producto|count|
+--------------------+--------------------+-----+
|              MÉXICO|TINTE PARA EL CAB...|44007|
|              MÉXICO|         TELEVISORES|29702|
|              MÉXICO|              ACELGA| 7691|
|              MÉXICO|       QUESO. COTIJA| 4414|
|    DISTRITO FEDERAL|              AZUCAR|18078|
|              MÉXICO|      DESENFRIOL-ITO|  642|
|             JALISCO|               ARROZ|11735|
|              OAXACA|PEDIALYTE. ELECTR...|  302|
|            TLAXCALA|        AGUA SIN GAS|14505|
|VERACRUZ DE IGNAC...|              TOMATE|  652|
| MICHOACÁN DE OCAMPO|         PAN DE CAJA|13003|
|             YUCATÁN|       FLAGENASE 400|  313|
| MICHOACÁN DE OCAMPO|              ECTIVA|   39|
|             YUCATÁN|        SALSA CATSUP| 6549|
|             YUCATÁN|            CLAVULIN|  183|
|             YUCATÁN|            CAPOTENA|  271|
|             JALISCO|       FLAGENASE 400|  699|


In [29]:
window = Window.partitionBy(conteo_productos_estado['estado']).orderBy(conteo_productos_estado['count'].desc())
conteo_productos_estado.select('*', rank().over(window).alias('rank')).filter(col('rank') <= 2).show() 

+--------------------+--------------------+-----+----+
|              estado|            producto|count|rank|
+--------------------+--------------------+-----+----+
|        QUINTANA ROO|                 FUD|34846|   1|
|        QUINTANA ROO|            REFRESCO|34367|   2|
|          NUEVO LEÓN|   DETERGENTE P/ROPA|50307|   1|
|          NUEVO LEÓN|            REFRESCO|49592|   2|
|             SINALOA|            REFRESCO|33115|   1|
|             SINALOA|   DETERGENTE P/ROPA|27177|   2|
|             TABASCO|            REFRESCO|28754|   1|
|             TABASCO|   DETERGENTE P/ROPA|26431|   2|
|     BAJA CALIFORNIA|            REFRESCO|37243|   1|
|     BAJA CALIFORNIA|   DETERGENTE P/ROPA|23395|   2|
|            TLAXCALA|            REFRESCO|43904|   1|
|            TLAXCALA|   DETERGENTE P/ROPA|41398|   2|
|COAHUILA DE ZARAGOZA|                 FUD|28613|   1|
|COAHUILA DE ZARAGOZA|            REFRESCO|26889|   2|
|                null|LECHE ULTRAPASTEU...|  804|   1|
|         

In [47]:
df.createOrReplaceTempView("all_data")

In [67]:
sqlDF = spark.sql("SELECT cadenaComercial, producto, COUNT(*) AS numberProduct FROM all_data GROUP BY cadenaComercial, producto ORDER BY numberProduct desc")
sqlDF.show()

+--------------------+--------------------+-------------+
|     cadenaComercial|            producto|numberProduct|
+--------------------+--------------------+-------------+
|TORTILLERIAS TRAD...|    TORTILLA DE MAIZ|       206950|
|            WAL-MART|            REFRESCO|       182066|
|      BODEGA AURRERA|            REFRESCO|       173538|
|      BODEGA AURRERA|                 FUD|       136876|
|            WAL-MART|   DETERGENTE P/ROPA|       134237|
|            WAL-MART|                 FUD|       129023|
|             SORIANA|            REFRESCO|       128758|
|             SORIANA|                 FUD|       120610|
|            WAL-MART|LECHE ULTRAPASTEU...|       118766|
|             SORIANA|   DETERGENTE P/ROPA|       116610|
|      BODEGA AURRERA|LECHE ULTRAPASTEU...|       115742|
|            WAL-MART|    JABON DE TOCADOR|       107971|
|            WAL-MART|             YOGHURT|       104072|
|            WAL-MART|             CERVEZA|       102961|
|            W

In [77]:
sqlDF.createOrReplaceTempView("sqlDF")
cadna_conMasProductos = spark.sql('SELECT cadenaComercial, COUNT(DISTINCT(producto)) AS numberProducts FROM sqlDF GROUP BY cadenaComercial ORDER BY numberProducts desc')
cadna_conMasProductos.show()

+--------------------+--------------+
|     cadenaComercial|numberProducts|
+--------------------+--------------+
|             SORIANA|          1059|
|            WAL-MART|          1051|
|MEGA COMERCIAL ME...|          1049|
|  COMERCIAL MEXICANA|          1036|
|            CHEDRAUI|          1026|
|     MERCADO SORIANA|          1024|
|      BODEGA AURRERA|          1012|
|HIPERMERCADO SORIANA|          1006|
|              H.E.B.|          1001|
|        SORIANA PLUS|           999|
|       SORIANA SUPER|           996|
|BODEGA COMERCIAL ...|           979|
|        I.S.S.S.T.E.|           937|
|            SUPERAMA|           936|
|              S MART|           851|
|SUPERMERCADOS SAN...|           849|
|              SUMESA|           848|
|         CITY MARKET|           844|
|FARMACIA GUADALAJARA|           819|
|            CASA LEY|           808|
+--------------------+--------------+
only showing top 20 rows

