In [1]:
!pip install pyspark



In [2]:
import os
import pyspark.sql.functions as F
from pyspark.sql import SparkSession,DataFrame
from pyspark.ml.feature import StringIndexer,OneHotEncoder,VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from tqdm.notebook import tqdm
from google.colab import drive


In [3]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
DATA_DIR = "/content/drive/MyDrive/tp4_data"
spark = SparkSession.builder.getOrCreate()

### Question 01 : store the content of all the csv files in a data frame

In [5]:
def read_csv_folder(root : str, spark : SparkSession) -> DataFrame | None:

  files = os.listdir(path=root)
  csv_files = list(filter(lambda filename : filename.endswith(".csv"),files))

  result = None

  for csv_file in tqdm(csv_files):
    csv_file_path = os.path.join(root, csv_file)
    df = spark.read.csv(path=csv_file_path, header=True, inferSchema=True)

    if result is None:
      result = df
    else:
      result = result.union(df)

  return result

In [6]:
df = read_csv_folder(root=DATA_DIR, spark=spark)

  0%|          | 0/22 [00:00<?, ?it/s]

In [7]:
df.show(n=4)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   539993|    22386|JUMBO BAG PINK PO...|      10|2011-01-04 10:00:00|     1.95|   13313.0|United Kingdom|
|   539993|    21499|  BLUE POLKADOT WRAP|      25|2011-01-04 10:00:00|     0.42|   13313.0|United Kingdom|
|   539993|    21498| RED RETROSPOT WRAP |      25|2011-01-04 10:00:00|     0.42|   13313.0|United Kingdom|
|   539993|    22379|RECYCLING BAG RET...|       5|2011-01-04 10:00:00|      2.1|   13313.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 4 rows



### Question 02 : Display the schema of the resulted data frame

In [8]:
df.schema

StructType([StructField('InvoiceNo', StringType(), True), StructField('StockCode', StringType(), True), StructField('Description', StringType(), True), StructField('Quantity', IntegerType(), True), StructField('InvoiceDate', TimestampType(), True), StructField('UnitPrice', DoubleType(), True), StructField('CustomerID', DoubleType(), True), StructField('Country', StringType(), True)])

### Question 03 : Fill the missing values with the value 0

In [9]:
df = df.fillna(value=0)

### Question 04 : Add a new column `DayOfWeek`
La valeur de la colonne est le jour de la semaine correspondant à la date de chaque ligne dans la colonne
 "InvoiceDate".

In [10]:
df = df.withColumn("DayOfWeek", F.date_format(df.InvoiceDate, format='EEEE'))
#Fonction PySpark pour formater les dates
#F fait référence au module pyspark.sql.functions
#Le pattern de formatage qui extrait le jour de la semaine en texte complet

In [11]:
df.show(n=4)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|DayOfWeek|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+
|   539993|    22386|JUMBO BAG PINK PO...|      10|2011-01-04 10:00:00|     1.95|   13313.0|United Kingdom|  Tuesday|
|   539993|    21499|  BLUE POLKADOT WRAP|      25|2011-01-04 10:00:00|     0.42|   13313.0|United Kingdom|  Tuesday|
|   539993|    21498| RED RETROSPOT WRAP |      25|2011-01-04 10:00:00|     0.42|   13313.0|United Kingdom|  Tuesday|
|   539993|    22379|RECYCLING BAG RET...|       5|2011-01-04 10:00:00|      2.1|   13313.0|United Kingdom|  Tuesday|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+
only showing top 4 rows



 ** 5) Diviser les données en un ensemble d'apprentissage et un ensemble de test. Effectuer
 la division en se basant sur l'attribut InvoiceDate : l'ensemble d'apprentissage contient
 les achats effectués avant 2010-12-13 et l'ensemble de test contient les achats effectués
 durant ou après 2010-12-13.**

In [12]:
train_df = df.where(df.InvoiceDate < '2010-12-13')
test_df = df.where(df.InvoiceDate > '2010-12-13')

In [13]:
print(f"Number of data points in the training set is : {train_df.count()}")
print(f"Number of data points in the test set is : {test_df.count()}")

Number of data points in the training set is : 26732
Number of data points in the test set is : 18676


** 6) Créer un StringIndexer qui permet de transformer les jours de semaine présents dans la colonne day_of_week en valeurs numériques correspondantes.**

In [14]:
dow_indexer = StringIndexer() \
  .setInputCol("DayOfWeek") \
  .setOutputCol("NumDayOfWeek")
dow_indexer_model = dow_indexer.fit(train_df)

