[PySpark Window Functions](https://sparkbyexamples.com/pyspark/pyspark-window-functions/)

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local[5]") \
        .appName("Window Functions") \
        .getOrCreate()

spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/16 21:34:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )
 
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)



                                                                                

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [27]:
df.createOrReplaceTempView("DF")

In [3]:
# Defining the window
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("department").orderBy("salary")

## Ranking Functions

In [30]:
from pyspark.sql import functions
# row number() - return a number from 1 within the partition

df.withColumn("row number", functions.row_number().over(windowSpec)) \
.show(truncate=False)

spark.sql('''
        SELECT employee_name, department, 
          salary, ROW_NUMBER() OVER(PARTITION BY department ORDER BY salary) AS ROW_NUM
        FROM DF
        ''').show()

+-------------+----------+------+----------+
|employee_name|department|salary|row number|
+-------------+----------+------+----------+
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
|James        |Sales     |3000  |1         |
|James        |Sales     |3000  |2         |
|Robert       |Sales     |4100  |3         |
|Saif         |Sales     |4100  |4         |
|Michael      |Sales     |4600  |5         |
+-------------+----------+------+----------+

+-------------+----------+------+-------+
|employee_name|department|salary|ROW_NUM|
+-------------+----------+------+-------+
|        Maria|   Finance|  3000|      1|
|        Scott|   Finance|  3300|      2|
|          Jen|   Finance|  3900|      3|
|        Kumar| Marketing|  2000|      1|
|         Jeff| Marketing|  3000|      2|
|        James|     Sales|  3000|

In [32]:
# rank() - ranks the result within a partition
df.withColumn("rank", functions.rank().over(windowSpec)) \
    .show()

spark.sql('''
        SELECT employee_name, department, 
          salary, RANK() OVER(PARTITION BY department ORDER BY salary) AS Rank
        FROM DF
        ''').show()

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|   1|
|        Scott|   Finance|  3300|   2|
|          Jen|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|         Jeff| Marketing|  3000|   2|
|        James|     Sales|  3000|   1|
|        James|     Sales|  3000|   1|
|       Robert|     Sales|  4100|   3|
|         Saif|     Sales|  4100|   3|
|      Michael|     Sales|  4600|   5|
+-------------+----------+------+----+

+-------------+----------+------+----+
|employee_name|department|salary|Rank|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|   1|
|        Scott|   Finance|  3300|   2|
|          Jen|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|         Jeff| Marketing|  3000|   2|
|        James|     Sales|  3000|   1|
|        James|     Sales|  3000|   1|
|       Robert|     Sales|  4100|   3|
|         Saif|     Sale

In [33]:
# dense_rank() - ranks without gaps
df.withColumn("dense rank", functions.dense_rank().over(windowSpec)) \
    .show()

spark.sql('''
        SELECT employee_name, department, 
          salary, DENSE_RANK() OVER(PARTITION BY department ORDER BY salary) AS Dense_Rank
        FROM DF
        ''').show()

+-------------+----------+------+----------+
|employee_name|department|salary|dense rank|
+-------------+----------+------+----------+
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         1|
|       Robert|     Sales|  4100|         2|
|         Saif|     Sales|  4100|         2|
|      Michael|     Sales|  4600|         3|
+-------------+----------+------+----------+

+-------------+----------+------+----------+
|employee_name|department|salary|Dense_Rank|
+-------------+----------+------+----------+
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
|        

In [34]:
# percent_rank() - returns the relative rank over a partition
df.withColumn("percent rank", functions.percent_rank().over(windowSpec)) \
    .show()

spark.sql('''
        SELECT employee_name, department, 
          salary, PERCENT_RANK() OVER(PARTITION BY department ORDER BY salary) AS Percent_Rank
        FROM DF
        ''').show()

+-------------+----------+------+------------+
|employee_name|department|salary|percent rank|
+-------------+----------+------+------------+
|        Maria|   Finance|  3000|         0.0|
|        Scott|   Finance|  3300|         0.5|
|          Jen|   Finance|  3900|         1.0|
|        Kumar| Marketing|  2000|         0.0|
|         Jeff| Marketing|  3000|         1.0|
|        James|     Sales|  3000|         0.0|
|        James|     Sales|  3000|         0.0|
|       Robert|     Sales|  4100|         0.5|
|         Saif|     Sales|  4100|         0.5|
|      Michael|     Sales|  4600|         1.0|
+-------------+----------+------+------------+

+-------------+----------+------+------------+
|employee_name|department|salary|Percent_Rank|
+-------------+----------+------+------------+
|        Maria|   Finance|  3000|         0.0|
|        Scott|   Finance|  3300|         0.5|
|          Jen|   Finance|  3900|         1.0|
|        Kumar| Marketing|  2000|         0.0|
|         Je

In [35]:
# ntile(2) - first 50 percentile data get 1 next 50 percentile data get 2
df.withColumn("Ntile", functions.ntile(2).over(windowSpec)) \
    .show()

spark.sql('''
        SELECT employee_name, department, 
          salary, NTILE() OVER(PARTITION BY department ORDER BY salary) AS Ntile
        FROM DF
        ''').show()

+-------------+----------+------+-----+
|employee_name|department|salary|Ntile|
+-------------+----------+------+-----+
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
+-------------+----------+------+-----+

+-------------+----------+------+-----+
|employee_name|department|salary|Ntile|
+-------------+----------+------+-----+
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    1|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    1|
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|

## Analytical Functions
- cume_dist()
- lag()
- lead()

