<a href="https://colab.research.google.com/github/deepavasanthkumar/deepcodesnippets/blob/master/Spark_Window_Functions_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 50.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=ebc56aff744af91c761228e73175d67382801137289cf3adf8c4b6fb4c1d730f
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
import pyspark
from pyspark.sql import SparkSession
  
spark = SparkSession.builder.appName("Spark Window Functions ").getOrCreate()
spark

In [11]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data1 = [("James","","Smith","36636","M", 1000, "Sales", 2020),
    ("Michael","Rose","","40288","M", 2000, "Operations",2020),
    ("Robert","","Williams","42114","M", 3000, "Sales",2020),
    ("Maria","Anne","Jones","39192","F", 4000, "Operations",2020),
    ("James","","Smith","36636","M", 1050, "Sales", 2021),
    ("Michael","Rose","","40288","M", 1950, "Operations",2021),
    ("Robert","","Williams","42114","M", 3100, "Sales",2021),
    ("Maria","Anne","Jones","39192","F", 4200, "Operations",2021),
     ("James","","Smith","36636","M", 1050, "Sales", 2017),
    ("Michael","Rose","","40288","M", 1950, "Operations",2017),
    ("Robert","","Williams","42114","M", 3100, "Sales",2017),
    ("Maria","Anne","Jones","39192","F", 4200, "Operations",2017),
    ("James","","Smith","36636","M", 990, "Sales", 2019),
    ("Michael","Rose","","40288","M", 2200, "Operations",2019),
    ("Robert","","Williams","42114","M", 2900, "Sales",2019),
    ("Maria","Anne","Jones","39192","F", 4500, "Operations",2019)

 
  
  ]

schema1 = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True),
    StructField("annualsalary", IntegerType(), True),
    StructField("work", StringType(), True),
    StructField("year", IntegerType(), True),
   
  ])
 
df1 = spark.createDataFrame(data=data1,schema=schema1)
df1.printSchema()
df1.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- annualsalary: integer (nullable = true)
 |-- work: string (nullable = true)
 |-- year: integer (nullable = true)

+---------+----------+--------+-----+------+------------+----------+----+
|firstname|middlename|lastname|id   |gender|annualsalary|work      |year|
+---------+----------+--------+-----+------+------------+----------+----+
|James    |          |Smith   |36636|M     |1000        |Sales     |2020|
|Michael  |Rose      |        |40288|M     |2000        |Operations|2020|
|Robert   |          |Williams|42114|M     |3000        |Sales     |2020|
|Maria    |Anne      |Jones   |39192|F     |4000        |Operations|2020|
|James    |          |Smith   |36636|M     |1050        |Sales     |2021|
|Michael  |Rose      |        |40288|M     |1950        |Operations|2021|
|Robert   |    

In [13]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec  = Window.partitionBy("work").orderBy("annualsalary")

# Rank Function 

rank() window function is used to provide a rank to the result within a window partition. This function leaves gaps in rank when there are ties.

The difference between **rank** and **dense_rank** is that denseRank leaves no gaps in ranking sequence when there are ties. That is, if you were ranking a competition using dense_rank and had three people tie for second place, you would say that all three were in second place and that the next person came in third. Rank would give me sequential numbers, making the person that came in third place (after the ties) would register as coming in fifth.

In [14]:
from pyspark.sql.functions import rank
df1.withColumn("rank",rank().over(windowSpec)).show()

+---------+----------+--------+-----+------+------------+----------+----+----+
|firstname|middlename|lastname|   id|gender|annualsalary|      work|year|rank|
+---------+----------+--------+-----+------+------------+----------+----+----+
|  Michael|      Rose|        |40288|     M|        1950|Operations|2021|   1|
|  Michael|      Rose|        |40288|     M|        1950|Operations|2017|   1|
|  Michael|      Rose|        |40288|     M|        2000|Operations|2020|   3|
|  Michael|      Rose|        |40288|     M|        2200|Operations|2019|   4|
|    Maria|      Anne|   Jones|39192|     F|        4000|Operations|2020|   5|
|    Maria|      Anne|   Jones|39192|     F|        4200|Operations|2021|   6|
|    Maria|      Anne|   Jones|39192|     F|        4200|Operations|2017|   6|
|    Maria|      Anne|   Jones|39192|     F|        4500|Operations|2019|   8|
|    James|          |   Smith|36636|     M|         990|     Sales|2019|   1|
|    James|          |   Smith|36636|     M|        

