In [44]:
import plotly.express as px
import pandas as pd
from pyspark.sql import SparkSession , functions as F 
from pyspark.ml.feature import VectorAssembler , StandardScaler
from pyspark.ml.clustering import GaussianMixture , BisectingKMeans , KMeans
from pyspark.ml.evaluation import ClusteringEvaluator


**CREATION DE LA SESSION SPARK**

In [None]:
spark = SparkSession.builder.appName("OnlineRetail").getOrCreate()

**CHARGEMENT DU CSV**

In [2]:
data = spark.read.csv("data/Online_Retail_CSV.csv", header=True, inferSchema=True , sep =";")

**EDA**

In [3]:
data.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [4]:
data.show(5, vertical=True)

-RECORD 0---------------------------
 InvoiceNo   | 536365               
 StockCode   | 85123A               
 Description | WHITE HANGING HEA... 
 Quantity    | 6                    
 InvoiceDate | 01/12/2010 08:26     
 UnitPrice   | 2,55                 
 CustomerID  | 17850                
 Country     | United Kingdom       
-RECORD 1---------------------------
 InvoiceNo   | 536365               
 StockCode   | 71053                
 Description | WHITE METAL LANTERN  
 Quantity    | 6                    
 InvoiceDate | 01/12/2010 08:26     
 UnitPrice   | 3,39                 
 CustomerID  | 17850                
 Country     | United Kingdom       
-RECORD 2---------------------------
 InvoiceNo   | 536365               
 StockCode   | 84406B               
 Description | CREAM CUPID HEART... 
 Quantity    | 8                    
 InvoiceDate | 01/12/2010 08:26     
 UnitPrice   | 2,75                 
 CustomerID  | 17850                
 Country     | United Kingdom       
-

In [5]:
print("Nombre de lignes : ", data.count())
print("Nombre de colonnes : ", len(data.columns))

Nombre de lignes :  541909
Nombre de colonnes :  8


In [6]:
# On check que StockCode correspond bien à un produit unique.
data.filter(F.col("StockCode") == "71053").show(5)

+---------+---------+-------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|        Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+-------------------+--------+----------------+---------+----------+--------------+
|   536365|    71053|WHITE METAL LANTERN|       6|01/12/2010 08:26|     3,39|     17850|United Kingdom|
|   536373|    71053|WHITE METAL LANTERN|       6|01/12/2010 09:02|     3,39|     17850|United Kingdom|
|   536375|    71053|WHITE METAL LANTERN|       6|01/12/2010 09:32|     3,39|     17850|United Kingdom|
|   536396|    71053|WHITE METAL LANTERN|       6|01/12/2010 10:51|     3,39|     17850|United Kingdom|
|   536406|    71053|WHITE METAL LANTERN|       8|01/12/2010 11:33|     3,39|     17850|United Kingdom|
+---------+---------+-------------------+--------+----------------+---------+----------+--------------+
only showing top 5 rows


In [7]:
print(f"Nombre de clients uniques : {data.select('CustomerID').distinct().count()}")
print(f"Nombre de factures distinctes : {data.select('InvoiceNo').distinct().count()}")
print(f"Nombre de pays : {data.select('Country').distinct().count()}")
print(f"Nombre de produits : {data.select('StockCode').distinct().count()}")


Nombre de clients uniques : 4373
Nombre de factures distinctes : 25900
Nombre de pays : 38
Nombre de produits : 4070


In [8]:
data.select("Quantity").describe().show()

+-------+------------------+
|summary|          Quantity|
+-------+------------------+
|  count|            541909|
|   mean|  9.55224954743324|
| stddev|218.08115785023406|
|    min|            -80995|
|    max|             80995|
+-------+------------------+



On remarque que Quantity s'étend de -80995 à 80995 avec une moyenne de 9.5 et un écart type de 218. Cela indique une forte présence d'outliers. Ceci est confirmé par le graph ci-dessous:

In [9]:
df = data.select("Quantity").sample(0.1, seed=42).toPandas()
fig = px.violin(
    df,
    y="Quantity",
    box=True,
    points="outliers",
    title="Violin plot de la quantité"
)
fig.show()



Les valeurs extrêmes observées pour Quantity (jusqu’à 80 995 unités) ne correspondent pas à des achats individuels classiques, mais probablement à des commandes en gros ou à des corrections comptables (retours/annulations).
Ces valeurs extrêmes ont un impact important sur les statistiques et les modèles, et doivent être traitées avec précaution. Dans un premier temps , supprimons les retours.

In [10]:
data = data.filter(F.col("Quantity") >= 0)


**CHECK VALEURS MANQUANTES**

In [11]:
null_counts = data.select([F.sum(F.col(c).isNull().cast("int")).alias(c)
             for c in data.columns])
null_counts.show()

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|        0|        0|        592|       0|          0|        0|    133361|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



Ici on décide supprimer les lignes ou l'identifiant client est manquant : on en aura nécessairement besoin plus tard pour la segmentation client. 

In [12]:
count_before = data.count()
data_clean = data.filter(F.col("CustomerID").isNotNull())
count_after = data_clean.count() 

print(f"Nombre de lignes avant nettoyage : {count_before}")
print(f"Nombre de lignes après nettoyage : {count_after}")
print(f"Nombre de lignes supprimées : {count_before - count_after}")

Nombre de lignes avant nettoyage : 531285
Nombre de lignes après nettoyage : 397924
Nombre de lignes supprimées : 133361


On peut garder les lignes avec la Description manquante , elle ne sera pas utilisée pour notre modèle. On remplace simplement ces valeurs NaN par Unknown. 

In [13]:
data_clean = data_clean.fillna({"Description": "Unknown"})

**CHECK ET SUPPRESSION LIGNES DUPLIQUEES**

In [14]:
before = data_clean.count()

data_clean = data_clean.dropDuplicates()

after = data_clean.count()
print("Avant :", before, "| Après :", after, "| Supprimées :", before - after)


Avant : 397924 | Après : 392732 | Supprimées : 5192


**CONVERSION TYPES COLONNES**

On converti les valeurs UnitPrice pour que le séparateur soit un point et non une virgule afin de pouvoir performer le casting plus tard.

In [15]:
data_clean = data_clean.withColumn(
    "UnitPrice_cleaned",
    F.regexp_replace(F.col("UnitPrice"), ",", ".")
)

On convertit la colonne InvoiceDate en time-stamp en précisant le format. 

In [16]:
data_clean = data_clean.withColumn("InvoiceDate", F.to_timestamp(F.col("InvoiceDate"), "dd/MM/yyyy HH:mm"))


On convertit UnitPrice en double afin de pouvoir l'utiliser dans nos modèles plus tard. 

In [17]:
data_clean = data_clean.withColumn("UnitPrice_cleaned", F.col("UnitPrice_cleaned").cast("double"))

On vérifie.

In [18]:
data_clean.select("InvoiceDate").show(5,truncate=False)

+-------------------+
|InvoiceDate        |
+-------------------+
|2010-12-01 08:34:00|
|2010-12-01 11:33:00|
|2010-12-01 11:45:00|
|2010-12-01 11:49:00|
|2010-12-01 12:23:00|
+-------------------+
only showing top 5 rows


In [19]:
data_clean.select("UnitPrice_cleaned").describe().show()

+-------+------------------+
|summary| UnitPrice_cleaned|
+-------+------------------+
|  count|            392732|
|   mean|3.1255955307944383|
| stddev|22.240725281426112|
|    min|               0.0|
|    max|           8142.75|
+-------+------------------+



On supprime le UnitPrice original en string.

In [20]:
data_clean = data_clean.drop("UnitPrice").withColumnRenamed("UnitPrice_cleaned", "UnitPrice")

On vérifie que tout est OK.

In [21]:
data_clean.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = false)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- UnitPrice: double (nullable = true)



