In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('pyspark-examples').getOrCreate()

simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )
 
columns= ["employee_name", "department", "salary"]

df = spark.createDataFrame(data = simpleData, schema = columns)

df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [3]:
from pyspark.sql.window import Window # displays the result as added column for each and every record.
from pyspark.sql.functions import *

In [4]:
windowSpec = Window.partitionBy("department").orderBy("Salary") # acts as cte which can be used in over() clause and partitionBy acts as groupby clause

In [5]:
df.withColumn("row_number", row_number().over(windowSpec)).show(truncate = False) # since we haven't assigned the result to df it won't add the row_number column

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
|James        |Sales     |3000  |1         |
|James        |Sales     |3000  |2         |
|Robert       |Sales     |4100  |3         |
|Saif         |Sales     |4100  |4         |
|Michael      |Sales     |4600  |5         |
+-------------+----------+------+----------+



In [6]:
df.show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+



In [7]:
df.withColumn("rank", rank().over(windowSpec)).show(truncate = False) # Here the ranking is based on the salary and the rank starts from 1 for every department.

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|Maria        |Finance   |3000  |1   |
|Scott        |Finance   |3300  |2   |
|Jen          |Finance   |3900  |3   |
|Kumar        |Marketing |2000  |1   |
|Jeff         |Marketing |3000  |2   |
|James        |Sales     |3000  |1   |
|James        |Sales     |3000  |1   |
|Robert       |Sales     |4100  |3   |
|Saif         |Sales     |4100  |3   |
|Michael      |Sales     |4600  |5   |
+-------------+----------+------+----+



In [8]:
df.withColumn("dense rank", dense_rank().over(windowSpec)).show(truncate = False) # Here the difference between the rank and the dense_rank is that there won't be any gaps in dense_rank function

+-------------+----------+------+----------+
|employee_name|department|salary|dense rank|
+-------------+----------+------+----------+
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
|James        |Sales     |3000  |1         |
|James        |Sales     |3000  |1         |
|Robert       |Sales     |4100  |2         |
|Saif         |Sales     |4100  |2         |
|Michael      |Sales     |4600  |3         |
+-------------+----------+------+----------+



In [9]:
df.withColumn("percent rank", percent_rank().over(windowSpec)).show(truncate = False) # percent rank shows the distribution of salary per department
# 0.0 refers the min , 1.0 refers the max value and the intermediate values ranges from 0.0 to 1.0.

+-------------+----------+------+------------+
|employee_name|department|salary|percent rank|
+-------------+----------+------+------------+
|Maria        |Finance   |3000  |0.0         |
|Scott        |Finance   |3300  |0.5         |
|Jen          |Finance   |3900  |1.0         |
|Kumar        |Marketing |2000  |0.0         |
|Jeff         |Marketing |3000  |1.0         |
|James        |Sales     |3000  |0.0         |
|James        |Sales     |3000  |0.0         |
|Robert       |Sales     |4100  |0.5         |
|Saif         |Sales     |4100  |0.5         |
|Michael      |Sales     |4600  |1.0         |
+-------------+----------+------+------------+



In [10]:
df.withColumn("ntile", ntile(2).over(windowSpec)).show(truncate = False) # for every partition it distributes equally, the salary into 2 groups 

+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|Maria        |Finance   |3000  |1    |
|Scott        |Finance   |3300  |1    |
|Jen          |Finance   |3900  |2    |
|Kumar        |Marketing |2000  |1    |
|Jeff         |Marketing |3000  |2    |
|James        |Sales     |3000  |1    |
|James        |Sales     |3000  |1    |
|Robert       |Sales     |4100  |1    |
|Saif         |Sales     |4100  |2    |
|Michael      |Sales     |4600  |2    |
+-------------+----------+------+-----+



In [11]:
df.withColumn("cume_dist", cume_dist().over(windowSpec)).show(truncate = False) # provides the distribution same as percent_rank for each partitions

+-------------+----------+------+------------------+
|employee_name|department|salary|cume_dist         |
+-------------+----------+------+------------------+
|Maria        |Finance   |3000  |0.3333333333333333|
|Scott        |Finance   |3300  |0.6666666666666666|
|Jen          |Finance   |3900  |1.0               |
|Kumar        |Marketing |2000  |0.5               |
|Jeff         |Marketing |3000  |1.0               |
|James        |Sales     |3000  |0.4               |
|James        |Sales     |3000  |0.4               |
|Robert       |Sales     |4100  |0.8               |
|Saif         |Sales     |4100  |0.8               |
|Michael      |Sales     |4600  |1.0               |
+-------------+----------+------+------------------+



In [12]:
df.withColumn("lag", lag('salary',2).over(windowSpec)).show(truncate = False) # lags provides the preceding value for ecah partitions

+-------------+----------+------+----+
|employee_name|department|salary|lag |
+-------------+----------+------+----+
|Maria        |Finance   |3000  |null|
|Scott        |Finance   |3300  |null|
|Jen          |Finance   |3900  |3000|
|Kumar        |Marketing |2000  |null|
|Jeff         |Marketing |3000  |null|
|James        |Sales     |3000  |null|
|James        |Sales     |3000  |null|
|Robert       |Sales     |4100  |3000|
|Saif         |Sales     |4100  |3000|
|Michael      |Sales     |4600  |4100|
+-------------+----------+------+----+



In [13]:
df.withColumn("lead", lead('salary',2).over(windowSpec)).show(truncate = False) # leads provides the succeding value for ecah partitions

+-------------+----------+------+----+
|employee_name|department|salary|lead|
+-------------+----------+------+----+
|Maria        |Finance   |3000  |3900|
|Scott        |Finance   |3300  |null|
|Jen          |Finance   |3900  |null|
|Kumar        |Marketing |2000  |null|
|Jeff         |Marketing |3000  |null|
|James        |Sales     |3000  |4100|
|James        |Sales     |3000  |4100|
|Robert       |Sales     |4100  |4600|
|Saif         |Sales     |4100  |null|
|Michael      |Sales     |4600  |null|
+-------------+----------+------+----+



In [14]:
windowSpecAgg  = Window.partitionBy("department") # new windowSpec

# Aggregate functions:

df.withColumn("row",row_number().over(windowSpec)) \
  .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
  .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
  .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
  .where(col("row")==1).select("department","avg","sum","min","max") \ # here for ecah dept the top 1 row is extracted
  .show()

+----------+------+-----+----+----+
|department|   avg|  sum| min| max|
+----------+------+-----+----+----+
|   Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
|     Sales|3760.0|18800|3000|4600|
+----------+------+-----+----+----+

