![Shopping trolley in front of a laptop](./iStock-1249219777.jpg)

It's simple to buy any product with a click and have it delivered to your door. Online shopping has been rapidly evolving over the last few years, making our lives easier. But behind the scenes, e-commerce companies face a complex challenge that needs to be addressed. 

Uncertainty plays a big role in how the supply chains plan and organize their operations to ensure that the products are delivered on time. These uncertainties can lead to challenges such as stockouts, delayed deliveries, and increased operational costs.

You work for the Sales & Operations Planning (S&OP) team at a multinational e-commerce company. They need your help to assist in planning for the upcoming end-of-the-year sales. They want to use your insights to plan for promotional opportunities and manage their inventory. This effort is to ensure they have the right products in stock when needed and ensure their customers are satisfied with the prompt delivery to their doorstep.


## The Data

You are provided with a sales dataset to use. A summary and preview are provided below.

# Online Retail.csv

| Column     | Description              |
|------------|--------------------------|
| `'InvoiceNo'` | A 6-digit number uniquely assigned to each transaction |
| `'StockCode'` | A 5-digit number uniquely assigned to each distinct product |
| `'Description'` | The product name |
| `'Quantity'` | The quantity of each product (item) per transaction |
| `'UnitPrice'` | Product price per unit |
| `'CustomerID'` | A 5-digit number uniquely assigned to each customer |
| `'Country'` | The name of the country where each customer resides |
| `'InvoiceDate'` | The day and time when each transaction was generated `"MM/DD/YYYY"` |
| `'Year'` | The year when each transaction was generated |
| `'Month'` | The month when each transaction was generated |
| `'Week'` | The week when each transaction was generated (`1`-`52`) |
| `'Day'` | The day of the month when each transaction was generated (`1`-`31`) |
| `'DayOfWeek'` | The day of the weeke when each transaction was generated <br>(`0` = Monday, `6` = Sunday) |

In [43]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, dayofmonth, month, year,  to_date, to_timestamp, weekofyear, dayofweek
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize Spark session
my_spark = SparkSession.builder.appName("SalesForecast").getOrCreate()

# Importing sales data
sales_data = my_spark.read.csv(
    "Online Retail.csv", header=True, inferSchema=True, sep=",")

# Convert InvoiceDate to datetime 
sales_data = sales_data.withColumn("InvoiceDate", to_date(
    to_timestamp(col("InvoiceDate"), "d/M/yyyy H:mm")))

In [44]:
# Insert the code necessary to solve the assigned problems. Use as many code cells as you need.
sales_data.show(5)

+---------+---------+--------------------+--------+---------+----------+--------------+-----------+----+-----+----+---+---------+
|InvoiceNo|StockCode|         Description|Quantity|UnitPrice|CustomerID|       Country|InvoiceDate|Year|Month|Week|Day|DayOfWeek|
+---------+---------+--------------------+--------+---------+----------+--------------+-----------+----+-----+----+---+---------+
|   536365|   85123A|WHITE HANGING HEA...|       6|     2.55|     17850|United Kingdom| 2010-01-12|2010|    1|   2| 12|        1|
|   536365|    71053| WHITE METAL LANTERN|       6|     3.39|     17850|United Kingdom| 2010-01-12|2010|    1|   2| 12|        1|
|   536365|   84406B|CREAM CUPID HEART...|       8|     2.75|     17850|United Kingdom| 2010-01-12|2010|    1|   2| 12|        1|
|   536365|   84029G|KNITTED UNION FLA...|       6|     3.39|     17850|United Kingdom| 2010-01-12|2010|    1|   2| 12|        1|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|     3.39|     17850|United Kingdom| 20

In [45]:
sales_data.columns

['InvoiceNo',
 'StockCode',
 'Description',
 'Quantity',
 'UnitPrice',
 'CustomerID',
 'Country',
 'InvoiceDate',
 'Year',
 'Month',
 'Week',
 'Day',
 'DayOfWeek']

In [46]:
sales_data.printSchema()

