# Demand Forecasting Lab
In this notebook, you are going to implement end-to-end Demand Forecasting solution including data processing, feature computation, model training and validation. You learn how to use the pyspark ML objects on big data.

## Environment

In [17]:
import gdown
gdown.download_folder('https://drive.google.com/drive/folders/1Yv66Ng1IHcGlafsT96wWPpdRRs-sRo6F?usp=sharing', quiet=True)
gdown.download_folder('https://drive.google.com/drive/folders/1TnTEFGYWpmj6qqYJId8LHT_ow6bfavdC?usp=drive_link', quiet=True)


['/content/Product_Demand_Validation_All/_committed_7611924776083353318',
 '/content/Product_Demand_Validation_All/_started_7611924776083353318',
 '/content/Product_Demand_Validation_All/_SUCCESS (1)',
 '/content/Product_Demand_Validation_All/part-00000-tid-7611924776083353318-639b66f7-dd8e-4fb0-8013-b7aa1757ed46-247-1-c000.snappy.parquet']

In [18]:
# installing jdk
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#downloading .tgz installation file for Spache spark
!wget -q https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
# installing apache spark from downloaded file
!tar xf spark-3.5.0-bin-hadoop3.tgz
# installing findspark library
!pip install -q findspark
import os
import findspark
#setting up paths for JDK and spark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
#initiating findspark
findspark.init()

In [3]:
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql import types as T
from pyspark.sql import SparkSession

from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

## Load data
Read data with past daily demand for individual products. The dataset contains historical product demand for a sales company. The company offers thousands of products within dozens of product categories. It normally takes up to one month to deliver their products after ordering, thus monthly demand forecasts would be beneficial to the company in multiple ways.

### Data format
- Product_Code (string): Product identification number
- Product_Category (string): Product category identifier
- Date (date)
- Demand (int): Quantity of products

In [19]:
spark = SparkSession.builder.appName("SparkDayThree").getOrCreate()

In [20]:
df_raw = spark.read.parquet("/content/Product_Demand_Train_Test_All")

df_raw.show(5)
# Explore data format and sample values

+----------------+------------+----------+-------+
|Product_Category|Product_Code|      Date| Demand|
+----------------+------------+----------+-------+
|    Category_005|Product_1581|2012-02-29|18000.0|
|    Category_005|Product_1581|2012-07-09| 1000.0|
|    Category_011|Product_0664|2012-03-01| 1296.0|
|    Category_007|Product_1023|2012-05-24|10600.0|
|    Category_006|Product_0929|2012-06-27| 1000.0|
+----------------+------------+----------+-------+
only showing top 5 rows



# Milestone #1: Data analysis & Features
In this section start analysing the raw data and prepare a final dataset for your model.

## Objectives
You should
- Compute some descriptive statistics
- Clean the raw data
- Perform target (Demand) analysis and create target variable for demand forecasting on monthly basis
- Analyse category distribution and other related characteristics
- Create at least two features

## Milestone presentation outline
1. Show at least one interesting finding from data analysis
2. Explain target variable definition and its characteristics
3. Prepare your modelling features (and modelling dataset)

In [21]:
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, quarter
df_processed = df_raw.withColumn("Year", year("Date")) \
                    .withColumn("Month", month("Date")) \
                    .withColumn("Day", dayofmonth("Date")) \
                    .withColumn("Day_of_week", dayofweek("Date")) \
                    .withColumn("Quarter", quarter("Date"))

# Show the resulting dataframe
df_processed.show(5)

+----------------+------------+----------+-------+----+-----+---+-----------+-------+
|Product_Category|Product_Code|      Date| Demand|Year|Month|Day|Day_of_week|Quarter|
+----------------+------------+----------+-------+----+-----+---+-----------+-------+
|    Category_005|Product_1581|2012-02-29|18000.0|2012|    2| 29|          4|      1|
|    Category_005|Product_1581|2012-07-09| 1000.0|2012|    7|  9|          2|      3|
|    Category_011|Product_0664|2012-03-01| 1296.0|2012|    3|  1|          5|      1|
|    Category_007|Product_1023|2012-05-24|10600.0|2012|    5| 24|          5|      2|
|    Category_006|Product_0929|2012-06-27| 1000.0|2012|    6| 27|          4|      2|
+----------------+------------+----------+-------+----+-----+---+-----------+-------+
only showing top 5 rows



AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `01` cannot be resolved. Did you mean one of the following? [`year`, `month`, `avg_demand`].;
'Project [year#460, month#470, avg_demand#490, concat_ws(-, year#460, month#470, '01) AS date_str#494]
+- Aggregate [year#460, month#470], [year#460, month#470, avg(Demand#414) AS avg_demand#490]
   +- Project [Product_Category#411, Product_Code#412, Date#413, Demand#414, year#460, month(Date#413) AS month#470, day#433, day_of_week#441, quarter#450]
      +- Project [Product_Category#411, Product_Code#412, Date#413, Demand#414, year(Date#413) AS year#460, month#426, day#433, day_of_week#441, quarter#450]
         +- Project [Product_Category#411, Product_Code#412, Date#413, Demand#414, year#419, month#426, day#433, day_of_week#441, quarter(Date#413) AS quarter#450]
            +- Project [Product_Category#411, Product_Code#412, Date#413, Demand#414, year#419, month#426, day#433, dayofweek(Date#413) AS day_of_week#441]
               +- Project [Product_Category#411, Product_Code#412, Date#413, Demand#414, year#419, month#426, dayofmonth(Date#413) AS day#433]
                  +- Project [Product_Category#411, Product_Code#412, Date#413, Demand#414, year#419, month(Date#413) AS month#426]
                     +- Project [Product_Category#411, Product_Code#412, Date#413, Demand#414, year(Date#413) AS year#419]
                        +- Relation [Product_Category#411,Product_Code#412,Date#413,Demand#414] parquet


## Expected Results
You should end up with a cleaned dataset with well defined target variable and at least two features for your demand forecasting model.

# Milestone #2: Modelling & Experiment design
Create first model on the dataset from previous section.
## Objectives
You should
- Propose train and test division of the data
- Create a modelling pipeline using spark ML and [GBTRegressor](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.GBTRegressor.html)
- Train first model
- Obtain first predictions on test set
## Milestone presentation outline
1. Describe your train and test datasets
2. Show your modelling pipeline and code for training your data
3. Provide first predictions on the test set

In [None]:
# your code

# Milestone #3: Tuning & Evaluation
This section is dedicated to improving your model performance and evaluation.
## Objectives
You should
- Perform hyperparameter tunning (by hand or using [CrossValidator](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.CrossValidator.html))
- Evaluate feature importance
- Calculate standard evaluation metrics on your test set and on the provided validation set
## Milestone presentation outline
1. Explain your choice of hyperparameters and your approach to tunning
2. Provide feature importance
3. Summarize overall model performance using the evaluation metrics on your test set and the provided validation set

In [None]:
# your code for model tunning and feature imprtance

## Expected Results
Final model for monthly demand forecasting ready for out of sample predictions.

## Out of sample validation
Test your model on a hold out sample that was not previously available in the dataset. As you can see below you will provide demand predictions for December 2016.

In [None]:
# Get Demand for validation data
df_raw = spark.read.parquet("/content/Product_Demand_Validation_All")
df_raw.show(5)

+------------+----+-----+--------+
|Product_Code|Year|Month|  Demand|
+------------+----+-----+--------+
|Product_1383|2016|   12| 18354.0|
|Product_0767|2016|   12|  3857.0|
|Product_1439|2016|   12|217582.0|
|Product_1762|2016|   12|    14.0|
|Product_0346|2016|   12|    11.0|
+------------+----+-----+--------+
only showing top 5 rows



Transform the raw data in the same way as your original dataset and obtain predictions using your final model. There should be a pyspark data frame `predictions_val` with columns
- Product_Code
- Demand
- Prediction

In [None]:
# your code to obtain predictions and compose a final dataset called "predictions_val"

The code below provides standard metrics for model evaluation - Root Mean Squared Error and Mean Absolute Error as a final model characteristics.

In [None]:
# Calculate final metrics for comparison
predictions_val = (
    predictions_val
        .withColumn("Error", F.col("Demand") - F.col("Prediction"))
        .withColumn("AbsError", F.abs(F.col("Demand") - F.col("Prediction")))
        .withColumn("SqrError", F.pow(F.col("Demand") - F.col("Prediction"), F.lit(2)))
)

mae = predictions_val.agg(F.avg(F.col("AbsError"))).collect()[0][0]
print("Mean Absolute Error (MAE) on validation data = %g" % mae)
rmse = predictions_val.agg(F.sqrt(F.avg(F.col("SqrError")))).collect()[0][0]
print("Root Mean Squared Error (RMSE) on validation data = %g" % rmse)