In [None]:

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, dayofmonth, month, year,  to_date, to_timestamp, weekofyear, dayofweek
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator

my_spark = SparkSession.builder.appName("SalesForecast").getOrCreate()

sales_data = my_spark.read.csv(
    "Online Retail.csv", header=True, inferSchema=True, sep=",")

sales_data = sales_data.withColumn("InvoiceDate", to_date(
    to_timestamp(col("InvoiceDate"), "d/M/yyyy H:mm")))

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


26/02/08 16:57:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


                                                                                

In [2]:
print("Raw data count:", sales_data.count())

Raw data count: 384721


In [3]:
daily_sales_data = sales_data.groupBy(
    "Country", "StockCode", "InvoiceDate",
    "Year", "Month", "Day", "Week", "DayOfWeek"
).agg({"Quantity": "sum", "UnitPrice": "avg"})

daily_sales_data = daily_sales_data.withColumnRenamed(
    "sum(Quantity)", "Quantity"
)

print("Daily aggregated rows:", daily_sales_data.count())
daily_sales_data.show(5)


                                                                                

Daily aggregated rows: 239976




+--------------+---------+-----------+----+-----+---+----+---------+--------------+--------+
|       Country|StockCode|InvoiceDate|Year|Month|Day|Week|DayOfWeek|avg(UnitPrice)|Quantity|
+--------------+---------+-----------+----+-----+---+----+---------+--------------+--------+
|United Kingdom|    22912| 2010-01-12|2010|    1| 12|   2|        1|          4.95|       3|
|        France|    22659| 2010-01-12|2010|    1| 12|   2|        1|          1.95|      24|
|United Kingdom|    21544| 2010-01-12|2010|    1| 12|   2|        1|          0.85|      12|
|United Kingdom|    21098| 2010-01-12|2010|    1| 12|   2|        1|          1.25|      16|
|        Norway|    85150| 2010-01-12|2010|    1| 12|   2|        1|          2.55|      12|
+--------------+---------+-----------+----+-----+---+----+---------+--------------+--------+
only showing top 5 rows



                                                                                

In [4]:
split_date_train_test = "2011-09-25"

train_data = daily_sales_data.filter(col("InvoiceDate") <= split_date_train_test)
test_data  = daily_sales_data.filter(col("InvoiceDate") > split_date_train_test)

print("Train rows:", train_data.count())
print("Test rows:", test_data.count())


                                                                                

Train rows: 175452


[Stage 20:>                                                         (0 + 2) / 2]

Test rows: 64524


                                                                                

In [5]:
pd_daily_train_data = train_data.select(
    "Country", "StockCode", "InvoiceDate", "Quantity"
).toPandas()

print(pd_daily_train_data.head())


                                                                                

          Country StockCode InvoiceDate  Quantity
0  United Kingdom     22912  2010-01-12         3
1          France     22659  2010-01-12        24
2  United Kingdom     21544  2010-01-12        12
3  United Kingdom     21098  2010-01-12        16
4          Norway     85150  2010-01-12        12


In [6]:
country_indexer = StringIndexer(
    inputCol="Country", outputCol="CountryIndex", handleInvalid="keep"
)

stock_code_indexer = StringIndexer(
    inputCol="StockCode", outputCol="StockCodeIndex", handleInvalid="keep"
)

feature_cols = [
    "CountryIndex", "StockCodeIndex",
    "Month", "Year", "DayOfWeek", "Day", "Week"
]

assembler = VectorAssembler(
    inputCols=feature_cols, outputCol="features"
)

rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="Quantity",
    maxBins=4000,
    numTrees=50
)

pipeline = Pipeline(stages=[
    country_indexer,
    stock_code_indexer,
    assembler,
    rf
])

model = pipeline.fit(train_data)
print("✅ Model trained successfully")


                                                                                

26/02/08 17:04:53 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB


                                                                                

26/02/08 17:04:56 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB


                                                                                

26/02/08 17:05:00 WARN DAGScheduler: Broadcasting large task binary with size 6.4 MiB


[Stage 63:>                                                         (0 + 2) / 2]

✅ Model trained successfully


                                                                                

In [7]:
test_predictions = model.transform(test_data)

test_predictions.select(
    "Country", "StockCode", "InvoiceDate",
    "Quantity", "prediction"
).show(10)


[Stage 64:>                                                         (0 + 2) / 2]

+--------------+---------+-----------+--------+------------------+
|       Country|StockCode|InvoiceDate|Quantity|        prediction|
+--------------+---------+-----------+--------+------------------+
|United Kingdom|    22414| 2011-10-01|       1| 7.237056461117459|
|United Kingdom|    22773| 2011-10-01|      12|13.569954605124957|
|United Kingdom|    22180| 2011-10-01|       1| 6.674920355648336|
|United Kingdom|    20686| 2011-10-01|       2| 7.538452828298906|
|United Kingdom|    82580| 2011-11-01|      12|13.749064932842801|
|United Kingdom|    22113| 2011-11-01|      40|12.980375511265102|
|United Kingdom|    22057| 2011-11-01|       4|13.772250967516023|
|United Kingdom|    22723| 2011-12-01|       9| 7.523623279635007|
|United Kingdom|    20654| 2011-12-01|       1| 7.075018150798626|
|        Cyprus|    22721| 2011-12-01|       3| 7.978390599309173|
+--------------+---------+-----------+--------+------------------+
only showing top 10 rows



                                                                                

In [9]:
mae_evaluator = RegressionEvaluator(
    labelCol="Quantity",
    predictionCol="prediction",
    metricName="mae"
)

mae = mae_evaluator.evaluate(test_predictions)

print(" Mean Absolute Error (MAE):", mae)


[Stage 72:>                                                         (0 + 1) / 1]

 Mean Absolute Error (MAE): 9.519850003848056


                                                                                

In [11]:
weekly_predictions = test_predictions.groupBy(
    "Year", "Week"
).sum("prediction")

weekly_predictions.show()

promotion_week = weekly_predictions.filter(
    (col("Year") == 2011) & (col("Week") == 39)
)

promotion_week.show()

quantity_sold_w39 = int(
    promotion_week.select("sum(prediction)").collect()[0][0]
)

print(" Expected units sold in Week 39 of 2011:", quantity_sold_w39)


                                                                                

+----+----+------------------+
|Year|Week|   sum(prediction)|
+----+----+------------------+
|2011|  42| 98574.74693642993|
|2011|  44|58196.024269496884|
|2011|  46|115832.41571273093|
|2011|  48| 71169.00494165577|
|2011|  45| 96252.57625426835|
|2011|  43| 96760.14299597735|
|2011|  39| 88721.13048441493|
|2011|  49| 65843.16377099641|
|2011|  47|106347.92921233115|
|2011|  41|  85017.1858023543|
|2011|  40|49799.563277199246|
+----+----+------------------+



                                                                                

+----+----+-----------------+
|Year|Week|  sum(prediction)|
+----+----+-----------------+
|2011|  39|88721.13048441493|
+----+----+-----------------+

 Expected units sold in Week 39 of 2011: 88721


In [12]:
my_spark.stop()