In [16]:
from pyspark.sql.functions import dense_rank
df1.withColumn("dense_rank",dense_rank().over(windowSpec)).show()

+---------+----------+--------+-----+------+------------+----------+----+----------+
|firstname|middlename|lastname|   id|gender|annualsalary|      work|year|dense_rank|
+---------+----------+--------+-----+------+------------+----------+----+----------+
|  Michael|      Rose|        |40288|     M|        1950|Operations|2021|         1|
|  Michael|      Rose|        |40288|     M|        1950|Operations|2017|         1|
|  Michael|      Rose|        |40288|     M|        2000|Operations|2020|         2|
|  Michael|      Rose|        |40288|     M|        2200|Operations|2019|         3|
|    Maria|      Anne|   Jones|39192|     F|        4000|Operations|2020|         4|
|    Maria|      Anne|   Jones|39192|     F|        4200|Operations|2021|         5|
|    Maria|      Anne|   Jones|39192|     F|        4200|Operations|2017|         5|
|    Maria|      Anne|   Jones|39192|     F|        4500|Operations|2019|         6|
|    James|          |   Smith|36636|     M|         990|     Sal

# Percent Rank

In [15]:
from pyspark.sql.functions import percent_rank
df1.withColumn("percent_rank",percent_rank().over(windowSpec)) \
    .show()

+---------+----------+--------+-----+------+------------+----------+----+-------------------+
|firstname|middlename|lastname|   id|gender|annualsalary|      work|year|       percent_rank|
+---------+----------+--------+-----+------+------------+----------+----+-------------------+
|  Michael|      Rose|        |40288|     M|        1950|Operations|2021|                0.0|
|  Michael|      Rose|        |40288|     M|        1950|Operations|2017|                0.0|
|  Michael|      Rose|        |40288|     M|        2000|Operations|2020| 0.2857142857142857|
|  Michael|      Rose|        |40288|     M|        2200|Operations|2019|0.42857142857142855|
|    Maria|      Anne|   Jones|39192|     F|        4000|Operations|2020| 0.5714285714285714|
|    Maria|      Anne|   Jones|39192|     F|        4200|Operations|2021| 0.7142857142857143|
|    Maria|      Anne|   Jones|39192|     F|        4200|Operations|2017| 0.7142857142857143|
|    Maria|      Anne|   Jones|39192|     F|        4500|Ope

# row_number Window Function

returns a sequential number starting at 1 within a window partition.


In [12]:


df1.withColumn("row_number",row_number().over(windowSpec)) \
    .show(truncate=False)

+---------+----------+--------+-----+------+------------+----------+----+----------+
|firstname|middlename|lastname|id   |gender|annualsalary|work      |year|row_number|
+---------+----------+--------+-----+------+------------+----------+----+----------+
|Michael  |Rose      |        |40288|M     |1950        |Operations|2021|1         |
|Michael  |Rose      |        |40288|M     |1950        |Operations|2017|2         |
|Michael  |Rose      |        |40288|M     |2000        |Operations|2020|3         |
|Michael  |Rose      |        |40288|M     |2200        |Operations|2019|4         |
|Maria    |Anne      |Jones   |39192|F     |4000        |Operations|2020|5         |
|Maria    |Anne      |Jones   |39192|F     |4200        |Operations|2021|6         |
|Maria    |Anne      |Jones   |39192|F     |4200        |Operations|2017|7         |
|Maria    |Anne      |Jones   |39192|F     |4500        |Operations|2019|8         |
|James    |          |Smith   |36636|M     |990         |Sales   

# Analytical Window Functions - Lag & Lead

