In [0]:
 #row_number(),rank,dense_rank,lead,lag
sampleData = [
('Tom Prescott', 'Furniture', 'Chairs', 4000),
('Quincy Jones', 'Furniture', 'Bookcases', 4000),
('Joseph Holt', 'Furniture', 'Tables', 5000),
('Alejandro Grove', 'Furniture', 'Furnishings', 8000),
('Adrian Barton', 'Office Supplies', 'Binders', 3000),
('Ken Lonsdale', 'Office Supplies', 'Supplies', 9000),
('Greg Guthrie', 'Office Supplies', 'Fasteners', 3000),
('Yoseph Carroll', 'Office Supplies', 'Storage', 3000),
('Sean Miller', 'Technology', 'Machines', 22000),
('Tamara Chand', 'Technology', 'Copiers', 18000),
('John Murray', 'Technology', 'Phones', 5000),
('Kelly Collister', 'Technology', 'Accessories', 3000)
]

#column names for dataframe
columns = ["Customer Name","Category","Sub Category","Sales"]

#creating the dataframe df

df = spark.createDataFrame(data=sampleData, schema=columns )
df.show()

+---------------+---------------+------------+-----+
|  Customer Name|       Category|Sub Category|Sales|
+---------------+---------------+------------+-----+
|   Tom Prescott|      Furniture|      Chairs| 4000|
|   Quincy Jones|      Furniture|   Bookcases| 4000|
|    Joseph Holt|      Furniture|      Tables| 5000|
|Alejandro Grove|      Furniture| Furnishings| 8000|
|  Adrian Barton|Office Supplies|     Binders| 3000|
|   Ken Lonsdale|Office Supplies|    Supplies| 9000|
|   Greg Guthrie|Office Supplies|   Fasteners| 3000|
| Yoseph Carroll|Office Supplies|     Storage| 3000|
|    Sean Miller|     Technology|    Machines|22000|
|   Tamara Chand|     Technology|     Copiers|18000|
|    John Murray|     Technology|      Phones| 5000|
|Kelly Collister|     Technology| Accessories| 3000|
+---------------+---------------+------------+-----+



In [0]:
from pyspark.sql.window import *  # this has to used when ever you are working with window functions
from pyspark.sql.functions import * # this has to used when you re trying to use transformation funcation
from pyspark.sql.types import * # this has to be used when you are trying to assign datatypes and struct field and struct type
from pyspark.sql.column import * # this has to be used when you are working with col in dataframe

In [0]:
# to create a window function in pyspark. there are 2 steps
# step1: assign a partition and order level, either table or specific column partition levels
# step2: write any function based on above partition created


In [0]:
"""Example for SQL table level :  over(order by sales des)
pysark code for table level: Window.orderBy(col("Sales").desc())

Exaple for SQL partition level: over(partition by category order by sales desc)
pyspark code for partiton level: Window.partitionBy("Category").orderB(col("Sales").desc())"""

In [0]:
# create a row_number() for above table based on sales descending

#Step1: create a window partition at table level
table_partition = Window.orderBy(col("Sales").desc())
#step2: Creat a Row_number function
df2 = df.withColumn("row_num",row_number().over(table_partition))
df2.show()



+---------------+---------------+------------+-----+-------+
|  Customer Name|       Category|Sub Category|Sales|row_num|
+---------------+---------------+------------+-----+-------+
|    Sean Miller|     Technology|    Machines|22000|      1|
|   Tamara Chand|     Technology|     Copiers|18000|      2|
|   Ken Lonsdale|Office Supplies|    Supplies| 9000|      3|
|Alejandro Grove|      Furniture| Furnishings| 8000|      4|
|    Joseph Holt|      Furniture|      Tables| 5000|      5|
|    John Murray|     Technology|      Phones| 5000|      6|
|   Tom Prescott|      Furniture|      Chairs| 4000|      7|
|   Quincy Jones|      Furniture|   Bookcases| 4000|      8|
|  Adrian Barton|Office Supplies|     Binders| 3000|      9|
|   Greg Guthrie|Office Supplies|   Fasteners| 3000|     10|
| Yoseph Carroll|Office Supplies|     Storage| 3000|     11|
|Kelly Collister|     Technology| Accessories| 3000|     12|
+---------------+---------------+------------+-----+-------+



In [0]:
#craete a row_number() for above table based on category partition and sales descending
# step: Create a windows partiton at table level

window_cat = Window.partitionBy("Category").orderBy(col("Sales").desc())

#step2: create a row_number function
df2 = df.withColumn("R_num",row_number().over(window_cat))\
        .withColumn("Rank0",rank().over(window_cat))\
        .withColumn("Dns_rak",dense_rank().over(window_cat))    
df2.show()        


