# Spark SQL – Analytics or Windowing Functions

As part of this session, we will see advanced operations such as aggregations, ranking, and windowing functions within each group using clauses such as OVER, PARTITION BY etc. We will also build a solution to the problem and run it on a multinode cluster.

* Window Functions – Overview
* Problem Statement – Get top n products per day
* Performing Aggregations
* Using Windowing Functions
* Ranking Functions
* Development Life Cycle

### Window Functions – Overview

Let us understand Functions related to aggregations, ranking and windowing functions.

* We use the functions in **SELECT** clause.
* Specification: **function() OVER (PARTITION BY column [ORDER BY column])**
* **PARTITION BY** is used to group the data based on a column.
* **ORDER BY** is used to sort the data based on a column.
* Example: **rank() OVER (PARTITION BY department_id ORDER BY salary DESC)**
* Aggregations – **sum, avg, min, max** etc
* Ranking – **rank, dense_rank, row_number** etc
* Windowing – **lead, lag** etc
* Window have APIs such as **PARTITION BY, ORDER BY**
* For aggregations, we can define the group by using **PARTITION BY**
* For ranking or windowing, we need to use **PARTITION BY** and then **ORDER BY. PARTITION BY** is to group the data and **ORDER BY** is to sort the data to assign rank.


In [1]:
val orderItems = spark.
  read.
  json("/public/retail_db_json/order_items")

orderItems.createTempView("order_items")

val orderItemsWithRevenue = spark.
  sql(s"""SELECT oi.*, 
            round(sum(oi.order_item_subtotal) OVER (PARTITION BY oi.order_item_order_id), 2) AS order_revenue
          FROM order_items oi""")

orderItemsWithRevenue.printSchema
orderItemsWithRevenue.show

root
 |-- order_item_id: long (nullable = true)
 |-- order_item_order_id: long (nullable = true)
 |-- order_item_product_id: long (nullable = true)
 |-- order_item_product_price: double (nullable = true)
 |-- order_item_quantity: long (nullable = true)
 |-- order_item_subtotal: double (nullable = true)
 |-- order_revenue: double (nullable = true)

+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+-------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|order_revenue|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+-------------+
|           83|                 29|                 1073|                  199.99|                  1|             199.99|      1109.85|
|           84|                 29|                 1014|                   49.98|                  5|

orderItems = [order_item_id: bigint, order_item_order_id: bigint ... 4 more fields]
orderItemsWithRevenue = [order_item_id: bigint, order_item_order_id: bigint ... 5 more fields]


[order_item_id: bigint, order_item_order_id: bigint ... 5 more fields]

### Problem Statement – Get top n products per day

Let us define the problem statement and see the real usage of the analytics function.

* Problem Statement – Get top N Products Per day
* Get daily product revenue code from the previous topic
* Use ranking functions and get the rank associated based on revenue for each day
* Once we get rank, let us filter for top n products.
* We have already seen how to use join, group by, order by etc to get **Daily Product Revenue**

### Performing aggregations

Let us see how to perform aggregations within each group.

* We have functions such as sum, avg, min, max etc which can be used to aggregate the data.
* We need to invoke **PARTITION BY** to get aggregations within each group.

* Some realistic use cases
    * Get the average salary for each department and get all employee details who earn more than the average salary
    * Get average revenue for each day and get all the orders who earn revenue more than average revenue
    * Get the highest order revenue and get all the orders which have revenue more than 75% of the revenue
    

In [4]:
// val employeesPath = "/Users/itversity/Research/data/hr_db/employees/part-00000"
val employeesPath = "/public/hr_db/employees"

val employeesRaw = spark.
  read.
  text(employeesPath).
  as[String]

val employees = employeesRaw.map(rec => {
  val r = rec.split("\t")
  (r(0).toInt, r(1), r(2), r(3),
   r(4), r(5), r(6), r(7).toFloat,
   r(8), r(9), r(10)
  )
}).toDF("employee_id", "first_name", "last_name", "email",
        "phone_number", "hire_date", "job_id", "salary",
        "commission_pct", "manager_id", "department_id")

spark.conf.set("spark.sql.shuffle.partitions", "2")

employees.createTempView("employees")

employeesPath = /public/hr_db/employees
employeesRaw = [value: string]
employees = [employee_id: int, first_name: string ... 9 more fields]


lastException: Throwable = null


[employee_id: int, first_name: string ... 9 more fields]

