![Shopping trolley in front of a laptop](./iStock-1249219777.jpg)

It's simple to buy any product with a click and have it delivered to your door. Online shopping has been rapidly evolving over the last few years, making our lives easier. But behind the scenes, e-commerce companies face a complex challenge that needs to be addressed. 

Uncertainty plays a big role in how the supply chains plan and organize their operations to ensure that the products are delivered on time. These uncertainties can lead to challenges such as stockouts, delayed deliveries, and increased operational costs.

**Problem Statement:**

You work for the Sales & Operations Planning (S&OP) team at a multinational e-commerce company. They need your help to assist in planning for the upcoming end-of-the-year sales. They want to use your insights to plan for promotional opportunities and manage their inventory. This effort is to ensure they have the right products in stock when needed and ensure their customers are satisfied with the prompt delivery to their doorstep.


## The Data

You are provided with a sales dataset to use. A summary and preview are provided below.

# Online Retail.csv

| Column     | Description              |
|------------|--------------------------|
| `'InvoiceNo'` | A 6-digit number uniquely assigned to each transaction |
| `'StockCode'` | A 5-digit number uniquely assigned to each distinct product |
| `'Description'` | The product name |
| `'Quantity'` | The quantity of each product (item) per transaction |
| `'UnitPrice'` | Product price per unit |
| `'CustomerID'` | A 5-digit number uniquely assigned to each customer |
| `'Country'` | The name of the country where each customer resides |
| `'InvoiceDate'` | The day and time when each transaction was generated `"MM/DD/YYYY"` |
| `'Year'` | The year when each transaction was generated |
| `'Month'` | The month when each transaction was generated |
| `'Week'` | The week when each transaction was generated (`1`-`52`) |
| `'Day'` | The day of the month when each transaction was generated (`1`-`31`) |
| `'DayOfWeek'` | The day of the weeke when each transaction was generated <br>(`0` = Monday, `6` = Sunday) |

## Install pyspark

PySpark is a Python library that provides an interface for Apache Spark, a powerful distributed computing framework. Installing PySpark via pip ensures that all necessary dependencies are fetched and installed on your system.

Without installing PySpark, we won't be able to use its functionalities in our Python environment. Installing it using pip makes it accessible for importing into the notebook.

## Import Important Libraries

In [17]:
# Import required libraries 

#Importing SparkSession for Spark functionality:
#This line imports the SparkSession class from the pyspark.sql module. 
#SparkSession is the entry point to Spark functionality in PySpark applications
from pyspark.sql import SparkSession

# Importing VectorAssembler for assembling feature vectors
#VectorAssembler is used to assemble vectors of features, which are required for machine learning algorithms in PySpark.
from pyspark.ml.feature import VectorAssembler

# Importing Pipeline for building data processing pipelines
#Pipelines are a sequence of stages used to process and transform data in PySpark, 
#commonly used for building machine learning workflows.
from pyspark.ml import Pipeline

# Importing  DecisionTreeRegressor for regression modeling
#DecisionTreeRegressor is a machine learning algorithm used for regression tasks, such as demand forecasting
from pyspark.ml.regression import DecisionTreeRegressor

# Importing various functions for data manipulation
#These functions are commonly used for data manipulation and feature engineering tasks in PySpark.
from pyspark.sql.functions import col, dayofmonth, month, year,  to_date, to_timestamp, weekofyear, dayofweek

# Importing StringIndexer for converting categorical variables
#StringIndexer is used to convert categorical variables into numerical format.
from pyspark.ml.feature import StringIndexer

# Importing RegressionEvaluator for evaluating regression model performance
#RegressionEvaluator is used to evaluate the performance of regression models, 
#providing metrics such as RMSE (Root Mean Squared Error) or R^2 (Coefficient of Determination).
from pyspark.ml.evaluation import RegressionEvaluator

## Initialize Spark session

