## 1) Initialisation & Chargement des données

In [3]:
import pyspark
print(pyspark.__version__)

3.5.3


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, datediff, max as spark_max, sum as spark_sum, countDistinct, year, month, dayofweek, hour, quarter, count, round as spark_round, avg as spark_avg, when, desc


In [5]:
spark = SparkSession.builder.appName("Spark_Final_Project").master("local[*]").getOrCreate()

df_raw = spark.read.option("Header", True).option("inferSchema", True).option("delimiter", ";").csv("../data/raw/Online_Retail_CSV.csv")

df_raw.printSchema()
df_raw.show(5)

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01/12/2010 08:26|     2,55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01/12/2010 08:26|     3,39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01/12/2010 08:26|     2,75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNI

In [6]:
print(f"Il y a {df_raw.count()} lignes.")
print(f"Il y a {len(df_raw.columns)} colonnes.")

Il y a 541909 lignes.
Il y a 8 colonnes.


## 2) Exploration & Prétraitement

## A) Analyse Descriptive

Tout d'abord, regardons combien il y a de clients référencés dans cette base de données :

In [7]:
print(f"Il y a {df_raw.select('CustomerID').distinct().count()} clients.")

Il y a 4373 clients.


Maintenant, regardons le nombre total de transactions :

In [8]:
transaction_nb = df_raw.select('InvoiceNo').dropDuplicates().count()
print(f"Il y a {transaction_nb} transactions uniques.")

Il y a 25900 transactions uniques.


Regardons de plus près la distribution de la variable Quantity :

In [9]:
df_raw.select("Quantity").describe().show()

+-------+------------------+
|summary|          Quantity|
+-------+------------------+
|  count|            541909|
|   mean|  9.55224954743324|
| stddev|218.08115785023406|
|    min|            -80995|
|    max|             80995|
+-------+------------------+



Nous observons qu’en moyenne, une ligne de commande contient environ 9 produits.  
Cependant, l’écart-type est relativement élevé (218), ce qui indique une forte dispersion des valeurs et la présence d’outliers dans la variable Quantity.  
En effet, certaines transactions présentent des quantités extrêmement faibles ou élevées, allant jusqu’à –80 995 et +80 995 unités.  
Ces valeurs extrêmes contribuent fortement à l’augmentation de l’écart-type et sont susceptibles de correspondre à des retours de produits, des annulations de commandes ou des commandes en gros.

Regardons egalement la distribution de la variable UnitPrice :

In [10]:
df_raw.select("UnitPrice").describe().show()

+-------+------------------+
|summary|         UnitPrice|
+-------+------------------+
|  count|            541909|
|   mean|29.921163668665333|
| stddev| 595.7455525989114|
|    min|         -11062,06|
|    max|             99,96|
+-------+------------------+



Nous observons que le prix unitaire moyen d’un produit est d’environ 30 unités monétaires.  
Toutefois, l’écart-type est particulièrement élevé (596), ce qui traduit une forte dispersion des valeurs et la présence d’outliers dans la variable UnitPrice.  
En effet, certaines transactions présentent des valeurs négatives, pouvant atteindre –11 062, qui ne correspondent pas à des prix réels mais sont probablement liées à des remboursements, des annulations de commandes ou des écritures de correction comptable.  
Ces valeurs extrêmes contribuent fortement à l’augmentation de l’écart-type.

Maintenant, intéressons-nous au nombre de pays clients :

In [11]:
client_countries = df_raw.select("Country").dropDuplicates().count()
print(f"Il y a {client_countries} pays qui sont clients.")

Il y a 38 pays qui sont clients.


Regardons le nombre de commandes par pays : 

In [12]:
df_raw.select("InvoiceNo", "Country").dropDuplicates().groupby("Country").count().orderBy("count", ascending=False).show()

+---------------+-----+
|        Country|count|
+---------------+-----+
| United Kingdom|23494|
|        Germany|  603|
|         France|  461|
|           EIRE|  360|
|        Belgium|  119|
|          Spain|  105|
|    Netherlands|  101|
|    Switzerland|   74|
|       Portugal|   71|
|      Australia|   69|
|          Italy|   55|
|        Finland|   48|
|         Sweden|   46|
|         Norway|   40|
|Channel Islands|   33|
|          Japan|   28|
|         Poland|   24|
|        Denmark|   21|
|         Cyprus|   20|
|        Austria|   19|
+---------------+-----+
only showing top 20 rows



Nous observons que le jeu de données comprend des clients provenant de 38 pays différents, ce qui indique une base de clientèle internationale.  
Toutefois, une très grande proportion des transactions est réalisée au Royaume-Uni, avec plus de 22 000 factures uniques, tandis que le deuxième pays, l’Allemagne, ne compte que 603 transactions.  
Cela met en évidence un fort déséquilibre géographique, le Royaume-Uni dominant largement le jeu de données par rapport aux autres pays.

