In [1]:
from pyspark.sql import SparkSession

# Création de la session PySpark
spark = SparkSession.builder \
    .appName("Segmentation_Clients_Expresso") \
    .getOrCreate()

In [2]:
# Chargement du fichier
df = spark.read.csv("Expresso_data.csv", header=True, inferSchema=True)

df.show(5)

+--------------------+------+-------------+-------+--------------+-------+------------+---------+-----------+------+------+----+-----+-----+---+----------+--------------------+-------------+-----+
|             user_id|REGION|       TENURE|MONTANT|FREQUENCE_RECH|REVENUE|ARPU_SEGMENT|FREQUENCE|DATA_VOLUME|ON_NET|ORANGE|TIGO|ZONE1|ZONE2|MRG|REGULARITY|            TOP_PACK|FREQ_TOP_PACK|CHURN|
+--------------------+------+-------------+-------+--------------+-------+------------+---------+-----------+------+------+----+-----+-----+---+----------+--------------------+-------------+-----+
|00000bfd7d50f0109...|FATICK| K > 24 month| 4250.0|          15.0| 4251.0|      1417.0|     17.0|        4.0| 388.0|  46.0| 1.0|  1.0|  2.0| NO|        54|On net 200F=Unlim...|          8.0|    0|
|00000cb4a5d760de8...|  NULL|I 18-21 month|   NULL|          NULL|   NULL|        NULL|     NULL|       NULL|  NULL|  NULL|NULL| NULL| NULL| NO|         4|                NULL|         NULL|    1|
|00001654a9d9f9

In [3]:
# Affiche les types de colonnes et s'il y a des nulls
df.printSchema()

print(f"Nombre de lignes : {df.count()}")
print(f"Nombre de colonnes : {len(df.columns)}")

root
 |-- user_id: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- TENURE: string (nullable = true)
 |-- MONTANT: double (nullable = true)
 |-- FREQUENCE_RECH: double (nullable = true)
 |-- REVENUE: double (nullable = true)
 |-- ARPU_SEGMENT: double (nullable = true)
 |-- FREQUENCE: double (nullable = true)
 |-- DATA_VOLUME: double (nullable = true)
 |-- ON_NET: double (nullable = true)
 |-- ORANGE: double (nullable = true)
 |-- TIGO: double (nullable = true)
 |-- ZONE1: double (nullable = true)
 |-- ZONE2: double (nullable = true)
 |-- MRG: string (nullable = true)
 |-- REGULARITY: integer (nullable = true)
 |-- TOP_PACK: string (nullable = true)
 |-- FREQ_TOP_PACK: double (nullable = true)
 |-- CHURN: integer (nullable = true)

Nombre de lignes : 2154048
Nombre de colonnes : 19


In [4]:
from pyspark.sql.functions import approx_count_distinct

# Récupération du nombre de valeurs uniques par colonne
unique_counts = {
    c: df.select(approx_count_distinct(c)).first()[0]
    for c in df.columns
}

# Affichage trié avec Pandas
import pandas as pd
pd.DataFrame.from_dict(unique_counts, orient="index", columns=["Valeurs Uniques"])\
    .sort_values("Valeurs Uniques", ascending=False)

Unnamed: 0,Valeurs Uniques
user_id,2072614
DATA_VOLUME,40472
REVENUE,35650
ARPU_SEGMENT,14786
ON_NET,8989
MONTANT,5910
ORANGE,2894
TIGO,1288
ZONE1,625
ZONE2,481


In [5]:
from pyspark.sql.functions import count

# Nombre de lignes totales vs distinctes
total_rows = df.count()
distinct_rows = df.distinct().count()

print(f"Doublons détectés : {total_rows - distinct_rows}")

Doublons détectés : 0


In [6]:
from pyspark.sql.functions import col, sum, isnan
from pyspark.sql.types import DoubleType, FloatType

# Nombre total de lignes
total_rows = df.count()

# Initialisation du dictionnaire des pourcentages de valeurs manquantes
percent_missing = {}

# Traitement colonne par colonne
for c in df.columns:
    if isinstance(df.schema[c].dataType, (DoubleType, FloatType)):
        null_count = df.filter(col(c).isNull() | isnan(col(c))).count()
    else:
        null_count = df.filter(col(c).isNull()).count()
    
    pourcentage = (null_count / total_rows) * 100
    percent_missing[c] = round(pourcentage, 2)

# Affichage trié
import pandas as pd
pd.DataFrame.from_dict(percent_missing, orient="index", columns=["% Manquant"]).sort_values("% Manquant", ascending=False)

Unnamed: 0,% Manquant
ZONE2,93.65
ZONE1,92.12
TIGO,59.89
DATA_VOLUME,49.23
FREQ_TOP_PACK,41.9
TOP_PACK,41.9
ORANGE,41.56
REGION,39.43
ON_NET,36.52
MONTANT,35.13


In [7]:
# Suppression des colonnes choisies
df = df.drop("user_id", "TIGO", "ZONE1", "ZONE2")