- **Initialize Spark session**: This line initializes a SparkSession object named my_spark. SparkSession is the entry point to Spark functionality in PySpark applications. It provides a way to interact with Spark and allows you to create DataFrames, execute SQL queries, and work with machine learning algorithms.

- **SparkSession.builder**: This starts the process of creating a SparkSession. The builder() method returns a Builder object which is used to configure various SparkSession options.

- **appName("SalesForecast")**: This sets the name of the Spark application to "SalesForecast". The name is used to identify your application on the Spark cluster's UI (User Interface) and in logs.

- **getOrCreate()**: This method tries to reuse an existing SparkSession if it exists or creates a new one if none exists. This ensures that you have a SparkSession available for your application to use, promoting efficient resource utilization.

In [2]:
# Initialize Spark session
my_spark = SparkSession.builder.appName("SalesForecast").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/25 15:26:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Import the Data

- **my_spark.read.csv()**: This is a method provided by the SparkSession object (my_spark) to read data from various sources, in this case, a CSV file.

- **"Online Retail.csv"**: This is the path to the CSV file containing the sales data. Spark will look for this file in the current working directory unless an absolute path is provided.

- **header=True**: This parameter indicates that the first row of the CSV file contains the column names. Setting header=True ensures that Spark uses the first row as the header, assigning column names accordingly.

- **inferSchema=True**: This parameter instructs Spark to infer the data types of each column in the CSV file. Spark will automatically analyze a sample of the data to determine the appropriate data types (e.g., integer, string, float) for each column.

- **sep=","**: This parameter specifies the delimiter used in the CSV file. In this case, a comma (",") is used as the delimiter. Spark will split the data in each row based on this delimiter to create columns in the DataFrame.

In [3]:
# Importing sales data
#This line imports sales data from a CSV file named "Online Retail.csv" into a DataFrame named sales_data.

sales_data = my_spark.read.csv(
    "Online Retail.csv", header=True, inferSchema=True, sep=",")

# Show the first row 
sales_data.head()

                                                                                

Row(InvoiceNo=536365, StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, UnitPrice=2.55, CustomerID=17850, Country='United Kingdom', InvoiceDate=datetime.datetime(2010, 1, 12, 8, 26), Year=2010, Month=1, Week=2, Day=12, DayOfWeek=1)

### - Convert the InvoiceDate to datetime

**sales_data.withColumn("InvoiceDate", ...)**: This is a DataFrame method in PySpark used to add or replace a column named "InvoiceDate" in the sales_data DataFrame.

**to_timestamp(col("InvoiceDate"), "d/M/yyyy H:mm")**: This part of the code converts the "InvoiceDate" column from string format to a timestamp format. It uses the to_timestamp() function, which converts a string to a timestamp, with the format specified as "d/M/yyyy H:mm". This format indicates that the date is in day/month/year hour:minute format.

**to_date(...)**: This function converts the timestamp obtained from to_timestamp() to a date format, discarding the time information. It ensures that only the date part remains in the "InvoiceDate" column.

**col("InvoiceDate")**: This is used to reference the "InvoiceDate" column in the DataFrame.

In [4]:
# Convert InvoiceDate to datetime 
sales_data = sales_data.withColumn("InvoiceDate", to_date(
    to_timestamp(col("InvoiceDate"), "d/M/yyyy H:mm")))

# shows the first row of the DataFrame sales_data
sales_data.head()

Row(InvoiceNo=536365, StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, UnitPrice=2.55, CustomerID=17850, Country='United Kingdom', InvoiceDate=datetime.date(2010, 1, 12), Year=2010, Month=1, Week=2, Day=12, DayOfWeek=1)

### Aggregate the Data into Daily Intervals

- **sales_data.groupBy(...)**: This groups the data in the sales_data DataFrame based on specified columns, which include "Country", "StockCode", "InvoiceDate", "Year", "Month", "Day", "Week", and "DayOfWeek".

- **.agg({"Quantity": "sum", "UnitPrice": "avg"})**: This performs aggregation functions on selected columns after grouping. In this case:

