Grouping and various aggregate functions
========================================

 Wecan categorise the aggregate functions into 3 types as mentioned below.
 
 
 ● Simple Function
 ● Grouping Function
 ● Windowsfunction
 
 Also there are three types to write the functions in Apache Spark.
 ● Programmatic Style
 ● ColumnExpression Style
 ● Spark Sql Style
 
 Simple Function & Grouping Function:
 This function is used to get a single output out of the entire dataframe. Mostly the
 output will be the final metrics and no further transformation is required
 

In [48]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.shuffle.useOldFetchProtocol','true'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [49]:
### Aggregate functions on sales data

df = spark.read.format("csv").option("header","True").option("inferSchema",True) \
.load("/public/trendytech/groceries.csv")

In [50]:
df.show()

+--------+---------+--------+----------+--------+
|order_id| location|    item|order_date|quantity|
+--------+---------+--------+----------+--------+
|      o1|  Seattle| Bananas|01/01/2017|       7|
|      o2|     Kent|  Apples|02/01/2017|      20|
|      o3| Bellevue| Flowers|02/01/2017|      10|
|      o4|  Redmond|    Meat|03/01/2017|      40|
|      o5|  Seattle|Potatoes|04/01/2017|       9|
|      o6| Bellevue|   Bread|04/01/2017|       5|
|      o7|  Redmond|   Bread|05/01/2017|       5|
|      o8| Issaquah|   Onion|05/01/2017|       4|
|      o9|  Redmond|  Cheese|05/01/2017|      15|
|     o10| Issaquah|   Onion|06/01/2017|       4|
|     o11|   Renton|   Bread|05/01/2017|       5|
|     o12| Issaquah|   Onion|07/01/2017|       4|
|     o13|Sammamish|   Bread|07/01/2017|       5|
|     o14| Issaquah|  Tomato|07/01/2017|       6|
|     o15| Issaquah|    Meat|08/01/2017|       3|
|     o16| Issaquah|    Meat|09/01/2017|       5|
|     o17| Issaquah|    Meat|10/01/2017|       6|


use all of the three types of the Simple function such as Count, Count Distinct, Sum, Average and Grouping Aggregation

In [51]:
from pyspark.sql.functions import * 
from pyspark.sql import *

In [52]:
## Simple aggregation using progrmmatic style

sap_df = df.select(count("*").alias("row_count"),countDistinct("item").alias("UniqueItems"), \
              sum("quantity").alias("total quantities"), avg("quantity").alias("Avg quantity"))
sap_df.show()

+---------+-----------+----------------+------------+
|row_count|UniqueItems|total quantities|Avg quantity|
+---------+-----------+----------------+------------+
|       21|          9|             273|        13.0|
+---------+-----------+----------------+------------+



In [53]:
## grouping function progrmmatic style

sum_df = df.groupBy("location","item").agg(sum("quantity").alias("Total Quantity"))
sum_df.show()

+---------+--------+--------------+
| location|    item|Total Quantity|
+---------+--------+--------------+
|     Kent|  Apples|            20|
|Sammamish|   Bread|             5|
| Issaquah|    Meat|            14|
|   Renton|   Bread|             5|
|  Seattle| Bananas|             7|
|  Seattle|Potatoes|             9|
|  Redmond|  Cheese|            15|
| Issaquah|   Onion|            12|
|  Redmond|   Bread|             5|
|  Redmond|    Meat|            40|
| Bellevue| Flowers|            10|
| Issaquah|  Tomato|             6|
| Bellevue|   Bread|           125|
+---------+--------+--------------+



 By using this approach we can call the built in functions like Count, Count Distinct,
 Sum, Average and Agg and write the code in a programmatic way

 Column Expression Style:

In [54]:
col_df = df.selectExpr("count(*) as row_number", "count(Distinct(item)) as unique_items", \
                      "sum(quantity) as total_quantity", "avg(quantity) as avg_quantity")
col_df.show()

+----------+------------+--------------+------------+
|row_number|unique_items|total_quantity|avg_quantity|
+----------+------------+--------------+------------+
|        21|           9|           273|        13.0|
+----------+------------+--------------+------------+



In [55]:
## Grouping function column expression style

In [56]:
sum_df1 = df.groupBy("location","item").agg(expr("sum(quantity) as total_quantity"))
sum_df1.show()

+---------+--------+--------------+
| location|    item|total_quantity|
+---------+--------+--------------+
|     Kent|  Apples|            20|
|Sammamish|   Bread|             5|
| Issaquah|    Meat|            14|
|   Renton|   Bread|             5|
|  Seattle| Bananas|             7|
|  Seattle|Potatoes|             9|
|  Redmond|  Cheese|            15|
| Issaquah|   Onion|            12|
|  Redmond|   Bread|             5|
|  Redmond|    Meat|            40|
| Bellevue| Flowers|            10|
| Issaquah|  Tomato|             6|
| Bellevue|   Bread|           125|
+---------+--------+--------------+