In [5]:
val employeesLead = spark.
  sql(s"""SELECT employee_id, salary, department_id,
            lead(salary, 1) OVER (PARTITION BY department_id ORDER BY salary DESC) lead_salary
          FROM employees
          ORDER BY department_id, salary DESC""")

employeesLead.show(200)

+-----------+-------+-------------+-----------+
|employee_id| salary|department_id|lead_salary|
+-----------+-------+-------------+-----------+
|        200| 4400.0|           10|       null|
|        108|12000.0|          100|     9000.0|
|        109| 9000.0|          100|     8200.0|
|        110| 8200.0|          100|     7800.0|
|        112| 7800.0|          100|     7700.0|
|        111| 7700.0|          100|     6900.0|
|        113| 6900.0|          100|       null|
|        205|12000.0|          110|     8300.0|
|        206| 8300.0|          110|       null|
|        201|13000.0|           20|     6000.0|
|        202| 6000.0|           20|       null|
|        114|11000.0|           30|     3100.0|
|        115| 3100.0|           30|     2900.0|
|        116| 2900.0|           30|     2800.0|
|        117| 2800.0|           30|     2600.0|
|        118| 2600.0|           30|     2500.0|
|        119| 2500.0|           30|       null|
|        203| 6500.0|           40|     

employeesLead = [employee_id: int, salary: float ... 2 more fields]


[employee_id: int, salary: float ... 2 more fields]

### Using Windowing Functions

Let us see details about windowing functions within each group 

* We have functions such as lead, lag, first, last etc
* We need to create spec for most of the ranking functions by specifying grouping column under **PARTITION BY** clause and then sorting column under **ORDER BY** clause.
* lead and lag take the column names or expressions using which we need to get required information.
* Some realistic use cases
    * The salary difference between current and next/previous employee within each department

In [1]:
val employeesLead = spark.
  sql(s"""SELECT employee_id, salary, department_id,
            lead(salary, 1) OVER (PARTITION BY department_id ORDER BY salary DESC) lead_salary
          FROM employees
          ORDER BY department_id, salary DESC""")

employeesLead.show(200)

+-----------+--------+-------------+-----------+
|employee_id|  salary|department_id|lead_salary|
+-----------+--------+-------------+-----------+
|        178| 7000.00|         null|    7000.00|
|        178| 7000.00|         null|       null|
|        200| 4400.00|           10|    4400.00|
|        200| 4400.00|           10|       null|
|        201|13000.00|           20|    6000.00|
|        201|13000.00|           20|   13000.00|
|        202| 6000.00|           20|    6000.00|
|        202| 6000.00|           20|       null|
|        114|11000.00|           30|    3100.00|
|        114|11000.00|           30|   11000.00|
|        115| 3100.00|           30|    3100.00|
|        115| 3100.00|           30|    2900.00|
|        116| 2900.00|           30|    2900.00|
|        116| 2900.00|           30|    2800.00|
|        117| 2800.00|           30|    2600.00|
|        117| 2800.00|           30|    2800.00|
|        118| 2600.00|           30|    2600.00|
|        118| 2600.0

employeesLead = [employee_id: int, salary: decimal(8,2) ... 2 more fields]


[employee_id: int, salary: decimal(8,2) ... 2 more fields]

In [6]:
val employeesLag = spark.
  sql(s"""SELECT employee_id, salary, department_id,
            lag(salary, 1) OVER (PARTITION BY department_id ORDER BY salary DESC) lag_salary
          FROM employees
          ORDER BY department_id, salary DESC""")

employeesLag.show(200)

+-----------+-------+-------------+----------+
|employee_id| salary|department_id|lag_salary|
+-----------+-------+-------------+----------+
|        200| 4400.0|           10|      null|
|        108|12000.0|          100|      null|
|        109| 9000.0|          100|   12000.0|
|        110| 8200.0|          100|    9000.0|
|        112| 7800.0|          100|    8200.0|
|        111| 7700.0|          100|    7800.0|
|        113| 6900.0|          100|    7700.0|
|        205|12000.0|          110|      null|
|        206| 8300.0|          110|   12000.0|
|        201|13000.0|           20|      null|
|        202| 6000.0|           20|   13000.0|
|        114|11000.0|           30|      null|
|        115| 3100.0|           30|   11000.0|
|        116| 2900.0|           30|    3100.0|
|        117| 2800.0|           30|    2900.0|
|        118| 2600.0|           30|    2800.0|
|        119| 2500.0|           30|    2600.0|
|        203| 6500.0|           40|      null|
|        121|

