In [1]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sparkConf = SparkConf()

In [2]:
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext

In [3]:
data = (("James", "Sales", 3000), ("Michael", "Sales", 4600), ("Robert", "Sales", 4100), ("Maria", "Finance", 3000),  \
        ("James", "Sales", 3000), ("Scott", "Finance", 3300),("Jen", "Finance", 3900), ("Jeff", "Marketing", 3000), \
        ("Kumar", "Marketing", 2000), ("Saif", "Sales", 4100) )

columns= ["employee_name", "department", "salary"]

In [4]:
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [None]:
# row_number

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

In [None]:
windowSpec  = Window.partitionBy("department").orderBy("salary")

df.withColumn("row_number",row_number().over(windowSpec)).show(truncate=False)

In [None]:
# rank()

from pyspark.sql.functions import rank

In [None]:
df.withColumn("rank",rank().over(windowSpec)).show()

In [None]:
# dense_rank()

from pyspark.sql.functions import dense_rank

In [None]:
df.withColumn("dense_rank",dense_rank().over(windowSpec)).show()

In [None]:
# percent_rank

from pyspark.sql.functions import percent_rank

df.withColumn("percent_rank",percent_rank().over(windowSpec)).show()

In [None]:
# ntile

from pyspark.sql.functions import ntile

df.withColumn("ntile",ntile(2).over(windowSpec)).show()

In [None]:
# Aggregate functions

from pyspark.sql.functions import col,avg,sum,min,max,row_number

windowSpecAgg  = Window.partitionBy("department")

In [None]:
df.withColumn("row",row_number().over(windowSpec)) \
  .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
  .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
  .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
  .where(col("row")==1).select("department","avg","sum","min","max") \
  .show()

In [None]:
# ranking functions

from pyspark.sql.functions import row_number,rank,dense_rank,percent_rank,ntile

windowSpec  = Window.partitionBy("department").orderBy("salary")

In [None]:
df.withColumn("row_number",row_number().over(windowSpec)) \
  .withColumn("rank",rank().over(windowSpec)) \
  .withColumn("dense_rank",dense_rank().over(windowSpec)) \
  .withColumn("percent_rank",percent_rank().over(windowSpec)) \
  .withColumn("ntile",ntile(2).over(windowSpec)) \
  .show()