In [22]:
null_counts = data_clean.select([F.sum(F.col(c).isNull().cast("int")).alias(c)
             for c in data_clean.columns])
null_counts.show()

+---------+---------+-----------+--------+-----------+----------+-------+---------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|CustomerID|Country|UnitPrice|
+---------+---------+-----------+--------+-----------+----------+-------+---------+
|        0|        0|          0|       0|          0|         0|      0|        0|
+---------+---------+-----------+--------+-----------+----------+-------+---------+



**CREATION DE NOUVELLES COLONNES**

On crée une nouvelle colonne "TotalPrice" pour obtenir le total du prix par commande.

In [23]:
data_clean = data_clean.withColumn("TotalPrice", F.col("Quantity") * F.col("UnitPrice"))
data_clean.select("TotalPrice").show(5)

+----------+
|TotalPrice|
+----------+
|      25.5|
|      71.5|
|      2.52|
|      34.8|
|     13.52|
+----------+
only showing top 5 rows


**CREATION DU DATAFRAME RFM**

On copie le dataset original et on refiltre par mesure de sûreté pour que quantity et UnitPrice soient superieurs à zéro.

In [24]:
data_rfm = (data_clean.filter(F.col("Quantity") > 0)
            .filter(F.col("UnitPrice") > 0))

On trouve notre date de référence pour calculer le temps passé depuis la dernière commande du client par rapport à la date de commande la plus récente de notre dataset. 

In [25]:
ref_date = (data_rfm
            .agg(F.max("InvoiceDate").alias("MaxDate"))
            .collect()[0]["MaxDate"])
print("Date de référence pour le calcul de la récence :", ref_date)

Date de référence pour le calcul de la récence : 2011-12-09 12:50:00


On aggrège RFM par client.

In [26]:
rfm = (
    data_rfm
    .groupBy("CustomerID")
    .agg(

        # Recency : nombre de jours depuis la dernière commande
        F.datediff(F.lit(ref_date),
                   F.max("InvoiceDate")
                   ).alias("Recency"), 

        # Frequency : nombre de commandes passées
        F.countDistinct("InvoiceNo").alias("Frequency"),

        # Monetary : valeur monétaire totale des commandes
        F.sum("TotalPrice").alias("Monetary")
        
)
)

Les variables RFM ont été calculées au niveau client.
La récence correspond au nombre de jours écoulés depuis la dernière commande, la fréquence au nombre de factures distinctes, et la valeur monétaire au montant total dépensé par client.

In [27]:
rfm.show(5)
rfm.printSchema()


+----------+-------+---------+------------------+
|CustomerID|Recency|Frequency|          Monetary|
+----------+-------+---------+------------------+
|     17389|      0|       34|          31833.68|
|     14450|    180|        3|            483.25|
|     15727|     16|        7|5159.0599999999995|
|     15790|     10|        1|            218.75|
|     13285|     23|        4|2709.1200000000003|
+----------+-------+---------+------------------+
only showing top 5 rows
root
 |-- CustomerID: integer (nullable = true)
 |-- Recency: integer (nullable = true)
 |-- Frequency: long (nullable = false)
 |-- Monetary: double (nullable = true)



**ASSEMBLAGE DES FEATURES EN VECTOR**

In [30]:
assembler = VectorAssembler(inputCols=["Recency", "Frequency", "Monetary"], outputCol="rfm_features")
rfm_assembled = assembler.transform(rfm)
rfm_assembled.select("CustomerID", "rfm_features").show(5, truncate=False)

+----------+-----------------------------+
|CustomerID|rfm_features                 |
+----------+-----------------------------+
|17389     |[0.0,34.0,31833.68]          |
|14450     |[180.0,3.0,483.25]           |
|15727     |[16.0,7.0,5159.0599999999995]|
|15790     |[10.0,1.0,218.75]            |
|13285     |[23.0,4.0,2709.1200000000003]|
+----------+-----------------------------+
only showing top 5 rows