employeesLag = [employee_id: int, salary: float ... 2 more fields]


[employee_id: int, salary: float ... 2 more fields]

In [7]:
val employeesFirst = spark.
  sql(s"""SELECT employee_id, salary, department_id,
            first_value(salary) OVER (PARTITION BY department_id ORDER BY salary DESC) first_salary
          FROM employees
          ORDER BY department_id, salary DESC""")

employeesFirst.show(200)

+-----------+-------+-------------+------------+
|employee_id| salary|department_id|first_salary|
+-----------+-------+-------------+------------+
|        200| 4400.0|           10|      4400.0|
|        108|12000.0|          100|     12000.0|
|        109| 9000.0|          100|     12000.0|
|        110| 8200.0|          100|     12000.0|
|        112| 7800.0|          100|     12000.0|
|        111| 7700.0|          100|     12000.0|
|        113| 6900.0|          100|     12000.0|
|        205|12000.0|          110|     12000.0|
|        206| 8300.0|          110|     12000.0|
|        201|13000.0|           20|     13000.0|
|        202| 6000.0|           20|     13000.0|
|        114|11000.0|           30|     11000.0|
|        115| 3100.0|           30|     11000.0|
|        116| 2900.0|           30|     11000.0|
|        117| 2800.0|           30|     11000.0|
|        118| 2600.0|           30|     11000.0|
|        119| 2500.0|           30|     11000.0|
|        203| 6500.0

employeesFirst = [employee_id: int, salary: float ... 2 more fields]


[employee_id: int, salary: float ... 2 more fields]

In [8]:
val employeesLast = spark.
  sql(s"""SELECT employee_id, salary, department_id,
            last_value(salary) OVER (
              PARTITION BY department_id ORDER BY salary DESC
              ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
            ) last_salary
          FROM employees
          ORDER BY department_id, salary DESC""")

employeesLast.show(200)

+-----------+-------+-------------+-----------+
|employee_id| salary|department_id|last_salary|
+-----------+-------+-------------+-----------+
|        200| 4400.0|           10|     4400.0|
|        108|12000.0|          100|     6900.0|
|        109| 9000.0|          100|     6900.0|
|        110| 8200.0|          100|     6900.0|
|        112| 7800.0|          100|     6900.0|
|        111| 7700.0|          100|     6900.0|
|        113| 6900.0|          100|     6900.0|
|        205|12000.0|          110|     8300.0|
|        206| 8300.0|          110|     8300.0|
|        201|13000.0|           20|     6000.0|
|        202| 6000.0|           20|     6000.0|
|        114|11000.0|           30|     2500.0|
|        115| 3100.0|           30|     2500.0|
|        116| 2900.0|           30|     2500.0|
|        117| 2800.0|           30|     2500.0|
|        118| 2600.0|           30|     2500.0|
|        119| 2500.0|           30|     2500.0|
|        203| 6500.0|           40|     

employeesLast = [employee_id: int, salary: float ... 2 more fields]


[employee_id: int, salary: float ... 2 more fields]

### Ranking Functions

Let us talk about ranking functions within each group.

* We have functions like rank, dense_rank, row_number, etc
* We need to create a spec for most of the ranking functions by specifying grouping column under **PARTITION BY** clause and then sorting column under **ORDER BY** clause.
* Some realistic use cases
    * Assign rank to employees based on salary within each department
    * Assign ranks to products based on revenue each day or month
    


In [2]:
val employeesPath = "/public/hr_db/employees"

val employeesRaw = spark.
  read.
  text(employeesPath).
  as[String]

val employees = employeesRaw.map(rec => {
  val r = rec.split("\t")
  (r(0).toInt, r(1), r(2), r(3),
   r(4), r(5), r(6), r(7).toFloat,
   r(8), r(9), r(10)
  )
}).toDF("employee_id", "first_name", "last_name", "email",
        "phone_number", "hire_date", "job_id", "salary",
        "commission_pct", "manager_id", "department_id")

spark.conf.set("spark.sql.shuffle.partitions", "2")

employeesPath = /public/hr_db/employees
employeesRaw = [value: string]
employees = [employee_id: int, first_name: string ... 9 more fields]


[employee_id: int, first_name: string ... 9 more fields]

In [3]:
val employeesRanked = spark.
  sql(s"""SELECT employee_id, salary, department_id,
            rank() OVER (PARTITION BY department_id ORDER BY salary DESC) AS rank
          FROM employees
          ORDER BY department_id, salary DESC""")