+---------------+---------------+------------+-----+-----+-----+-------+
|  Customer Name|       Category|Sub Category|Sales|R_num|Rank0|Dns_rak|
+---------------+---------------+------------+-----+-----+-----+-------+
|Alejandro Grove|      Furniture| Furnishings| 8000|    1|    1|      1|
|    Joseph Holt|      Furniture|      Tables| 5000|    2|    2|      2|
|   Tom Prescott|      Furniture|      Chairs| 4000|    3|    3|      3|
|   Quincy Jones|      Furniture|   Bookcases| 4000|    4|    3|      3|
|   Ken Lonsdale|Office Supplies|    Supplies| 9000|    1|    1|      1|
|  Adrian Barton|Office Supplies|     Binders| 3000|    2|    2|      2|
|   Greg Guthrie|Office Supplies|   Fasteners| 3000|    3|    2|      2|
| Yoseph Carroll|Office Supplies|     Storage| 3000|    4|    2|      2|
|    Sean Miller|     Technology|    Machines|22000|    1|    1|      1|
|   Tamara Chand|     Technology|     Copiers|18000|    2|    2|      2|
|    John Murray|     Technology|      Phones| 5000

In [0]:
#lead: get the next row value into current row
# syntax: lead("COLNAME",<no_of_nextcol>).over(<partition level>)
#lag: get the last row value into current row
# syntax: lag("COLNAME",<no_of_lastcol>).over(<partition level>)

In [0]:
#Get the previous Row Sales into current row
#step1: create a window partition at table level

table_partition = Window.orderBy(col("Sales").desc())

#step2: create lag function
df2 = df.withColumn("lag1",lag("Sales",1).over(table_partition))\
        .withColumn("lag2",lag("Sales",2).over(table_partition))
df2.show()        



+---------------+---------------+------------+-----+-----+-----+
|  Customer Name|       Category|Sub Category|Sales| lag1| lag2|
+---------------+---------------+------------+-----+-----+-----+
|    Sean Miller|     Technology|    Machines|22000| NULL| NULL|
|   Tamara Chand|     Technology|     Copiers|18000|22000| NULL|
|   Ken Lonsdale|Office Supplies|    Supplies| 9000|18000|22000|
|Alejandro Grove|      Furniture| Furnishings| 8000| 9000|18000|
|    Joseph Holt|      Furniture|      Tables| 5000| 8000| 9000|
|    John Murray|     Technology|      Phones| 5000| 5000| 8000|
|   Tom Prescott|      Furniture|      Chairs| 4000| 5000| 5000|
|   Quincy Jones|      Furniture|   Bookcases| 4000| 4000| 5000|
|  Adrian Barton|Office Supplies|     Binders| 3000| 4000| 4000|
|   Greg Guthrie|Office Supplies|   Fasteners| 3000| 3000| 4000|
| Yoseph Carroll|Office Supplies|     Storage| 3000| 3000| 3000|
|Kelly Collister|     Technology| Accessories| 3000| 3000| 3000|
+---------------+--------

In [0]:
# Get the Next row sales into current row

#step1: create a windo partition at table level
table_partition = Window.orderBy(col("SAles").desc())

#step2: create a lead function

df2 = df.withColumn("lead1",lead("Sales",1).over(table_partition))\
        .withColumn("lead2",lead("Sales",2).over(table_partition))
df2.show()        



+---------------+---------------+------------+-----+-----+-----+
|  Customer Name|       Category|Sub Category|Sales|lead1|lead2|
+---------------+---------------+------------+-----+-----+-----+
|    Sean Miller|     Technology|    Machines|22000|18000| 9000|
|   Tamara Chand|     Technology|     Copiers|18000| 9000| 8000|
|   Ken Lonsdale|Office Supplies|    Supplies| 9000| 8000| 5000|
|Alejandro Grove|      Furniture| Furnishings| 8000| 5000| 5000|
|    Joseph Holt|      Furniture|      Tables| 5000| 5000| 4000|
|    John Murray|     Technology|      Phones| 5000| 4000| 4000|
|   Tom Prescott|      Furniture|      Chairs| 4000| 4000| 3000|
|   Quincy Jones|      Furniture|   Bookcases| 4000| 3000| 3000|
|  Adrian Barton|Office Supplies|     Binders| 3000| 3000| 3000|
|   Greg Guthrie|Office Supplies|   Fasteners| 3000| 3000| 3000|
| Yoseph Carroll|Office Supplies|     Storage| 3000| 3000| NULL|
|Kelly Collister|     Technology| Accessories| 3000| NULL| NULL|
+---------------+--------

In [0]:
#get the top 2 sales from each category

from pyspark.sql.functions import *
from pyspark.sql.window import *
pat_cat = Window.partitionBy("Category").orderBy(col("Sales").desc())
df2 = df.withColumn("R_N",row_number().over(pat_cat))
df2 = df2.filter("R_N <=2")
df2.show()

