In [0]:
#1. Load the dataset into a PySpark DataFrame and display the first 10 rows.
df = spark.read.csv(path = 'dbfs:/FileStore/data/sales.csv', header=True, inferSchema=True)
df.limit(10).show()

+----------+----------+-------------+------+-------------+-------------+
|      date|product_id|sales_channel|region|quantity_sold|total_revenue|
+----------+----------+-------------+------+-------------+-------------+
|2022-02-10|         1|       Online| North|           10|        100.0|
|2022-02-11|         2|      Offline| South|            5|         75.0|
|2022-02-12|         3|       Online|  East|           15|        200.0|
|2022-02-13|         4|      Offline|  West|           20|        300.0|
|2022-02-14|         1|       Online|  West|           12|        120.0|
|2022-02-15|         2|      Offline| North|            8|        120.0|
|2022-02-16|         3|       Online| South|           18|        250.0|
|2022-02-17|         4|      Offline|  East|           22|        330.0|
|2022-02-18|         1|       Online|  East|            9|         90.0|
|2022-02-19|         2|      Offline|  West|            6|         90.0|
+----------+----------+-------------+------+-------

In [0]:
# 2. Check the schema of the DataFrame and make sure all columns are of the correct data type.
df.printSchema()

root
 |-- date: date (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- sales_channel: string (nullable = true)
 |-- region: string (nullable = true)
 |-- quantity_sold: integer (nullable = true)
 |-- total_revenue: double (nullable = true)



In [0]:
# 3. Create a new column called 'year' that extracts the year from the 'date' column.
from pyspark.sql.functions import year
df = df.withColumn("year", year("date"))
df.limit(5).show()

+----------+----------+-------------+------+-------------+-------------+----+
|      date|product_id|sales_channel|region|quantity_sold|total_revenue|year|
+----------+----------+-------------+------+-------------+-------------+----+
|2022-02-10|         1|       Online| North|           10|        100.0|2022|
|2022-02-11|         2|      Offline| South|            5|         75.0|2022|
|2022-02-12|         3|       Online|  East|           15|        200.0|2022|
|2022-02-13|         4|      Offline|  West|           20|        300.0|2022|
|2022-02-14|         1|       Online|  West|           12|        120.0|2022|
+----------+----------+-------------+------+-------------+-------------+----+



In [0]:
# Filter the DataFrame to only include sales from the year 2019.

# modify the year and date column values because all records contains same year 2022. 
from pyspark.sql.functions import when, expr
df = df.withColumn("new_year", when(df.quantity_sold < 10, 2019).when(df.quantity_sold > 10, 2020).otherwise(2022))
df = df.withColumn("new_date", expr("date_add(to_date(date), (365 * (new_year - year)))"))
df = df.drop("year")
df = df.drop("date")
df = df.withColumnRenamed("new_year", "year")
df = df.withColumnRenamed("new_date", "date")
# df.limit(5).show()
df.filter((df.year  == 2019)).show(truncate=False)  

+----------+-------------+------+-------------+-------------+----+----------+
|product_id|sales_channel|region|quantity_sold|total_revenue|year|date      |
+----------+-------------+------+-------------+-------------+----+----------+
|2         |Offline      |South |5            |75.0         |2019|2019-02-12|
|2         |Offline      |North |8            |120.0        |2019|2019-02-16|
|1         |Online       |East  |9            |90.0         |2019|2019-02-19|
|2         |Offline      |West  |6            |90.0         |2019|2019-02-20|
|2         |Offline      |East  |7            |105.0        |2019|2019-02-24|
|1         |Online       |North |8            |80.0         |2019|2019-02-27|
|2         |Offline      |South |4            |60.0         |2019|2019-02-28|
+----------+-------------+------+-------------+-------------+----+----------+



In [0]:
# Calculate the total revenue for each sales channel and display the results in descending order.
total_sales_df = df.groupBy("sales_channel") \
  .sum("total_revenue") \
  .withColumnRenamed("sum(total_revenue)", "total_sales")
total_sales_df.orderBy(col("total_sales").desc()).show()

+-------------+-----------+
|sales_channel|total_sales|
+-------------+-----------+
|      Offline|     2040.0|
|       Online|     1545.0|
+-------------+-----------+



In [0]:
# Calculate the total revenue for each region and display the results in descending order.
total_revenue_df = df.groupby("region") \
    .sum("total_revenue") \
    .withColumnRenamed("sum(total_revenue)", "total_sales")
total_revenue_df.orderBy(col("total_sales").desc()).show()

+------+-----------+
|region|total_sales|
+------+-----------+
|  West|      955.0|
|  East|      945.0|
| South|      920.0|
| North|      765.0|
+------+-----------+



In [0]:
# Calculate the total revenue for each product and display the results in descending order.
total_product_revenue_df = df.groupby("product_id") \
    .sum("total_revenue") \
    .withColumnRenamed("sum(total_revenue)", "total_sales")
total_product_revenue_df.orderBy(col("total_sales").desc()).show()

+----------+-----------+
|product_id|total_sales|
+----------+-----------+
|         4|     1590.0|
|         3|      995.0|
|         1|      550.0|
|         2|      450.0|
+----------+-----------+



In [0]:
# Calculate the total quantity sold for each product and display the results in descending order.
total_quantity_df = df.groupby("product_id") \
    .sum("quantity_sold") \
    .withColumnRenamed("sum(quantity_sold)", "total_sold")
total_quantity_df.orderBy(col("total_sold").desc()).show()

+----------+----------+
|product_id|total_sold|
+----------+----------+
|         4|       106|
|         3|        73|
|         1|        55|
|         2|        30|
+----------+----------+



In [0]:
# Group the DataFrame by both sales channel and region and calculate the total revenue for each group.
region_sales_df = df.groupBy("sales_channel", "region") \
    .sum("total_revenue") \
    .withColumnRenamed("sum(total_revenue)", "region_sales")

region_sales_df.orderBy(col("region_sales").desc()).show()

+-------------+------+------------+
|sales_channel|region|region_sales|
+-------------+------+------------+
|      Offline|  West|       660.0|
|      Offline| South|       510.0|
|       Online|  East|       510.0|
|      Offline| North|       435.0|
|      Offline|  East|       435.0|
|       Online| South|       410.0|
|       Online| North|       330.0|
|       Online|  West|       295.0|
+-------------+------+------------+

