In [1]:
import findspark
findspark.init()
import pyspark

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
spark = SparkSession.builder.appName('kppysparkwindowfunc').getOrCreate()

In [8]:
sampleData = (("James", "Sales", 4000),
              ("Michael", "Sales", 4500),
              ("Robert", "Marketing", 4100),
              ("Maria", "Finance", 5600),
              ("James", "Sales", 3000),
              ("Scott", "Finance", 4500))
columns = ["employee_name","department", "salary"]
df = spark.createDataFrame(data = sampleData, schema=columns)
df.printSchema()

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)



In [9]:
df.show(truncate=False)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |4000  |
|Michael      |Sales     |4500  |
|Robert       |Marketing |4100  |
|Maria        |Finance   |5600  |
|James        |Sales     |3000  |
|Scott        |Finance   |4500  |
+-------------+----------+------+



In [10]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("department").orderBy("salary")
df.withColumn("row_number", row_number().over(windowSpec)).show(truncate=False)

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|Scott        |Finance   |4500  |1         |
|Maria        |Finance   |5600  |2         |
|Robert       |Marketing |4100  |1         |
|James        |Sales     |3000  |1         |
|James        |Sales     |4000  |2         |
|Michael      |Sales     |4500  |3         |
+-------------+----------+------+----------+



In [11]:
from pyspark.sql.functions import rank
df.withColumn("rank", rank().over(windowSpec)).show()

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        Scott|   Finance|  4500|   1|
|        Maria|   Finance|  5600|   2|
|       Robert| Marketing|  4100|   1|
|        James|     Sales|  3000|   1|
|        James|     Sales|  4000|   2|
|      Michael|     Sales|  4500|   3|
+-------------+----------+------+----+



In [12]:
from pyspark.sql.functions import dense_rank 
df.withColumn("dense_rank", dense_rank().over(windowSpec)).show()

+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
|        Scott|   Finance|  4500|         1|
|        Maria|   Finance|  5600|         2|
|       Robert| Marketing|  4100|         1|
|        James|     Sales|  3000|         1|
|        James|     Sales|  4000|         2|
|      Michael|     Sales|  4500|         3|
+-------------+----------+------+----------+



In [13]:
from pyspark.sql.functions import percent_rank
df.withColumn("percent_rank", percent_rank().over(windowSpec)).show()

+-------------+----------+------+------------+
|employee_name|department|salary|percent_rank|
+-------------+----------+------+------------+
|        Scott|   Finance|  4500|         0.0|
|        Maria|   Finance|  5600|         1.0|
|       Robert| Marketing|  4100|         0.0|
|        James|     Sales|  3000|         0.0|
|        James|     Sales|  4000|         0.5|
|      Michael|     Sales|  4500|         1.0|
+-------------+----------+------+------------+



In [14]:
# ntile() is a window function which distributes rows into a ordered partition if a pre-defined number of roughly equal groups. 
# it assigns each group a pre-defined number of roughly equal group with a number expression ranging from 1. 
# ntile() assigns a number_expression for every row in a group in which the row belongs. 

from pyspark.sql.functions import ntile
df.withColumn("ntile", ntile(2).over(windowSpec)).show()


+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        Scott|   Finance|  4500|    1|
|        Maria|   Finance|  5600|    2|
|       Robert| Marketing|  4100|    1|
|        James|     Sales|  3000|    1|
|        James|     Sales|  4000|    1|
|      Michael|     Sales|  4500|    2|
+-------------+----------+------+-----+



In [15]:
# the cume_dist() function returns in SQL the cumulative distribution of a value within a group of values. It calculates
# the relative position of a value in a group of values. 

from pyspark.sql.functions import cume_dist
df.withColumn("cume_dist", cume_dist().over(windowSpec)).show()

+-------------+----------+------+------------------+
|employee_name|department|salary|         cume_dist|
+-------------+----------+------+------------------+
|        Scott|   Finance|  4500|               0.5|
|        Maria|   Finance|  5600|               1.0|
|       Robert| Marketing|  4100|               1.0|
|        James|     Sales|  3000|0.3333333333333333|
|        James|     Sales|  4000|0.6666666666666666|
|      Michael|     Sales|  4500|               1.0|
+-------------+----------+------+------------------+



In [16]:
# Aggregate functions in PySpark SQL 

# row_number(), min(), max(), avg(), sum() etc. 

In [23]:
windowSpecAgg = Window.partitionBy("department")
from pyspark.sql.functions import col, avg, sum, min, max, row_number
df.withColumn("row", row_number().over(windowSpec)) \
  .withColumn("avg",avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("sum",avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("min",min(col("salary")).over(windowSpecAgg)) \
  .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
  .where(col("row")==1).select("department","avg","sum","min", "max").show()
   

+----------+------------------+------------------+----+----+
|department|               avg|               sum| min| max|
+----------+------------------+------------------+----+----+
|   Finance|            5050.0|            5050.0|4500|5600|
| Marketing|            4100.0|            4100.0|4100|4100|
|     Sales|3833.3333333333335|3833.3333333333335|3000|4500|
+----------+------------------+------------------+----+----+

