# Window Functions

In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, sum, avg

In [3]:
spark = SparkSession.builder.appName('Window Functions').getOrCreate()

25/01/08 16:48:02 WARN Utils: Your hostname, Ks-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.9 instead (on interface en0)
25/01/08 16:48:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/08 16:48:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/01/08 16:48:03 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/01/08 16:48:03 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/01/08 16:48:03 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [4]:
data = [("Alice", "Sales", 3000),
        ("Bob", "Sales", 4000),
        ("Alice", "HR", 2000),
        ("Bob", "HR", 2500),
        ("Charlie", "Sales", 3000),
        ("Charlie", "HR", 2200)]
columns = ["Name", "Department", "Salary"]

df = spark.createDataFrame(data, schema=columns)

In [5]:
windowSpec = Window.partitionBy("Department").orderBy("Salary")

# Row Number

In [6]:
from pyspark.sql.functions import row_number
df.withColumn("Row_Number", row_number().over(windowSpec)).show()

                                                                                

+-------+----------+------+----------+
|   Name|Department|Salary|Row_Number|
+-------+----------+------+----------+
|  Alice|        HR|  2000|         1|
|Charlie|        HR|  2200|         2|
|    Bob|        HR|  2500|         3|
|  Alice|     Sales|  3000|         1|
|Charlie|     Sales|  3000|         2|
|    Bob|     Sales|  4000|         3|
+-------+----------+------+----------+



In [7]:
from pyspark.sql.functions import rank # type: ignore
df.withColumn("Rank", rank().over(windowSpec)).show()

+-------+----------+------+----+
|   Name|Department|Salary|Rank|
+-------+----------+------+----+
|  Alice|        HR|  2000|   1|
|Charlie|        HR|  2200|   2|
|    Bob|        HR|  2500|   3|
|  Alice|     Sales|  3000|   1|
|Charlie|     Sales|  3000|   1|
|    Bob|     Sales|  4000|   3|
+-------+----------+------+----+



25/01/08 16:48:18 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [8]:
from pyspark.sql.functions import dense_rank

df.withColumn("Dense_Rank", dense_rank().over(windowSpec)).show()


+-------+----------+------+----------+
|   Name|Department|Salary|Dense_Rank|
+-------+----------+------+----------+
|  Alice|        HR|  2000|         1|
|Charlie|        HR|  2200|         2|
|    Bob|        HR|  2500|         3|
|  Alice|     Sales|  3000|         1|
|Charlie|     Sales|  3000|         1|
|    Bob|     Sales|  4000|         2|
+-------+----------+------+----------+



In [9]:
df.withColumn("Running_Total", sum("Salary").over(windowSpec)).show()


+-------+----------+------+-------------+
|   Name|Department|Salary|Running_Total|
+-------+----------+------+-------------+
|  Alice|        HR|  2000|         2000|
|Charlie|        HR|  2200|         4200|
|    Bob|        HR|  2500|         6700|
|  Alice|     Sales|  3000|         6000|
|Charlie|     Sales|  3000|         6000|
|    Bob|     Sales|  4000|        10000|
+-------+----------+------+-------------+