Regardons maintenant le nombre total de commandes par clients, cela nous permettra de distinguer les clients habituels des clients ponctuels : 

In [13]:
df_raw.select("CustomerID", "InvoiceNo").dropDuplicates().groupBy("CustomerID").count().orderBy("count", ascending=False).show()

+----------+-----+
|CustomerID|count|
+----------+-----+
|      NULL| 3710|
|     14911|  248|
|     12748|  224|
|     17841|  169|
|     14606|  128|
|     15311|  118|
|     13089|  118|
|     12971|   89|
|     14527|   86|
|     13408|   81|
|     14646|   77|
|     16029|   76|
|     16422|   75|
|     14156|   66|
|     13798|   63|
|     18102|   62|
|     13694|   60|
|     15061|   55|
|     17450|   55|
|     16013|   54|
+----------+-----+
only showing top 20 rows



On remarque la présence d’un groupe CustomerID = NULL avec un nombre élevé de commandes. Cette observation suggère la présence de transactions sans identification client, ce qui motive l’analyse des valeurs manquantes présentée dans la section suivante.

## B) Vérifications de valeurs manquantes

Avant toute analyse plus poussée, il est important de vérifier les valeurs nulles :  

In [14]:
from pyspark.sql.functions import col, sum, when

df_raw.select([sum(when(col(c).isNull(), 1 ).otherwise(0)).alias(c) for c in df_raw.columns]).show()

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|        0|        0|       1454|       0|          0|        0|    135080|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



Nous observons la présence de valeurs manquantes dans deux colonnes sur huit : Description et CustomerID.  
La colonne Description contient un nombre relativement limité de valeurs manquantes (1 454), ce qui reste marginal au regard de la taille du jeu de données et n’impactera pas significativement l’analyse, d’autant plus que cette variable ne sera pas utilisée directement dans les modèles.  
En revanche, la colonne CustomerID présente un nombre important de valeurs manquantes (135 080), ce qui constitue un enjeu majeur pour l’analyse, notamment pour la segmentation client. Ces lignes devront faire l’objet d’un traitement spécifique lors de la phase de nettoyage des données.

Pour pallier au problème de valeurs manquantes mais egalement de prix et de quantité négatives, nous allons créer un nouveau dataset filtré sur ces trois conditions :  
1 - Les valeurs de CustomerID doivent êtres Non Nulles  
2 - Les valeurs de Quantity doivent êtres > 0  
3 - Les valeurs unitaires des produits doivent êtres > 0  

Nous créons donc ce dataset :

# Nettoyage des données

In [15]:
df_cleanV1 = (
    df_raw
    .filter(col("CustomerID").isNotNull())
    .filter(col("Quantity") > 0)
    .filter(col("UnitPrice") > 0)
)

La colonne InvoiceDate est initialement stockée sous forme de chaîne de caractères.  
Afin de permettre les calculs temporels nécessaires à l’analyse RFM, cette variable est convertie en format timestamp.

In [16]:
from pyspark.sql.functions import to_timestamp, col

df_cleanV1 = df_cleanV1.withColumn("InvoiceDate", to_timestamp(col("InvoiceDate"), "dd/MM/yyyy HH:mm"))
df_cleanV1.printSchema()
df_cleanV1.select("InvoiceDate").show(5)

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

+-------------------+
|        InvoiceDate|
+-------------------+
|2010-12-01 08:45:00|
|2010-12-01 10:29:00|
|2010-12-01 11:27:00|
|2010-12-01 13:04:00|
|2010-12-01 14:05:00|
+-------------------+
only showing top 5 rows



In [17]:
from pyspark.sql.functions import regexp_replace

df_cleanV1 = df_cleanV1.withColumn("UnitPrice", regexp_replace(col("UnitPrice"), ",", "." ).cast("double"))

df_cleanV1.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



## Ajout de Colonnes

Création Colonne TotalAmount

In [18]:
df_cleanV1 = df_cleanV1.withColumn("TotalAmount", col("Quantity") * col("UnitPrice"))

print("✓ Nouvelle colonne créée : TotalAmount = Quantity × UnitPrice")
print("\n--- Aperçu du dataset enrichi ---")
df_cleanV1.select("InvoiceNo", "CustomerID", "Quantity", "UnitPrice", "TotalAmount", "InvoiceDate").show(10)


✓ Nouvelle colonne créée : TotalAmount = Quantity × UnitPrice