- **"Quantity"**: "sum" calculates the sum of the "Quantity" column for each group, aggregating the total quantity of items sold.

- **"UnitPrice"**: "avg" calculates the average of the "UnitPrice" column for each group, aggregating the average unit price of items sold.
daily_sales_data: This assigns the result of the aggregation operation to a new DataFrame named daily_sales_data.

This aggregated data can be useful for further analysis and modeling, particularly for understanding sales trends over time and in different regions or for forecasting purposes.

In [5]:
# Aggregate data into daily intervals
daily_sales_data = sales_data.groupBy("Country", "StockCode", "InvoiceDate", "Year", "Month", "Day", "Week", "DayOfWeek").agg({"Quantity": "sum", "UnitPrice": "avg"})

daily_sales_data.head()

                                                                                

Row(Country='United Kingdom', StockCode='22912', InvoiceDate=datetime.date(2010, 1, 12), Year=2010, Month=1, Day=12, Week=2, DayOfWeek=1, avg(UnitPrice)=4.95, sum(Quantity)=3)

In [6]:
# Rename the target column
daily_sales_data = daily_sales_data.withColumnRenamed(
    "sum(Quantity)", "Quantity")

daily_sales_data.head()

Row(Country='United Kingdom', StockCode='22912', InvoiceDate=datetime.date(2010, 1, 12), Year=2010, Month=1, Day=12, Week=2, DayOfWeek=1, avg(UnitPrice)=4.95, Quantity=3)

## Splitting the Dataset

Split the data into two sets based on the spliting date, "2011-09-25". All data up to and including this date should be in the training set, while data after this date should be in the testing set. Return a pandas Dataframe, pd_daily_train_data, containing, at least, the columns ["Country", "StockCode", "InvoiceDate", "Quantity"].

These operations split the data into two datasets: one for training (up to and including the splitting date) and one for testing (after the splitting date), allowing for the development and evaluation of predictive models.

In [8]:
# initiate the date for splitting
split_date_train_test = "2011-09-25"

# Creating the train and test datasets
train_data = daily_sales_data.filter(
    col("InvoiceDate") <= split_date_train_test)

test_data = daily_sales_data.filter(
    col("InvoiceDate") > split_date_train_test)

In [9]:
# converts the PySpark DataFrame train_data into a pandas DataFrame pd_daily_train_data
pd_daily_train_data = train_data.toPandas()

#Check the number of columns and rows for the train data
print(pd_daily_train_data.shape)

#displays the first few rows of the pandas DataFrame
pd_daily_train_data.head()

(175452, 10)


Unnamed: 0,Country,StockCode,InvoiceDate,Year,Month,Day,Week,DayOfWeek,avg(UnitPrice),Quantity
0,United Kingdom,22912,2010-01-12,2010,1,12,2,1,4.95,3
1,France,22659,2010-01-12,2010,1,12,2,1,1.95,24
2,United Kingdom,21544,2010-01-12,2010,1,12,2,1,0.85,12
3,United Kingdom,21098,2010-01-12,2010,1,12,2,1,1.25,16
4,Norway,85150,2010-01-12,2010,1,12,2,1,2.55,12


In [10]:
#Checking data types

daily_sales_data

DataFrame[Country: string, StockCode: string, InvoiceDate: date, Year: int, Month: int, Day: int, Week: int, DayOfWeek: int, avg(UnitPrice): double, Quantity: bigint]

## Feature Engineering

- ## Creating indexer for categorical columns

**StringIndexer(inputCol="Country", outputCol="CountryIndex")**: This creates a StringIndexer instance for the "Country" column. The inputCol parameter specifies the input column name, and the outputCol parameter specifies the name of the output column that will contain the indexed values. The indexed values are typically numeric representations of the original categorical values.

**.setHandleInvalid("keep")**: This sets the strategy to handle invalid input values during indexing. In this case, "keep" strategy is used, which means that if an unseen label is encountered during indexing (i.e., a label that was not present during fitting), it will be kept and assigned a new index.

