![Shopping trolley in front of a laptop](./iStock-1249219777.jpg)

It's simple to buy any product with a click and have it delivered to your door. Online shopping has been rapidly evolving over the last few years, making our lives easier. But behind the scenes, e-commerce companies face a complex challenge that needs to be addressed. 

Uncertainty plays a big role in how the supply chains plan and organize their operations to ensure that the products are delivered on time. These uncertainties can lead to challenges such as stockouts, delayed deliveries, and increased operational costs.

You work for the Sales & Operations Planning (S&OP) team at a multinational e-commerce company. They need your help to assist in planning for the upcoming end-of-the-year sales. They want to use your insights to plan for promotional opportunities and manage their inventory. This effort is to ensure they have the right products in stock when needed and ensure their customers are satisfied with the prompt delivery to their doorstep.


## The Data

You are provided with a sales dataset to use. A summary and preview are provided below.

# Online Retail.csv

| Column     | Description              |
|------------|--------------------------|
| `'InvoiceNo'` | A 6-digit number uniquely assigned to each transaction |
| `'StockCode'` | A 5-digit number uniquely assigned to each distinct product |
| `'Description'` | The product name |
| `'Quantity'` | The quantity of each product (item) per transaction |
| `'UnitPrice'` | Product price per unit |
| `'CustomerID'` | A 5-digit number uniquely assigned to each customer |
| `'Country'` | The name of the country where each customer resides |
| `'InvoiceDate'` | The day and time when each transaction was generated `"MM/DD/YYYY"` |
| `'Year'` | The year when each transaction was generated |
| `'Month'` | The month when each transaction was generated |
| `'Week'` | The week when each transaction was generated (`1`-`52`) |
| `'Day'` | The day of the month when each transaction was generated (`1`-`31`) |
| `'DayOfWeek'` | The day of the weeke when each transaction was generated <br>(`0` = Monday, `6` = Sunday) |

In [1]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, dayofmonth, month, year,  to_date, to_timestamp, weekofyear, dayofweek
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize Spark session
my_spark = SparkSession.builder.appName("SalesForecast").getOrCreate()

# Importing sales data
sales_data = my_spark.read.csv(
    "Online Retail.csv", header=True, inferSchema=True, sep=",")

# Convert InvoiceDate to datetime 
sales_data = sales_data.withColumn("InvoiceDate", to_date(
    to_timestamp(col("InvoiceDate"), "d/M/yyyy H:mm")))

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/07/17 04:28:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


                                                                                

In [10]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, dayofmonth, month, year, to_date, to_timestamp, weekofyear, dayofweek
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize Spark session
spark = SparkSession.builder.appName("SalesForecast").getOrCreate()

# Importing sales data
sales_data = spark.read.csv("Online Retail.csv", header=True, inferSchema=True, sep=",")

# Convert InvoiceDate to datetime
sales_data = sales_data.withColumn("InvoiceDate", to_date(to_timestamp(col("InvoiceDate"), "MM/dd/yyyy H:mm")))

# Extract year, month, week, day, and day of week from InvoiceDate
sales_data = sales_data.withColumn("Year", year(col("InvoiceDate")))
sales_data = sales_data.withColumn("Month", month(col("InvoiceDate")))
sales_data = sales_data.withColumn("Week", weekofyear(col("InvoiceDate")))
sales_data = sales_data.withColumn("Day", dayofmonth(col("InvoiceDate")))
sales_data = sales_data.withColumn("DayOfWeek", dayofweek(col("InvoiceDate")))

# Remove rows with missing CustomerID (optional, depending on the use case)
sales_data = sales_data.na.drop(subset=["CustomerID"])

# Drop existing indexed columns if they exist
columns_to_drop = ["CountryIndex", "DescriptionIndex"]
for column in columns_to_drop:
    if column in sales_data.columns:
        sales_data = sales_data.drop(column)

# Convert categorical variables to numerical
indexer = StringIndexer(inputCols=["Country", "Description"], outputCols=["CountryIndex", "DescriptionIndex"])
sales_data = indexer.fit(sales_data).transform(sales_data)

# Select features and label
assembler = VectorAssembler(
    inputCols=["CountryIndex", "DescriptionIndex", "Year", "Month", "Week", "Day", "DayOfWeek", "UnitPrice"],
    outputCol="features"
)
sales_data = assembler.transform(sales_data)