--- Aperçu du dataset enrichi ---
+---------+----------+--------+---------+-----------+-------------------+
|InvoiceNo|CustomerID|Quantity|UnitPrice|TotalAmount|        InvoiceDate|
+---------+----------+--------+---------+-----------+-------------------+
|   536370|     12583|       3|     18.0|       54.0|2010-12-01 08:45:00|
|   536392|     13705|       1|    165.0|      165.0|2010-12-01 10:29:00|
|   536403|     12791|       1|     15.0|       15.0|2010-12-01 11:27:00|
|   536527|     12662|       1|     18.0|       18.0|2010-12-01 13:04:00|
|   536540|     14911|       1|     50.0|       50.0|2010-12-01 14:05:00|
|   536779|     15823|       1|     15.0|       15.0|2010-12-02 15:08:00|
|   536835|     13145|       1|    295.0|      295.0|2010-12-02 18:06:00|
|   536840|     12738|       1|     18.0|       18.0|2010-12-02 18:27:00|
|   536852|     12686|       1|     18.0|       18.0|2010-12-03 09:51:00|
|   536858|    

Statistiques descriptives du dataset nettoyé

In [19]:
print("\n--- Statistiques descriptives après nettoyage ---")
df_cleanV1.select("Quantity", "UnitPrice", "TotalAmount").describe().show()


--- Statistiques descriptives après nettoyage ---
+-------+------------------+-----------------+------------------+
|summary|          Quantity|        UnitPrice|       TotalAmount|
+-------+------------------+-----------------+------------------+
|  count|              1712|             1712|              1712|
|   mean| 5.136682242990654|35.89778037383178| 75.98072429906541|
| stddev|18.870440432713025|76.74851762139058|105.17974474580936|
|    min|                 1|              1.0|               1.0|
|    max|               392|           2500.0|            2500.0|
+-------+------------------+-----------------+------------------+



## Colonnes temporelles


Pour identifier les patterns d'achat (clients qui achètent plutôt le weekend, en soirée, etc.)

In [20]:
# Extraire les composantes de la date
df_cleanV1 = df_cleanV1.withColumn("Year", year("InvoiceDate"))
df_cleanV1 = df_cleanV1.withColumn("Month", month("InvoiceDate"))
df_cleanV1 = df_cleanV1.withColumn("Quarter", quarter("InvoiceDate"))  # Trimestre (1-4)
df_cleanV1 = df_cleanV1.withColumn("DayOfWeek", dayofweek("InvoiceDate"))  # 1=Dimanche, 7=Samedi
df_cleanV1 = df_cleanV1.withColumn("Hour", hour("InvoiceDate"))

# Weekend ou semaine ?
df_cleanV1 = df_cleanV1.withColumn(
    "IsWeekend",
    when((col("DayOfWeek") == 1) | (col("DayOfWeek") == 7), 1).otherwise(0)
)

# Période de la journée
df_cleanV1 = df_cleanV1.withColumn(
    "TimeOfDay",
    when((col("Hour") >= 6) & (col("Hour") < 12), "Morning")
    .when((col("Hour") >= 12) & (col("Hour") < 18), "Afternoon")
    .when((col("Hour") >= 18) & (col("Hour") < 22), "Evening")
    .otherwise("Night")
)

In [25]:
# Analyses colonnes temporelles
print("\n--- Par période de la journée")
df_cleanV1.groupBy("TimeOfDay").agg(
    count("*").alias("Nb"),
    spark_round(spark_sum("TotalAmount"), 0).alias("Revenu"),
    spark_round(spark_avg("TotalAmount"), 2).alias("Moyen")
).show()

print("\n--- Weekend vs Semaine ---")
df_cleanV1.groupBy(when(col("IsWeekend") == 1, "Weekend").otherwise("Semaine").alias("Type")).agg(
    count("*").alias("Nb"),
    spark_round(spark_sum("TotalAmount"), 0).alias("Revenu"),
    spark_round(spark_avg("TotalAmount"), 2).alias("Moyenne")
).show()

print("\n--- Par Mois ---")
df_cleanV1.groupBy("Month").agg(
    spark_round(spark_sum("TotalAmount"), 0).alias("Revenu"),
    spark_round(spark_avg("TotalAmount"), 2).alias("Moyenne")
).orderBy("Month").show()


--- Par période de la journée
+---------+----+-------+------+
|TimeOfDay|  Nb| Revenu| Moyen|
+---------+----+-------+------+
|  Evening|  38| 5508.0|144.95|
|  Morning| 658|51880.0| 78.84|
|Afternoon|1016|72691.0| 71.55|
+---------+----+-------+------+


--- Weekend vs Semaine ---
+-------+----+--------+-------+
|   Type|  Nb|  Revenu|Moyenne|
+-------+----+--------+-------+
|Semaine|1578|122051.0|  77.35|
|Weekend| 134|  8028.0|  59.91|
+-------+----+--------+-------+


--- Par Mois ---
+-----+-------+-------+
|Month| Revenu|Moyenne|
+-----+-------+-------+
|    1| 8943.0|  90.33|
|    2| 5794.0|  70.66|
|    3|11241.0|  92.14|
|    4| 7338.0|  94.08|
|    5| 8336.0|  72.49|
|    6| 8766.0|  75.57|
|    7| 6415.0|  59.95|
|    8|12021.0|  86.48|
|    9|13196.0|  72.11|
|   10|12845.0|  65.87|
|   11|22760.0|  76.38|
|   12|12424.0|   69.8|
+-----+-------+-------+



# 3) SEGMENTATION CLIENT (NON SUPERVISÉ)

 --- 1. CRÉATION DES VARIABLES RFM ---

Calculer la date de référence (date maximale dans le dataset)

In [22]:
reference_date = df_cleanV1.select(spark_max("InvoiceDate")).first()[0]
print(f"Date de référence pour Recency : {reference_date}")

Date de référence pour Recency : 2011-12-09 12:16:00


Calculer les variables RFM par client

In [23]:
rfm_data = df_cleanV1.groupBy("CustomerID").agg(
    datediff(lit(reference_date), spark_max("InvoiceDate")).alias("Recency"),
    countDistinct("InvoiceNo").alias("Frequency"),  # compte les factures uniques
    spark_sum("TotalAmount").alias("Monetary")     # total dépensé
)

# Affichage d'un aperçu
rfm_data.show(10)
rfm_data.describe().show()

+----------+-------+---------+--------+
|CustomerID|Recency|Frequency|Monetary|
+----------+-------+---------+--------+
|     15727|    359|        1|   500.0|
|     12471|      2|       21|  2400.0|
|     16500|    330|        1|   165.0|
|     12626|     23|        9|   666.0|
|     12715|    106|        1|    80.0|
|     12367|      4|        1|    18.0|
|     17223|    368|        1|    35.0|
|     16828|     93|        1|    15.0|
|     13188|     11|        1|    15.0|
|     17190|    207|        1|   195.0|
+----------+-------+---------+--------+
only showing top 10 rows

+-------+------------------+------------------+-----------------+------------------+
|summary|        CustomerID|           Recency|        Frequency|          Monetary|
+-------+------------------+------------------+-----------------+------------------+
|  count|               633|               633|              633|               633|
|   mean|13995.001579778831|109.31911532385466|2.541864139020537|205.49605

## Verification du Dataframe RFM

In [None]:
rfm_data.printSchema()

# Vérification des valeurs nulles
rfm_data.select([
    spark_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) 
    for c in rfm_data.columns
]).show()

root
 |-- CustomerID: integer (nullable = true)
 |-- Recency: integer (nullable = true)
 |-- Frequency: long (nullable = false)
 |-- Monetary: double (nullable = true)

+----------+-------+---------+--------+
|CustomerID|Recency|Frequency|Monetary|
+----------+-------+---------+--------+
|         0|      0|        0|       0|
+----------+-------+---------+--------+



## ASSEMBLAGE ET STANDARDISATION DES FEATURES

In [27]:
from pyspark.ml.feature import VectorAssembler, StandardScaler


In [28]:
# Assembler les features
assembler = VectorAssembler(
    inputCols=["Recency", "Frequency", "Monetary"],
    outputCol="features_raw"
)

rfm_assembled = assembler.transform(rfm_data)

# Standardiser
scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,
    withMean=True
)

scaler_model = scaler.fit(rfm_assembled)
rfm_scaled = scaler_model.transform(rfm_assembled)

rfm_scaled.select("CustomerID", "Recency", "Frequency", "Monetary", "features").show(5, truncate=False)



+----------+-------+---------+--------+----------------------------------------------------------------+
|CustomerID|Recency|Frequency|Monetary|features                                                        |
+----------+-------+---------+--------+----------------------------------------------------------------+
|15727     |359    |1        |500.0   |[2.2394914627481404,-0.33906357100941437,0.5589527595717295]    |
|12471     |2      |21       |2400.0  |[-0.9625896786980496,4.0590356185184335,4.165051235263411]      |
|16500     |330    |1        |165.0   |[1.97937842884915,-0.33906357100941437,-0.07685934008969834]    |
|12626     |23     |9        |666.0   |[-0.7742319644953325,1.420176104801725,0.8740118895532133]      |
|12715     |106    |1        |80.0    |[-0.029770522646498418,-0.33906357100941437,-0.2381847982127472]|
+----------+-------+---------+--------+----------------------------------------------------------------+
only showing top 5 rows

