In [13]:
import warnings
import os   
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, year, month, countDistinct, avg, stddev, max, min, expr, desc
warnings.filterwarnings("ignore")   

In [15]:
spark = SparkSession.builder \
    .appName("Spark SQL agg") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [5]:
df = spark.read.csv('data.csv', header=True, inferSchema=True)
df= df.sample(fraction=0.20, seed=42) ## solo para pruebas hay que quitar
df.createOrReplaceTempView("data")

                                                                                

In [7]:
## solo para dar un vistazo 
spark.sql("""SELECT * FROM data LIMIT 5""").show()

+--------------------+--------------------+-----------+--------------------+------------------+------+-------------+---------------+--------------------+---------------+--------------------+--------------+--------------+---------+-----------+
|            PRODUCTO|        PRESENTACION|      MARCA|           CATEGORIA|          CATALOGO|PRECIO|FECHAREGISTRO|CADENACOMERCIAL|                GIRO|NOMBRECOMERCIAL|           DIRECCION|        ESTADO|     MUNICIPIO|  LATITUD|   LONGITUD|
+--------------------+--------------------+-----------+--------------------+------------------+------+-------------+---------------+--------------------+---------------+--------------------+--------------+--------------+---------+-----------+
|              ACELGA|              MANOJO|        S/M|  HORTALIZAS FRESCAS|FRUTAS Y LEGUMBRES|   4.9|   2015-03-02| BODEGA AURRERA|SUPERMERCADO / TI...| BODEGA AURRERA|CONVENCION 1914 P...|AGUASCALIENTES|AGUASCALIENTES|21.886051|-102.312284|
|        AGUA SIN GAS|     B

In [9]:
df_agg = df.withColumn(
    "canasta_basica",
    when(
        (col("PRODUCTO") == "ACEITE") & (col("PRESENTACION") == "BOTELLA 946 ML. MIXTO"), 1
    ).when(
        (col("PRODUCTO") == "ARROZ") & (col("PRESENTACION") == "BOLSA 1 KG. SUPER EXTRA"), 1
    ).when(
        (col("PRODUCTO") == "ATUN") & (col("PRESENTACION") == "LATA 140 GR. EN HOJUELAS EN AGUA"), 1
    ).when(
        (col("PRODUCTO") == "AZUCAR") & (col("PRESENTACION") == "BOLSA PLASTICO 2 KG. REFINADA"), 1
    ).when(
        (col("PRODUCTO") == "CARNE RES") & (col("PRESENTACION") == "1 KG. GRANEL. FALDA O PARA DESHEBRAR"), 1
    ).when(
        (col("PRODUCTO") == "CEBOLLA") & (col("PRESENTACION") == "1 KG. BLANCA SIN RABO"), 1
    ).when(
        (col("PRODUCTO") == "CHILE FRESCO") & (col("PRESENTACION") == "1 KG. JALAPEÑO O CUARESMEÑO"), 1
    ).when(
        (col("PRODUCTO") == "CARNE CERDO") & (col("PRESENTACION") == "1 KG. GRANEL. MILANESA O MILANESA DE PIERNA"), 1
    ).when(
        (col("PRODUCTO") == "FRIJOL") & (col("PRESENTACION") == "BOLSA 1 KG. NEGRO"), 1
    ).when(
        (col("PRODUCTO") == "HUEVO") & (col("PRESENTACION") == "PAQUETE CON 12 BLANCO"), 1
    ).when(
        (col("PRODUCTO") == "JABON DE TOCADOR") & (col("PRESENTACION") == "BARRA 180 GR."), 1
    ).when(
        (col("PRODUCTO") == "JITOMATE") & (col("PRESENTACION") == "1 KG. SALADETTE/ HUAJE O TOMATE SALADETTE/ HUAJE"), 1
    ).when(
        (col("PRODUCTO") == "LECHE PASTEURIZADA") & (col("PRESENTACION") == "CAJA 1 LT."), 1
    ).when(
        (col("PRODUCTO") == "LIMON") & (col("PRESENTACION") == "1 KG. AGRIO CON SEMILLA O LIMON COLIMA"), 1
    ).when(
        (col("PRODUCTO") == "MANZANA") & (col("PRESENTACION") == "1 KG. GOLDEN"), 1
    ).when(
        (col("PRODUCTO") == "PIÑA") & (col("PRESENTACION") == "1 KG. O PIÑA ESMERALDA O PIÑA CAYENA"), 1
    ).when(
        (col("PRODUCTO") == "PAN DE CAJA") & (col("PRESENTACION") == "PAQUETE GRANDE 680 GR. REBANADO BLANCO"), 1
    ).when(
        (col("PRODUCTO") == "PAPA") & (col("PRESENTACION") == "1 KG. ALFA/BLANCA"), 1
    ).when(
        (col("PRODUCTO") == "PAPEL HIGIENICO") & (col("PRESENTACION") == "PAQUETE 12 ROLLOS. 200 HOJAS DOBLES"), 1
    ).when(
        (col("PRODUCTO") == "PASTA PARA SOPA") & (col("PRESENTACION") == "PAQUETE 200 GR. SPAGHETTI"), 1
    ).when(
        (col("PRODUCTO") == "CARNE POLLO") & (col("PRESENTACION") == "1 KG. GRANEL. PECHUGA CON HUESO O ANATÓMICA O SIN RABADILLA (CON PIEL)"), 1
    ).when(
        (col("PRODUCTO") == "SARDINA") & (col("PRESENTACION") == "LATA 425 GR. EN TOMATE"), 1
    ).when(
        (col("PRODUCTO") == "TORTILLA DE MAIZ") & (col("PRESENTACION") == "1 KG. GRANEL"), 1
    ).when(
        (col("PRODUCTO") == "ZANAHORIA") & (col("PRESENTACION") == "1 KG. MEDIANA"), 1
    ).otherwise(0)
)