# Use total quantity sold as the label for demand forecasting
sales_data = sales_data.withColumnRenamed("Quantity", "label")

# Split data into training and test sets
train_data, test_data = sales_data.randomSplit([0.8, 0.2], seed=42)

# Define the RandomForestRegressor model with increased maxBins
rf = RandomForestRegressor(featuresCol="features", labelCol="label", maxBins=5000)

# Create a pipeline
pipeline = Pipeline(stages=[rf])

# Train the model
model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

# Save the model (optional)
model.save("demand_forecasting_model")

                                                                                

24/07/17 04:39:14 WARN DAGScheduler: Broadcasting large task binary with size 1692.5 KiB


                                                                                

24/07/17 04:39:16 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB


                                                                                

Root Mean Squared Error (RMSE): 7.698109107655913


                                                                                

24/07/17 04:39:27 WARN TaskSetManager: Stage 32 contains a task of very large size (1971 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

In [13]:
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from pyspark.sql.functions import col, to_date, to_timestamp, year, month, weekofyear, dayofmonth, dayofweek
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Initialize Spark session
spark = SparkSession.builder.appName("SalesForecast").getOrCreate()

# Load the saved model
model = PipelineModel.load("demand_forecasting_model")

# Prepare new data for prediction
# Here we assume new_data is a DataFrame with the same structure as the training data
# For demonstration, let's create a new DataFrame with the same structure
new_data = spark.createDataFrame([
    (0, "United Kingdom", "WHITE HANGING HEART T-LIGHT HOLDER", "2023-07-15", 2.55, 1)
], ["CustomerID", "Country", "Description", "InvoiceDate", "UnitPrice", "Quantity"])

# Convert InvoiceDate to datetime
new_data = new_data.withColumn("InvoiceDate", to_date(to_timestamp(col("InvoiceDate"), "yyyy-MM-dd")))

# Extract year, month, week, day, and day of week from InvoiceDate
new_data = new_data.withColumn("Year", year(col("InvoiceDate")))
new_data = new_data.withColumn("Month", month(col("InvoiceDate")))
new_data = new_data.withColumn("Week", weekofyear(col("InvoiceDate")))
new_data = new_data.withColumn("Day", dayofmonth(col("InvoiceDate")))
new_data = new_data.withColumn("DayOfWeek", dayofweek(col("InvoiceDate")))

# Show the new data after transformations
new_data.show()

# Check for null values in the new data
new_data.select([col(c).isNull().alias(c) for c in new_data.columns]).show()

# Convert categorical variables to numerical using StringIndexer
indexer = StringIndexer(inputCols=["Country", "Description"], outputCols=["CountryIndex", "DescriptionIndex"])
indexed_data = indexer.fit(new_data).transform(new_data)

# Show the indexed data
indexed_data.show()

# Assemble features with handleInvalid="skip"
assembler = VectorAssembler(
    inputCols=["CountryIndex", "DescriptionIndex", "Year", "Month", "Week", "Day", "DayOfWeek", "UnitPrice"],
    outputCol="features",
    handleInvalid="skip"
)
assembled_data = assembler.transform(indexed_data)

# Show the assembled data
assembled_data.select("features", "CountryIndex", "DescriptionIndex", "Year", "Month", "Week", "Day", "DayOfWeek", "UnitPrice").show()

# Make predictions using the loaded model
predictions = model.transform(assembled_data)

# Show predictions
predictions.select("features", "prediction").show()

                                                                                

+----------+--------------+--------------------+-----------+---------+--------+----+-----+----+---+---------+
|CustomerID|       Country|         Description|InvoiceDate|UnitPrice|Quantity|Year|Month|Week|Day|DayOfWeek|
+----------+--------------+--------------------+-----------+---------+--------+----+-----+----+---+---------+
|         0|United Kingdom|WHITE HANGING HEA...| 2023-07-15|     2.55|       1|2023|    7|  28| 15|        7|
+----------+--------------+--------------------+-----------+---------+--------+----+-----+----+---+---------+

+----------+-------+-----------+-----------+---------+--------+-----+-----+-----+-----+---------+
|CustomerID|Country|Description|InvoiceDate|UnitPrice|Quantity| Year|Month| Week|  Day|DayOfWeek|
+----------+-------+-----------+-----------+---------+--------+-----+-----+-----+-----+---------+
|     false|  false|      false|      false|    false|   false|false|false|false|false|    false|
+----------+-------+-----------+-----------+---------+---