It's simple to buy any product with a click and have it delivered to your door. Online shopping has been rapidly evolving over the last few years, making our lives easier. But behind the scenes, e-commerce companies face a complex challenge that needs to be addressed. 

Uncertainty plays a big role in how the supply chains plan and organize their operations to ensure that the products are delivered on time. These uncertainties can lead to challenges such as stockouts, delayed deliveries, and increased operational costs.

You work for the Sales & Operations Planning (S&OP) team at a multinational e-commerce company. They need your help to assist in planning for the upcoming end-of-the-year sales. They want to use your insights to plan for promotional opportunities and manage their inventory. This effort is to ensure they have the right products in stock when needed and ensure their customers are satisfied with the prompt delivery to their doorstep.


## The Data

You are provided with a sales dataset to use. A summary and preview are provided below.

# Online Retail.csv

| Column     | Description              |
|------------|--------------------------|
| `'InvoiceNo'` | A 6-digit number uniquely assigned to each transaction |
| `'StockCode'` | A 5-digit number uniquely assigned to each distinct product |
| `'Description'` | The product name |
| `'Quantity'` | The quantity of each product (item) per transaction |
| `'UnitPrice'` | Product price per unit |
| `'CustomerID'` | A 5-digit number uniquely assigned to each customer |
| `'Country'` | The name of the country where each customer resides |
| `'InvoiceDate'` | The day and time when each transaction was generated `"MM/DD/YYYY"` |
| `'Year'` | The year when each transaction was generated |
| `'Month'` | The month when each transaction was generated |
| `'Week'` | The week when each transaction was generated (`1`-`52`) |
| `'Day'` | The day of the month when each transaction was generated (`1`-`31`) |
| `'DayOfWeek'` | The day of the weeke when each transaction was generated <br>(`0` = Monday, `6` = Sunday) |

# Instructions

### Analyze the Online Retail.csv dataset and build a forecasting model to predict 'Quantity' of products sold.

### 1 -Split the data into two sets based on the splitting date, "2011-09-25". All data up to and including this date should be in the training set, while data after this date should be in the test set. Return a pandas DataFrame, pd_daily_train_data, containing, at least, the columns "Country", "StockCode", "InvoiceDate", "Quantity".

### 2 -Using your test set, calculate the Mean Absolute Error (MAE) for your forecast model for the 'Quantity' sold? Return a double (float) named mae.

### 3 -How many units are expected to be sold during the week 39 of 2011? Store as an integer variable called quantity_sold_w39.

In [167]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, dayofmonth, month, year,  to_date, to_timestamp, weekofyear, dayofweek
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator

my_spark = SparkSession.builder.appName("SalesForecast").getOrCreate()

sales_data = my_spark.read.csv("Online Retail.csv", header=True, inferSchema=True, sep=",")


sales_data = sales_data.withColumn("InvoiceDate", to_date(to_timestamp(col("InvoiceDate"), "d/M/yyyy H:mm")))

                                                                                

## Some data analysis and aggregation first...

In [168]:
sales_data.count()

384721

In [169]:
df = sales_data.groupBy('Country','StockCode','InvoiceDate','Year','Month','Day','Week','DayOfWeek')

In [170]:
df

<pyspark.sql.group.GroupedData at 0x7f38910557c0>

In [171]:
df1 = df.agg({
    'Quantity':'sum',
    'UnitPrice':'avg'
})
df2 = df1.withColumnRenamed('sum(Quantity)','Quantity').withColumnRenamed('avg(UnitPrice)','UnitPrice')

In [172]:
df2.count()

                                                                                

239976

In [173]:
df2.show(5)



+--------------+---------+-----------+----+-----+---+----+---------+---------+--------+
|       Country|StockCode|InvoiceDate|Year|Month|Day|Week|DayOfWeek|UnitPrice|Quantity|
+--------------+---------+-----------+----+-----+---+----+---------+---------+--------+
|United Kingdom|    22912| 2010-01-12|2010|    1| 12|   2|        1|     4.95|       3|
|        France|    22659| 2010-01-12|2010|    1| 12|   2|        1|     1.95|      24|
|United Kingdom|    21544| 2010-01-12|2010|    1| 12|   2|        1|     0.85|      12|
|United Kingdom|    21098| 2010-01-12|2010|    1| 12|   2|        1|     1.25|      16|
|        Norway|    85150| 2010-01-12|2010|    1| 12|   2|        1|     2.55|      12|
+--------------+---------+-----------+----+-----+---+----+---------+---------+--------+
only showing top 5 rows



                                                                                

