### Install Necessary Packages

In [1]:
!pip install pyspark



### Necessary Packages

In [36]:
import os
import pyspark.sql.functions as F
from pyspark.sql import SparkSession,DataFrame
from pyspark.ml.feature import StringIndexer,OneHotEncoder,VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from tqdm.notebook import tqdm

### Mount the drive

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### Global

In [4]:
DATA_DIR = "./drive/MyDrive/bdt/tp4_data"

### Question 01 : store the content of all the csv files in a data frame

In [5]:
def read_csv_folder(root : str, spark : SparkSession) -> DataFrame | None:

  files = os.listdir(path=root)
  csv_files = list(filter(lambda filename : filename.endswith(".csv"),files))

  result = None

  for csv_file in tqdm(csv_files):
    csv_file_path = os.path.join(root, csv_file)
    df = spark.read.csv(path=csv_file_path, header=True, inferSchema=True)

    if result is None:
      result = df
    else:
      result = result.union(df)

  return result

In [6]:
spark = SparkSession.builder.getOrCreate()

In [7]:
df = read_csv_folder(root=DATA_DIR, spark=spark)

  0%|          | 0/22 [00:00<?, ?it/s]

In [8]:
df.show(n=5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   538521|    21754|HOME BUILDING BLO...|       3|2010-12-13 09:02:00|     5.95|   14180.0|United Kingdom|
|   538521|    21755|LOVE BUILDING BLO...|       3|2010-12-13 09:02:00|     5.95|   14180.0|United Kingdom|
|   538521|    22072|RED RETROSPOT TEA...|       8|2010-12-13 09:02:00|     3.75|   14180.0|United Kingdom|
|   538521|    22846|BREAD BIN DINER S...|       1|2010-12-13 09:02:00|    16.95|   14180.0|United Kingdom|
|   538521|    22849|BREAD BIN DINER S...|       1|2010-12-13 09:02:00|    16.95|   14180.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



### Question 02 : Display the schema of the resulted data frame

In [9]:
df.schema

StructType([StructField('InvoiceNo', StringType(), True), StructField('StockCode', StringType(), True), StructField('Description', StringType(), True), StructField('Quantity', IntegerType(), True), StructField('InvoiceDate', TimestampType(), True), StructField('UnitPrice', DoubleType(), True), StructField('CustomerID', DoubleType(), True), StructField('Country', StringType(), True)])

### Question 03 : Fill the missing values with the value 0

In [10]:
df = df.fillna(value=0)

### Question 04 : Add a new column `DayOfWeek`

In [11]:
df = df.withColumn("DayOfWeek", F.date_format(df.InvoiceDate, format='EEEE'))

In [12]:
df.show(n=5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|DayOfWeek|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+
|   538521|    21754|HOME BUILDING BLO...|       3|2010-12-13 09:02:00|     5.95|   14180.0|United Kingdom|   Monday|
|   538521|    21755|LOVE BUILDING BLO...|       3|2010-12-13 09:02:00|     5.95|   14180.0|United Kingdom|   Monday|
|   538521|    22072|RED RETROSPOT TEA...|       8|2010-12-13 09:02:00|     3.75|   14180.0|United Kingdom|   Monday|
|   538521|    22846|BREAD BIN DINER S...|       1|2010-12-13 09:02:00|    16.95|   14180.0|United Kingdom|   Monday|
|   538521|    22849|BREAD BIN DINER S...|       1|2010-12-13 09:02:00|    16.95|   14180.0|United Kingdom|   Monday|
+---------+---------+--------------------+--------+-----

### Question 05 : Split the dataframe into a train and test sets based on the `InvoiceDate` column

In [13]:
df.count()

45408

In [14]:
train_df = df.where(df.InvoiceDate < '2010-12-13')
test_df = df.where(df.InvoiceDate > '2010-12-13')

In [15]:
print(f"Number of data points in the training set is : {train_df.count()}")
print(f"Number of data points in the test set is : {test_df.count()}")

Number of data points in the training set is : 26732
Number of data points in the test set is : 18676


### Question 06 : Create a `StringIndexer` to transform the `DayOfWeek` values to numerique values

In [16]:
dow_indexer = StringIndexer() \
  .setInputCol("DayOfWeek") \
  .setOutputCol("NumDayOfWeek")
dow_indexer_model = dow_indexer.fit(train_df)

In [17]:
train_df = dow_indexer_model.transform(dataset=train_df)
train_df.show(n=5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|DayOfWeek|NumDayOfWeek|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+
|   537667|    22158|3 HEARTS HANGING ...|     128|2010-12-08 08:12:00|     2.55|   17870.0|United Kingdom|Wednesday|         0.0|
|   537668|    22867|HAND WARMER BIRD ...|      12|2010-12-08 08:43:00|      2.1|   14821.0|United Kingdom|Wednesday|         0.0|
|   537668|    22158|3 HEARTS HANGING ...|       8|2010-12-08 08:43:00|     2.95|   14821.0|United Kingdom|Wednesday|         0.0|
|   537669|    84978|HANGING HEART JAR...|      12|2010-12-08 08:58:00|     1.25|   16863.0|United Kingdom|Wednesday|         0.0|
|   537669|    21726|MULTI HEARTS  STI...|      12|2010-12-08 08:58:00|     0.85|  

### Quesion 07 :

- The problem can be solved by one hot encoding the values of the `DayOfWeek` column instead of ordinal encoding.

In [18]:
dow_ohe = OneHotEncoder() \
  .setInputCol("NumDayOfWeek") \
  .setOutputCol("IdxDayOfWeek")
dow_ohe_model = dow_ohe.fit(train_df)

In [19]:
train_df = dow_ohe_model.transform(train_df)
train_df.show(n=5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+-------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|DayOfWeek|NumDayOfWeek| IdxDayOfWeek|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+-------------+
|   537667|    22158|3 HEARTS HANGING ...|     128|2010-12-08 08:12:00|     2.55|   17870.0|United Kingdom|Wednesday|         0.0|(5,[0],[1.0])|
|   537668|    22867|HAND WARMER BIRD ...|      12|2010-12-08 08:43:00|      2.1|   14821.0|United Kingdom|Wednesday|         0.0|(5,[0],[1.0])|
|   537668|    22158|3 HEARTS HANGING ...|       8|2010-12-08 08:43:00|     2.95|   14821.0|United Kingdom|Wednesday|         0.0|(5,[0],[1.0])|
|   537669|    84978|HANGING HEART JAR...|      12|2010-12-08 08:58:00|     1.25|   16863.0|United Kingdom|Wednesday|         0.0|

### Question 08 : Create a `VectorAssembler` to combine the columns `UnitPrice`,`Quantity` and `IdxDayOfWeek`

In [20]:
assembler = VectorAssembler() \
  .setInputCols(['UnitPrice','Quantity','IdxDayOfWeek']) \
  .setOutputCol('Vector')

In [21]:
train_df = assembler.transform(train_df)
train_df.show(n=5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+-------------+--------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|DayOfWeek|NumDayOfWeek| IdxDayOfWeek|              Vector|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+-------------+--------------------+
|   537667|    22158|3 HEARTS HANGING ...|     128|2010-12-08 08:12:00|     2.55|   17870.0|United Kingdom|Wednesday|         0.0|(5,[0],[1.0])|(7,[0,1,2],[2.55,...|
|   537668|    22867|HAND WARMER BIRD ...|      12|2010-12-08 08:43:00|      2.1|   14821.0|United Kingdom|Wednesday|         0.0|(5,[0],[1.0])|(7,[0,1,2],[2.1,1...|
|   537668|    22158|3 HEARTS HANGING ...|       8|2010-12-08 08:43:00|     2.95|   14821.0|United Kingdom|Wednesday|         0.0|(5,[0],[1.0])|(7,[0,1,2],[2.95,...|
|   

### Question 09 : Create a pipeline that combines steps of the questions 6,7 and 8.

In [22]:
train_df = train_df.drop('NumDayOfWeek','IdxDayOfWeek','Vector')

In [23]:
pipeline = Pipeline(stages=[
    StringIndexer().setInputCol("DayOfWeek").setOutputCol("NumDayOfWeek"),
    OneHotEncoder().setInputCol("NumDayOfWeek").setOutputCol("IdxDayOfWeek"),
    VectorAssembler().setInputCols(['UnitPrice','Quantity','IdxDayOfWeek']).setOutputCol('Vector')
])

### Question 10 : Specify the number of values to be indexed by the string indexer

In [24]:
pipeline_model = pipeline.fit(train_df)

### Question 11 : Transfrom the data frame using the pipeline

In [25]:
train_df = pipeline_model.transform(train_df)
test_df = pipeline_model.transform(test_df)

In [27]:
train_df.show(n=5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+-------------+--------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|DayOfWeek|NumDayOfWeek| IdxDayOfWeek|              Vector|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+-------------+--------------------+
|   537667|    22158|3 HEARTS HANGING ...|     128|2010-12-08 08:12:00|     2.55|   17870.0|United Kingdom|Wednesday|         0.0|(5,[0],[1.0])|(7,[0,1,2],[2.55,...|
|   537668|    22867|HAND WARMER BIRD ...|      12|2010-12-08 08:43:00|      2.1|   14821.0|United Kingdom|Wednesday|         0.0|(5,[0],[1.0])|(7,[0,1,2],[2.1,1...|
|   537668|    22158|3 HEARTS HANGING ...|       8|2010-12-08 08:43:00|     2.95|   14821.0|United Kingdom|Wednesday|         0.0|(5,[0],[1.0])|(7,[0,1,2],[2.95,...|
|   

In [28]:
test_df.show(n=5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+-------------+--------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|DayOfWeek|NumDayOfWeek| IdxDayOfWeek|              Vector|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+-------------+--------------------+
|   538521|    21754|HOME BUILDING BLO...|       3|2010-12-13 09:02:00|     5.95|   14180.0|United Kingdom|   Monday|         4.0|(5,[4],[1.0])|(7,[0,1,6],[5.95,...|
|   538521|    21755|LOVE BUILDING BLO...|       3|2010-12-13 09:02:00|     5.95|   14180.0|United Kingdom|   Monday|         4.0|(5,[4],[1.0])|(7,[0,1,6],[5.95,...|
|   538521|    22072|RED RETROSPOT TEA...|       8|2010-12-13 09:02:00|     3.75|   14180.0|United Kingdom|   Monday|         4.0|(5,[4],[1.0])|(7,[0,1,6],[3.75,...|
|   

### Question 12 : create a KMeans instance

In [26]:
kmeans = KMeans() \
  .setFeaturesCol('Vector') \
  .setK(20)

### Question 13 : fit the kmeans model on the training data

In [30]:
kmeans_model = kmeans.fit(train_df)

### Question 14 : make predictions on the test set

In [37]:
test_predictions = kmeans_model.transform(test_df)

In [38]:
test_predictions.show(n=5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+-------------+--------------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|DayOfWeek|NumDayOfWeek| IdxDayOfWeek|              Vector|prediction|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+-------------+--------------------+----------+
|   538521|    21754|HOME BUILDING BLO...|       3|2010-12-13 09:02:00|     5.95|   14180.0|United Kingdom|   Monday|         4.0|(5,[4],[1.0])|(7,[0,1,6],[5.95,...|         0|
|   538521|    21755|LOVE BUILDING BLO...|       3|2010-12-13 09:02:00|     5.95|   14180.0|United Kingdom|   Monday|         4.0|(5,[4],[1.0])|(7,[0,1,6],[5.95,...|         0|
|   538521|    22072|RED RETROSPOT TEA...|       8|2010-12-13 09:02:00|     3.75|   14180.0|United Kingdom|   Monda

### Question 15 : calculate the silhouette coefficient

In [39]:
evaluator = ClusteringEvaluator()  \
  .setMetricName('silhouette') \
  .setFeaturesCol('Vector') \
  .setPredictionCol('prediction')

In [41]:
evaluator.evaluate(test_predictions)

0.49050335732496136