Spark sql style

In [57]:
df.createOrReplaceTempView("groceries")

In [58]:
spark.sql(""" select count(*) as row_count, count(distinct(item)) as unique_items, 
sum(quantity) as total_quantity, avg(quantity) as avg_quantity from groceries""").show()

+---------+------------+--------------+------------+
|row_count|unique_items|total_quantity|avg_quantity|
+---------+------------+--------------+------------+
|       21|           9|           273|        13.0|
+---------+------------+--------------+------------+



In [59]:
## grouping function in spark sql style

spark.sql(""" select location, item, sum(quantity) as total_quantity
from groceries group by location, item""").show()

+---------+--------+--------------+
| location|    item|total_quantity|
+---------+--------+--------------+
|     Kent|  Apples|            20|
|Sammamish|   Bread|             5|
| Issaquah|    Meat|            14|
|   Renton|   Bread|             5|
|  Seattle| Bananas|             7|
|  Seattle|Potatoes|             9|
|  Redmond|  Cheese|            15|
| Issaquah|   Onion|            12|
|  Redmond|   Bread|             5|
|  Redmond|    Meat|            40|
| Bellevue| Flowers|            10|
| Issaquah|  Tomato|             6|
| Bellevue|   Bread|           125|
+---------+--------+--------------+



Windows Function:
 This type of function is little advanced from the above function. Running Total, Rank,
 Dense Rank, Lead & Lag are some examples of the windows function.
 To use the window function we need to first set the window in which the aggregation
 should work.
 Window properties :
 1. Partition by =>This will help to partition the data by a specified columns
 2. Order By => We can use any Measure to order the partitioned column either
 Ascending or Descending.
 3. Window Size => Specifying the size of the window is very important here, as
 this will play a crucial role to get out function to work as per the requirement.
 ● Current Row => This property will call the measure from the current
 row
 ● Unbounded Preceding => This property will call the measure from the
 first row
 ● Unbounded Following => This property will call the measure from the
 last row
 ● Likewise we can use-1 or 1 to change the direction of the row in which
 the function needs to select the row value

In [60]:
## Running total
window_spec = Window.partitionBy("location").orderBy("item") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

sum_df2 = df.withColumn("running_total", sum("quantity").over(window_spec))
sum_df2.show()

+--------+---------+--------+----------+--------+-------------+
|order_id| location|    item|order_date|quantity|running_total|
+--------+---------+--------+----------+--------+-------------+
|     o15| Issaquah|    Meat|08/01/2017|       3|            3|
|     o16| Issaquah|    Meat|09/01/2017|       5|            8|
|     o17| Issaquah|    Meat|10/01/2017|       6|           14|
|      o8| Issaquah|   Onion|05/01/2017|       4|           18|
|     o10| Issaquah|   Onion|06/01/2017|       4|           22|
|     o12| Issaquah|   Onion|07/01/2017|       4|           26|
|     o14| Issaquah|  Tomato|07/01/2017|       6|           32|
|     o13|Sammamish|   Bread|07/01/2017|       5|            5|
|      o7|  Redmond|   Bread|05/01/2017|       5|            5|
|      o9|  Redmond|  Cheese|05/01/2017|      15|           20|
|      o4|  Redmond|    Meat|03/01/2017|      40|           60|
|      o1|  Seattle| Bananas|01/01/2017|       7|            7|
|      o5|  Seattle|Potatoes|04/01/2017|

We are using the same grocery data to see the running total of the quantity partitioned
by location, and ordered by Item. Window size is called from the current row to the
previous row.
Wecan also use the same Running total in Spark SQL style.


In [61]:
## running total on spark sql

spark.sql("""
    SELECT 
        location, 
        item, 
        order_date, 
        quantity,
        SUM(quantity) OVER(PARTITION BY location ORDER BY item, order_date) AS running_total
    FROM groceries
""").show()

