Find product for Increasing the Sales

In [1]:
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession.builder\
        .master("local")\
        .appName("Find product for Increasing the Sales")\
        .getOrCreate()
        
spark

In [24]:
df_salestable=spark.read.format("csv").option("header", "true").load("sales_table.csv")

In [25]:
df_salestable.show()

+----------+----+----------------+
|product_id|year|total_year_sales|
+----------+----+----------------+
|         1|2019|            1000|
|         1|2020|            1200|
|         1|2021|            1100|
|         2|2019|             500|
|         2|2020|             600|
|         2|2021|             900|
|         3|2019|             300|
|         3|2020|             450|
|         3|2021|             400|
+----------+----+----------------+



In [26]:
df_producttable=spark.read.format("csv").option("header", "true").load("product_table.csv")

In [27]:
df_producttable.show()

+----------+------------+---------------+
|product_id|product_name|       category|
+----------+------------+---------------+
|         1|     Laptops|    Electronics|
|         2|       Jeans|       Clothing|
|         3|      Chairs|Home Appliances|
+----------+------------+---------------+



In [198]:
df_producttable.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)



In [197]:
df_salestable.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- year: string (nullable = true)
 |-- total_year_sales: string (nullable = true)
 |-- previous_year_revenue: string (nullable = true)



In [95]:
#imports
from pyspark.sql import Window
from pyspark.sql.functions import col, min , lag, max


In [69]:
df_salestable=df_salestable.withColumn("previous_year_revenue",lag(col("total_year_sales")).over(Window.partitionBy("product_id")
                                                                                                                    .orderBy("year")))

In [72]:
df_salestable.show()

+----------+----+----------------+---------------------+
|product_id|year|total_year_sales|previous_year_revenue|
+----------+----+----------------+---------------------+
|         1|2019|            1000|                 NULL|
|         1|2020|            1200|                 1000|
|         1|2021|            1100|                 1200|
|         2|2019|             500|                 NULL|
|         2|2020|             600|                  500|
|         2|2021|             900|                  600|
|         3|2019|             300|                 NULL|
|         3|2020|             450|                  300|
|         3|2021|             400|                  450|
+----------+----+----------------+---------------------+



In [73]:
df_differencetable=df_salestable.withColumn("revenue_diffr", col("total_year_sales") - col("previous_year_revenue"))

In [74]:
df_differencetable.show()

+----------+----+----------------+---------------------+-------------+
|product_id|year|total_year_sales|previous_year_revenue|revenue_diffr|
+----------+----+----------------+---------------------+-------------+
|         1|2019|            1000|                 NULL|         NULL|
|         1|2020|            1200|                 1000|        200.0|
|         1|2021|            1100|                 1200|       -100.0|
|         2|2019|             500|                 NULL|         NULL|
|         2|2020|             600|                  500|        100.0|
|         2|2021|             900|                  600|        300.0|
|         3|2019|             300|                 NULL|         NULL|
|         3|2020|             450|                  300|        150.0|
|         3|2021|             400|                  450|        -50.0|
+----------+----+----------------+---------------------+-------------+



In [201]:
df_product_increase=df_differencetable.groupby("product_id").agg((min(col("revenue_diffr"))).alias("minimum"))

In [202]:
df_product_increase.show()

+----------+-------+
|product_id|minimum|
+----------+-------+
|         1| -100.0|
|         2|  100.0|
|         3|  -50.0|
+----------+-------+



In [146]:
df_product_increase_result1=df_product_increase.filter(col("minimum")>0)

In [167]:
df_product_increase_result1.show()

+----------+-------+
|product_id|minimum|
+----------+-------+
|         2|  100.0|
+----------+-------+



In [175]:
#expermenting when we have two postive values in the column
df_product_increase_result2 = df_product_increase.agg(max(col("minimum")))

In [176]:
df_product_increase_result2.show()

+------------+
|max(minimum)|
+------------+
|       100.0|
+------------+



In [204]:
# JOINS THE PRODUCTID FROM THE pRODUCT tABLE
df_final=df_product_increase_result1.join(df_producttable, on ="product_id", how ="inner")\
        .select(col("product_id"),col("product_name"),col("category"))

In [205]:
df_final.show()

+----------+------------+--------+
|product_id|product_name|category|
+----------+------------+--------+
|         2|       Jeans|Clothing|
+----------+------------+--------+

