In [0]:
data = [
    (1, "manish", 50000, "IT", "m"),
    (2, "vikash", 60000, "sales", "m"),
    (3, "raushan", 70000, "marketing", "m"),
    (4, "mukesh", 80000, "IT", "m"),
    (5, "priti", 90000, "sales", "f"),
    (6, "nikita", 45000, "marketing", "f"),
    (7, "ragini", 55000, "marketing", "f"),
    (8, "rashi", 100000, "IT", "f"),
    (9, "aditya", 65000, "IT", "m"),
    (10, "rahul", 50000, "marketing", "m"),
    (11, "rakhi", 50000, "IT", "f"),
    (12, "akhilesh", 90000, "sales", "m"),
]

schema = ["id", "name", "salary", "department", "gender"]
df = spark.createDataFrame(data=data, schema=schema)
df.show()

+---+--------+------+----------+------+
| id|    name|salary|department|gender|
+---+--------+------+----------+------+
|  1|  manish| 50000|        IT|     m|
|  2|  vikash| 60000|     sales|     m|
|  3| raushan| 70000| marketing|     m|
|  4|  mukesh| 80000|        IT|     m|
|  5|   priti| 90000|     sales|     f|
|  6|  nikita| 45000| marketing|     f|
|  7|  ragini| 55000| marketing|     f|
|  8|   rashi|100000|        IT|     f|
|  9|  aditya| 65000|        IT|     m|
| 10|   rahul| 50000| marketing|     m|
| 11|   rakhi| 50000|        IT|     f|
| 12|akhilesh| 90000|     sales|     m|
+---+--------+------+----------+------+



In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, col, round, first, last

In [0]:
window = Window.partitionBy("department").orderBy("salary")
df.withColumn("Row Number", row_number().over(window)).withColumn(
    "Rank", rank().over(window)
).withColumn("Dense Rank", dense_rank().over(window)).show(truncate=False)

+---+--------+------+----------+------+----------+----+----------+
|id |name    |salary|department|gender|Row Number|Rank|Dense Rank|
+---+--------+------+----------+------+----------+----+----------+
|1  |manish  |50000 |IT        |m     |1         |1   |1         |
|11 |rakhi   |50000 |IT        |f     |2         |1   |1         |
|9  |aditya  |65000 |IT        |m     |3         |3   |2         |
|4  |mukesh  |80000 |IT        |m     |4         |4   |3         |
|8  |rashi   |100000|IT        |f     |5         |5   |4         |
|6  |nikita  |45000 |marketing |f     |1         |1   |1         |
|10 |rahul   |50000 |marketing |m     |2         |2   |2         |
|7  |ragini  |55000 |marketing |f     |3         |3   |3         |
|3  |raushan |70000 |marketing |m     |4         |4   |4         |
|2  |vikash  |60000 |sales     |m     |1         |1   |1         |
|5  |priti   |90000 |sales     |f     |2         |2   |2         |
|12 |akhilesh|90000 |sales     |m     |3         |2   |2      

In [0]:
window = Window.partitionBy("department", "gender").orderBy("salary")
df.withColumn("Row Number", row_number().over(window)).withColumn(
    "Rank", rank().over(window)
).show(truncate=False)

+---+--------+------+----------+------+----------+----+
|id |name    |salary|department|gender|Row Number|Rank|
+---+--------+------+----------+------+----------+----+
|11 |rakhi   |50000 |IT        |f     |1         |1   |
|8  |rashi   |100000|IT        |f     |2         |2   |
|1  |manish  |50000 |IT        |m     |1         |1   |
|9  |aditya  |65000 |IT        |m     |2         |2   |
|4  |mukesh  |80000 |IT        |m     |3         |3   |
|6  |nikita  |45000 |marketing |f     |1         |1   |
|7  |ragini  |55000 |marketing |f     |2         |2   |
|10 |rahul   |50000 |marketing |m     |1         |1   |
|3  |raushan |70000 |marketing |m     |2         |2   |
|5  |priti   |90000 |sales     |f     |1         |1   |
|2  |vikash  |60000 |sales     |m     |1         |1   |
|12 |akhilesh|90000 |sales     |m     |2         |2   |
+---+--------+------+----------+------+----------+----+