### With all data properly aggregated in daily sales intervals, we can now separate our dataset intro training and testing datasets, following the instructions to include all data up til 25th of september of 2011 as train data and the rest, as test data:

In [174]:
df2.createOrReplaceTempView('df2')

In [176]:
test_df2 = my_spark.sql("""
SELECT *
FROM df2
WHERE InvoiceDate > CAST('2011-09-25' AS DATE)""")

In [177]:
train_df2 = my_spark.sql("""
SELECT *
FROM df2
WHERE InvoiceDate <= CAST('2011-09-25' AS DATE)""")

In [178]:
pd_daily_train_data = train_df2.toPandas()
pd_daily_test_data = test_df2.toPandas()

                                                                                

## The first part of the assignment is concluded above, with the creation of a pandas dataframe for both train and test sets

In [180]:
pd_daily_test_data.head()

Unnamed: 0,Country,StockCode,InvoiceDate,Year,Month,Day,Week,DayOfWeek,UnitPrice,Quantity
0,United Kingdom,22414,2011-10-01,2011,10,1,39,5,7.95,1
1,United Kingdom,22773,2011-10-01,2011,10,1,39,5,1.25,12
2,United Kingdom,22180,2011-10-01,2011,10,1,39,5,9.95,1
3,United Kingdom,20686,2011-10-01,2011,10,1,39,5,3.25,2
4,United Kingdom,82580,2011-11-01,2011,11,1,44,1,0.73,12


In [181]:
pd_daily_test_data.dtypes

Country         object
StockCode       object
InvoiceDate     object
Year             int32
Month            int32
Day              int32
Week             int32
DayOfWeek        int32
UnitPrice      float64
Quantity         int64
dtype: object

### Now we're off to build a model to forecast the quantity of items sold globally by S&OP

### We want to be tidy and clean, so we'll organize all data preparation steps in a pipeline

### Our predictions will generate the 'prediction' column that will be compared to the actual 'Quantity' column to measure the quality of our model according to the Mean Absolute Error, as solicited

In [182]:
country_indexer = StringIndexer(inputCol='Country',outputCol='Country_idx').setHandleInvalid('keep')

stock_code_indexer = StringIndexer(inputCol='StockCode',outputCol='StockCode_idx').setHandleInvalid('keep')

assembler = VectorAssembler(inputCols=['Country_idx','StockCode_idx','Year','Month','Day','Week','DayOfWeek','UnitPrice'], outputCol='features')

rf = RandomForestRegressor(featuresCol='features',labelCol='Quantity',maxBins=4000)

pipeline = Pipeline(stages=[country_indexer,stock_code_indexer,assembler,rf])

In [183]:
model = pipeline.fit(train_df2)

                                                                                

24/04/21 19:46:59 WARN DAGScheduler: Broadcasting large task binary with size 1094.8 KiB


                                                                                

24/04/21 19:47:01 WARN DAGScheduler: Broadcasting large task binary with size 1696.0 KiB


                                                                                

24/04/21 19:47:02 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB


                                                                                

In [184]:
predictions = model.transform(test_df2)

In [185]:
predictions = predictions.withColumn('prediction', col('prediction').cast('double'))

In [186]:
mae_evaluator = RegressionEvaluator(metricName='mae',labelCol='Quantity',predictionCol='prediction')

In [187]:
mae = mae_evaluator.evaluate(predictions)

In [188]:
print(mae)

9.134251757431372


## There it is, our MAE is 9 units, which is very small, since we're talking globally!!

### Lastly, we group our predictions according to week and year to check how many items are expected to be sold on week 39...

In [189]:
week39 = predictions.groupBy('Year','Week').agg({
    'prediction':'sum'
}).filter(predictions.Week == 39).withColumnRenamed('sum(prediction)','weekly sales')
week39.show()

+----+----+-----------------+
|Year|Week|     weekly sales|
+----+----+-----------------+
|2011|  39|87134.50790900599|
+----+----+-----------------+



In [190]:
r = int(week39.collect()[0][2])
print(r)

87134


In [191]:
quantity_sold_w39 = r

## Finally, we can expect a total of 87134 items to be sold globally on the week 39, which corresponds to the end of September/beginning of October!