In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySpark Window Functions").getOrCreate()

simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )

columns = ["Employee Name", "Department", "Salary"]

df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate = False)


root
 |-- Employee Name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: long (nullable = true)

+-------------+----------+------+
|Employee Name|Department|Salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [None]:
# Row Number
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("Department").orderBy("Salary")

df.withColumn("row_number",row_number().over(windowSpec))\
.show(truncate = False)


+-------------+----------+------+----------+
|Employee Name|Department|Salary|row_number|
+-------------+----------+------+----------+
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
|James        |Sales     |3000  |1         |
|James        |Sales     |3000  |2         |
|Robert       |Sales     |4100  |3         |
|Saif         |Sales     |4100  |4         |
|Michael      |Sales     |4600  |5         |
+-------------+----------+------+----------+



In [None]:
# Rank
from pyspark.sql.functions import rank
df.withColumn("rank", rank().over(windowSpec))\
.show()

+-------------+----------+------+----+
|Employee Name|Department|Salary|rank|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|   1|
|        Scott|   Finance|  3300|   2|
|          Jen|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|         Jeff| Marketing|  3000|   2|
|        James|     Sales|  3000|   1|
|        James|     Sales|  3000|   1|
|       Robert|     Sales|  4100|   3|
|         Saif|     Sales|  4100|   3|
|      Michael|     Sales|  4600|   5|
+-------------+----------+------+----+



In [None]:
# Dense Rank
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank", dense_rank().over(windowSpec))\
.show()

+-------------+----------+------+----------+
|Employee Name|Department|Salary|dense_rank|
+-------------+----------+------+----------+
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         1|
|       Robert|     Sales|  4100|         2|
|         Saif|     Sales|  4100|         2|
|      Michael|     Sales|  4600|         3|
+-------------+----------+------+----------+



In [None]:
# Percent Rank
from pyspark.sql.functions import percent_rank
df.withColumn("Percent Rank", percent_rank().over(windowSpec))\
.show()

+-------------+----------+------+------------+
|Employee Name|Department|Salary|Percent Rank|
+-------------+----------+------+------------+
|        Maria|   Finance|  3000|         0.0|
|        Scott|   Finance|  3300|         0.5|
|          Jen|   Finance|  3900|         1.0|
|        Kumar| Marketing|  2000|         0.0|
|         Jeff| Marketing|  3000|         1.0|
|        James|     Sales|  3000|         0.0|
|        James|     Sales|  3000|         0.0|
|       Robert|     Sales|  4100|         0.5|
|         Saif|     Sales|  4100|         0.5|
|      Michael|     Sales|  4600|         1.0|
+-------------+----------+------+------------+



In [None]:
# ntile
from pyspark.sql.functions import ntile
df.withColumn("ntile",ntile(2).over(windowSpec))\
.show()

+-------------+----------+------+-----+
|Employee Name|Department|Salary|ntile|
+-------------+----------+------+-----+
|        Maria|   Finance|  3000|    1|
|        Scott|   Finance|  3300|    1|
|          Jen|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|         Jeff| Marketing|  3000|    2|
|        James|     Sales|  3000|    1|
|        James|     Sales|  3000|    1|
|       Robert|     Sales|  4100|    1|
|         Saif|     Sales|  4100|    2|
|      Michael|     Sales|  4600|    2|
+-------------+----------+------+-----+



In [None]:
# Cume dist - CUmulative distribution
from pyspark.sql.functions import cume_dist
df.withColumn("cume_dist", cume_dist().over(windowSpec))\
.show()

+-------------+----------+------+------------------+
|Employee Name|Department|Salary|         cume_dist|
+-------------+----------+------+------------------+
|        Maria|   Finance|  3000|0.3333333333333333|
|        Scott|   Finance|  3300|0.6666666666666666|
|          Jen|   Finance|  3900|               1.0|
|        Kumar| Marketing|  2000|               0.5|
|         Jeff| Marketing|  3000|               1.0|
|        James|     Sales|  3000|               0.4|
|        James|     Sales|  3000|               0.4|
|       Robert|     Sales|  4100|               0.8|
|         Saif|     Sales|  4100|               0.8|
|      Michael|     Sales|  4600|               1.0|
+-------------+----------+------+------------------+



In [None]:
# Lag
from pyspark.sql.functions import lag
df.withColumn("lag", lag("salary",2).over(windowSpec))\
.show()

+-------------+----------+------+----+
|Employee Name|Department|Salary| lag|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|NULL|
|        Scott|   Finance|  3300|NULL|
|          Jen|   Finance|  3900|3000|
|        Kumar| Marketing|  2000|NULL|
|         Jeff| Marketing|  3000|NULL|
|        James|     Sales|  3000|NULL|
|        James|     Sales|  3000|NULL|
|       Robert|     Sales|  4100|3000|
|         Saif|     Sales|  4100|3000|
|      Michael|     Sales|  4600|4100|
+-------------+----------+------+----+



In [None]:
# Lead
from pyspark.sql.functions import lead
df.withColumn("lead", lead("salary",2).over(windowSpec))\
.show()

+-------------+----------+------+----+
|Employee Name|Department|Salary|lead|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|3900|
|        Scott|   Finance|  3300|NULL|
|          Jen|   Finance|  3900|NULL|
|        Kumar| Marketing|  2000|NULL|
|         Jeff| Marketing|  3000|NULL|
|        James|     Sales|  3000|4100|
|        James|     Sales|  3000|4100|
|       Robert|     Sales|  4100|4600|
|         Saif|     Sales|  4100|NULL|
|      Michael|     Sales|  4600|NULL|
+-------------+----------+------+----+



In [None]:
# WindowsSpecAggregation

windowSpecAgg = Window.partitionBy("Department")
from pyspark.sql.functions import col, avg, sum, min, max, row_number
df.withColumn("Row", row_number().over(windowSpec))\
.withColumn("Avg", avg(col("Salary")).over(windowSpec))\
.withColumn("Sum", sum(col("Salary")).over(windowSpec))\
.withColumn("Min", min(col("Salary")).over(windowSpec))\
.withColumn("Max", max(col("Salary")).over(windowSpec))\
.where(col("Row") == 1).select("Department", "Avg", "Sum", "Min", "Max")\
.show()

+----------+------+----+----+----+
|Department|   Avg| Sum| Min| Max|
+----------+------+----+----+----+
|   Finance|3000.0|3000|3000|3000|
| Marketing|2000.0|2000|2000|2000|
|     Sales|3000.0|6000|3000|3000|
+----------+------+----+----+----+

