In [42]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql.window import Window

In [3]:
spark = (
    SparkSession
    .builder
    .appName("Electronic Sales Data")
    .master("local[*]")
    .config("spark.sql.adaptive.enabled", "true")
    .getOrCreate()
)

24/05/04 17:53:17 WARN Utils: Your hostname, codespaces-0d4183 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
24/05/04 17:53:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/04 17:53:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/04 17:53:30 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [4]:
electronic_sales_df = (
    spark.read
    .option("header", "true")
    .csv("../input_data/sales.csv")
)
electronic_sales_df.show(5)

+----------+---+-----------+-----+-----+----------+------------+
|      date| id|category_id|sales|views|price_cost|price_retail|
+----------+---+-----------+-----+-----+----------+------------+
|2022-02-24|  1|          3|    0|    0|         0|           0|
|2022-02-25|  1|          3|    0|    0|         0|           0|
|2022-02-26|  1|          3|    0|    0|         0|           0|
|2022-02-27|  1|          3|    0|    0|         0|           0|
|2022-02-28|  1|          3|    0|    0|         0|           0|
+----------+---+-----------+-----+-----+----------+------------+
only showing top 5 rows



In [6]:
electronic_sales_df.printSchema()

root
 |-- date: string (nullable = true)
 |-- id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- sales: string (nullable = true)
 |-- views: string (nullable = true)
 |-- price_cost: string (nullable = true)
 |-- price_retail: string (nullable = true)



In [8]:
electronic_sales_df.count()

                                                                                

2548824

## Easy Level

### Basic Aggregation:

In [13]:
# Find the total sales for each category.
# Calculate the average views per category.

total_sales_per_category_df = (
    electronic_sales_df
    .groupBy('category_id')
    .agg({"views":"avg",
          "sales":"sum",
         }
        )
)
total_sales_per_category_df.show()

[Stage 14:>                                                         (0 + 2) / 2]

+-----------+------------------+----------+
|category_id|        avg(views)|sum(sales)|
+-----------+------------------+----------+
|          3| 42.06166696912871|   73233.0|
|          1| 42.66659622361947|  127308.0|
|          4| 36.96256830601093|     656.0|
|          2|32.377772201776885|  743416.0|
+-----------+------------------+----------+





In [20]:
# What is the maximum retail price among all products?
max_retail_price_df = (
    electronic_sales_df
    .withColumn('price_retail', f.col('price_retail').cast(t.FloatType()))
    .groupBy('category_id')
    .agg(f.max('price_retail'))
    .orderBy(f.desc('max(price_retail)'))
)
max_retail_price_df.show(5)



+-----------+-----------------+
|category_id|max(price_retail)|
+-----------+-----------------+
|          4|         759077.0|
|          1|         300957.0|
|          2|          62401.0|
|          3|          10878.0|
+-----------+-----------------+



                                                                                

### Data Cleaning:

In [41]:
# Check for null values in each column.

columns = electronic_sales_df.columns

null_columns_comprehensive_list = [(c, electronic_sales_df.filter(f.col(c).isNull()).count()) for c in columns]

for column, count in null_columns_comprehensive_list:
    print(f"Column {column} \t null_count:{count} ")


[Stage 148:>                                                        (0 + 2) / 2]

Column date 	 null_count:0 
Column id 	 null_count:0 
Column category_id 	 null_count:0 
Column sales 	 null_count:0 
Column views 	 null_count:0 
Column price_cost 	 null_count:0 
Column price_retail 	 null_count:0 




In [25]:
# Convert the date column to a proper date format.
# Convert the sales, views, price_cost, and price_retail columns to numeric types.
cleaned_df = (
    electronic_sales_df
    .withColumn('new_date', f.col('date').cast(t.DateType()))
    .withColumn('sales', f.col('sales').cast(t.IntegerType()))
    .withColumn('views', f.col('views').cast(t.IntegerType()))
    .withColumn('price_cost', f.col('price_cost').cast(t.FloatType()))
    .withColumn('price_retail', f.col('price_retail').cast(t.FloatType()))
)
cleaned_df.show(5)

+----------+---+-----------+-----+-----+----------+------------+----------+
|      date| id|category_id|sales|views|price_cost|price_retail|  new_date|
+----------+---+-----------+-----+-----+----------+------------+----------+
|2022-02-24|  1|          3|    0|    0|       0.0|         0.0|2022-02-24|
|2022-02-25|  1|          3|    0|    0|       0.0|         0.0|2022-02-25|
|2022-02-26|  1|          3|    0|    0|       0.0|         0.0|2022-02-26|
|2022-02-27|  1|          3|    0|    0|       0.0|         0.0|2022-02-27|
|2022-02-28|  1|          3|    0|    0|       0.0|         0.0|2022-02-28|
+----------+---+-----------+-----+-----+----------+------------+----------+
only showing top 5 rows



In [26]:
cleaned_df.printSchema()

root
 |-- date: string (nullable = true)
 |-- id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- sales: integer (nullable = true)
 |-- views: integer (nullable = true)
 |-- price_cost: float (nullable = true)
 |-- price_retail: float (nullable = true)
 |-- new_date: date (nullable = true)



### Filtering:

In [33]:
# Filter the dataset to include only rows where sales are greater than 0.
# Remove rows where views are null.
sales_greater_than_0_df = (
    cleaned_df
    .filter((f.col('sales')>0) 
            & (f.col('views').isNotNull())
           )
)
sales_greater_than_0_df.count()



194358

                                                                                

## Medium Level

### Window Functions:

In [43]:
# Calculate the cumulative sum of sales for each category ordered by date.
window_spec = Window.partitionBy('category_id').orderBy('new_date')

cumulative_sales_df = (
    cleaned_df
    .withColumn('cumulative_sum', f.sum('sales').over(window_spec))
)
cumulative_sales_df.show()

[Stage 153:>                                                        (0 + 1) / 1]

+----------+---+-----------+-----+-----+----------+------------+----------+--------------+
|      date| id|category_id|sales|views|price_cost|price_retail|  new_date|cumulative_sum|
+----------+---+-----------+-----+-----+----------+------------+----------+--------------+
|2022-02-24|  1|          3|    0|    0|       0.0|         0.0|2022-02-24|            33|
|2022-02-24|  5|          3|    0|    0|       0.0|         0.0|2022-02-24|            33|
|2022-02-24| 16|          3|    0|    0|       0.0|         0.0|2022-02-24|            33|
|2022-02-24| 34|          3|    4|  307|     859.0|      1647.0|2022-02-24|            33|
|2022-02-24| 37|          3|    0|    0|       0.0|         0.0|2022-02-24|            33|
|2022-02-24| 49|          3|    0|    0|       0.0|         0.0|2022-02-24|            33|
|2022-02-24| 58|          3|    0|    0|       0.0|         0.0|2022-02-24|            33|
|2022-02-24| 61|          3|    0|    0|       0.0|         0.0|2022-02-24|            33|

                                                                                

In [45]:
# Rank products based on their sales within each category.
window_spec = Window.partitionBy('category_id').orderBy(f.desc('sales'), f.desc('new_date'))
product_rank_df = (
    cleaned_df
    .withColumn('rank', f.rank().over(window_spec))
)
product_rank_df.show()

[Stage 159:>                                                        (0 + 1) / 1]

+----------+----+-----------+-----+-----+----------+------------+----------+----+
|      date|  id|category_id|sales|views|price_cost|price_retail|  new_date|rank|
+----------+----+-----------+-----+-----+----------+------------+----------+----+
|2023-10-03|2267|          3|  154| 9338|     958.0|      1245.0|2023-10-03|   1|
|2023-04-14|1661|          3|  125| 1631|    1273.0|      1654.0|2023-04-14|   2|
|2023-09-13|2267|          3|  119| 1800|     958.0|      1245.0|2023-09-13|   3|
|2023-09-05|2267|          3|  115| 1392|     958.0|      1245.0|2023-09-05|   4|
|2023-04-02|1661|          3|  114| 2477|    1273.0|      1654.0|2023-04-02|   5|
|2023-05-24|1661|          3|  112| 1006|    1273.0|      1654.0|2023-05-24|   6|
|2023-04-13|2267|          3|  105| 1294|     958.0|      1245.0|2023-04-13|   7|
|2023-10-17|2267|          3|  104|  930|     958.0|      1245.0|2023-10-17|   8|
|2023-03-30|1661|          3|  103|23091|    1273.0|      1654.0|2023-03-30|   9|
|2023-02-13|1661

                                                                                

In [47]:
# Find the difference between the current row's sales and the previous row's sales for each product, ordered by date.
window_spec = Window.partitionBy('id').orderBy('new_date')
sales_diff_df = (
    cleaned_df
    .withColumn('previous_sales', f.lag('sales').over(window_spec))
    .withColumn('sales_difference', f.col('sales') - f.col('previous_sales'))
)

sales_diff_df.show()

[Stage 162:>                                                        (0 + 1) / 1]

+----------+---+-----------+-----+-----+----------+------------+----------+--------------+----------------+
|      date| id|category_id|sales|views|price_cost|price_retail|  new_date|previous_sales|sales_difference|
+----------+---+-----------+-----+-----+----------+------------+----------+--------------+----------------+
|2022-02-24|100|          1|    0|    6|   88080.0|    151300.0|2022-02-24|          NULL|            NULL|
|2022-02-25|100|          1|    0|    6|   88080.0|    146200.0|2022-02-25|             0|               0|
|2022-02-26|100|          1|    0|   11|   88080.0|    146200.0|2022-02-26|             0|               0|
|2022-02-27|100|          1|    0|    9|   88080.0|    146200.0|2022-02-27|             0|               0|
|2022-02-28|100|          1|    0|    5|   88080.0|    146200.0|2022-02-28|             0|               0|
|2022-03-01|100|          1|    0|    5|   88080.0|    146200.0|2022-03-01|             0|               0|
|2022-03-02|100|          1|

                                                                                

### UDF:

In [49]:
# Implement a UDF to categorize products based on their profit margin into low, medium, and high-profit categories.
profit_margin_df = (
    cleaned_df
    .withColumn('profit_margin', f.col('price_retail') - f.col('price_cost'))
    .groupBy('id')
    .agg(f.sum('profit_margin').alias('net_profit_margin'))
)
profit_margin_df.show()



+----+-----------------+
|  id|net_profit_margin|
+----+-----------------+
| 296|        1966136.0|
| 467|        8695091.0|
| 675|        4787571.0|
| 691|         369209.0|
| 829|          32685.0|
|1090|        1828133.0|
|1159|         334063.0|
|1436|          55648.0|
|1512|        3667760.0|
|1572|          43472.0|
| 125|          19497.0|
| 451|        7225119.0|
| 800|        1298541.0|
| 853|          87542.0|
| 944|      1.2749405E7|
|1372|        6637614.0|
|1394|          35305.0|
|1669|        2497860.0|
| 666|         844213.0|
| 870|           8244.0|
+----+-----------------+
only showing top 20 rows



                                                                                

In [51]:
percentiles = profit_margin_df.approxQuantile('net_profit_margin', [.33, .66], .01)
print(percentiles)

[Stage 166:>                                                        (0 + 2) / 2]

[386936.0, 3647584.0]


                                                                                

In [60]:
def profit_category(net_profit_margin, percentiles):
    if net_profit_margin < percentiles[0]:
        return 'low'
    elif net_profit_margin < percentiles[1]:
        return 'mid'
    else:
        return 'high'

profit_category_udf = f.udf(profit_category, t.StringType())

In [61]:
profit_margin_category_df = (
    profit_margin_df
    .withColumn('profit_category', profit_category_udf('net_profit_margin', f.array([f.lit(p) for p in percentiles])))
)
profit_margin_category_df.show()



+----+-----------------+---------------+
|  id|net_profit_margin|profit_category|
+----+-----------------+---------------+
| 296|        1966136.0|            mid|
| 467|        8695091.0|           high|
| 675|        4787571.0|           high|
| 691|         369209.0|            low|
| 829|          32685.0|            low|
|1090|        1828133.0|            mid|
|1159|         334063.0|            low|
|1436|          55648.0|            low|
|1512|        3667760.0|           high|
|1572|          43472.0|            low|
| 125|          19497.0|            low|
| 451|        7225119.0|           high|
| 800|        1298541.0|            mid|
| 853|          87542.0|            low|
| 944|      1.2749405E7|           high|
|1372|        6637614.0|           high|
|1394|          35305.0|            low|
|1669|        2497860.0|            mid|
| 666|         844213.0|            mid|
| 870|           8244.0|            low|
+----+-----------------+---------------+
only showing top

                                                                                

## Hard Level

### Complex Window Functions:

In [63]:
# For each product, calculate the average sales in the last 3 months.
window_spec = Window.partitionBy('id').orderBy('new_date').rowsBetween(-90, 0)

avg_sales_in_last_3_month_df = (
    cleaned_df
    .withColumn("avg_sales_in_last_3_month", f.avg("sales").over(window_spec))
)
avg_sales_in_last_3_month_df.show()

[Stage 183:>                                                        (0 + 1) / 1]

+----------+---+-----------+-----+-----+----------+------------+----------+------------------------+
|      date| id|category_id|sales|views|price_cost|price_retail|  new_date|vg_sales_in_last_3_month|
+----------+---+-----------+-----+-----+----------+------------+----------+------------------------+
|2022-02-24|100|          1|    0|    6|   88080.0|    151300.0|2022-02-24|                     0.0|
|2022-02-25|100|          1|    0|    6|   88080.0|    146200.0|2022-02-25|                     0.0|
|2022-02-26|100|          1|    0|   11|   88080.0|    146200.0|2022-02-26|                     0.0|
|2022-02-27|100|          1|    0|    9|   88080.0|    146200.0|2022-02-27|                     0.0|
|2022-02-28|100|          1|    0|    5|   88080.0|    146200.0|2022-02-28|                     0.0|
|2022-03-01|100|          1|    0|    5|   88080.0|    146200.0|2022-03-01|                     0.0|
|2022-03-02|100|          1|    0|    7|   88080.0|    146200.0|2022-03-02|                

                                                                                

In [66]:
# Implement a sliding window of 30 days to calculate the moving average of views for each product.
window_spec = Window.partitionBy('id').orderBy('new_date').rowsBetween(-30, 0)

moving_avg_views_30_days_df = (
    cleaned_df
    .withColumn('moving_avg_of views', f.avg('views').over(window_spec))
)
moving_avg_views_30_days_df.show()

[Stage 186:>                                                        (0 + 1) / 1]

+----------+---+-----------+-----+-----+----------+------------+----------+-------------------+
|      date| id|category_id|sales|views|price_cost|price_retail|  new_date|moving_avg_of views|
+----------+---+-----------+-----+-----+----------+------------+----------+-------------------+
|2022-02-24|100|          1|    0|    6|   88080.0|    151300.0|2022-02-24|                6.0|
|2022-02-25|100|          1|    0|    6|   88080.0|    146200.0|2022-02-25|                6.0|
|2022-02-26|100|          1|    0|   11|   88080.0|    146200.0|2022-02-26|  7.666666666666667|
|2022-02-27|100|          1|    0|    9|   88080.0|    146200.0|2022-02-27|                8.0|
|2022-02-28|100|          1|    0|    5|   88080.0|    146200.0|2022-02-28|                7.4|
|2022-03-01|100|          1|    0|    5|   88080.0|    146200.0|2022-03-01|                7.0|
|2022-03-02|100|          1|    0|    7|   88080.0|    146200.0|2022-03-02|                7.0|
|2022-03-03|100|          1|    0|    6|

                                                                                