In [None]:
!pip install pyspark
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from matplotlib import pyplot as plt



## **1) Create dataframe.**

In [None]:
spark = SparkSession.builder.appName("TP4_MLlib").getOrCreate()
df = spark.read.option("header", True).option("inferSchema", True).csv("data/*.csv")

## **2) Show schema**

In [None]:
df.printSchema()
df.show(5)

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   537226|    22811|SET OF 6 T-LIGHTS...|       6|2010-12-06 08:34:00|     2.95|   15987.0|United Kingdom|
|   537226|    21713|CITRONELLA CANDLE...|       8|2010-12-06 08:34:00|      2.1|   15987.0|United Kingdom|
|   537226|    22927|GREEN GIANT GARDE...|       2|2010-12-06 08:34:00|     5.95|   15987.0|United Kingdom|
|   537226| 

## **3) Replace NaN with 0**

In [None]:
df = df.fillna(0)
df.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   537226|    22811|SET OF 6 T-LIGHTS...|       6|2010-12-06 08:34:00|     2.95|   15987.0|United Kingdom|
|   537226|    21713|CITRONELLA CANDLE...|       8|2010-12-06 08:34:00|      2.1|   15987.0|United Kingdom|
|   537226|    22927|GREEN GIANT GARDE...|       2|2010-12-06 08:34:00|     5.95|   15987.0|United Kingdom|
|   537226|    20802|SMALL GLASS SUNDA...|       6|2010-12-06 08:34:00|     1.65|   15987.0|United Kingdom|
|   537226|    22052|VINTAGE CARAVAN G...|      25|2010-12-06 08:34:00|     0.42|   15987.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



## **4) Add a day_of_week column from InvoiceDate**

In [None]:
df = df.withColumn("day_of_week", date_format(col("InvoiceDate"), "E"))
df.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|   537226|    22811|SET OF 6 T-LIGHTS...|       6|2010-12-06 08:34:00|     2.95|   15987.0|United Kingdom|        Mon|
|   537226|    21713|CITRONELLA CANDLE...|       8|2010-12-06 08:34:00|      2.1|   15987.0|United Kingdom|        Mon|
|   537226|    22927|GREEN GIANT GARDE...|       2|2010-12-06 08:34:00|     5.95|   15987.0|United Kingdom|        Mon|
|   537226|    20802|SMALL GLASS SUNDA...|       6|2010-12-06 08:34:00|     1.65|   15987.0|United Kingdom|        Mon|
|   537226|    22052|VINTAGE CARAVAN G...|      25|2010-12-06 08:34:00|     0.42|   15987.0|United Kingdom|        Mon|
+---------+---------+-------------------

## **6) Split data**

In [None]:
train_df = df.filter(col("InvoiceDate") < "2010-12-13")
test_df  = df.filter(col("InvoiceDate") >= "2010-12-13")

train_df.show(5)
test_df.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|   537226|    22811|SET OF 6 T-LIGHTS...|       6|2010-12-06 08:34:00|     2.95|   15987.0|United Kingdom|        Mon|
|   537226|    21713|CITRONELLA CANDLE...|       8|2010-12-06 08:34:00|      2.1|   15987.0|United Kingdom|        Mon|
|   537226|    22927|GREEN GIANT GARDE...|       2|2010-12-06 08:34:00|     5.95|   15987.0|United Kingdom|        Mon|
|   537226|    20802|SMALL GLASS SUNDA...|       6|2010-12-06 08:34:00|     1.65|   15987.0|United Kingdom|        Mon|
|   537226|    22052|VINTAGE CARAVAN G...|      25|2010-12-06 08:34:00|     0.42|   15987.0|United Kingdom|        Mon|
+---------+---------+-------------------

## **7) Encode with StringIndexer**

In [None]:
indexer = StringIndexer(
    inputCol="day_of_week",
    outputCol="day_of_week_index",
    handleInvalid="keep"   # handles unseen values
)

indexer_model = indexer.fit(test_df)
# Transform the data (adds the new column 'day_of_week_index')
indexed_df = indexer_model.transform(test_df)

# Show only the relevant columns
print("Indexer labels (index → day):")
for i, label in enumerate(indexer_model.labels):
    print(f"{i} → {label}")

# Transform the data
indexed_df = indexer_model.transform(test_df)

# Show sample rows
indexed_df.select("day_of_week", "day_of_week_index").show(10)


Indexer labels (index → day):
0 → Tue
1 → Mon
2 → Wed
3 → Fri
4 → Thu
5 → Sun
+-----------+-----------------+
|day_of_week|day_of_week_index|
+-----------+-----------------+
|        Fri|              3.0|
|        Fri|              3.0|
|        Fri|              3.0|
|        Fri|              3.0|
|        Fri|              3.0|
|        Fri|              3.0|
|        Fri|              3.0|
|        Fri|              3.0|
|        Fri|              3.0|
|        Fri|              3.0|
+-----------+-----------------+
only showing top 10 rows