**StringIndexer(inputCol="StockCode", outputCol="StockCodeIndex")**: Similarly, this creates a StringIndexer instance for the "StockCode" column with the output column named "StockCodeIndex"

These indexers are used to convert categorical columns into numerical representations, which is often necessary for machine learning algorithms that require numeric input. By indexing categorical columns, we ensure that the model can process these columns effectively during training and prediction phases.

- ## Selecting features columns

These features are chosen based on domain knowledge and/or feature importance analysis to capture relevant information for predicting sales quantity or other target variables

In [11]:
# Creating indexer for categorical columns
country_indexer = StringIndexer(
    inputCol="Country", outputCol="CountryIndex").setHandleInvalid("keep")
stock_code_indexer = StringIndexer(
    inputCol="StockCode", outputCol="StockCodeIndex").setHandleInvalid("keep")

In [12]:
# Selectiong features columns
feature_cols = ["CountryIndex", "StockCodeIndex", "Month", "Year",
                "DayOfWeek", "Day", "Week"]

- ## Using Vector assembler to combine features

**assembler**: This variable represents a VectorAssembler instance, which is used to assemble the selected feature columns into a single vector column.

**VectorAssembler**: This is a transformer provided by PySpark's MLlib library. It combines multiple columns of features into a single vector column.

**inputCols=feature_cols**: This parameter specifies the names of the input feature columns that should be combined. In this case, feature_cols is a list of column names containing the selected features.

**outputCol="features"**: This parameter specifies the name of the output vector column that will contain the assembled feature vectors. The column name "features" is a common convention used in PySpark.

By using the VectorAssembler, the individual feature columns specified in feature_cols are combined into a single vector column named "features", which can then be used as input for machine learning algorithms in PySpark. This consolidated format is often required by machine learning algorithms to process the data efficiently.

In [13]:
# Using vector assembler to combine features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

## Building The Regression Model

from the model selection test, I conclude decision trre to be best performing model

- ## Initializing a Decision Tree model

**featuresCol="features"**: Specifies the name of the input column containing the features used for training the decision tree. In this case, it's set to "features", which is a common convention for the column name produced by the VectorAssembler when assembling feature vectors.

**labelCol="Quantity"**: Specifies the name of the column containing the target variable (the quantity being predicted). In this case, it's set to "Quantity", indicating that the decision tree should predict the quantity based on the features.

**maxBins=4000**: Specifies the maximum number of bins used for discretizing continuous features and for choosing how to split on features at each node. Binning is a process used to discretize continuous features into categorical features, which is necessary for decision trees. Setting maxBins to a higher value can help improve model accuracy, especially when dealing with datasets with many distinct values for continuous features.

By initializing the DecisionTreeRegressor object with these parameters, you're configuring it to train a decision tree regression model using the specified features and target variable, and controlling the binning process with the specified maximum number of bins. This initialized object (dt) can then be used to train the decision tree model on your dataset.

In [18]:
# Initializing a Random Forest model
dt = DecisionTreeRegressor(
    featuresCol="features",
    labelCol="Quantity",
    maxBins=4000
)

- ## Create a pipeline for staging the processes

**Pipeline**: This initializes a pipeline, which is a sequence of stages used to process and transform data. Stages can include transformations (like indexing and assembling features) and an estimator (like the RandomForestRegressor).

**stages=[country_indexer, stock_code_indexer, assembler, rf]**: This parameter specifies the sequence of stages to be executed in the pipeline. It includes the StringIndexer transformations for categorical columns, the VectorAssembler to assemble features, and the RandomForestRegressor as the estimator.

In [19]:
# Create a pipeline for staging the processes
pipeline = Pipeline(stages=[country_indexer, stock_code_indexer, assembler, dt])

- ## Train the Model

**pipeline.fit(train_data)**: This method trains the model using the training data. It executes the stages of the pipeline sequentially, transforming the input data and training the RandomForestRegressor model. The resulting model contains the trained Random Forest regression model ready for prediction.

