In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 55.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=8eb9dd6af8666e481941bfdfebb797c25e63a5e43febdb7f81e8549a0b2bf9d9
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('SparkWindowFunction').getOrCreate()

In [4]:
simpleData = (("James", "Sales", 3000), \
    ("Madhav", "Sales", 4600),  \
    ("Rohit", "Sales", 4100),   \
    ("Mitesh", "Finance", 3000),  \
    ("Gursheen", "Sales", 3000),    \
    ("Yash", "Finance", 3300),  \
    ("Ben", "Finance", 3900),    \
    ("Pnadey", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Shah", "Sales", 4100) \
  )

In [5]:
columns= ["employee_name", "department", "salary"]

In [6]:
df = spark.createDataFrame(data = simpleData, schema = columns)

In [7]:
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Madhav       |Sales     |4600  |
|Rohit        |Sales     |4100  |
|Mitesh       |Finance   |3000  |
|Gursheen     |Sales     |3000  |
|Yash         |Finance   |3300  |
|Ben          |Finance   |3900  |
|Pnadey       |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Shah         |Sales     |4100  |
+-------------+----------+------+



In [8]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

In [9]:
windowSpec  = Window.partitionBy("department").orderBy("salary")

In [10]:
df.withColumn("row_number",row_number().over(windowSpec)).show(truncate=False)

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|Mitesh       |Finance   |3000  |1         |
|Yash         |Finance   |3300  |2         |
|Ben          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Pnadey       |Marketing |3000  |2         |
|James        |Sales     |3000  |1         |
|Gursheen     |Sales     |3000  |2         |
|Rohit        |Sales     |4100  |3         |
|Shah         |Sales     |4100  |4         |
|Madhav       |Sales     |4600  |5         |
+-------------+----------+------+----------+



rank() window function is used to provide a rank to the result within a window partition. This function leaves gaps in rank when there are ties.

In [11]:
from pyspark.sql.functions import rank
df.withColumn("rank",rank().over(windowSpec)).show()

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|       Mitesh|   Finance|  3000|   1|
|         Yash|   Finance|  3300|   2|
|          Ben|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|       Pnadey| Marketing|  3000|   2|
|        James|     Sales|  3000|   1|
|     Gursheen|     Sales|  3000|   1|
|        Rohit|     Sales|  4100|   3|
|         Shah|     Sales|  4100|   3|
|       Madhav|     Sales|  4600|   5|
+-------------+----------+------+----+



dense_rank() window function is used to get the result with rank of rows within a window partition without any gaps. This is similar to rank() function difference being rank function leaves gaps in rank when there are ties.

In [12]:
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec)).show()

+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
|       Mitesh|   Finance|  3000|         1|
|         Yash|   Finance|  3300|         2|
|          Ben|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|       Pnadey| Marketing|  3000|         2|
|        James|     Sales|  3000|         1|
|     Gursheen|     Sales|  3000|         1|
|        Rohit|     Sales|  4100|         2|
|         Shah|     Sales|  4100|         2|
|       Madhav|     Sales|  4600|         3|
+-------------+----------+------+----------+



In [13]:
from pyspark.sql.functions import percent_rank
df.withColumn("percent_rank",percent_rank().over(windowSpec)).show()

+-------------+----------+------+------------+
|employee_name|department|salary|percent_rank|
+-------------+----------+------+------------+
|       Mitesh|   Finance|  3000|         0.0|
|         Yash|   Finance|  3300|         0.5|
|          Ben|   Finance|  3900|         1.0|
|        Kumar| Marketing|  2000|         0.0|
|       Pnadey| Marketing|  3000|         1.0|
|        James|     Sales|  3000|         0.0|
|     Gursheen|     Sales|  3000|         0.0|
|        Rohit|     Sales|  4100|         0.5|
|         Shah|     Sales|  4100|         0.5|
|       Madhav|     Sales|  4600|         1.0|
+-------------+----------+------+------------+



ntile() window function returns the relative rank of result rows within a window partition. In below example we have used 2 as an argument to ntile hence it returns ranking between 2 values (1 and 2)

In [14]:
from pyspark.sql.functions import ntile
df.withColumn("ntile",ntile(2).over(windowSpec)).show()

+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|       Mitesh|   Finance|  3000|    1|
|         Yash|   Finance|  3300|    1|
|          Ben|   Finance|  3900|    2|
|        Kumar| Marketing|  2000|    1|
|       Pnadey| Marketing|  3000|    2|
|        James|     Sales|  3000|    1|
|     Gursheen|     Sales|  3000|    1|
|        Rohit|     Sales|  4100|    1|
|         Shah|     Sales|  4100|    2|
|       Madhav|     Sales|  4600|    2|
+-------------+----------+------+-----+



PySpark Window Analytic functions

cume_dist() window function is used to get the cumulative distribution of values within a window partition.

In [15]:
from pyspark.sql.functions import cume_dist    
df.withColumn("cume_dist",cume_dist().over(windowSpec)).show()

+-------------+----------+------+------------------+
|employee_name|department|salary|         cume_dist|
+-------------+----------+------+------------------+
|       Mitesh|   Finance|  3000|0.3333333333333333|
|         Yash|   Finance|  3300|0.6666666666666666|
|          Ben|   Finance|  3900|               1.0|
|        Kumar| Marketing|  2000|               0.5|
|       Pnadey| Marketing|  3000|               1.0|
|        James|     Sales|  3000|               0.4|
|     Gursheen|     Sales|  3000|               0.4|
|        Rohit|     Sales|  4100|               0.8|
|         Shah|     Sales|  4100|               0.8|
|       Madhav|     Sales|  4600|               1.0|
+-------------+----------+------+------------------+



In [18]:
windowSpecAgg  = Window.partitionBy("department")


from pyspark.sql.functions import col,avg,sum,min,max,row_number 
df.withColumn("row",row_number().over(windowSpec)) \
  .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
  .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
  .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
  .where(col("row")==1).select("department","avg","sum","min","max").show()

+----------+------+-----+----+----+
|department|   avg|  sum| min| max|
+----------+------+-----+----+----+
|   Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
|     Sales|3760.0|18800|3000|4600|
+----------+------+-----+----+----+