employeesRanked.show(200)

+-----------+--------+-------------+----+
|employee_id|  salary|department_id|rank|
+-----------+--------+-------------+----+
|        178| 7000.00|         null|   1|
|        178| 7000.00|         null|   1|
|        200| 4400.00|           10|   1|
|        200| 4400.00|           10|   1|
|        201|13000.00|           20|   1|
|        201|13000.00|           20|   1|
|        202| 6000.00|           20|   3|
|        202| 6000.00|           20|   3|
|        114|11000.00|           30|   1|
|        114|11000.00|           30|   1|
|        115| 3100.00|           30|   3|
|        115| 3100.00|           30|   3|
|        116| 2900.00|           30|   5|
|        116| 2900.00|           30|   5|
|        117| 2800.00|           30|   7|
|        117| 2800.00|           30|   7|
|        118| 2600.00|           30|   9|
|        118| 2600.00|           30|   9|
|        119| 2500.00|           30|  11|
|        119| 2500.00|           30|  11|
|        203| 6500.00|           4

employeesRanked = [employee_id: int, salary: decimal(8,2) ... 2 more fields]


[employee_id: int, salary: decimal(8,2) ... 2 more fields]

In [4]:
val employeesDenseRanked = spark.
  sql(s"""SELECT employee_id, salary, department_id,
            dense_rank() OVER (PARTITION BY department_id ORDER BY salary DESC) AS rank
          FROM employees
          ORDER BY department_id, salary DESC;""")

employeesDenseRanked.show(200)

Name: org.apache.spark.sql.catalyst.parser.ParseException
Message: 
extraneous input ';' expecting <EOF>(line 4, pos 45)

== SQL ==
SELECT employee_id, salary, department_id,
            dense_rank() OVER (PARTITION BY department_id ORDER BY salary DESC) AS rank
          FROM employees
          ORDER BY department_id, salary DESC;
---------------------------------------------^^^

StackTrace: extraneous input ';' expecting <EOF>(line 4, pos 45)
== SQL ==
SELECT employee_id, salary, department_id,
            dense_rank() OVER (PARTITION BY department_id ORDER BY salary DESC) AS rank
          FROM employees
          ORDER BY department_id, salary DESC;
---------------------------------------------^^^
  at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:239)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:115)
  at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
  at org.apache.spark.sql

In [5]:
val employeesRowNumbered = spark.
  sql(s"""SELECT employee_id, salary, department_id,
            row_number() OVER (PARTITION BY department_id ORDER BY salary DESC) AS rn
          FROM employees
          ORDER BY department_id, salary DESC""")

employeesRowNumbered.show(200)

+-----------+--------+-------------+---+
|employee_id|  salary|department_id| rn|
+-----------+--------+-------------+---+
|        178| 7000.00|         null|  2|
|        178| 7000.00|         null|  1|
|        200| 4400.00|           10|  1|
|        200| 4400.00|           10|  2|
|        201|13000.00|           20|  2|
|        201|13000.00|           20|  1|
|        202| 6000.00|           20|  3|
|        202| 6000.00|           20|  4|
|        114|11000.00|           30|  1|
|        114|11000.00|           30|  2|
|        115| 3100.00|           30|  4|
|        115| 3100.00|           30|  3|
|        116| 2900.00|           30|  5|
|        116| 2900.00|           30|  6|
|        117| 2800.00|           30|  7|
|        117| 2800.00|           30|  8|
|        118| 2600.00|           30| 10|
|        118| 2600.00|           30|  9|
|        119| 2500.00|           30| 11|
|        119| 2500.00|           30| 12|
|        203| 6500.00|           40|  2|
|        203| 65

employeesRowNumbered = [employee_id: int, salary: decimal(8,2) ... 2 more fields]


lastException: Throwable = null


[employee_id: int, salary: decimal(8,2) ... 2 more fields]

### Development Life Cycle

Let us talk about the development lifecycle. 

* Take the GetDailyProductRevenueSQL code which gives us order_date, order_item_product_id, and revenue (dailyProductRevenue)
* Register dailyProductRevenue as temp view daily_product_revenue
* Write a query against daily_product_revenue **PARTITION BY** date and **ORDER BY** revenue in descending order and assign rank using the rank function.
* Make the query nested and assign the name as inner.
* Filter data where rank is less than or equal to topN passed as an argument to the program
* Drop rank field as we do not want to save the data and then sort in ascending order by date and descending order by revenue
* Save the data frame into a file