In [8]:
from pyspark.sql.functions import col, round

# Nombre total de lignes
total_rows = df.count()

# Calcul du pourcentage par valeur de CHURN
df.groupBy("CHURN")\
  .count()\
  .withColumn("Pourcentage", round((col("count") / total_rows) * 100, 2))\
  .show()

+-----+-------+-----------+
|CHURN|  count|Pourcentage|
+-----+-------+-----------+
|    1| 403986|      18.75|
|    0|1750062|      81.25|
+-----+-------+-----------+



In [9]:
from pyspark.sql.types import StringType, NumericType

# Sélection des colonnes de type string (équivalent object/category)
cat_vars = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]

# Sélection des colonnes numériques
num_vars = [f.name for f in df.schema.fields if isinstance(f.dataType, NumericType)]

print("Variables catégorielles :", cat_vars)
print("Variables numériques :", num_vars)

Variables catégorielles : ['REGION', 'TENURE', 'MRG', 'TOP_PACK']
Variables numériques : ['MONTANT', 'FREQUENCE_RECH', 'REVENUE', 'ARPU_SEGMENT', 'FREQUENCE', 'DATA_VOLUME', 'ON_NET', 'ORANGE', 'REGULARITY', 'FREQ_TOP_PACK', 'CHURN']


In [10]:
from pyspark.sql.functions import col, mean, count

# 1. Remplacer les valeurs manquantes dans les colonnes numériques par la moyenne
for c in num_vars:
    mean_value = df.select(mean(col(c))).first()[0]
    if mean_value is not None:
        df = df.fillna({c: mean_value})

# 2. Remplacer les valeurs manquantes dans les colonnes catégorielles par la valeur la plus fréquente (mode)
for c in cat_vars:
    try:
        mode_value = df.groupBy(c).count().orderBy(col("count").desc()).first()[0]
        if mode_value is not None:
            df = df.fillna({c: mode_value})
    except:
        print(f"Impossible de calculer le mode pour la colonne {c}")

In [11]:
from pyspark.sql.functions import col, isnan, sum
from pyspark.sql.types import DoubleType, FloatType

# Calcul du nombre de valeurs manquantes par colonne
missing_counts = {}

for c in df.columns:
    if isinstance(df.schema[c].dataType, (DoubleType, FloatType)):
        count_missing = df.filter(col(c).isNull() | isnan(col(c))).count()
    else:
        count_missing = df.filter(col(c).isNull()).count()
    if count_missing > 0:
        missing_counts[c] = count_missing

# Affichage propre
import pandas as pd
pd.DataFrame.from_dict(missing_counts, orient="index", columns=["Valeurs manquantes"]).sort_values("Valeurs manquantes", ascending=False)

Unnamed: 0,Valeurs manquantes
TOP_PACK,902594
REGION,849299


In [12]:
df.select("TOP_PACK").filter(col("TOP_PACK").isNotNull()).show(10, truncate=False)
df.select("REGION").filter(col("REGION").isNotNull()).show(10, truncate=False)

+-----------------------------------------+
|TOP_PACK                                 |
+-----------------------------------------+
|On net 200F=Unlimited _call24H           |
|On-net 1000F=10MilF;10d                  |
|Data:1000F=5GB,7d                        |
|Mixt 250F=Unlimited_call24H              |
|MIXT:500F= 2500F on net _2500F off net;2d|
|All-net 500F=2000F;5d                    |
|On-net 500F_FNF;3d                       |
|On net 200F=Unlimited _call24H           |
|All-net 500F=2000F;5d                    |
|Data: 100 F=40MB,24H                     |
+-----------------------------------------+
only showing top 10 rows

+-----------+
|REGION     |
+-----------+
|FATICK     |
|DAKAR      |
|DAKAR      |
|LOUGA      |
|LOUGA      |
|DAKAR      |
|DAKAR      |
|TAMBACOUNDA|
|DAKAR      |
|KAOLACK    |
+-----------+
only showing top 10 rows



In [13]:
from pyspark.sql.functions import col, trim, lower, upper, when

# Nettoyage de base
df = df.withColumn("TOP_PACK", trim(lower(col("TOP_PACK"))))
df = df.withColumn("REGION", trim(upper(col("REGION"))))  # on standardise en majuscule

# Remplacer les chaînes vides ou "nan"
df = df.withColumn("TOP_PACK", when((col("TOP_PACK") == "") | (col("TOP_PACK") == "nan"), None).otherwise(col("TOP_PACK")))
df = df.withColumn("REGION", when((col("REGION") == "") | (col("REGION") == "nan"), None).otherwise(col("REGION")))

# Tentative de remplissage avec le mode
def fill_with_mode_or_default(df, col_name, default_val):
    mode_row = df.groupBy(col_name).count().orderBy(col("count").desc()).first()
    if mode_row and mode_row[0] is not None:
        df = df.fillna({col_name: mode_row[0]})
        print(f"Rempli {col_name} avec son mode : {mode_row[0]}")
    else:
        df = df.fillna({col_name: default_val})
        print(f"Mode non trouvé pour {col_name} → remplacé par '{default_val}'")
    return df

# Application
df = fill_with_mode_or_default(df, "TOP_PACK", "inconnu")
df = fill_with_mode_or_default(df, "REGION", "NON_PRECISÉ")

Mode non trouvé pour TOP_PACK → remplacé par 'inconnu'
Mode non trouvé pour REGION → remplacé par 'NON_PRECISÉ'


In [14]:
from pyspark.ml.feature import StringIndexer

# Création des indexeurs pour chaque variable catégorielle
indexers = [
    StringIndexer(inputCol=c, outputCol=c + "_idx", handleInvalid="keep")
    for c in cat_vars
]

# Application de chaque indexeur en séquence
for indexer in indexers:
    df = indexer.fit(df).transform(df)

In [15]:
from pyspark.ml.feature import VectorAssembler

# On fusionne toutes les colonnes numériques + encodées
cols_for_features = num_vars + [c + "_idx" for c in cat_vars]

assembler = VectorAssembler(
    inputCols=cols_for_features,
    outputCol="features"
)

df_vector = assembler.transform(df)

In [16]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(
    inputCol="features",
    outputCol="features_scaled",
    withMean=True,
    withStd=True
)

scaler_model = scaler.fit(df_vector)
df_scaled = scaler_model.transform(df_vector)

In [17]:
from pyspark.ml.feature import PCA

# Réduction de la dimension avec PCA
pca = PCA(k=5, inputCol="features_scaled", outputCol="pca_features")
pca_model = pca.fit(df_scaled)
df_pca = pca_model.transform(df_scaled)

# Affichage de la variance expliquée
print("Variance expliquée par chaque composante principale :")
print(pca_model.explainedVariance.toArray())

Variance expliquée par chaque composante principale :
[0.4247323  0.12722984 0.08357809 0.07120151 0.0614992 ]


In [18]:
from pyspark.ml.clustering import KMeans

# Apllication du KMeans pour le clustering
kmeans = KMeans(featuresCol="pca_features", predictionCol="cluster", k=5, seed=42)

kmeans_model = kmeans.fit(df_pca)
df_clustered = kmeans_model.transform(df_pca)

# Affichage d’un aperçu des clusters attribués
df_clustered.select("cluster").groupBy("cluster").count().orderBy("cluster").show()

+-------+-------+
|cluster|  count|
+-------+-------+
|      0|1062120|
|      1|  55181|
|      2| 318295|
|      3|  23618|
|      4| 694834|
+-------+-------+



In [19]:
from pyspark.sql.functions import round, avg, col

# Moyenne des indicateurs par cluster, arrondis à 5 décimales
df_clustered.groupBy("cluster").agg(
    round(avg("REGION_idx"), 5).alias("REGION_idx"),
    round(avg("TENURE_idx"), 5).alias("TENURE_idx"),
    round(avg("MONTANT"), 5).alias("MONTANT"),
    round(avg("FREQUENCE_RECH"), 5).alias("FREQ_RECH"),
    round(avg("REVENUE"), 5).alias("REVENUE"),
    round(avg("FREQUENCE"), 5).alias("FREQUENCE"),
    round(avg("DATA_VOLUME"), 5).alias("DATA_VOLUME"),
    round(avg("ON_NET"), 5).alias("ON_NET"),
    round(avg("ORANGE"), 5).alias("ORANGE"),
    round(avg("REGULARITY"), 5).alias("REGULARITY"),
    round(avg("TOP_PACK_idx"), 5).alias("TOP_PACK_idx"),
    round(avg("FREQ_TOP_PACK"), 5).alias("FREQ_TOPACK"),
    round(avg("CHURN"), 5).alias("CHURN")
).orderBy("cluster").show(truncate=False)

+-------+----------+----------+-----------+---------+-----------+---------+-----------+----------+---------+----------+------------+-----------+-------+
|cluster|REGION_idx|TENURE_idx|MONTANT    |FREQ_RECH|REVENUE    |FREQUENCE|DATA_VOLUME|ON_NET    |ORANGE   |REGULARITY|TOP_PACK_idx|FREQ_TOPACK|CHURN  |
+-------+----------+----------+-----------+---------+-----------+---------+-----------+----------+---------+----------+------------+-----------+-------+
|0      |3.17274   |0.10171   |2816.40804 |6.21965  |2723.33106 |7.99654  |2263.88937 |129.59871 |49.47416 |31.83874  |5.04479     |5.196      |0.00416|
|1      |2.32165   |0.09878   |28512.2377 |49.51551 |29158.44231|53.33987 |13681.71764|736.00987 |607.72539|60.05596  |7.78413     |43.4925    |0.00783|
|2      |2.68634   |0.1018    |10771.66017|22.35367 |10987.15423|27.16735 |6438.51295 |419.19241 |163.38082|55.1273   |7.13304     |16.10517   |0.00662|
|3      |3.38839   |0.10793   |15217.72205|40.53507 |15576.65733|46.14891 |2624.08