## **Fix problem with OneHotIncoder**

In [None]:
encoder = OneHotEncoder(
    inputCol="day_of_week_index",
    outputCol="day_of_week_encoded"
)

encoder_model = encoder.fit(indexed_df)
encoded_df = encoder_model.transform(indexed_df)

# ✅ Show indexing + the one-hot encoded vector
encoded_df.select(
    "day_of_week",
    "day_of_week_index",
    "day_of_week_encoded"
).show(10, truncate=False)

+-----------+-----------------+-------------------+
|day_of_week|day_of_week_index|day_of_week_encoded|
+-----------+-----------------+-------------------+
|Fri        |3.0              |(6,[3],[1.0])      |
|Fri        |3.0              |(6,[3],[1.0])      |
|Fri        |3.0              |(6,[3],[1.0])      |
|Fri        |3.0              |(6,[3],[1.0])      |
|Fri        |3.0              |(6,[3],[1.0])      |
|Fri        |3.0              |(6,[3],[1.0])      |
|Fri        |3.0              |(6,[3],[1.0])      |
|Fri        |3.0              |(6,[3],[1.0])      |
|Fri        |3.0              |(6,[3],[1.0])      |
|Fri        |3.0              |(6,[3],[1.0])      |
+-----------+-----------------+-------------------+
only showing top 10 rows



## **8) Assemble Features**

In [None]:
assembler = VectorAssembler(
    inputCols=["UnitPrice", "Quantity", "day_of_week_encoded"],
    outputCol="features"
)

## **9) Create pipline**

In [None]:
pipeline = Pipeline(stages=[indexer, encoder, assembler])

## **10) StringIndexer must know all unique values → Solution**

In [None]:
pipeline_model = pipeline.fit(train_df)

## **11) Transform training data using pipeline**

In [None]:
train_transformed = pipeline_model.transform(train_df)
train_transformed.select("features").show(5, truncate=False)

+---------------------------+
|features                   |
+---------------------------+
|(8,[0,1,6],[2.95,6.0,1.0]) |
|(8,[0,1,6],[2.1,8.0,1.0])  |
|(8,[0,1,6],[5.95,2.0,1.0]) |
|(8,[0,1,6],[1.65,6.0,1.0]) |
|(8,[0,1,6],[0.42,25.0,1.0])|
+---------------------------+
only showing top 5 rows



## **12) Create KMeans with k = 20**

In [None]:
kmeans = KMeans(k=20, featuresCol="features", predictionCol="prediction")

## **13) Train KMeans on transformed training data**

In [None]:
kmeans_model = kmeans.fit(train_transformed)

## **`14) Predict on test dataset`**

In [None]:
test_transformed = pipeline_model.transform(test_df)
predictions = kmeans_model.transform(test_transformed)
predictions.select("features", "prediction").show(50)

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|(8,[0,1,4],[4.95,...|         0|
|(8,[0,1,4],[3.95,...|         0|
|(8,[0,1,4],[0.42,...|         0|
|(8,[0,1,4],[0.85,...|         0|
|(8,[0,1,4],[0.85,...|         0|
|(8,[0,1,4],[0.85,...|         9|
|(8,[0,1,4],[7.95,...|         0|
|(8,[0,1,4],[0.21,...|        12|
|(8,[0,1,4],[1.25,...|         0|
|(8,[0,1,4],[1.25,...|         0|
|(8,[0,1,4],[1.25,...|         0|
|(8,[0,1,4],[1.25,...|         0|
|(8,[0,1,4],[1.45,...|         0|
|(8,[0,1,4],[1.95,...|         0|
|(8,[0,1,4],[9.95,...|         0|
|(8,[0,1,4],[9.95,...|         0|
|(8,[0,1,4],[1.45,...|         0|
|(8,[0,1,4],[5.95,...|         0|
|(8,[0,1,4],[7.95,...|         0|
|(8,[0,1,4],[2.95,...|         0|
|(8,[0,1,4],[1.25,...|         0|
|(8,[0,1,4],[2.95,...|         0|
|(8,[0,1,4],[3.75,...|         0|
|(8,[0,1,4],[9.95,...|         0|
|(8,[0,1,4],[1.25,...|         0|
|(8,[0,1,4],[12.75...|         0|
|(8,[0,1,4],[7

## **15) Compute silhouette score**

In [None]:
evaluator = ClusteringEvaluator(featuresCol="features", predictionCol="prediction")
silhouette = evaluator.evaluate(predictions)
print("Silhouette score =", silhouette)

Silhouette score = 0.703982474613244