In [15]:
train_df = dow_indexer_model.transform(dataset=train_df)
train_df.show(n=5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|DayOfWeek|NumDayOfWeek|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+
|   538365|    22469|HEART OF WICKER S...|       8|2010-12-12 10:11:00|     1.65|   17243.0|United Kingdom|   Sunday|         3.0|
|   538365|   84030E|ENGLISH ROSE HOT ...|       1|2010-12-12 10:11:00|     4.25|   17243.0|United Kingdom|   Sunday|         3.0|
|   538365|    22112|CHOCOLATE HOT WAT...|       3|2010-12-12 10:11:00|     4.95|   17243.0|United Kingdom|   Sunday|         3.0|
|   538365|    22835|HOT WATER BOTTLE ...|       5|2010-12-12 10:11:00|     4.65|   17243.0|United Kingdom|   Sunday|         3.0|
|   538365|   84029E|RED WOOLLY HOTTIE...|       4|2010-12-12 10:11:00|     3.75|  

**En utilisant le StringIndexer, Spark peut par exemple représenter samedi par 6 et
 lundi par 1. Cependant, avec ce schéma de numérotation, nous indiquons
 implicitement que samedi est supérieur à lundi (par des valeurs numériques pures).
 Ceci est évidemment incorrect. Comment résoudre ce problème ?**

oui c'est vrai , il va considérer qu'une valeur 6 par exemple est plus grande que la valeur 0 , pour cela on utilise le Onehotencoder qui va convertir les index en vecteur binaire (1 et 0 )
et puis si on apprlique un algorithme de machine learning par exmple kmeans , il ne va pas considerer que la distance entre les jours est un critere pour la categorisation , parceque simplement la distance entre les veteurs est toujours fixe et puis l'interpretaion de la colonne par le model soit juste

In [16]:
dow_ohe = OneHotEncoder() \
  .setInputCol("NumDayOfWeek") \
  .setOutputCol("IdxDayOfWeek")
dow_ohe_model = dow_ohe.fit(train_df)

In [17]:
train_df = dow_ohe_model.transform(train_df)
train_df.show(n=5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+-------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|DayOfWeek|NumDayOfWeek| IdxDayOfWeek|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+-------------+
|   538365|    22469|HEART OF WICKER S...|       8|2010-12-12 10:11:00|     1.65|   17243.0|United Kingdom|   Sunday|         3.0|(5,[3],[1.0])|
|   538365|   84030E|ENGLISH ROSE HOT ...|       1|2010-12-12 10:11:00|     4.25|   17243.0|United Kingdom|   Sunday|         3.0|(5,[3],[1.0])|
|   538365|    22112|CHOCOLATE HOT WAT...|       3|2010-12-12 10:11:00|     4.95|   17243.0|United Kingdom|   Sunday|         3.0|(5,[3],[1.0])|
|   538365|    22835|HOT WATER BOTTLE ...|       5|2010-12-12 10:11:00|     4.65|   17243.0|United Kingdom|   Sunday|         3.0|

RQ:
La représentation (5, [3], [1.0]) signifie :

Taille du vecteur : 5

Indices des éléments non nuls : [3] (donc l'index 3 est non nul)

Valeurs des éléments non nuls : [1.0]

**8) Créer un VectorAssembler contenant trois attributs : UnitPrice, Quantity, et
 day_of_week_encoded.**

In [18]:
assembler = VectorAssembler() \
  .setInputCols(['UnitPrice','Quantity','IdxDayOfWeek']) \
  .setOutputCol('Vector')

In [19]:
train_df = assembler.transform(train_df)
train_df.show(n=5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+-------------+--------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|DayOfWeek|NumDayOfWeek| IdxDayOfWeek|              Vector|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+-------------+--------------------+
|   538365|    22469|HEART OF WICKER S...|       8|2010-12-12 10:11:00|     1.65|   17243.0|United Kingdom|   Sunday|         3.0|(5,[3],[1.0])|(7,[0,1,5],[1.65,...|
|   538365|   84030E|ENGLISH ROSE HOT ...|       1|2010-12-12 10:11:00|     4.25|   17243.0|United Kingdom|   Sunday|         3.0|(5,[3],[1.0])|(7,[0,1,5],[4.25,...|
|   538365|    22112|CHOCOLATE HOT WAT...|       3|2010-12-12 10:11:00|     4.95|   17243.0|United Kingdom|   Sunday|         3.0|(5,[3],[1.0])|(7,[0,1,5],[4.95,...|
|   

Remarque:
La taille totale du vecteur est 7(3 colonne : 1 col + 1 col + 5 de vecteur crée)

Les indices [0,1,5] signifient que les positions 0, 1 et 5 ont des valeurs non nulles.

Les valeurs sont [1.65, 8.0, 1.0] correspondant respectivement à UnitPrice, Quantity et la position 3 de IdxDayOfWeek (qui est devenue la position 5 dans le vecteur assemblé).

**9) Créer un pipeline configuré avec les résultats des étapes 6, 7 et 8**

In [25]:
train_df = train_df.drop('NumDayOfWeek','IdxDayOfWeek','Vector')

In [26]:
pipeline = Pipeline(stages=[
    StringIndexer().setInputCol("DayOfWeek").setOutputCol("NumDayOfWeek"),
    OneHotEncoder().setInputCol("NumDayOfWeek").setOutputCol("IdxDayOfWeek"),
    VectorAssembler().setInputCols(['UnitPrice','Quantity','IdxDayOfWeek']).setOutputCol('Vector')
])

**10)Notre StringIndexer doit savoir combien de valeurs uniques il y a à indexer, comment
résoudre ce problème ?**

In [27]:
pipeline_model = pipeline.fit(train_df)

**11)Transformer les données de l’ensemble d’apprentissage en se basant sur les étapes
(stages) du pipeline.**

In [28]:
train_df = pipeline_model.transform(train_df)
test_df = pipeline_model.transform(test_df)

In [29]:
train_df.show(n=5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+-------------+--------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|DayOfWeek|NumDayOfWeek| IdxDayOfWeek|              Vector|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+-------------+--------------------+
|   538365|    22469|HEART OF WICKER S...|       8|2010-12-12 10:11:00|     1.65|   17243.0|United Kingdom|   Sunday|         3.0|(5,[3],[1.0])|(7,[0,1,5],[1.65,...|
|   538365|   84030E|ENGLISH ROSE HOT ...|       1|2010-12-12 10:11:00|     4.25|   17243.0|United Kingdom|   Sunday|         3.0|(5,[3],[1.0])|(7,[0,1,5],[4.25,...|
|   538365|    22112|CHOCOLATE HOT WAT...|       3|2010-12-12 10:11:00|     4.95|   17243.0|United Kingdom|   Sunday|         3.0|(5,[3],[1.0])|(7,[0,1,5],[4.95,...|
|   

In [30]:
test_df.show(n=5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+------------+--------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|DayOfWeek|NumDayOfWeek|IdxDayOfWeek|              Vector|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+------------+--------------------+
|   539993|    22386|JUMBO BAG PINK PO...|      10|2011-01-04 10:00:00|     1.95|   13313.0|United Kingdom|  Tuesday|         5.0|   (5,[],[])|(7,[0,1],[1.95,10...|
|   539993|    21499|  BLUE POLKADOT WRAP|      25|2011-01-04 10:00:00|     0.42|   13313.0|United Kingdom|  Tuesday|         5.0|   (5,[],[])|(7,[0,1],[0.42,25...|
|   539993|    21498| RED RETROSPOT WRAP |      25|2011-01-04 10:00:00|     0.42|   13313.0|United Kingdom|  Tuesday|         5.0|   (5,[],[])|(7,[0,1],[0.42,25...|
|   539993

**12)Créer une instance de KMeans, onsupposse que le nombre de clusters est 20.**

In [31]:
kmeans = KMeans() \
  .setFeaturesCol('Vector') \
  .setK(20)

**13)Lancer l’apprentissage de KMeans en se basant sur l’ensemble obtenu dans l’étape 11**

In [32]:
kmeans_model = kmeans.fit(train_df)

**14)Effectuer des prédictions sur l’ensemble de test.**

In [33]:
test_predictions = kmeans_model.transform(test_df)

In [34]:
test_predictions.show(n=5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+------------+--------------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|DayOfWeek|NumDayOfWeek|IdxDayOfWeek|              Vector|prediction|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+------------+------------+--------------------+----------+
|   539993|    22386|JUMBO BAG PINK PO...|      10|2011-01-04 10:00:00|     1.95|   13313.0|United Kingdom|  Tuesday|         5.0|   (5,[],[])|(7,[0,1],[1.95,10...|        14|
|   539993|    21499|  BLUE POLKADOT WRAP|      25|2011-01-04 10:00:00|     0.42|   13313.0|United Kingdom|  Tuesday|         5.0|   (5,[],[])|(7,[0,1],[0.42,25...|        15|
|   539993|    21498| RED RETROSPOT WRAP |      25|2011-01-04 10:00:00|     0.42|   13313.0|United Kingdom|  Tuesday|   

**15)Calculer le coefficient de silhouette.**

In [35]:
evaluator = ClusteringEvaluator()  \
  .setMetricName('silhouette') \
  .setFeaturesCol('Vector') \
  .setPredictionCol('prediction')

In [36]:
evaluator.evaluate(test_predictions)

0.4163293280065855