In [20]:
# Training the model
model = pipeline.fit(train_data)

## Evalualting the Model

**model.transform(test_data)** : This method applies the trained model to the test dataset (test_data) to make predictions. It uses the pipeline created earlier, which includes all the preprocessing steps (such as feature indexing and vector assembling) and the trained RandomForestRegressor model.

**.withColumn("prediction", col("prediction").cast("double"))**: This line casts the "prediction" column from its original data type to double type. This step ensures consistency in data types, especially if the prediction column was stored as a different data type during the model prediction phase.

**test_predictions**: This DataFrame contains the original test dataset along with a new column named "prediction", which contains the predicted values generated by the model.

In [22]:
# Getting test predictions
test_predictions = model.transform(test_data)
test_predictions = test_predictions.withColumn(
    "prediction", col("prediction").cast("double"))

# Provide the Mean Absolute Error (MAE) for your forecast? Return a double/floar "mae"

# Initializing the evaluator
mae_evaluator = RegressionEvaluator(
    labelCol="Quantity", predictionCol="prediction", metricName="mae")

# Obtaining MAE
mae = mae_evaluator.evaluate(test_predictions)

mae

8.940367659308437

## Evaluate with MAE Metric

**RegressionEvaluator()**: This initializes a RegressionEvaluator object used to evaluate regression models. The labelCol parameter specifies the name of the target variable column ("Quantity" in this case), the predictionCol parameter specifies the name of the column containing the predicted values, and the metricName parameter specifies the evaluation metric to be computed ("mae" for Mean Absolute Error).

**mae_evaluator.evaluate(test_predictions)**: This method computes the Mean Absolute Error (MAE) between the "Quantity" column (actual values) and the "prediction" column (predicted values) in the test_predictions DataFrame. The resulting mae variable holds the calculated MAE value.

_**The evalution procedure will be seen in the [model selection](Selecting the best Model.ipynb) notebook**_

The MAE value of `8.9` provides insight into the performance of the regression model for forecasting sales quantity. Here are some interpretations of the MAE value and its implications:

- Accuracy: The MAE value indicates that, on average, the model's predictions are off by approximately 9.30 units of quantity. This suggests that the model's forecasts are reasonably accurate in predicting the sales quantity.

- Error Magnitude: Since MAE measures the absolute difference between the actual and predicted values, a larger MAE value indicates larger errors in the model's predictions. In this case, a MAE of approximately 9.30 suggests that there might be some variability in the predictions, but the model is still making reasonably close predictions on average.

- Model Performance: The MAE value serves as a benchmark for evaluating the model's performance. Lower MAE values indicate better predictive accuracy, while higher MAE values suggest poorer performance. Therefore, a MAE of 9.30 suggests that the model is performing fairly well but may have room for improvement.

## Identifying the Quantity sold at specific week (Inference)

In [23]:
# How many units will be sold during the  week 39 of 2011? Return an integer `quantity_sold_w39`.

# Getting the weekly sales of all countries
weekly_test_predictions = test_predictions.groupBy("Year", "Week").agg({"prediction": "sum"})

# Finding the quantity sold on the 39 week. 
promotion_week = weekly_test_predictions.filter(col('Week')==39)

# Storing prediction as quantity_sold_w30
quantity_sold_w39 = int(promotion_week.select("sum(prediction)").collect()[0][0])

**quantity_sold_w39**: This variable stores the quantity sold during week 39. It does this by selecting the sum of predictions from the promotion_week DataFrame, collecting the result as a list, and then accessing the first element of the first row (assuming there's only one row) to get the sum of predictions for week 39. This sum is converted to an integer.

In [24]:
quantity_sold_w39

86052

Overall, this code segment effectively extracts the quantity sold during week 39 of 2011 from the predictions made by the regression model. The result is stored in the variable quantity_sold_w39.

In [25]:
# Stop the Spark session

my_spark.stop()