**STANDARDSCALER POUR METTRE TOUTES LES FEATURES A LA MEME ECHELLE**

In [32]:
scaler = StandardScaler(inputCol="rfm_features", 
                        outputCol="rfm_scaled",
                        withMean=True,
                        withStd=True)

scaler_model = scaler.fit(rfm_assembled)

rfm_scaled = scaler_model.transform(rfm_assembled)

In [33]:
rfm_scaled.select("CustomerID", "rfm_scaled").show(5, truncate=False)

+----------+---------------------------------------------------------------+
|CustomerID|rfm_scaled                                                     |
+----------+---------------------------------------------------------------+
|17389     |[-0.9204818540021659,3.8617814551576157,3.3148835577960494]    |
|14450     |[0.879297416717677,-0.16523968726395577,-0.17422348035072524]  |
|15727     |[-0.7605014743826243,0.35437594401624695,0.3461649666350759]   |
|15790     |[-0.8204941167399523,-0.42504750290405713,-0.2036606782325349] |
|13285     |[-0.6905100582990747,-0.035335779443905084,0.07350194743446822]|
+----------+---------------------------------------------------------------+
only showing top 5 rows


In [34]:
rfm_final = rfm_scaled.select(
    "CustomerID",
    "Recency",
    "Frequency",
    "Monetary",
    "rfm_scaled"
)
rfm_final.show(5, truncate=False)

+----------+-------+---------+------------------+---------------------------------------------------------------+
|CustomerID|Recency|Frequency|Monetary          |rfm_scaled                                                     |
+----------+-------+---------+------------------+---------------------------------------------------------------+
|17389     |0      |34       |31833.68          |[-0.9204818540021659,3.8617814551576157,3.3148835577960494]    |
|14450     |180    |3        |483.25            |[0.879297416717677,-0.16523968726395577,-0.17422348035072524]  |
|15727     |16     |7        |5159.0599999999995|[-0.7605014743826243,0.35437594401624695,0.3461649666350759]   |
|15790     |10     |1        |218.75            |[-0.8204941167399523,-0.42504750290405713,-0.2036606782325349] |
|13285     |23     |4        |2709.1200000000003|[-0.6905100582990747,-0.035335779443905084,0.07350194743446822]|
+----------+-------+---------+------------------+---------------------------------------

**CLUSTERING**

On prépare l'évaluateur commun aux 3 modèles. 
L’indice de Silhouette a été privilégié car il fournit une mesure quantitative et objective de la qualité du clustering, contrairement à la méthode du coude qui repose sur une interprétation visuelle plus subjective. La distance euclidienne au carré est utilisée car elle est cohérente avec les algorithmes de type KMeans et adaptée à des variables RFM standardisées.

In [47]:
evaluator = ClusteringEvaluator(
    featuresCol="rfm_scaled",
    metricName="silhouette",
    distanceMeasure="squaredEuclidean"
)

BASELINE - K-Means

In [48]:
kmeans_scores = []

for k in range(2,11): 
    km = KMeans(featuresCol="rfm_scaled", k=k, seed=42)
    model = km.fit(rfm_final)
    preds = model.transform(rfm_final)
    score = evaluator.evaluate(preds)
    wssse = model.summary.trainingCost
    kmeans_scores.append(("KMeans" , k, score, wssse))

MODEL 1 - BisectingKMeans

In [49]:
bkm_scores = []

for k in range(2, 11):
    bkm = BisectingKMeans(
        featuresCol="rfm_scaled",
        k=k,
        seed=42
    )
    model = bkm.fit(rfm_final)
    preds = model.transform(rfm_final)
    score = evaluator.evaluate(preds)
    wssse = model.summary.trainingCost
    bkm_scores.append(("BisectingKMeans", k, score, wssse))

MODEL 2 - Gaussian Mixture Model

In [50]:
gmm_scores = []