In [0]:
product_data = [
    (1, "iphone", "01-01-2023", 1500000),
    (2, "samsung", "01-01-2023", 1100000),
    (3, "oneplus", "01-01-2023", 1100000),
    (1, "iphone", "01-02-2023", 1300000),
    (2, "samsung", "01-02-2023", 1120000),
    (3, "oneplus", "01-02-2023", 1120000),
    (1, "iphone", "01-03-2023", 1600000),
    (2, "samsung", "01-03-2023", 1080000),
    (3, "oneplus", "01-03-2023", 1160000),
    (1, "iphone", "01-04-2023", 1700000),
    (2, "samsung", "01-04-2023", 1800000),
    (3, "oneplus", "01-04-2023", 1170000),
    (1, "iphone", "01-05-2023", 1200000),
    (2, "samsung", "01-05-2023", 980000),
    (3, "oneplus", "01-05-2023", 1175000),
    (1, "iphone", "01-06-2023", 1100000),
    (2, "samsung", "01-06-2023", 1100000),
    (3, "oneplus", "01-06-2023", 1200000),
]

schema = ["id", "product", "date", "sales"]
sales_df = spark.createDataFrame(data=product_data, schema=schema)
sales_df.show()

+---+-------+----------+-------+
| id|product|      date|  sales|
+---+-------+----------+-------+
|  1| iphone|01-01-2023|1500000|
|  2|samsung|01-01-2023|1100000|
|  3|oneplus|01-01-2023|1100000|
|  1| iphone|01-02-2023|1300000|
|  2|samsung|01-02-2023|1120000|
|  3|oneplus|01-02-2023|1120000|
|  1| iphone|01-03-2023|1600000|
|  2|samsung|01-03-2023|1080000|
|  3|oneplus|01-03-2023|1160000|
|  1| iphone|01-04-2023|1700000|
|  2|samsung|01-04-2023|1800000|
|  3|oneplus|01-04-2023|1170000|
|  1| iphone|01-05-2023|1200000|
|  2|samsung|01-05-2023| 980000|
|  3|oneplus|01-05-2023|1175000|
|  1| iphone|01-06-2023|1100000|
|  2|samsung|01-06-2023|1100000|
|  3|oneplus|01-06-2023|1200000|
+---+-------+----------+-------+



In [0]:
window = Window.partitionBy("id").orderBy("date")
previous_months_df = sales_df.withColumn("Previous Month's sales",lag(col("sales"),1).over(window))
previous_months_df.show()

+---+-------+----------+-------+----------------------+
| id|product|      date|  sales|Previous Month's sales|
+---+-------+----------+-------+----------------------+
|  1| iphone|01-01-2023|1500000|                  null|
|  1| iphone|01-02-2023|1300000|               1500000|
|  1| iphone|01-03-2023|1600000|               1300000|
|  1| iphone|01-04-2023|1700000|               1600000|
|  1| iphone|01-05-2023|1200000|               1700000|
|  1| iphone|01-06-2023|1100000|               1200000|
|  2|samsung|01-01-2023|1100000|                  null|
|  2|samsung|01-02-2023|1120000|               1100000|
|  2|samsung|01-03-2023|1080000|               1120000|
|  2|samsung|01-04-2023|1800000|               1080000|
|  2|samsung|01-05-2023| 980000|               1800000|
|  2|samsung|01-06-2023|1100000|                980000|
|  3|oneplus|01-01-2023|1100000|                  null|
|  3|oneplus|01-02-2023|1120000|               1100000|
|  3|oneplus|01-03-2023|1160000|               1

In [0]:
previous_months_df.withColumn(
    "percentage_loss_profit",
    round((col("sales") - col("Previous Month's sales")) / col("sales") * 100, 2),
).show()