In [36]:
# cume_dist() - get the cumulative distribution of values within a partition
df.withColumn("cumulative Distribution", functions.cume_dist() \
              .over(windowSpec)).show()
# like dense_rank()

spark.sql('''
        SELECT employee_name, department, 
          salary, CUME_DIST() OVER(PARTITION BY department ORDER BY salary) AS Cum_dist
        FROM DF
        ''').show()

+-------------+----------+------+-----------------------+
|employee_name|department|salary|cumulative Distribution|
+-------------+----------+------+-----------------------+
|        Maria|   Finance|  3000|     0.3333333333333333|
|        Scott|   Finance|  3300|     0.6666666666666666|
|          Jen|   Finance|  3900|                    1.0|
|        Kumar| Marketing|  2000|                    0.5|
|         Jeff| Marketing|  3000|                    1.0|
|        James|     Sales|  3000|                    0.4|
|        James|     Sales|  3000|                    0.4|
|       Robert|     Sales|  4100|                    0.8|
|         Saif|     Sales|  4100|                    0.8|
|      Michael|     Sales|  4600|                    1.0|
+-------------+----------+------+-----------------------+

+-------------+----------+------+------------------+
|employee_name|department|salary|          Cum_dist|
+-------------+----------+------+------------------+
|        Maria|   Finance|  

In [37]:
# lag()
df.withColumn("Lag", functions.lag("salary", 2).over(windowSpec)).show()

spark.sql('''
        SELECT employee_name, department, 
          salary, LAG(salary, 2) OVER(PARTITION BY department ORDER BY salary) AS Lag
        FROM DF
        ''').show()

+-------------+----------+------+----+
|employee_name|department|salary| Lag|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|NULL|
|        Scott|   Finance|  3300|NULL|
|          Jen|   Finance|  3900|3000|
|        Kumar| Marketing|  2000|NULL|
|         Jeff| Marketing|  3000|NULL|
|        James|     Sales|  3000|NULL|
|        James|     Sales|  3000|NULL|
|       Robert|     Sales|  4100|3000|
|         Saif|     Sales|  4100|3000|
|      Michael|     Sales|  4600|4100|
+-------------+----------+------+----+

+-------------+----------+------+----+
|employee_name|department|salary| Lag|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|NULL|
|        Scott|   Finance|  3300|NULL|
|          Jen|   Finance|  3900|3000|
|        Kumar| Marketing|  2000|NULL|
|         Jeff| Marketing|  3000|NULL|
|        James|     Sales|  3000|NULL|
|        James|     Sales|  3000|NULL|
|       Robert|     Sales|  4100|3000|
|         Saif|     Sale

In [38]:
# lead()
df.withColumn("Lead", functions.lead("salary", 2).over(windowSpec)).show()

spark.sql('''
        SELECT employee_name, department, 
          salary, LEAD(salary, 2) OVER(PARTITION BY department ORDER BY salary) AS Lead
        FROM DF
        ''').show()

+-------------+----------+------+----+
|employee_name|department|salary|Lead|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|3900|
|        Scott|   Finance|  3300|NULL|
|          Jen|   Finance|  3900|NULL|
|        Kumar| Marketing|  2000|NULL|
|         Jeff| Marketing|  3000|NULL|
|        James|     Sales|  3000|4100|
|        James|     Sales|  3000|4100|
|       Robert|     Sales|  4100|4600|
|         Saif|     Sales|  4100|NULL|
|      Michael|     Sales|  4600|NULL|
+-------------+----------+------+----+

+-------------+----------+------+----+
|employee_name|department|salary|Lead|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|3900|
|        Scott|   Finance|  3300|NULL|
|          Jen|   Finance|  3900|NULL|
|        Kumar| Marketing|  2000|NULL|
|         Jeff| Marketing|  3000|NULL|
|        James|     Sales|  3000|4100|
|        James|     Sales|  3000|4100|
|       Robert|     Sales|  4100|4600|
|         Saif|     Sale

## Aggregate Functions

In [23]:
windowSpecAgg = Window.partitionBy("department")
df.withColumn("row_num", functions.row_number().over(windowSpec)) \
    .withColumn("AVG", functions.avg("salary").over(windowSpecAgg)) \
    .withColumn("SUM", functions.sum("salary").over(windowSpecAgg)) \
    .withColumn("MIN", functions.min("salary").over(windowSpecAgg)) \
    .withColumn("MAX", functions.max("salary").over(windowSpecAgg)) \
    .where("row_num == 1") \
    .select("department", "AVG", "SUM", "MIN", "MAX") \
    .show()

+----------+------+-----+----+----+
|department|   AVG|  SUM| MIN| MAX|
+----------+------+-----+----+----+
|   Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
|     Sales|3760.0|18800|3000|4600|
+----------+------+-----+----+----+



In [57]:
spark.sql('''
        SELECT department, avg, sum, min, max
          FROM (
          SELECT department,
          ROW_NUMBER() OVER(PARTITION BY department ORDER BY salary) as row,
          AVG(salary) OVER(PARTITION BY department) as avg,
          SUM(salary) OVER(PARTITION BY department) as sum,
          MIN(salary) OVER(PARTITION BY department) as min,
          MAX(salary) OVER(PARTITION BY department) as max
          FROM DF) as temp
          WHERE temp.row = 1
        ''').show()

+----------+------+-----+----+----+
|department|   avg|  sum| min| max|
+----------+------+-----+----+----+
|   Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
|     Sales|3760.0|18800|3000|4600|
+----------+------+-----+----+----+



In [58]:
spark.stop()