root
 |-- InvoiceNo: integer (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- InvoiceDate: date (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Week: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)



In [47]:
Dict_Null = {col:sales_data.filter(sales_data[col].isNull()).count() for col in sales_data.columns}
Dict_Null

{'InvoiceNo': 0,
 'StockCode': 0,
 'Description': 0,
 'Quantity': 0,
 'UnitPrice': 0,
 'CustomerID': 0,
 'Country': 0,
 'InvoiceDate': 0,
 'Year': 0,
 'Month': 0,
 'Week': 0,
 'Day': 0,
 'DayOfWeek': 0}

# Aggregate the data

In [48]:
from pyspark.sql import functions as F

sales_data_grp = sales_data.groupBy(
    "Country", "StockCode", "InvoiceDate", "Year", 
    "Month", "Day", "Week", "DayOfWeek"
).agg(
    F.sum("Quantity"), 
    F.avg("UnitPrice")
)

In [49]:
sales_data_grp.show(5)

[Stage 236:>                                                        (0 + 2) / 2]

+--------------+---------+-----------+----+-----+---+----+---------+-------------+--------------+
|       Country|StockCode|InvoiceDate|Year|Month|Day|Week|DayOfWeek|sum(Quantity)|avg(UnitPrice)|
+--------------+---------+-----------+----+-----+---+----+---------+-------------+--------------+
|United Kingdom|    22912| 2010-01-12|2010|    1| 12|   2|        1|            3|          4.95|
|        France|    22659| 2010-01-12|2010|    1| 12|   2|        1|           24|          1.95|
|United Kingdom|    21544| 2010-01-12|2010|    1| 12|   2|        1|           12|          0.85|
|United Kingdom|    21098| 2010-01-12|2010|    1| 12|   2|        1|           16|          1.25|
|        Norway|    85150| 2010-01-12|2010|    1| 12|   2|        1|           12|          2.55|
+--------------+---------+-----------+----+-----+---+----+---------+-------------+--------------+
only showing top 5 rows



                                                                                

In [50]:
renamedSales_data = sales_data_grp.withColumnRenamed("sum(Quantity)", "Quantity") \
              .withColumnRenamed("avg(UnitPrice)", "avg_UnitPrice")

renamedSales_data.show(5)

+--------------+---------+-----------+----+-----+---+----+---------+--------+-------------+
|       Country|StockCode|InvoiceDate|Year|Month|Day|Week|DayOfWeek|Quantity|avg_UnitPrice|
+--------------+---------+-----------+----+-----+---+----+---------+--------+-------------+
|United Kingdom|    22912| 2010-01-12|2010|    1| 12|   2|        1|       3|         4.95|
|        France|    22659| 2010-01-12|2010|    1| 12|   2|        1|      24|         1.95|
|United Kingdom|    21544| 2010-01-12|2010|    1| 12|   2|        1|      12|         0.85|
|United Kingdom|    21098| 2010-01-12|2010|    1| 12|   2|        1|      16|         1.25|
|        Norway|    85150| 2010-01-12|2010|    1| 12|   2|        1|      12|         2.55|
+--------------+---------+-----------+----+-----+---+----+---------+--------+-------------+
only showing top 5 rows



                                                                                

# Data Split

In [51]:
training_data = renamedSales_data.filter(renamedSales_data["InvoiceDate"] <= "2011-09-25")
test_data = renamedSales_data.filter(renamedSales_data["InvoiceDate"] > "2011-09-25")

In [52]:
pd_daily_train_data = training_data.toPandas()

                                                                                

In [53]:
pd_daily_train_data.head()

Unnamed: 0,Country,StockCode,InvoiceDate,Year,Month,Day,Week,DayOfWeek,Quantity,avg_UnitPrice
0,United Kingdom,22912,2010-01-12,2010,1,12,2,1,3,4.95
1,France,22659,2010-01-12,2010,1,12,2,1,24,1.95
2,United Kingdom,21544,2010-01-12,2010,1,12,2,1,12,0.85
3,United Kingdom,21098,2010-01-12,2010,1,12,2,1,16,1.25
4,Norway,85150,2010-01-12,2010,1,12,2,1,12,2.55


# Model

In [54]:
country_indexer = StringIndexer(
        inputCol="Country",
        outputCol="CountryIndex").setHandleInvalid("keep")

StockCode_indexer = StringIndexer(
        inputCol="StockCode",
        outputCol="StockCodeIndex").setHandleInvalid("keep")

In [55]:
assemblingCol = VectorAssembler(
    inputCols=[country_indexer.getOutputCol(), StockCode_indexer.getOutputCol(), "Year", "Month", "Day", 
               "Week", "DayOfWeek", "avg_UnitPrice"], outputCol="features"
)

In [56]:
rf = RandomForestRegressor(featuresCol= 'features', labelCol = 'Quantity' , maxBins = 4000)

In [57]:
pipeline = Pipeline(stages = [country_indexer, StockCode_indexer, assemblingCol, rf])
forest = pipeline.fit(training_data)
prediction = forest.transform(test_data)

                                                                                

26/01/07 18:15:42 WARN DAGScheduler: Broadcasting large task binary with size 1037.1 KiB
26/01/07 18:15:44 WARN DAGScheduler: Broadcasting large task binary with size 1813.8 KiB


                                                                                

26/01/07 18:15:45 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB


                                                                                

In [58]:
prediction.show()

                                                                                

+--------------+---------+-----------+----+-----+---+----+---------+--------+------------------+------------+--------------+--------------------+------------------+
|       Country|StockCode|InvoiceDate|Year|Month|Day|Week|DayOfWeek|Quantity|     avg_UnitPrice|CountryIndex|StockCodeIndex|            features|        prediction|
+--------------+---------+-----------+----+-----+---+----+---------+--------+------------------+------------+--------------+--------------------+------------------+
|United Kingdom|    22414| 2011-10-01|2011|   10|  1|  39|        5|       1|              7.95|         0.0|        1326.0|[0.0,1326.0,2011....|5.8449879904374455|
|United Kingdom|    22773| 2011-10-01|2011|   10|  1|  39|        5|      12|              1.25|         0.0|         592.0|[0.0,592.0,2011.0...|16.056126643665372|
|United Kingdom|    22180| 2011-10-01|2011|   10|  1|  39|        5|       1|              9.95|         0.0|         623.0|[0.0,623.0,2011.0...|3.5408185865045416|
|United Ki

In [59]:
test_predictions = prediction.withColumn("prediction", col("prediction").cast("double"))
test_predictions.show(5)

+--------------+---------+-----------+----+-----+---+----+---------+--------+-------------+------------+--------------+--------------------+------------------+
|       Country|StockCode|InvoiceDate|Year|Month|Day|Week|DayOfWeek|Quantity|avg_UnitPrice|CountryIndex|StockCodeIndex|            features|        prediction|
+--------------+---------+-----------+----+-----+---+----+---------+--------+-------------+------------+--------------+--------------------+------------------+
|United Kingdom|    22414| 2011-10-01|2011|   10|  1|  39|        5|       1|         7.95|         0.0|        1326.0|[0.0,1326.0,2011....|5.8449879904374455|
|United Kingdom|    22773| 2011-10-01|2011|   10|  1|  39|        5|      12|         1.25|         0.0|         592.0|[0.0,592.0,2011.0...|16.056126643665372|
|United Kingdom|    22180| 2011-10-01|2011|   10|  1|  39|        5|       1|         9.95|         0.0|         623.0|[0.0,623.0,2011.0...|3.5408185865045416|
|United Kingdom|    20686| 2011-10-01|20

In [60]:
evaluator = RegressionEvaluator(labelCol="Quantity", 
                                predictionCol="prediction", 
                                metricName="mae")

# Evaluate the model on the test data
mae = evaluator.evaluate(test_predictions)

                                                                                

In [61]:
print("Mean Absolute Error (MAE):", mae)

Mean Absolute Error (MAE): 9.107169968436338


In [62]:
from pyspark.sql.functions import year, weekofyear, sum as spark_sum, col

filtered_df = test_predictions.filter(
    (year(col("InvoiceDate")) == 2011) & (weekofyear(col("InvoiceDate")) == 39)
)

# Ensure 'prediction' is cast to integer before aggregation
filtered_df = filtered_df.withColumn("prediction_int", col("prediction").cast("integer"))

# Calculate total units sold
quantity_sold_w39 = filtered_df.agg(spark_sum("prediction_int")).collect()[0][0]

print("Expected units sold in week 39 of 2011:", quantity_sold_w39)

Expected units sold in week 39 of 2011: 85954
