In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
product_data = [
(1,"iphone","01-01-2023",1500000),
(2,"samsung","01-01-2023",1100000),
(3,"oneplus","01-01-2023",1100000),
(1,"iphone","01-02-2023",1300000),
(2,"samsung","01-02-2023",1120000),
(3,"oneplus","01-02-2023",1120000),
(1,"iphone","01-03-2023",1600000),
(2,"samsung","01-03-2023",1080000),
(3,"oneplus","01-03-2023",1160000),
(1,"iphone","01-04-2023",1700000),
(2,"samsung","01-04-2023",1800000),
(3,"oneplus","01-04-2023",1170000),
(1,"iphone","01-05-2023",1200000),
(2,"samsung","01-05-2023",980000),
(3,"oneplus","01-05-2023",1175000),
(1,"iphone","01-06-2023",1100000),
(2,"samsung","01-06-2023",1100000),
(3,"oneplus","01-06-2023",1200000)
]
prod_df = spark.createDataFrame(data = product_data , schema = ['cmp_id', 'name', 'date', 'sales'])
display(prod_df)

cmp_id,name,date,sales
1,iphone,01-01-2023,1500000
2,samsung,01-01-2023,1100000
3,oneplus,01-01-2023,1100000
1,iphone,01-02-2023,1300000
2,samsung,01-02-2023,1120000
3,oneplus,01-02-2023,1120000
1,iphone,01-03-2023,1600000
2,samsung,01-03-2023,1080000
3,oneplus,01-03-2023,1160000
1,iphone,01-04-2023,1700000


In [0]:
window = Window.partitionBy('cmp_id').orderBy('date')
#LAG FUNCTION
last_month_df = prod_df.withColumn('last_mon_sale', lag(col('sales'),1).over(window))
display(last_month_df)

cmp_id,name,date,sales,last_mon_sale
1,iphone,01-01-2023,1500000,
1,iphone,01-02-2023,1300000,1500000.0
1,iphone,01-03-2023,1600000,1300000.0
1,iphone,01-04-2023,1700000,1600000.0
1,iphone,01-05-2023,1200000,1700000.0
1,iphone,01-06-2023,1100000,1200000.0
2,samsung,01-01-2023,1100000,
2,samsung,01-02-2023,1120000,1100000.0
2,samsung,01-03-2023,1080000,1120000.0
2,samsung,01-04-2023,1800000,1080000.0


In [0]:
#LEAD FUNCTION
next_month_df = prod_df.withColumn('next_mon_sale', lead(col('sales'), 1).over(window))
display(next_month_df)

cmp_id,name,date,sales,next_mon_sale
1,iphone,01-01-2023,1500000,1300000.0
1,iphone,01-02-2023,1300000,1600000.0
1,iphone,01-03-2023,1600000,1700000.0
1,iphone,01-04-2023,1700000,1200000.0
1,iphone,01-05-2023,1200000,1100000.0
1,iphone,01-06-2023,1100000,
2,samsung,01-01-2023,1100000,1120000.0
2,samsung,01-02-2023,1120000,1080000.0
2,samsung,01-03-2023,1080000,1800000.0
2,samsung,01-04-2023,1800000,980000.0


In [0]:
#%AGE OF LOSS OR GAIN BASED ON PREVIOUS MONTH SALES
res_df = last_month_df.withColumn(
    "prof_or_loss",
    round(((col("sales") - col("last_mon_sale")) / col("sales")) * 100, 2),
)
display(res_df)

cmp_id,name,date,sales,last_mon_sale,prof_or_loss
1,iphone,01-01-2023,1500000,,
1,iphone,01-02-2023,1300000,1500000.0,-15.38
1,iphone,01-03-2023,1600000,1300000.0,18.75
1,iphone,01-04-2023,1700000,1600000.0,5.88
1,iphone,01-05-2023,1200000,1700000.0,-41.67
1,iphone,01-06-2023,1100000,1200000.0,-9.09
2,samsung,01-01-2023,1100000,,
2,samsung,01-02-2023,1120000,1100000.0,1.79
2,samsung,01-03-2023,1080000,1120000.0,-3.7
2,samsung,01-04-2023,1800000,1080000.0,40.0


In [0]:
# WHAT IS THE %AGE OF SALES EACH MONTH BASED ON LAST 6 MONTH SALES
ans_df = prod_df.withColumn(
    "tot_sale", sum("sales").over(Window.partitionBy("cmp_id"))
).withColumn("%sale_per_month", round((col("sales") / col("tot_sale")) * 100, 2))
display(ans_df)

cmp_id,name,date,sales,tot_sale,%sale_per_month
1,iphone,01-01-2023,1500000,8400000,17.86
1,iphone,01-02-2023,1300000,8400000,15.48
1,iphone,01-03-2023,1600000,8400000,19.05
1,iphone,01-04-2023,1700000,8400000,20.24
1,iphone,01-05-2023,1200000,8400000,14.29
1,iphone,01-06-2023,1100000,8400000,13.1
2,samsung,01-01-2023,1100000,7180000,15.32
2,samsung,01-02-2023,1120000,7180000,15.6
2,samsung,01-03-2023,1080000,7180000,15.04
2,samsung,01-04-2023,1800000,7180000,25.07