In [10]:
df_agg = df_agg.filter((col("canasta_basica") == 1) & (col("PRECIO").between(5, 1000)))

In [11]:
df_agg = df_agg.groupBy(
    col("FECHAREGISTRO").alias("fecha"),
    year(col("FECHAREGISTRO")).alias("fecha_year"),
    month(col("FECHAREGISTRO")).alias("fecha_month"),
    col("PRODUCTO").alias("producto"),
    col("PRESENTACION").alias("descripcion")
).agg(
    countDistinct("MARCA").alias("count_marca"),
    countDistinct("NOMBRECOMERCIAL").alias("count_tiendas"),
    countDistinct("ESTADO").alias("count_estados"),
    countDistinct("MUNICIPIO").alias("count_municipios"),
    avg("PRECIO").alias("precio_avg"),
    stddev("PRECIO").alias("precio_desv_std"),
    expr("percentile_approx(PRECIO, 0.5)").alias("precio_median"),
    max("PRECIO").alias("precio_max"),
    min("PRECIO").alias("precio_min")
)

In [12]:
df_agg.show()

[Stage 7:>                                                        (0 + 12) / 12]

+----------+----------+-----------+------------------+--------------------+-----------+-------------+-------------+----------------+------------------+-------------------+-------------+----------+----------+
|     fecha|fecha_year|fecha_month|          producto|         descripcion|count_marca|count_tiendas|count_estados|count_municipios|        precio_avg|    precio_desv_std|precio_median|precio_max|precio_min|
+----------+----------+-----------+------------------+--------------------+-----------+-------------+-------------+----------------+------------------+-------------------+-------------+----------+----------+
|2015-01-29|      2015|          1|  JABON DE TOCADOR|       BARRA 180 GR.|          3|           32|           17|              23|10.301764705882354|  1.035135683064274|         10.4|     12.55|       8.5|
|2015-02-16|      2015|          2|  TORTILLA DE MAIZ|        1 KG. GRANEL|          1|           55|           19|              30|11.583432835820894| 2.24466947842891

                                                                                

In [14]:
value_counts = df_agg.groupBy("producto").count()
value_counts_sorted = value_counts.orderBy(desc("count"))
value_counts_sorted.show(25)



+------------------+-----+
|          producto|count|
+------------------+-----+
|              ATUN|  246|
|  JABON DE TOCADOR|  246|
|           SARDINA|  246|
|       PAN DE CAJA|  246|
|             HUEVO|  246|
|           CEBOLLA|  245|
|             LIMON|  245|
|LECHE PASTEURIZADA|  245|
|   PAPEL HIGIENICO|  245|
|       CARNE CERDO|  245|
|       CARNE POLLO|  245|
|      CHILE FRESCO|  245|
|              PAPA|  245|
|  TORTILLA DE MAIZ|  245|
|          JITOMATE|  245|
|         ZANAHORIA|  245|
|              PIÑA|  245|
|         CARNE RES|  245|
|             ARROZ|  245|
|           MANZANA|  245|
|            AZUCAR|  245|
|            FRIJOL|  244|
|   PASTA PARA SOPA|  193|
|            ACEITE|   90|
+------------------+-----+



                                                                                

In [16]:
spark.stop()