In [0]:

Online_Retail_df_cleaned_path = "/mnt/online-retail/silver-layer/Online_Retail_df_cleaned"

Online_Retail_df = spark.read.parquet(Online_Retail_df_cleaned_path)

Online_Retail_df.display()

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536367,22310,IVORY KNITTED MUG COSY,6,2010-12-01,1.65,13047,United Kingdom
536388,21523,DOORMAT FANCY FONT HOME SWEET HOME,2,2010-12-01,7.95,16250,United Kingdom
536401,20749,ASSORTED COLOUR MINI CASES,1,2010-12-01,7.95,15862,United Kingdom
536464,22297,HEART IVORY TRELLIS SMALL,1,2010-12-01,1.25,17968,United Kingdom
536500,22712,CARD DOLLY GIRL,12,2010-12-01,0.42,17377,United Kingdom
536576,21354,TOAST ITS - BEST MUM,12,2010-12-01,1.05,13777,United Kingdom
536578,22729,ALARM CLOCK BAKELIKE ORANGE,4,2010-12-01,3.75,17690,United Kingdom
536609,82482,WOODEN PICTURE FRAME WHITE FINISH,6,2010-12-02,2.1,17850,United Kingdom
536615,22467,GUMBALL COAT RACK,12,2010-12-02,2.55,14047,United Kingdom
536617,21356,TOAST ITS - FAIRY FLOWER,12,2010-12-02,1.25,13941,United Kingdom


In [0]:
from pyspark.sql.functions import year, month, col
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
data = Online_Retail_df.withColumn("Year", year(col("InvoiceDate"))) \
                       .withColumn("Month", month(col("InvoiceDate")))

In [0]:
country_indexer = StringIndexer(inputCol="Country", outputCol="CountryIndex")
feature_columns = ["Quantity", "UnitPrice", "Year", "Month", "CountryIndex"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

In [0]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

In [0]:
lr = LinearRegression(featuresCol="features", labelCol="Quantity", predictionCol="prediction")

In [0]:
pipeline = Pipeline(stages=[country_indexer, assembler, lr])

In [0]:
train_data = train_data.withColumn("Quantity", col("Quantity").cast("double"))
train_data = train_data.withColumn("UnitPrice", col("UnitPrice").cast("double"))

In [0]:
model = pipeline.fit(train_data)

Downloading artifacts:   0%|          | 0/30 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

In [0]:
test_data = test_data.withColumn("Quantity", col("Quantity").cast("double"))
test_data = test_data.withColumn("UnitPrice", col("UnitPrice").cast("double"))

predictions = model.transform(test_data)

In [0]:
evaluator = RegressionEvaluator(labelCol="Quantity", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"R2 Score: {r2}")


Root Mean Squared Error (RMSE): 2.3515407852317815e-11
R2 Score: 1.0


In [0]:
future_data = spark.createDataFrame([
    (5.0, 20.0, 2024, 1, "United Kingdom"),  
    (10.0, 15.0, 2024, 2, "France"),
    (8.0, 12.0, 2024, 3, "Germany")
], ["UnitPrice", "Quantity", "Year", "Month", "Country"])


In [0]:
future_predictions = model.transform(future_data)
future_predictions.select("Year", "Month", "Country", "Quantity", "prediction").show()

+----+-----+--------------+--------+------------------+
|Year|Month|       Country|Quantity|        prediction|
+----+-----+--------------+--------+------------------+
|2024|    1|United Kingdom|    20.0| 19.99999999868203|
|2024|    2|        France|    15.0|14.999999998679586|
|2024|    3|       Germany|    12.0|11.999999998676993|
+----+-----+--------------+--------+------------------+