+---+-------+----------+-------+----------------------+----------------------+
| id|product|      date|  sales|Previous Month's sales|percentage_loss_profit|
+---+-------+----------+-------+----------------------+----------------------+
|  1| iphone|01-01-2023|1500000|                  null|                  null|
|  1| iphone|01-02-2023|1300000|               1500000|                -15.38|
|  1| iphone|01-03-2023|1600000|               1300000|                 18.75|
|  1| iphone|01-04-2023|1700000|               1600000|                  5.88|
|  1| iphone|01-05-2023|1200000|               1700000|                -41.67|
|  1| iphone|01-06-2023|1100000|               1200000|                 -9.09|
|  2|samsung|01-01-2023|1100000|                  null|                  null|
|  2|samsung|01-02-2023|1120000|               1100000|                  1.79|
|  2|samsung|01-03-2023|1080000|               1120000|                  -3.7|
|  2|samsung|01-04-2023|1800000|               10800

In [0]:
next_months_df = sales_df.withColumn("Next Month's sales",lead(col("sales"),1).over(window))
next_months_df.show()

+---+-------+----------+-------+------------------+
| id|product|      date|  sales|Next Month's sales|
+---+-------+----------+-------+------------------+
|  1| iphone|01-01-2023|1500000|           1300000|
|  1| iphone|01-02-2023|1300000|           1600000|
|  1| iphone|01-03-2023|1600000|           1700000|
|  1| iphone|01-04-2023|1700000|           1200000|
|  1| iphone|01-05-2023|1200000|           1100000|
|  1| iphone|01-06-2023|1100000|              null|
|  2|samsung|01-01-2023|1100000|           1120000|
|  2|samsung|01-02-2023|1120000|           1080000|
|  2|samsung|01-03-2023|1080000|           1800000|
|  2|samsung|01-04-2023|1800000|            980000|
|  2|samsung|01-05-2023| 980000|           1100000|
|  2|samsung|01-06-2023|1100000|              null|
|  3|oneplus|01-01-2023|1100000|           1120000|
|  3|oneplus|01-02-2023|1120000|           1160000|
|  3|oneplus|01-03-2023|1160000|           1170000|
|  3|oneplus|01-04-2023|1170000|           1175000|
|  3|oneplus

In [0]:
window = (
    Window.partitionBy("id")
    .orderBy("date")
    .rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)
sales_df.withColumn("first", first("sales").over(window)).withColumn(
    "latest", last("sales").over(window)
).show()

+---+-------+----------+-------+-------+-------+
| id|product|      date|  sales|  first| latest|
+---+-------+----------+-------+-------+-------+
|  1| iphone|01-01-2023|1500000|1500000|1100000|
|  1| iphone|01-02-2023|1300000|1500000|1100000|
|  1| iphone|01-03-2023|1600000|1500000|1100000|
|  1| iphone|01-04-2023|1700000|1500000|1100000|
|  1| iphone|01-05-2023|1200000|1500000|1100000|
|  1| iphone|01-06-2023|1100000|1500000|1100000|
|  2|samsung|01-01-2023|1100000|1100000|1100000|
|  2|samsung|01-02-2023|1120000|1100000|1100000|
|  2|samsung|01-03-2023|1080000|1100000|1100000|
|  2|samsung|01-04-2023|1800000|1100000|1100000|
|  2|samsung|01-05-2023| 980000|1100000|1100000|
|  2|samsung|01-06-2023|1100000|1100000|1100000|
|  3|oneplus|01-01-2023|1100000|1100000|1200000|
|  3|oneplus|01-02-2023|1120000|1100000|1200000|
|  3|oneplus|01-03-2023|1160000|1100000|1200000|
|  3|oneplus|01-04-2023|1170000|1100000|1200000|
|  3|oneplus|01-05-2023|1175000|1100000|1200000|
|  3|oneplus|01-06-2