In [0]:
spark

In [0]:
product_data = [
  (1, 'Laptops', 'Electronics'),
  (2, 'Jeans', 'Clothing'),
  (3, 'Chair', 'Home Appliance')
]

product_schema = ['product_id', 'product_name', 'category']

product_df = spark.createDataFrame(data=product_data, schema=product_schema)

product_df.show()

+----------+------------+--------------+
|product_id|product_name|      category|
+----------+------------+--------------+
|         1|     Laptops|   Electronics|
|         2|       Jeans|      Clothing|
|         3|       Chair|Home Appliance|
+----------+------------+--------------+



In [0]:
sales_data = [
  (1, 2019, 1000),
  (1,2020, 1200),
  (1,2021, 1100),
  (2, 2019, 500),
  (2,2020, 600),
  (2,2021, 900),
  (3, 2019, 300),
  (3,2020, 450),
  (3,2021, 400)
]

sales_schema = ['product_id', 'sale_year', 'sales_revenue']

sales_df = spark.createDataFrame(data=sales_data, schema=sales_schema)

sales_df.show()

+----------+---------+-------------+
|product_id|sale_year|sales_revenue|
+----------+---------+-------------+
|         1|     2019|         1000|
|         1|     2020|         1200|
|         1|     2021|         1100|
|         2|     2019|          500|
|         2|     2020|          600|
|         2|     2021|          900|
|         3|     2019|          300|
|         3|     2020|          450|
|         3|     2021|          400|
+----------+---------+-------------+



In [0]:
# pyspark imports
from pyspark.sql import Window
from pyspark.sql.functions import col, min, lag

In [0]:
# creating the previous_year_revenue column using the lag window function
df = sales_df.withColumn("previous_year_revenue", lag(col("sales_revenue")).over(Window.partitionBy(col("product_id")).orderBy(col("sale_year"))))

df.show()

+----------+---------+-------------+---------------------+
|product_id|sale_year|sales_revenue|previous_year_revenue|
+----------+---------+-------------+---------------------+
|         1|     2019|         1000|                 null|
|         1|     2020|         1200|                 1000|
|         1|     2021|         1100|                 1200|
|         2|     2019|          500|                 null|
|         2|     2020|          600|                  500|
|         2|     2021|          900|                  600|
|         3|     2019|          300|                 null|
|         3|     2020|          450|                  300|
|         3|     2021|          400|                  450|
+----------+---------+-------------+---------------------+



In [0]:
# Find sales difference YOY 
revdiff_df = df.withColumn("differenceYOY", col("sales_revenue") - col("previous_year_revenue"))

revdiff_df.show()

+----------+---------+-------------+---------------------+-------------+
|product_id|sale_year|sales_revenue|previous_year_revenue|differenceYOY|
+----------+---------+-------------+---------------------+-------------+
|         1|     2019|         1000|                 null|         null|
|         1|     2020|         1200|                 1000|          200|
|         1|     2021|         1100|                 1200|         -100|
|         2|     2019|          500|                 null|         null|
|         2|     2020|          600|                  500|          100|
|         2|     2021|          900|                  600|          300|
|         3|     2019|          300|                 null|         null|
|         3|     2020|          450|                  300|          150|
|         3|     2021|          400|                  450|          -50|
+----------+---------+-------------+---------------------+-------------+



In [0]:
#grouping the data on product_id and finding the min of revenue difference
finaldf = revdiff_df.groupBy(col("product_id")).agg(min(col("differenceYOY")).alias("min_sales"))

finaldf.show()

+----------+---------+
|product_id|min_sales|
+----------+---------+
|         1|     -100|
|         2|      100|
|         3|      -50|
+----------+---------+



In [0]:
# filtering the negative revenues
finaldf = finaldf.filter(col("min_sales")>0)
finaldf.show()

+----------+---------+
|product_id|min_sales|
+----------+---------+
|         2|      100|
+----------+---------+



In [0]:
# Joining the product table to get product details based on product_id 
joindf = finaldf.join(product_df, on='product_id', how='inner').select(col("product_id"),col("product_name"),col("category"))
joindf.show()

+----------+------------+--------+
|product_id|product_name|category|
+----------+------------+--------+
|         2|       Jeans|Clothing|
+----------+------------+--------+