**Lag** Window function: returns the value that is `offset` rows before the current row, and 'null` if there is less than `offset` rows before the current row. For example, 
   * an `offset` of one will return the previous row at any given point in the window partition.

In [21]:
from pyspark.sql.functions import lag
windowYear  = Window.partitionBy("id").orderBy("year")
 
df1 = df1.withColumn("PreviousYearSalary", lag("annualSalary", 1).over(windowYear))
df1.select("firstname","year","annualsalary","PreviousYearSalary").show()

+---------+----+------------+------------------+
|firstname|year|annualsalary|PreviousYearSalary|
+---------+----+------------+------------------+
|    James|2017|        1050|              null|
|    James|2019|         990|              1050|
|    James|2020|        1000|               990|
|    James|2021|        1050|              1000|
|    Maria|2017|        4200|              null|
|    Maria|2019|        4500|              4200|
|    Maria|2020|        4000|              4500|
|    Maria|2021|        4200|              4000|
|  Michael|2017|        1950|              null|
|  Michael|2019|        2200|              1950|
|  Michael|2020|        2000|              2200|
|  Michael|2021|        1950|              2000|
|   Robert|2017|        3100|              null|
|   Robert|2019|        2900|              3100|
|   Robert|2020|        3000|              2900|
|   Robert|2021|        3100|              3000|
+---------+----+------------+------------------+



   **Lead ** Window function: returns the value that is `offset` rows after the current row, and `null` if there is less than `offset` rows after the current row. For example,  an `offset` of one will return the next row at any given point in the window partition.

In [22]:
from pyspark.sql.functions import lead
windowYear  = Window.partitionBy("id").orderBy("year")
 
df1 = df1.withColumn("NextYearSalary", lead("annualSalary", 1).over(windowYear))
df1.select("firstname","year","PreviousYearSalary","annualsalary","NextYearSalary").show()

+---------+----+------------------+------------+--------------+
|firstname|year|PreviousYearSalary|annualsalary|NextYearSalary|
+---------+----+------------------+------------+--------------+
|    James|2017|              null|        1050|           990|
|    James|2019|              1050|         990|          1000|
|    James|2020|               990|        1000|          1050|
|    James|2021|              1000|        1050|          null|
|    Maria|2017|              null|        4200|          4500|
|    Maria|2019|              4200|        4500|          4000|
|    Maria|2020|              4500|        4000|          4200|
|    Maria|2021|              4000|        4200|          null|
|  Michael|2017|              null|        1950|          2200|
|  Michael|2019|              1950|        2200|          2000|
|  Michael|2020|              2200|        2000|          1950|
|  Michael|2021|              2000|        1950|          null|
|   Robert|2017|              null|     

**Lag and Lead with negative offsets**



In [24]:
from pyspark.sql.functions import lag, lead
windowYear  = Window.partitionBy("id").orderBy("year")
 
df1 = df1.withColumn("salary2", lead("annualSalary", -1).over(windowYear)) 
df1 = df1.withColumn("salary1", lag("annualSalary", -1).over(windowYear))
df1.select("firstname","year","annualsalary","salary2", "PreviousYearSalary","salary1", "NextYearSalary").show()

+---------+----+------------+-------+------------------+-------+--------------+
|firstname|year|annualsalary|salary2|PreviousYearSalary|salary1|NextYearSalary|
+---------+----+------------+-------+------------------+-------+--------------+
|    James|2017|        1050|   null|              null|    990|           990|
|    James|2019|         990|   1050|              1050|   1000|          1000|
|    James|2020|        1000|    990|               990|   1050|          1050|
|    James|2021|        1050|   1000|              1000|   null|          null|
|    Maria|2017|        4200|   null|              null|   4500|          4500|
|    Maria|2019|        4500|   4200|              4200|   4000|          4000|
|    Maria|2020|        4000|   4500|              4500|   4200|          4200|
|    Maria|2021|        4200|   4000|              4000|   null|          null|
|  Michael|2017|        1950|   null|              null|   2200|          2200|
|  Michael|2019|        2200|   1950|   

Source Code for reference :
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala