### Window SQL in Spark
We need to understand the window function in PySpark sql. It is also called as statistical operations such as rank, row number etc. It is mostly asked interview question in Big Data Engineering interview. Now, PySpark window function is growing to perform different transformations operations
Mainly there are three types of window function in PySpark
1) Analytical Window Function

2) Rank Window Function

3) Aggregate Window FUnction

To perform window function operation on a group of rows first, we need to first partition i.e. define the group of data rows using window.partition() function,

=> and for row number and rank function we need to additionally order by on partition data using ORDER BY clause.


1) Analytical functions
=>>> An analytic function is a function that returns a result after operating on data or a finite set of rows partitioned by a SELECT clause or in the ORDER BY clause. It returns a result in the same number of rows as the number of input rows. E.g. lead(), lag(), cume_dist().

2) Ranking Function
=>>>The function returns the statistical rank of a given value for each row in a partition or group. The goal of this function is to provide consecutive numbering of the rows in the resultant column, set by the order selected in the Window.partition for each partition specified in the OVER clause. E.g. row_number(), rank(), dense_rank(), etc.

3) Aggregate function
=>>> An aggregate function or aggregation function is a function where the values of multiple rows are grouped to form a single summary value. The definition of the groups of rows on which they operate is done by using the SQL GROUP BY clause. E.g. AVERAGE, SUM, MIN, MAX, etc. 

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql.session import SparkSession

In [6]:
spark = SparkSession.builder.appName('Nth highestSalary').master("local[*]").getOrCreate()

In [7]:
cols = ["id", "salary"]

data= [(1,2000),
      (2,3000),
      (3,4000),
      (4,3000),
      (5,6000),
      (6,2000)]

In [8]:
df = spark.createDataFrame(data,cols)
display(df)

DataFrame[id: bigint, salary: bigint]

In [9]:
df.show()

+---+------+
| id|salary|
+---+------+
|  1|  2000|
|  2|  3000|
|  3|  4000|
|  4|  3000|
|  5|  6000|
|  6|  2000|
+---+------+



### row_number Window Function

In [11]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

In [12]:
windowSpec  = Window.partitionBy("id").orderBy("salary")

In [13]:
df.withColumn("row_number",row_number().over(windowSpec)) \
    .show(truncate=False)

+---+------+----------+
|id |salary|row_number|
+---+------+----------+
|6  |2000  |1         |
|5  |6000  |1         |
|1  |2000  |1         |
|3  |4000  |1         |
|2  |3000  |1         |
|4  |3000  |1         |
+---+------+----------+



### Rank Window Function
rank() window function is used to provide a rank to the result within a window partition

In [14]:
from pyspark.sql.functions import rank
df.withColumn("rank",rank().over(windowSpec)) \
    .show()

+---+------+----+
| id|salary|rank|
+---+------+----+
|  6|  2000|   1|
|  5|  6000|   1|
|  1|  2000|   1|
|  3|  4000|   1|
|  2|  3000|   1|
|  4|  3000|   1|
+---+------+----+



### dense_rank window function
dense_rank() window function is used to get the result with rank of rows within a window partition without any gaps

In [15]:
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
    .show()

+---+------+----------+
| id|salary|dense_rank|
+---+------+----------+
|  6|  2000|         1|
|  5|  6000|         1|
|  1|  2000|         1|
|  3|  4000|         1|
|  2|  3000|         1|
|  4|  3000|         1|
+---+------+----------+



### percent_rank Window Function

In [16]:
from pyspark.sql.functions import percent_rank
df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
    .show()

+---+------+------------+
| id|salary|percent_rank|
+---+------+------------+
|  6|  2000|         0.0|
|  5|  6000|         0.0|
|  1|  2000|         0.0|
|  3|  4000|         0.0|
|  2|  3000|         0.0|
|  4|  3000|         0.0|
+---+------+------------+



### ntile Window Function
ntile() window function returns the relative rank of result rows within a window partition.

In [17]:
from pyspark.sql.functions import ntile
df.withColumn("ntile",ntile(2).over(windowSpec)) \
    .show()


+---+------+-----+
| id|salary|ntile|
+---+------+-----+
|  6|  2000|    1|
|  5|  6000|    1|
|  1|  2000|    1|
|  3|  4000|    1|
|  2|  3000|    1|
|  4|  3000|    1|
+---+------+-----+



### Aggregate function
An aggregate function or aggregation function is a function where the values of multiple rows are grouped to form a single summary value. 
The definition of the groups of rows on which they operate is done by using the SQL GROUP BY clause. E.g. AVERAGE, SUM, MIN, MAX, etc.

In [18]:
### PySpark Window Aggregate Functions
data1 = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )
 
columns= ["emp_name", "dep", "salary"]
df1 = spark.createDataFrame(data =data1, schema = columns)
df1.printSchema()
df1.show(truncate=False)

root
 |-- emp_name: string (nullable = true)
 |-- dep: string (nullable = true)
 |-- salary: long (nullable = true)

+--------+---------+------+
|emp_name|dep      |salary|
+--------+---------+------+
|James   |Sales    |3000  |
|Michael |Sales    |4600  |
|Robert  |Sales    |4100  |
|Maria   |Finance  |3000  |
|James   |Sales    |3000  |
|Scott   |Finance  |3300  |
|Jen     |Finance  |3900  |
|Jeff    |Marketing|3000  |
|Kumar   |Marketing|2000  |
|Saif    |Sales    |4100  |
+--------+---------+------+



In [25]:
windowSpecAgg  = Window.partitionBy("dep")
from pyspark.sql.functions import col,avg,sum,min,max,row_number 

In [27]:
df1.withColumn("avg", avg(col("salary")).over(windowSpecAgg)).show()

+--------+---------+------+------+
|emp_name|      dep|salary|   avg|
+--------+---------+------+------+
|   James|    Sales|  3000|3760.0|
| Michael|    Sales|  4600|3760.0|
|  Robert|    Sales|  4100|3760.0|
|   James|    Sales|  3000|3760.0|
|    Saif|    Sales|  4100|3760.0|
|   Maria|  Finance|  3000|3400.0|
|   Scott|  Finance|  3300|3400.0|
|     Jen|  Finance|  3900|3400.0|
|    Jeff|Marketing|  3000|2500.0|
|   Kumar|Marketing|  2000|2500.0|
+--------+---------+------+------+



In [28]:
df1.withColumn("sum", sum(col("salary")).over(windowSpecAgg)).show()

+--------+---------+------+-----+
|emp_name|      dep|salary|  sum|
+--------+---------+------+-----+
|   James|    Sales|  3000|18800|
| Michael|    Sales|  4600|18800|
|  Robert|    Sales|  4100|18800|
|   James|    Sales|  3000|18800|
|    Saif|    Sales|  4100|18800|
|   Maria|  Finance|  3000|10200|
|   Scott|  Finance|  3300|10200|
|     Jen|  Finance|  3900|10200|
|    Jeff|Marketing|  3000| 5000|
|   Kumar|Marketing|  2000| 5000|
+--------+---------+------+-----+



In [29]:
df1.withColumn("min", min(col("salary")).over(windowSpecAgg)).show()

+--------+---------+------+----+
|emp_name|      dep|salary| min|
+--------+---------+------+----+
|   James|    Sales|  3000|3000|
| Michael|    Sales|  4600|3000|
|  Robert|    Sales|  4100|3000|
|   James|    Sales|  3000|3000|
|    Saif|    Sales|  4100|3000|
|   Maria|  Finance|  3000|3000|
|   Scott|  Finance|  3300|3000|
|     Jen|  Finance|  3900|3000|
|    Jeff|Marketing|  3000|2000|
|   Kumar|Marketing|  2000|2000|
+--------+---------+------+----+



In [30]:
df1.withColumn("max", max(col("salary")).over(windowSpecAgg)).show()

+--------+---------+------+----+
|emp_name|      dep|salary| max|
+--------+---------+------+----+
|   James|    Sales|  3000|4600|
| Michael|    Sales|  4600|4600|
|  Robert|    Sales|  4100|4600|
|   James|    Sales|  3000|4600|
|    Saif|    Sales|  4100|4600|
|   Maria|  Finance|  3000|3900|
|   Scott|  Finance|  3300|3900|
|     Jen|  Finance|  3900|3900|
|    Jeff|Marketing|  3000|3000|
|   Kumar|Marketing|  2000|3000|
+--------+---------+------+----+



### cume_dist
cume_dist() window function is used to get the cumulative distribution within a window partition.

In [34]:
from pyspark.sql.functions import cume_dist
df.withColumn("cume_dist",cume_dist().over(windowSpec)) \
   .show()


+---+------+---------+
| id|salary|cume_dist|
+---+------+---------+
|  6|  2000|      1.0|
|  5|  6000|      1.0|
|  1|  2000|      1.0|
|  3|  4000|      1.0|
|  2|  3000|      1.0|
|  4|  3000|      1.0|
+---+------+---------+



### Using lag()
A lag() function is used to access previous rows’ data as per the defined offset value in the function.
This function is similar to the LAG in SQL.

In [35]:
from pyspark.sql.functions import lag    
df.withColumn("lag",lag("salary",2).over(windowSpec)) \
      .show()

+---+------+----+
| id|salary| lag|
+---+------+----+
|  6|  2000|null|
|  5|  6000|null|
|  1|  2000|null|
|  3|  4000|null|
|  2|  3000|null|
|  4|  3000|null|
+---+------+----+



###  Using lead()
A lead() function is used to access next rows data as per the defined offset value in the function.
This function is similar to the LEAD in SQL and just opposite to lag() function or LAG in SQL

In [36]:
from pyspark.sql.functions import lead    
df.withColumn("lead",lead("salary",2).over(windowSpec)) \
    .show()

+---+------+----+
| id|salary|lead|
+---+------+----+
|  6|  2000|null|
|  5|  6000|null|
|  1|  2000|null|
|  3|  4000|null|
|  2|  3000|null|
|  4|  3000|null|
+---+------+----+