+---------+--------+----------+--------+-------------+
| location|    item|order_date|quantity|running_total|
+---------+--------+----------+--------+-------------+
| Issaquah|    Meat|08/01/2017|       3|            3|
| Issaquah|    Meat|09/01/2017|       5|            8|
| Issaquah|    Meat|10/01/2017|       6|           14|
| Issaquah|   Onion|05/01/2017|       4|           18|
| Issaquah|   Onion|06/01/2017|       4|           22|
| Issaquah|   Onion|07/01/2017|       4|           26|
| Issaquah|  Tomato|07/01/2017|       6|           32|
|Sammamish|   Bread|07/01/2017|       5|            5|
|  Redmond|   Bread|05/01/2017|       5|            5|
|  Redmond|  Cheese|05/01/2017|      15|           20|
|  Redmond|    Meat|03/01/2017|      40|           60|
|  Seattle| Bananas|01/01/2017|       7|            7|
|  Seattle|Potatoes|04/01/2017|       9|           16|
|     Kent|  Apples|02/01/2017|      20|           20|
| Bellevue|   Bread|04/01/2017|       5|            5|
| Bellevue

Rank, Dense Rank & RowNumber:

 Wecan use the same window properties to work the Rank, Dense Rank & Row
 number. But we need to call the specific function when creating the columns and use
 them accordingly.


In [62]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank

In [63]:
window_spec = Window.partitionBy("location").orderBy("Quantity")

In [64]:
df_with_ranks = df.select("order_id","location","item","order_date","quantity") \
                .withColumn("row_number", row_number().over(window_spec)) \
                .withColumn("rank", rank().over(window_spec))
df_with_ranks.show()

+--------+---------+--------+----------+--------+----------+----+
|order_id| location|    item|order_date|quantity|row_number|rank|
+--------+---------+--------+----------+--------+----------+----+
|     o15| Issaquah|    Meat|08/01/2017|       3|         1|   1|
|      o8| Issaquah|   Onion|05/01/2017|       4|         2|   2|
|     o10| Issaquah|   Onion|06/01/2017|       4|         3|   2|
|     o12| Issaquah|   Onion|07/01/2017|       4|         4|   2|
|     o16| Issaquah|    Meat|09/01/2017|       5|         5|   5|
|     o14| Issaquah|  Tomato|07/01/2017|       6|         6|   6|
|     o17| Issaquah|    Meat|10/01/2017|       6|         7|   6|
|     o13|Sammamish|   Bread|07/01/2017|       5|         1|   1|
|      o7|  Redmond|   Bread|05/01/2017|       5|         1|   1|
|      o9|  Redmond|  Cheese|05/01/2017|      15|         2|   2|
|      o4|  Redmond|    Meat|03/01/2017|      40|         3|   3|
|      o1|  Seattle| Bananas|01/01/2017|       7|         1|   1|
|      o5|

In [66]:
## same logic but in spark sql

In [67]:
spark.sql("""
    SELECT 
        order_id,
        location,
        item,
        order_date,
        quantity,
        ROW_NUMBER() OVER (PARTITION BY location ORDER BY quantity) AS row_number,
        RANK()       OVER (PARTITION BY location ORDER BY quantity) AS rank,
        DENSE_RANK() OVER (PARTITION BY location ORDER BY quantity) AS dense_rank
    FROM groceries
""").show(truncate=False)

+--------+---------+--------+----------+--------+----------+----+----------+
|order_id|location |item    |order_date|quantity|row_number|rank|dense_rank|
+--------+---------+--------+----------+--------+----------+----+----------+
|o15     |Issaquah |Meat    |08/01/2017|3       |1         |1   |1         |
|o8      |Issaquah |Onion   |05/01/2017|4       |2         |2   |2         |
|o10     |Issaquah |Onion   |06/01/2017|4       |3         |2   |2         |
|o12     |Issaquah |Onion   |07/01/2017|4       |4         |2   |2         |
|o16     |Issaquah |Meat    |09/01/2017|5       |5         |5   |3         |
|o14     |Issaquah |Tomato  |07/01/2017|6       |6         |6   |4         |
|o17     |Issaquah |Meat    |10/01/2017|6       |7         |6   |4         |
|o13     |Sammamish|Bread   |07/01/2017|5       |1         |1   |1         |
|o7      |Redmond  |Bread   |05/01/2017|5       |1         |1   |1         |
|o9      |Redmond  |Cheese  |05/01/2017|15      |2         |2   |2         |

Rank:
 Rank function is used when there is same values for two categories and we want to
 rank them with the same number, but skip the following Rank. This kind of method is
 used in Races.
 
 Dense Rank:
 Dense Rank also works similar to the Rank function, but this function will not skip the
 next rank and assign the same rank to repeating values.
 
 RowNumber:
 RowNumber is widely used inorder to find the Top N values of a specific partitioned
 value. This will however give unique value to each row.

In [68]:
window_spec = Window.partitionBy("location").orderBy("order_date")

df_with_lead_lag = df.withColumn("prev_quantity", lag("quantity").over(window_spec)) \
                     .withColumn("next_quantity", lead("quantity").over(window_spec)) \
                     .withColumn("change_from_prev", col("quantity") - lag("quantity").over(window_spec)) \
                     .withColumn("change_to_next", lead("quantity", 1).over(window_spec) - col("quantity"))

