In [0]:
# window functions

In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F, types as T

In [0]:
data = [
    ("Balaji", "sales", 3000),
    ("Harsha", "sales", 4000),
    ("Farooq", "sales", 3400),
    ("Govardan", "HR", 3500),
    ("Bhanu", "HR", 4500),
    ("Naveen", "HR", 4500),
    ("Manoj", "HR", 4000)
]

df = spark.createDataFrame(data, ["EmpName", "Department", "Salary"])
df.display()

In [0]:
# row number
window_spec = Window.partitionBy("Department").orderBy(F.col("Salary").desc())
df_row = df.withColumn("rn", F.row_number().over(window_spec))
df_row.display()

In [0]:
# rank
window_spec = Window.partitionBy("Department").orderBy(F.col("Salary").desc())
df_rank = df.withColumn("rk", F.rank().over(window_spec))
df_rank.display()

In [0]:
# dense_rank
window_spec = Window.partitionBy("Department").orderBy(F.col("Salary").desc())
df_drk = df.withColumn("drn", F.dense_rank().over(window_spec))
df_drk.display()

In [0]:
data = [
    (101, "2020-05-10", 50000.0),
    (101, "2021-06-15", 55000.0),
    (101, "2021-06-15", 55000.0),
    (102, "2019-03-20", 40000.0),
    (102, "2022-07-25", 60000.0),
    (103, "2021-01-05", 45000.0),
    (103, "2023-08-30", 70000.0),
    (104, "2022-09-17", 48000.0),  # No duplicate
]

# Define schema
schema = T.StructType([
    T.StructField("empid", T.IntegerType(), True),
    T.StructField("joining_date", T.StringType(), True),
    T.StructField("salary", T.FloatType(), True)
])

# Create DataFrame
df = spark.createDataFrame(data, schema)
df.display()


In [0]:
df = df.withColumn("joining_date", F.to_date("joining_date", "yyyy-MM-dd"))
df.display()

In [0]:
# find the latest record
window_spec = Window.partitionBy("empid").orderBy(F.col("joining_date").desc())
df_rn = df.withColumn("rn", F.row_number().over(window_spec))
df_rn.display()

In [0]:
display(df_rn.filter(F.col("rn") == 1))

In [0]:
# find the latest record using rank, which is not ideal in this case
window_spec = Window.partitionBy("empid").orderBy(F.col("joining_date").desc())
df_rk = df.withColumn("rk", F.rank().over(window_spec))
df_rk.display()

In [0]:
display(df_rk.filter(F.col("rk") == 1))

In [0]:
# find the latest record using dense_rank, which is not ideal in this case
window_spec = Window.partitionBy("empid").orderBy(F.col("joining_date").desc())
df_drk = df.withColumn("drk", F.dense_rank().over(window_spec))
df_drk.display()

In [0]:
display(df_drk.filter(F.col("drk") == 1))