for k in range(2, 11):
    gmm = GaussianMixture(
        featuresCol="rfm_scaled",
        k=k,
        seed=42
    )
    model = gmm.fit(rfm_final)
    preds = model.transform(rfm_final)
    score = evaluator.evaluate(preds)
    gmm_scores.append(("GMM", k, score))

In [56]:
all_scores = kmeans_scores + bkm_scores + gmm_scores
all_scores_df = pd.DataFrame(all_scores, columns=["Algorithm", "K", "Silhouette Score", "WSSSE"])
all_scores_df.sort_values(by=["Silhouette Score", "K"], inplace=True, ascending=[False,True])
all_scores_df["WSSSE_interpretation"] = all_scores_df["WSSSE"].apply(
    lambda x: "Not applicable (GMM)" if pd.isna(x) else "Applicable"
)
all_scores_df.head(all_scores_df.shape[0])

Unnamed: 0,Algorithm,K,Silhouette Score,WSSSE,WSSSE_interpretation
0,KMeans,2,0.988439,9099.285738,Applicable
2,KMeans,4,0.77983,4124.713167,Applicable
4,KMeans,6,0.769492,2502.359785,Applicable
3,KMeans,5,0.769278,3355.156414,Applicable
10,BisectingKMeans,3,0.745191,5441.888352,Applicable
1,KMeans,3,0.713283,5500.564474,Applicable
5,KMeans,7,0.693311,2023.319729,Applicable
12,BisectingKMeans,5,0.685017,3993.539094,Applicable
8,KMeans,10,0.655419,1300.107134,Applicable
6,KMeans,8,0.652315,1715.554375,Applicable


Bien que KMeans avec K=2 présente le score de Silhouette le plus élevé, cette solution conduit à une segmentation trop grossière.
Le modèle BisectingKMeans avec K=3 offre un bon compromis entre qualité de clustering, interprétabilité métier et actionnabilité marketing.
Il a donc été retenu comme modèle final.

**ENTRAINEMENT MODELE CHOISI**

In [57]:
bkm = BisectingKMeans(
    featuresCol="rfm_scaled",
    k=3,
    seed=42
)

bkm_model = bkm.fit(rfm_final)

In [58]:
rfm_clustered = bkm_model.transform(rfm_final)

On renomme la colonne prediction en "cluster" et affiche a quel cluster chaque data-point appartient. 

In [61]:
rfm_clustered = rfm_clustered.withColumnRenamed("prediction", "cluster")

In [62]:
rfm_clustered.select("CustomerID", "cluster").show(10)

+----------+-------+
|CustomerID|cluster|
+----------+-------+
|     17389|      0|
|     14450|      2|
|     15727|      0|
|     15790|      0|
|     13285|      0|
|     16574|      0|
|     14570|      2|
|     15619|      0|
|     14465|      2|
|     15967|      0|
+----------+-------+
only showing top 10 rows


On récupère les centroides des clusters.

In [63]:
centroids = bkm_model.clusterCenters()
centroids


[array([-0.51812558,  0.05513084, -0.01963845]),
 array([-0.86587317,  8.07366585,  9.32390022]),
 array([ 1.52042023, -0.34924089, -0.16213205])]

**Analyse des segments**

In [64]:
cluster_analysis = (
    rfm_clustered
    .groupBy("cluster")
    .agg(
        F.count("*").alias("nb_clients"),
        F.avg("Recency").alias("avg_recency"),
        F.avg("Frequency").alias("avg_frequency"),
        F.avg("Monetary").alias("avg_monetary")
    ).orderBy("cluster") 
)
cluster_analysis.show()

+-------+----------+------------------+------------------+-----------------+
|cluster|nb_clients|       avg_recency|     avg_frequency|     avg_monetary|
+-------+----------+------------------+------------------+-----------------+
|      0|      3205|40.240561622464895| 4.696411856474259|1872.232088299532|
|      1|        26| 5.461538461538462| 66.42307692307692|85826.07807692309|
|      2|      1107| 244.1201445347787|1.5835591689250226|591.8943279132804|
+-------+----------+------------------+------------------+-----------------+