df_with_lead_lag.show(truncate=False)

+--------+---------+--------+----------+--------+-------------+-------------+----------------+--------------+
|order_id|location |item    |order_date|quantity|prev_quantity|next_quantity|change_from_prev|change_to_next|
+--------+---------+--------+----------+--------+-------------+-------------+----------------+--------------+
|o8      |Issaquah |Onion   |05/01/2017|4       |null         |4            |null            |0             |
|o10     |Issaquah |Onion   |06/01/2017|4       |4            |4            |0               |0             |
|o12     |Issaquah |Onion   |07/01/2017|4       |4            |6            |0               |2             |
|o14     |Issaquah |Tomato  |07/01/2017|6       |4            |3            |2               |-3            |
|o15     |Issaquah |Meat    |08/01/2017|3       |6            |5            |-3              |2             |
|o16     |Issaquah |Meat    |09/01/2017|5       |3            |6            |2               |1             |
|o17     |

Lead &LagFunctions:
These two functions are used to play with measures and find some insights.
Lead => Lead helps to compare and get the next value in a new column of a
partitioned data.

Lag => likewise lead, lag also helps us to get the previous value of the partitioned
data.

We also need to use the Window properties with partition and order by in order to
work with Lead & La

Dealing With Null values in Apache Spark:

LikeWise in any form of data we have the same problem of NULL and nan values in
 Apache spark Data frames. Identifying and dealing with them wisely would help us to
 maintain the data.
 Let's look at the sales data below,

In [70]:
sales_schema = "store_id long, product String, quantity int, revenue double"
sales_df = spark.read.format("json").schema(sales_schema).option("header","True")\
.load("/public/trendytech/datasets/sales_data.json")
sales_df.show()

+--------+----------+--------+-------+
|store_id|   product|quantity|revenue|
+--------+----------+--------+-------+
|       1|     Apple|      10|  100.0|
|       2|    Banana|      15|   75.0|
|       3|    Orange|      12|   90.0|
|       4|     Mango|       8|  120.0|
|       5|     Grape|      20|  150.0|
|       6|Watermelon|       5|   50.0|
|       7|Strawberry|      18|  108.0|
|       8| Pineapple|      14|  140.0|
|       9|    Cherry|       7|  105.0|
|      10|      Pear|       9|   81.0|
|      11| Blueberry|      11|   88.0|
|      12|      Kiwi|      16|  128.0|
|      13|     Peach|      13|   91.0|
|      14|      Plum|       6|   54.0|
|      15|     Lemon|      10|   70.0|
|      16| Raspberry|      17|  136.0|
|      17|   Coconut|       4|   80.0|
|      18|   Avocado|      11|   99.0|
|      19|Blackberry|       8|   64.0|
|      20|         G|    null|    NaN|
+--------+----------+--------+-------+
only showing top 20 rows



The sales data has Null values in all of the columns. And it is present in both String
and Integer data types.
 
Type 1:
We can change the null values in the integer data type by replacing it with any value.

Here the replacement value majorly would be 0, and in some scenarios it can be a
mean of a particular category. This will be dependent on the requirements

In [71]:
null_df = sales_df.fillna(0)
null_df.show()

+--------+----------+--------+-------+
|store_id|   product|quantity|revenue|
+--------+----------+--------+-------+
|       1|     Apple|      10|  100.0|
|       2|    Banana|      15|   75.0|
|       3|    Orange|      12|   90.0|
|       4|     Mango|       8|  120.0|
|       5|     Grape|      20|  150.0|
|       6|Watermelon|       5|   50.0|
|       7|Strawberry|      18|  108.0|
|       8| Pineapple|      14|  140.0|
|       9|    Cherry|       7|  105.0|
|      10|      Pear|       9|   81.0|
|      11| Blueberry|      11|   88.0|
|      12|      Kiwi|      16|  128.0|
|      13|     Peach|      13|   91.0|
|      14|      Plum|       6|   54.0|
|      15|     Lemon|      10|   70.0|
|      16| Raspberry|      17|  136.0|
|      17|   Coconut|       4|   80.0|
|      18|   Avocado|      11|   99.0|
|      19|Blackberry|       8|   64.0|
|      20|         G|       0|    0.0|
+--------+----------+--------+-------+
only showing top 20 rows



Here we have filled the null values with 0.

 Type 2:
 If we find any null value is not helping or providing any insights to us, then we can
 simply delete them by dropping it

In [None]:
sales_df.filter(isnull("quantity")).show()

In [47]:
spark.stop()