In [1]:
# Importing required libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, dayofmonth, month, year,  to_date, to_timestamp, weekofyear, dayofweek
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Initializing Spark session
my_spark = SparkSession.builder.appName("SalesForecast").getOrCreate()

# Importing sales data
sales_data = my_spark.read.csv(
    "Online Retail.csv", header=True, inferSchema=True, sep=",")

# Converting InvoiceDate to datetime 
sales_data = sales_data.withColumn("InvoiceDate", to_date(
    to_timestamp(col("InvoiceDate"), "d/M/yyyy H:mm")))

# Aggregating data into daily intervals
daily_sales_data = sales_data.groupBy("Country", "StockCode", "InvoiceDate", "Year", "Month", "Day", "Week", "DayOfWeek").agg({"Quantity": "sum",                                                                                                           "UnitPrice": "avg"})
# Renaming the target column
daily_sales_data = daily_sales_data.withColumnRenamed(
    "sum(Quantity)", "Quantity")

# Data Split based on the spliting date, "2011-09-25". 

split_date_train_test = "2011-09-25"

# Creating the train and test datasets
train_data = daily_sales_data.filter(
    col("InvoiceDate") <= split_date_train_test)
test_data = daily_sales_data.filter(col("InvoiceDate") > split_date_train_test)

pd_daily_train_data = train_data.toPandas()

# Creating indexer for categorical columns
country_indexer = StringIndexer(
    inputCol="Country", outputCol="CountryIndex").setHandleInvalid("keep")
stock_code_indexer = StringIndexer(
    inputCol="StockCode", outputCol="StockCodeIndex").setHandleInvalid("keep")

# Selecting features columns
feature_cols = ["CountryIndex", "StockCodeIndex", "Month", "Year",
                "DayOfWeek", "Day", "Week"]

# Using vector assembler to combine features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Initializing a Random Forest model
rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="Quantity",
    maxBins=4000
)

# Creating a pipeline for staging the processes
pipeline = Pipeline(stages=[country_indexer, stock_code_indexer, assembler, rf])

# Training the model
model = pipeline.fit(train_data)

# Getting test predictions
test_predictions = model.transform(test_data)
test_predictions = test_predictions.withColumn(
    "prediction", col("prediction").cast("double"))

# Providing Mean Absolute Error (MAE) for forecast

# Initializing the evaluator
mae_evaluator = RegressionEvaluator(
    labelCol="Quantity", predictionCol="prediction", metricName="mae")

# Obtaining MAE
mae = mae_evaluator.evaluate(test_predictions)

# Predicted units to be sold during the week 39 of 2011, per Country and product.

# Getting the weekly sales
weekly_test_predictions = test_predictions.groupBy("Year", "Week", "Country", "StockCode").agg({"prediction": "sum"})
weekly_test_predictions = weekly_test_predictions.withColumn("sum(prediction)", col("sum(prediction)").cast("integer"))

# Filtering for 39 week. 
promotion_week = weekly_test_predictions.filter(col('Week')==39)
weekly_test_predictions.show()

# Stop the Spark session
my_spark.stop()

+----+----+--------------+---------+---------------+
|Year|Week|       Country|StockCode|sum(prediction)|
+----+----+--------------+---------+---------------+
|2011|  39|United Kingdom|    22838|             13|
|2011|  39|United Kingdom|    22674|             19|
|2011|  44|United Kingdom|    22436|             52|
|2011|  48|United Kingdom|    22227|             55|
|2011|  44|United Kingdom|    20972|            105|
|2011|  39|         Spain|    21879|             14|
|2011|  39|United Kingdom|    22695|             42|
|2011|  44|United Kingdom|    21070|              7|
|2011|  40|United Kingdom|   85194L|             17|
|2011|  40|United Kingdom|    22384|            139|
|2011|  44|United Kingdom|    84949|             30|
|2011|  44|United Kingdom|    22712|             46|
|2011|  40|United Kingdom|    22357|              7|
|2011|  44|United Kingdom|    21787|             45|
|2011|  48|United Kingdom|    21928|             67|
|2011|  40|United Kingdom|    22985|          