In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, date_format
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator


In [None]:
from google.colab import files
uploaded = files.upload()

In [None]:
# 1) Store the contents of all files in the tp4_data folder into a DataFrame.
spark = SparkSession.builder.appName("Lab4").getOrCreate()
df = spark.read.csv("*.csv", header=True, inferSchema=True)

#  2) Display the schema of the resulting DataFrame.
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [None]:
df.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   537226|    22811|SET OF 6 T-LIGHTS...|       6|2010-12-06 08:34:00|     2.95|   15987.0|United Kingdom|
|   537226|    21713|CITRONELLA CANDLE...|       8|2010-12-06 08:34:00|      2.1|   15987.0|United Kingdom|
|   537226|    22927|GREEN GIANT GARDE...|       2|2010-12-06 08:34:00|     5.95|   15987.0|United Kingdom|
|   537226|    20802|SMALL GLASS SUNDA...|       6|2010-12-06 08:34:00|     1.65|   15987.0|United Kingdom|
|   537226|    22052|VINTAGE CARAVAN G...|      25|2010-12-06 08:34:00|     0.42|   15987.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [None]:
# 3) Fill the missing values (NaN) with the value 0.
df = df.na.fill(0)

In [None]:
# 4) Add a new column named "day_of_week"
df = df.withColumn("day_of_week", date_format("InvoiceDate", "EEEE"))
df.show(5)


+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|   537226|    22811|SET OF 6 T-LIGHTS...|       6|2010-12-06 08:34:00|     2.95|   15987.0|United Kingdom|     Monday|
|   537226|    21713|CITRONELLA CANDLE...|       8|2010-12-06 08:34:00|      2.1|   15987.0|United Kingdom|     Monday|
|   537226|    22927|GREEN GIANT GARDE...|       2|2010-12-06 08:34:00|     5.95|   15987.0|United Kingdom|     Monday|
|   537226|    20802|SMALL GLASS SUNDA...|       6|2010-12-06 08:34:00|     1.65|   15987.0|United Kingdom|     Monday|
|   537226|    22052|VINTAGE CARAVAN G...|      25|2010-12-06 08:34:00|     0.42|   15987.0|United Kingdom|     Monday|
+---------+---------+-------------------

In [None]:
# 5) Split the data into a training set and a test set
train = df.filter(df.InvoiceDate < "2010-12-13")
test = df.filter(df.InvoiceDate >= "2010-12-13")

In [None]:
# 6) convert the days of the week in the "day_of_week" column into their corresponding numerical values.
indexer = StringIndexer(inputCol="day_of_week", outputCol="day_of_week_index", handleInvalid="keep")

In [None]:
# 7) To solve that problem we use one hot encoding
encoder = OneHotEncoder(inputCol="day_of_week_index", outputCol="day_of_week_encoded")

In [None]:
# 8) VectorAssembler to Combine Features
assembler = VectorAssembler(inputCols=["UnitPrice", "Quantity", "day_of_week_encoded"], outputCol="features")

# it is like RFormula that we see it in the course

In [None]:
# 9) Create a pipeline configured with the results of steps 6, 7, and 8.
pipeline = Pipeline(stages=[indexer, encoder, assembler])
# here we build a pipeline with StringIndexer(transformer), VectorAssembler(transformer), and KMeans(estimator)

In [None]:
# 10) 10) Our StringIndexer needs to know how many unique values it has to index. How can this issue be resolved?
# ==> we did handleInvalid="keep" so that if the model get a value that he did not seed in the training set he will
# handel the new label as numLabels + 1

In [None]:
# 11) Transform the training set data based on the stages of the pipeline.
pipeline_model = pipeline.fit(train)
train_transformed = pipeline_model.transform(train)

In [None]:
# 12)Create an instance of KMeans, assuming the number of clusters is 20.
from pyspark.ml.clustering import KMeans
kmeans = KMeans(featuresCol="features", k=20)

In [None]:
# 13) Train the KMeans model using the transformed data from step 11.
model = kmeans.fit(train_transformed)

In [None]:
# 14) Make predictions on the test set.
test_transformed = pipeline_model.transform(test)
predictions = model.transform(test_transformed)

In [None]:
# 15)Calculate the Silhouette coefficient.
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Coefficient: {silhouette}")
# evaluator

Silhouette Coefficient: 0.7277052837147251
