In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import row_number
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

In [0]:
simpleData = (("James", "Sales", 3000), 
    ("Michael", "Sales", 4600),  
    ("Robert", "Sales", 4100),   
    ("Maria", "Finance", 3000),  
    ("James", "Sales", 3000),    
    ("Scott", "Finance", 3300),  
    ("Jen", "Finance", 3900),    
    ("Jeff", "Marketing", 3000), 
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100) 
  )
 
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank

In [0]:
windowSpec  = Window.partitionBy("department").orderBy("salary")

In [0]:
df.withColumn("row_number",row_number().over(windowSpec)).show(truncate=False)




+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
|James        |Sales     |3000  |1         |
|James        |Sales     |3000  |2         |
|Robert       |Sales     |4100  |3         |
|Saif         |Sales     |4100  |4         |
|Michael      |Sales     |4600  |5         |
+-------------+----------+------+----------+



In [0]:
df.withColumn("rank",rank().over(windowSpec)).show(truncate=False)

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|Maria        |Finance   |3000  |1   |
|Scott        |Finance   |3300  |2   |
|Jen          |Finance   |3900  |3   |
|Kumar        |Marketing |2000  |1   |
|Jeff         |Marketing |3000  |2   |
|James        |Sales     |3000  |1   |
|James        |Sales     |3000  |1   |
|Robert       |Sales     |4100  |3   |
|Saif         |Sales     |4100  |3   |
|Michael      |Sales     |4600  |5   |
+-------------+----------+------+----+



In [0]:
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank", dense_rank().over(windowSpec)).show()

+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         1|
|       Robert|     Sales|  4100|         2|
|         Saif|     Sales|  4100|         2|
|      Michael|     Sales|  4600|         3|
+-------------+----------+------+----------+



In [0]:
from pyspark.sql.functions import lag
df.withColumn("lag",lag("salary",2).over(windowSpec)).show(truncate=False)

+-------------+----------+------+----+
|employee_name|department|salary|lag |
+-------------+----------+------+----+
|Maria        |Finance   |3000  |null|
|Scott        |Finance   |3300  |null|
|Jen          |Finance   |3900  |3000|
|Kumar        |Marketing |2000  |null|
|Jeff         |Marketing |3000  |null|
|James        |Sales     |3000  |null|
|James        |Sales     |3000  |null|
|Robert       |Sales     |4100  |3000|
|Saif         |Sales     |4100  |3000|
|Michael      |Sales     |4600  |4100|
+-------------+----------+------+----+



In [0]:
windowSpecAgg = Window.partitionBy("department").orderBy("salary")
from pyspark.sql.functions import col,avg,sum,min,max,row_number

In [0]:
df.withColumn("row",row_number().over(windowSpec)) \
  .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
  .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
  .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
  .select("department","avg","sum","min","max") \
  .show()


+----------+------+-----+----+----+
|department|   avg|  sum| min| max|
+----------+------+-----+----+----+
|   Finance|3000.0| 3000|3000|3000|
|   Finance|3150.0| 6300|3000|3300|
|   Finance|3400.0|10200|3000|3900|
| Marketing|2000.0| 2000|2000|2000|
| Marketing|2500.0| 5000|2000|3000|
|     Sales|3000.0| 6000|3000|3000|
|     Sales|3000.0| 6000|3000|3000|
|     Sales|3550.0|14200|3000|4100|
|     Sales|3550.0|14200|3000|4100|
|     Sales|3760.0|18800|3000|4600|
+----------+------+-----+----+----+

