# Spark SQL with pySpark

Answer the following questions

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Row, Window
from pyspark.sql.types import IntegerType

In [16]:
spark = (
    SparkSession
    .builder
    .master("local")
    .config("spark.sql.autoBroadcastJoinThreshold", -1)
    .config("spark.executor.memory", "3g")
    .appName("Exercise1")
    .getOrCreate()
)

In [17]:
# Read the source tables
products_table = spark.read.parquet("./data/products_parquet")
sales_table    = spark.read.parquet("./data/sales_parquet")
sellers_table  = spark.read.parquet("./data/sellers_parquet")

In [4]:
products_table.printSchema()
sales_table.printSchema()
sellers_table.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: string (nullable = true)
 |-- bill_raw_text: string (nullable = true)

root
 |-- seller_id: string (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: string (nullable = true)



In [18]:
# Convertir las columnas numéricas a tipos de datos adecuados
sales_table = sales_table.withColumn("num_pieces_sold", col("num_pieces_sold").cast("int"))
products_table = products_table.withColumn("price", col("price").cast("float"))
sellers_table = sellers_table.withColumn("daily_target", col("daily_target").cast("float"))


# Data study

For each seller find the average performance as a % of their daily target amount

In [6]:
# Unir las tablas 
sales_with_price = sales_table.join(products_table, "product_id")

In [7]:
# total vendido por cada vendedor cada día
daily_sales = sales_with_price.withColumn("total_sale", col("num_pieces_sold") * col("price")) \
    .groupBy("seller_id", "date").agg(sum("total_sale").alias("daily_total_sale"))

In [9]:
# rendimiento diario en porcentaje 
daily_performance = daily_sales.join(sellers_table, "seller_id") \
    .withColumn("performance_percent", (col("daily_total_sale") / col("daily_target")) * 100)

In [10]:
# rendimiento promedio 
average_performance = daily_performance.groupBy("seller_id") \
    .agg(round(avg("performance_percent"),2).alias("average_performance_percent"))

average_performance.show()

+---------+---------------------------+
|seller_id|average_performance_percent|
+---------+---------------------------+
|        1|                     212.23|
|        2|                      72.97|
|        3|                     169.68|
|        4|                      35.75|
|        5|                      45.66|
|        6|                      52.91|
|        7|                       28.1|
|        8|                     102.76|
|        9|                      40.81|
+---------+---------------------------+



In [19]:
# Estilo Sol

(
    sales_table
    .join(sellers_table, on='seller_id', how='inner')
    .join(products_table, on='product_id', how='inner')
    .withColumn('total_sale', col('num_pieces_sold') * col('price'))
    .groupby(['seller_id'])
    .agg(
        sum("total_sale").alias("daily_total_sale"),
        first('daily_target').alias('daily_target')
    )
    .withColumn("performance_percent", (col("daily_total_sale") / col("daily_target")) * 10)
    .groupby('seller_id')    
    .agg(round(avg("performance_percent"),2).alias("average_performance_percent"))
    .orderBy('seller_id')
    .show()
)

+---------+---------------------------+
|seller_id|average_performance_percent|
+---------+---------------------------+
|        1|                     212.23|
|        2|                      72.97|
|        3|                     169.68|
|        4|                      35.75|
|        5|                      45.66|
|        6|                      52.91|
|        7|                       28.1|
|        8|                     102.76|
|        9|                      40.81|
+---------+---------------------------+



|seller_id|avg_daily_percentage|
|--------:|--------------------|
|        1|              212.22|
|        2|               72.99|
|        3|              169.68|
|        4|               35.75|
|        5|               45.66|
|        6|               52.91|
|        7|                28.1|
|        8|              102.76|
|        9|               40.81|

Find out how many sellers are not making their daily goal for every day.

In [11]:
# Unir las tablas 
sales_with_price = sales_table.join(products_table, "product_id")

# total vendido por cada vendedor cada día
daily_sales = sales_with_price.withColumn("total_sale", col("num_pieces_sold") * col("price")) \
    .groupBy("seller_id", "date").agg(round(sum("total_sale"), 2).alias("daily_total_sale"))

# Unir tabla de vendedores + daily_target y vendedor
daily_sales = daily_sales.join(sellers_table, "seller_id")

# Seleccionar las columnas necesarias
daily_sales = daily_sales.select("seller_name", "date", "daily_total_sale", "daily_target")

# Filtrar rendimiento diario es menor que el objetivo diario
underperforming_sellers = daily_sales.filter(col("daily_total_sale") < col("daily_target"))

# Mostrar 
underperforming_sellers.orderBy("seller_name", "date").show()


+-----------+----------+----------------+------------+
|seller_name|      date|daily_total_sale|daily_target|
+-----------+----------+----------------+------------+
|   seller_2|2020-07-01|        518119.0|    754188.0|
|   seller_2|2020-07-02|        512012.0|    754188.0|
|   seller_2|2020-07-03|        525180.0|    754188.0|
|   seller_2|2020-07-04|        617840.0|    754188.0|
|   seller_2|2020-07-05|        508774.0|    754188.0|
|   seller_2|2020-07-06|        589736.0|    754188.0|
|   seller_2|2020-07-07|        635401.0|    754188.0|
|   seller_2|2020-07-08|        604797.0|    754188.0|
|   seller_2|2020-07-09|        503370.0|    754188.0|
|   seller_2|2020-07-10|        487760.0|    754188.0|
|   seller_4|2020-07-01|        549595.0|   1532808.0|
|   seller_4|2020-07-02|        512200.0|   1532808.0|
|   seller_4|2020-07-03|        627174.0|   1532808.0|
|   seller_4|2020-07-04|        519878.0|   1532808.0|
|   seller_4|2020-07-05|        486912.0|   1532808.0|
|   seller

only showing top 20 rows:

|seller_name|      date|sellers_daily_profit|daily_target|
|----------:|----------|--------------------|------------|
|   seller_2|2020-07-01|            518119.0|      754188|
|   seller_2|2020-07-02|            512012.0|      754188|
|   seller_2|2020-07-03|            525180.0|      754188|
|   seller_2|2020-07-04|            617840.0|      754188|
|   seller_2|2020-07-05|            508774.0|      754188|
|   seller_2|2020-07-06|            589736.0|      754188|
|   seller_2|2020-07-07|            635401.0|      754188|
|   seller_2|2020-07-08|            604797.0|      754188|
|   seller_2|2020-07-09|            503370.0|      754188|
|   seller_2|2020-07-10|            487760.0|      754188|
|   seller_4|2020-07-01|            549595.0|     1532808|
|   seller_4|2020-07-02|            512200.0|     1532808|
|   seller_4|2020-07-03|            627174.0|     1532808|
|   seller_4|2020-07-04|            519878.0|     1532808|
|   seller_4|2020-07-05|            486912.0|     1532808|
|   seller_4|2020-07-06|            563439.0|     1532808|
|   seller_4|2020-07-07|            531202.0|     1532808|
|   seller_4|2020-07-08|            584587.0|     1532808|
|   seller_4|2020-07-09|            508752.0|     1532808|
|   seller_4|2020-07-10|            596303.0|     1532808|

In [26]:
# Estilo Sol
(
    sales_table
    .join(sellers_table, on='seller_id', how='inner')
    .join(products_table, on='product_id', how='inner')
    .withColumn('total', (col('num_pieces_sold')*col('price')))
    .groupby(['seller_id','date'])
    .agg(
        sum('total').alias('sellers_daily'),
        first('daily_target').alias('daily_target')
    )
    .filter(col("sellers_daily") < col("daily_target"))
    .orderBy(['seller_id','date'])
    
    .show()
)    


+---------+----------+-------------+------------+
|seller_id|      date|sellers_daily|daily_target|
+---------+----------+-------------+------------+
|        2|2020-07-01|     518119.0|    754188.0|
|        2|2020-07-02|     512012.0|    754188.0|
|        2|2020-07-03|     525180.0|    754188.0|
|        2|2020-07-04|     617840.0|    754188.0|
|        2|2020-07-05|     508774.0|    754188.0|
|        2|2020-07-06|     589736.0|    754188.0|
|        2|2020-07-07|     635401.0|    754188.0|
|        2|2020-07-08|     604797.0|    754188.0|
|        2|2020-07-09|     503370.0|    754188.0|
|        2|2020-07-10|     487760.0|    754188.0|
|        4|2020-07-01|     549595.0|   1532808.0|
|        4|2020-07-02|     512200.0|   1532808.0|
|        4|2020-07-03|     627174.0|   1532808.0|
|        4|2020-07-04|     519878.0|   1532808.0|
|        4|2020-07-05|     486912.0|   1532808.0|
|        4|2020-07-06|     563439.0|   1532808.0|
|        4|2020-07-07|     531202.0|   1532808.0|


What sellers have the biggest diffference between the day they performed the best and the day they performed the worst?

In [27]:
# Calcular el rendimiento máximo y mínimo para cada vendedor
performance_stats = daily_performance.groupBy("seller_id") \
    .agg(
        round(expr("min(performance_percent)"),2).alias("min_performance"),
        round(expr("max(performance_percent)"),2).alias("max_performance")
        
    )

In [28]:
# Calcular la diferencia entre el mejor y el peor día de rendimiento
performance_diff = performance_stats.withColumn("performance_diff", 
    round(col("max_performance") - col("min_performance"), 2))

In [29]:
# Encontrar los vendedores con la mayor diferencia
max_performance = performance_diff.orderBy(col("performance_diff").asc())

max_performance.show()

+---------+---------------+---------------+----------------+
|seller_id|min_performance|max_performance|performance_diff|
+---------+---------------+---------------+----------------+
|        7|          23.43|           31.3|            7.87|
|        4|          31.77|          40.92|            9.15|
|        6|          46.13|          57.49|           11.36|
|        9|          32.52|          46.83|           14.31|
|        5|          37.56|          52.02|           14.46|
|        2|          64.67|          84.25|           19.58|
|        8|          87.55|         114.94|           27.39|
|        1|         193.52|          227.6|           34.08|
|        3|         145.56|         197.62|           52.06|
+---------+---------------+---------------+----------------+



|seller_id|min_daily_percentage|max_daily_percentage|min_max_difference|
|--------:|--------------------|--------------------|------------------|
|        7|               23.43|                31.3|              7.87|
|        4|               31.77|               40.92|              9.15|
|        6|               46.13|               57.49|             11.36|
|        9|               32.52|               46.83|             14.31|
|        5|               37.56|               52.02|             14.46|
|        2|               64.67|               84.25|             19.58|
|        8|               87.55|              114.94|             27.39|
|        1|              193.52|               227.6|             34.08|
|        3|              145.56|              197.62|             52.06|

In [73]:
# Estilo Sol
(
    sales_table
    .join(sellers_table, on='seller_id', how='inner')
    .join(products_table, on='product_id', how='inner')
    .withColumn('total', (col('num_pieces_sold')*col('price')))
    .groupby(['seller_id','date'])
    
    .agg(
        sum("total").alias("daily_total_sale"),
        first('daily_target').alias('daily_target')
    )
    
    .withColumn("performance_percent", (col("daily_total_sale") / col("daily_target")) * 100)
    .groupby('seller_id')    
    .agg(
        round(expr("min(performance_percent)"),2).alias("min_performance"),
        round(expr("max(performance_percent)"),2).alias("max_performance"),
        round((max("performance_percent")-min("performance_percent")),2).alias("min_max_difference")
        
    )
    
    .show()
)    


+---------+---------------+---------------+------------------+
|seller_id|min_performance|max_performance|min_max_difference|
+---------+---------------+---------------+------------------+
|        7|          23.43|           31.3|              7.87|
|        3|         145.56|         197.62|             52.06|
|        8|          87.55|         114.94|             27.39|
|        5|          37.56|          52.02|             14.46|
|        6|          46.13|          57.49|             11.36|
|        9|          32.52|          46.83|             14.31|
|        1|         193.52|          227.6|             34.08|
|        4|          31.77|          40.92|              9.15|
|        2|          64.67|          84.25|             19.58|
+---------+---------------+---------------+------------------+



# Pivot tables

Let's imagine that there is a "product category", based on the first number of the product id. To create this column, you can use the functions [lit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.lit.html#) and [concat](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.concat.html), and the property [substr](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.substr.html) of a column

Find out about whether or not it is more common to sell more units during the weekend, depending on product category. Use a [pivot table](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.GroupedData.pivot.html).

TIP: what metric should you use to know whether is it more common to sell during the weekend or not?

In [7]:


products_table = products_table.withColumn("product_category", concat(lit("category_"), col("product_id").substr(1, 1)))


sales_with_details = sales_table.join(products_table, "product_id")


sales_with_details = sales_with_details.withColumn("weekend", when(dayofweek(col("date")).isin(1, 7), True).otherwise(False))


category_sales = sales_with_details.groupBy("product_category", "weekend").agg(avg("num_pieces_sold").alias("total_pieces_sold"))


pivot_table = category_sales.groupBy("weekend").pivot("product_category").agg(round(sum("total_pieces_sold"),2))


pivot_table = pivot_table.orderBy("weekend")


pivot_table.show()


+-------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
|weekend|category_1|category_2|category_3|category_4|category_5|category_6|category_7|category_8|category_9|
+-------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
|  false|     51.87|     50.81|     51.01|      50.2|     50.49|     50.08|     49.82|     50.04|     50.42|
|   true|     52.11|     52.29|     49.08|     49.12|      47.4|     48.09|     49.21|     52.64|     54.09|
+-------+----------+----------+----------+----------+----------+----------+----------+----------+----------+



I'm not telling you the results! But it should look like this:

|weekend|category_1|category_2|category_3|category_4|category_5|category_6|category_7|category_8|category_9|
|------:|-----|-----|-----|-----|-----|-----|-----|-----|-----|
|   true|  -  |  -  |  -  |  -  |  -  |  -  |  -  |  -  |  -  |
|  false|  -  |  -  |  -  |  -  |  -  |  -  |  -  |  -  |  -  |

Go back to the exercise of "which sellers are not making their daily goal" and study the percentage of days they make it, depending on the day of week. Can you find out any interesting information?

In [15]:


sales_table = sales_table.withColumn("num_pieces_sold", col("num_pieces_sold").cast("int"))
products_table = products_table.withColumn("price", col("price").cast("float"))


sales_with_price = sales_table.join(products_table, "product_id")


daily_sales = sales_with_price.withColumn("total_sale", col("num_pieces_sold") * col("price")) \
    .groupBy("seller_id", "date").agg(round(sum("total_sale"), 2).alias("daily_total_sale"))


sales_with_target = daily_sales.join(sellers_table, "seller_id")


sales_with_target = sales_with_target.withColumn("met_target", col("daily_total_sale") >= col("daily_target").cast("float"))


sales_with_target = sales_with_target.withColumn("day_of_week", dayofweek(col("date")))


performance_by_day = sales_with_target.groupBy("seller_name", "day_of_week") \
    .agg((round(sum(when(col("met_target"), 1).otherwise(0)) / sum(lit(1)), 2) * 100).alias("percentage_met"))


pivot_table = performance_by_day.groupBy("seller_name").pivot("day_of_week").agg(first("percentage_met")).orderBy("seller_name")


pivot_table.show()


+-----------+-----+-----+-----+-----+-----+-----+-----+
|seller_name|    1|    2|    3|    4|    5|    6|    7|
+-----------+-----+-----+-----+-----+-----+-----+-----+
|   seller_1|100.0|100.0|100.0|100.0|100.0|100.0|100.0|
|   seller_2|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|
|   seller_3|100.0|100.0|100.0|100.0|100.0|100.0|100.0|
|   seller_4|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|
|   seller_5|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|
|   seller_6|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|
|   seller_7|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|
|   seller_8|100.0|100.0|100.0|100.0|100.0|  0.0|  0.0|
|   seller_9|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|  0.0|
+-----------+-----+-----+-----+-----+-----+-----+-----+



Again, I'm not telling you the results.

|seller_name|    1|    2|    3|    4|    5|    6|    7|
|----------:|-----|-----|-----|-----|-----|-----|-----|
|   seller_1|  -  |  -  |  -  |  -  |  -  |  -  |  -  |
|   seller_2|  -  |  -  |  -  |  -  |  -  |  -  |  -  |
|   seller_3|  -  |  -  |  -  |  -  |  -  |  -  |  -  |
|   seller_4|  -  |  -  |  -  |  -  |  -  |  -  |  -  |
|   seller_5|  -  |  -  |  -  |  -  |  -  |  -  |  -  |
|   seller_6|  -  |  -  |  -  |  -  |  -  |  -  |  -  |
|   seller_7|  -  |  -  |  -  |  -  |  -  |  -  |  -  |
|   seller_8|  -  |  -  |  -  |  -  |  -  |  -  |  -  |
|   seller_9|  -  |  -  |  -  |  -  |  -  |  -  |  -  |


# Window functions

[Window functions](https://medium.com/@uzzaman.ahmed/pyspark-window-functions-a-comprehensive-guide-dc9bdad8c7ae) allow us to query subsets of the data without the need to do things like creating another group by table and joining the two.

See [the documentation](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/window.html) for everything you can do with window functions.

Which seller, and on which day, has made the most money compared to their previous day? Which one has made the least?

In [33]:

daily_sales = (
    sales_table
    .groupBy('seller_id')
    .agg(sum_col('num_pieces_sold').alias('daily_total_sale'))
)


sales_table_with_daily_total = (
    sales_table
    .join(daily_sales, on='seller_id', how='inner')
)


window = Window.orderBy('date')


result = (
    sales_table_with_daily_total
    .withColumn('prev_day_sales', lag(col('daily_total_sale')).over(window))
    .withColumn('profit_percent_of_prev_day', round((col('daily_total_sale') * 100) / col('prev_day_sales'), 1))
    .filter(col('profit_percent_of_prev_day').isNotNull())
    .drop('daily_total_sale', 'prev_day_sales','order_id','product_id','bill_raw_text')
)

In [34]:
min = result.orderBy(col('profit_percent_of_prev_day')).limit(1)
max = result.orderBy(col('profit_percent_of_prev_day').desc()).limit(1)
(
    min
    .union(max)
    .show()
)

+---------+----------+---------------+--------------------------+
|seller_id|      date|num_pieces_sold|profit_percent_of_prev_day|
+---------+----------+---------------+--------------------------+
|        1|2020-07-02|             65|                      98.9|
|        5|2020-07-01|             85|                     102.3|
+---------+----------+---------------+--------------------------+



|seller_name|      date|profit_percent_of_prev_day|
|----------:|----------|--------------------------|
|   seller_5|2020-07-09|                      77.0|
|   seller_9|2020-07-03|                     143.0|

Coming back to the "category" we set before, find out, for each category, which salesman is the best, the second best and the worst at making profit from it.

In [37]:
(
    sales_table
    .join(sellers_table, on='seller_id', how='inner')
    .join(products_table, on='product_id', how='inner')
    .withColumn('total', (col('num_pieces_sold')*col('price')))
    .withColumn('product_category', concat(lit('Category'), col('product_id').substr(1,1)))
    .groupby('product_category')
    .pivot('seller_name')    
    .agg(sum("total").alias("daily_total_sale"))
    .show()
)    

+----------------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|product_category|seller_1|seller_2|seller_3|seller_4|seller_5|seller_6|seller_7|seller_8|seller_9|
+----------------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|       Category2|556522.0|710244.0|536541.0|588812.0|645581.0|584572.0|633007.0|692742.0|608536.0|
|       Category7|581579.0|568232.0|652735.0|672607.0|636005.0|702608.0|648622.0|593523.0|512249.0|
|       Category8|640234.0|577545.0|501397.0|711303.0|687903.0|651014.0|540931.0|591161.0|782179.0|
|       Category9|376654.0|471396.0|369138.0|362044.0|421683.0|351043.0|419635.0|407165.0|395138.0|
|       Category1|695834.0|754037.0|650204.0|707027.0|650765.0|766770.0|654051.0|638202.0|695829.0|
|       Category5|597671.0|535015.0|575381.0|634780.0|595126.0|555131.0|580592.0|691991.0|591207.0|
|       Category6|604616.0|653195.0|684348.0|668271.0|518444.0|738343.0|578859.0|599273.0|550348.0|


In [39]:
per_category = (
      sales_table
    .join(sellers_table, on='seller_id', how='inner')
    .join(products_table, on='product_id', how='inner')
    .withColumn('total', (col('num_pieces_sold')*col('price')))
    .withColumn('product_category', concat(lit('Category'), col('product_id').substr(1,1)))
    .groupby(['product_category','seller_name'])
    .agg(sum("total").alias("daily_total_sale"))    
)

In [40]:
window = Window.partitionBy('product_category').orderBy(desc('daily_total_sale'))

In [41]:
best = (
    per_category
    .withColumn('rank', rank().over(window))
    .withColumn('best',(when(col('rank')==1,col('seller_name'))))
    .dropna()
    .select('product_category','best')
)

In [42]:
second_best = (
    per_category
    .withColumn('rank', rank().over(window))
    .withColumn('best',(when(col('rank')==2,col('seller_name'))))
    .dropna()
    .select('product_category','best')
)

In [43]:
worst = (
    per_category
    .withColumn('rank', rank().over(window))
    .withColumn('best',(when(col('rank')==9,col('seller_name'))))
    .dropna()
    .select('product_category','best')
)

In [44]:
(
    best
    .join(second_best, on='product_category')
    .join(worst, on='product_category')
    .show()
)

+----------------+--------+--------+--------+
|product_category|    best|    best|    best|
+----------------+--------+--------+--------+
|       Category1|seller_6|seller_2|seller_8|
|       Category2|seller_2|seller_8|seller_3|
|       Category3|seller_8|seller_1|seller_6|
|       Category4|seller_7|seller_1|seller_4|
|       Category5|seller_8|seller_4|seller_2|
|       Category6|seller_6|seller_3|seller_5|
|       Category7|seller_6|seller_4|seller_9|
|       Category8|seller_9|seller_4|seller_3|
|       Category9|seller_2|seller_5|seller_6|
+----------------+--------+--------+--------+



Tip: you can use a pivot table if you assign a value for each seller, according to their position in the category (look at the documentation for window functions!) and then rename using the property [withColumnRenamed](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumnRenamed.html).

|  category|    best|second_best|   worst|
|---------:|--------|-----------|--------|
|category_1|seller_6|   seller_2|seller_8|
|category_2|seller_2|   seller_8|seller_3|
|category_3|seller_8|   seller_1|seller_6|
|category_4|seller_7|   seller_1|seller_4|
|category_5|seller_8|   seller_4|seller_2|
|category_6|seller_6|   seller_3|seller_5|
|category_7|seller_6|   seller_4|seller_9|
|category_8|seller_9|   seller_4|seller_3|
|category_9|seller_2|   seller_5|seller_6|