+---------------+---------------+------------+-----+---+
|  Customer Name|       Category|Sub Category|Sales|R_N|
+---------------+---------------+------------+-----+---+
|Alejandro Grove|      Furniture| Furnishings| 8000|  1|
|    Joseph Holt|      Furniture|      Tables| 5000|  2|
|   Ken Lonsdale|Office Supplies|    Supplies| 9000|  1|
|  Adrian Barton|Office Supplies|     Binders| 3000|  2|
|    Sean Miller|     Technology|    Machines|22000|  1|
|   Tamara Chand|     Technology|     Copiers|18000|  2|
+---------------+---------------+------------+-----+---+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

# initialize Spark Session

spark = SparkSession.builder.appName("Remove_Duplicate").getOrCreate()

#Sample data
data = [
   (1,'Ram','2025-01-01', 'A'),(1,'Ram','2025-01-02', 'B'),(1,'Ram', '2025-01-01', 'C'),
   (1,'Ram', '2025-01-04', 'D'),(2,'Shankar', '2025-02-01', 'E'),(2,'Shankar', '2025-02-02', 'F'),
   (2,'Shankar', '2025-02-3', 'G'),(2,'Shankar', '2025-02-04', 'H'),(3,'Raj', '2025-03-01','I'),
   (3,'Raj', '2025-03-01','J'),(3,'Raj', '2025-03-02','K'),(3,'Raj', '2025-03-03','L'),
    (3,'Raj', '2025-03-04','M'),(4,'RaN', '2025-04-01','O'),(4,'RaN', '2025-04-02','P'),
    (4,'RaN', '2025-04-03','Q'),(4,'RaN', '2025-04-04','R')

] 
# CREATE dataframe
df = spark.createDataFrame(data,["Cus_ID","Cus_Nmae","Date","Value"])
df.show()

+------+--------+----------+-----+
|Cus_ID|Cus_Nmae|      Date|Value|
+------+--------+----------+-----+
|     1|     Ram|2025-01-01|    A|
|     1|     Ram|2025-01-02|    B|
|     1|     Ram|2025-01-01|    C|
|     1|     Ram|2025-01-04|    D|
|     2| Shankar|2025-02-01|    E|
|     2| Shankar|2025-02-02|    F|
|     2| Shankar| 2025-02-3|    G|
|     2| Shankar|2025-02-04|    H|
|     3|     Raj|2025-03-01|    I|
|     3|     Raj|2025-03-01|    J|
|     3|     Raj|2025-03-02|    K|
|     3|     Raj|2025-03-03|    L|
|     3|     Raj|2025-03-04|    M|
|     4|     RaN|2025-04-01|    O|
|     4|     RaN|2025-04-02|    P|
|     4|     RaN|2025-04-03|    Q|
|     4|     RaN|2025-04-04|    R|
+------+--------+----------+-----+



In [0]:
#Get last date transformation details for each customer

part_cust = Window.partitionBy("Cus_Nmae").orderBy(col("Date").desc())
df_ld = df.withColumn("RN",row_number().over(part_cust))
df_ld = df_ld.filter("RN=1")
df_ld.show()

+------+--------+----------+-----+---+
|Cus_ID|Cus_Nmae|      Date|Value| RN|
+------+--------+----------+-----+---+
|     4|     RaN|2025-04-04|    R|  1|
|     3|     Raj|2025-03-04|    M|  1|
|     1|     Ram|2025-01-04|    D|  1|
|     2| Shankar| 2025-02-3|    G|  1|
+------+--------+----------+-----+---+



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

#Initialize Spark session
spark = SparkSession.builder.appName("CreateDataFrame").getOrCreate()

# Sample list

lsits1 = [
    (10000, 'Ram', '2025-01-01','A'),
    (10000, 'Ram', '2025-01-02','B'),
    (8000, 'Sam', '2025-02-01','D'),
    (8000, 'Sam', '2025-02-02','E'),
    (8000, 'Sam', '2025-02-03','F'),
    (8000, 'Sam', '2025-02-04','G'),
    (2000, 'Tom', '2025-03-01','L')
]

# CRAETE DATAFRAME
df = spark.createDataFrame(lsits1,["c_id","c_nm","date","value"])
df = df.select(df.c_id.alias("Salesamt"))
# show the dataframe
df.show()

+--------+
|Salesamt|
+--------+
|   10000|
|   10000|
|    8000|
|    8000|
|    8000|
|    8000|
|    2000|
+--------+



In [0]:
part_rank = Window.orderBy(col("Salesamt").desc())
df2 = df.withColumn("R_N",row_number().over(part_rank)).withColumn("rank",rank().over(part_rank)).withColumn("Dns_rank",dense_rank().over(part_rank))
df2.show()



+--------+---+----+--------+
|Salesamt|R_N|rank|Dns_rank|
+--------+---+----+--------+
|   10000|  1|   1|       1|
|   10000|  2|   1|       1|
|    8000|  3|   3|       2|
|    8000|  4|   3|       2|
|    8000|  5|   3|       2|
|    8000|  6|   3|       2|
|    2000|  7|   7|       3|
+--------+---+----+--------+

