In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml import PipelineModel
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.functions import (
    col, sum, avg, countDistinct, count, max, datediff, lit
)
from pyspark.ml import PipelineModel
from pipeline_segm.SegmentationClass import SegmentationClass

In [2]:
# Démarrer une session Spark
spark = SparkSession.builder.appName("Modelseg").getOrCreate()

# Lire le CSV depuis HDFS ou local
df = spark.read.option("header", "true").csv("hdfs://localhost:9000/projet/gold/train_df")


25/06/02 02:17:53 WARN Utils: Your hostname, toma-Nitro-AN517-52 resolves to a loopback address: 127.0.1.1; using 192.168.88.11 instead (on interface enp7s0)
25/06/02 02:17:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/02 02:17:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [3]:
loaded_pipeline_model = PipelineModel.load("/home/toma/Documents/BIGDATA/big_data_pipeline_v3")
df = loaded_pipeline_model.transform(df)

                                                                                

In [4]:


geo_assembler = VectorAssembler(inputCols=["x", "y", "z"], outputCol="features")



In [21]:
seg = SegmentationClass()



kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=15, seed=42)

pipeline = Pipeline(stages=[seg, geo_assembler, kmeans])

model = pipeline.fit(df)

clustered = model.transform(df)
model.write().overwrite().save("/home/toma/Documents/BIGDATA/segmentation_model")

                                                                                

In [22]:
clustered = clustered.select(
        "customer_id", "order_id", "customer_city",
        "product_name","order_item_quantity","product_price","shipping_mode","cluster"
    )
clustered.show(5)



+-----------+--------+-------------+--------------------+-------------------+-------------+--------------+-------+
|customer_id|order_id|customer_city|        product_name|order_item_quantity|product_price| shipping_mode|cluster|
+-----------+--------+-------------+--------------------+-------------------+-------------+--------------+-------+
|      10000|   68882|       Caguas|Nike Men's Dri-FI...|                  1|         50.0|Standard Class|      1|
|      10009|   15502|       Caguas|Diamondback Women...|                  1|   299.980011|Standard Class|      1|
|      10010|    8757|      Phoenix|Perfect Fitness P...|                  5|  59.99000168|   First Class|     12|
|      10012|   36469|     Lynnwood|Perfect Fitness P...|                  4|  59.99000168|Standard Class|     11|
|      10038|   27853|       Caguas|Nike Men's Free 5...|                  3|  99.98999786|Standard Class|      1|
+-----------+--------+-------------+--------------------+-------------------+---

                                                                                

In [4]:
seg = SegmentationClass()

assembler = VectorAssembler(
    inputCols=[
        "total_sales", "total_orders", "avg_order_value", "total_quantity",
        "avg_discount_rate", "num_categories", "frequency",
        "avg_shipping_delay", "late_delivery_rate", "avg_product_price",
        "recency_days"
    ],
    outputCol="features"
)

kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=2, seed=42)

pipeline = Pipeline(stages=[seg, assembler, kmeans])

model = pipeline.fit(df)

clusters_df = model.transform(df)
model.write().overwrite().save("/home/toma/Documents/BIGDATA/segmentation_model")


                                                                                

In [5]:
clusters_df.show(10)



+-----------+------------------+------------+------------------+--------------+-------------------+--------------+-------------------+---------+------------------+------------------+------------------+------------+--------------------+-------+
|customer_id|       total_sales|total_orders|   avg_order_value|total_quantity|  avg_discount_rate|num_categories|    last_order_date|frequency|avg_shipping_delay|late_delivery_rate| avg_product_price|recency_days|            features|cluster|
+-----------+------------------+------------+------------------+--------------+-------------------+--------------+-------------------+---------+------------------+------------------+------------------+------------+--------------------+-------+
|       5645|     2889.22006228|           5|     577.844012456|            24|0.09083333166666667|             7|2017-03-16 00:48:00|       12|3.4166666666666665|0.5833333333333334|203.32000510499998|         322|[2889.22006228,5....|      1|
|       4937|      2449.

                                                                                

In [21]:
from datetime import timedelta

# Récupérer la date max
max_date_row = df.select(max("order_date_(dateorders)")).collect()[0]
max_date = max_date_row[0]

if max_date is None:
    raise ValueError("Aucune date valide trouvée dans 'order_date'.")

# ✅ Corriger ici avec timedelta
today = max_date + timedelta(days=1)  # ou ajouter timedelta(1) en Pandas

# Agrégation par client
features_df = df.groupBy("customer_id").agg(
    sum("order_item_total").alias("total_sales"),
    countDistinct("order_id").alias("total_orders"),
    (sum("order_item_total") / countDistinct("order_id")).alias("avg_order_value"),
    sum("order_item_quantity").alias("total_quantity"),
    avg("order_item_discount_rate").alias("avg_discount_rate"),
    countDistinct("category_id").alias("num_categories"),
    max("order_date_(dateorders)").alias("last_order_date"),
    count("order_id").alias("frequency"),
    avg("days_for_shipping_(real)").alias("avg_shipping_delay"),
    avg("late_delivery_risk").alias("late_delivery_rate"),
    avg("order_item_product_price").alias("avg_product_price")
)

# Calcul de la récence (en jours)
features_df = features_df.withColumn("recency_days", datediff(lit(today), col("last_order_date")))

# Ajouter le segment client (optionnel)
segments_df = df.select("customer_id", "customer_segment").dropDuplicates(["customer_id"])
features_df = features_df.join(segments_df, on="customer_id", how="left")

# Afficher un aperçu
features_df.show(10)

                                                                                

+-----------+------------------+------------+------------------+--------------+-------------------+--------------+-------------------+---------+------------------+------------------+------------------+------------+----------------+
|customer_id|       total_sales|total_orders|   avg_order_value|total_quantity|  avg_discount_rate|num_categories|    last_order_date|frequency|avg_shipping_delay|late_delivery_rate| avg_product_price|recency_days|customer_segment|
+-----------+------------------+------------+------------------+--------------+-------------------+--------------+-------------------+---------+------------------+------------------+------------------+------------+----------------+
|        467|     3732.31005469|           7|      533.18715067|            40|0.11900000025000002|             9|2017-03-20 19:21:00|       20|               2.6|               0.6|    147.7365033185|         318|        Consumer|
|       7711|      1223.9600219|           4|     305.990005475|        

In [13]:

from pyspark.sql.functions import radians, cos, sin

dataset = df.withColumn("lat_rad", radians("latitude"))
dataset = dataset.withColumn("lon_rad", radians("longitude"))

dataset = dataset.withColumn("x", cos("lat_rad") * cos("lon_rad")) \
                    .withColumn("y", cos("lat_rad") * sin("lon_rad")) \
                    .withColumn("z", sin("lat_rad"))
dataset = dataset.dropDuplicates(["customer_id"])
# Assembler les features dans une colonne vectorielle

features_vector_df = geo_assembler.transform(dataset)

# Initialiser l’évaluateur silhouette
evaluator = ClusteringEvaluator(featuresCol='features', metricName='silhouette', distanceMeasure='squaredEuclidean')


In [15]:

# Boucle pour tester k=2 à 5
for k in range(15, 16):
    kmeans = KMeans(featuresCol="features",k=k, seed=42)
    model = kmeans.fit(features_vector_df)
    predictions = model.transform(features_vector_df)
    score = evaluator.evaluate(predictions)
    print(f"Silhouette Score pour k={k} : {score:.4f}")

Silhouette Score pour k=15 : 0.8410


In [16]:
model.write().overwrite().save("/home/toma/Documents/BIGDATA/segmentation_model")

                                                                                

In [23]:
# Lire le CSV depuis HDFS ou local
dfs = spark.read.option("header", "true").csv("hdfs://localhost:9000/projet/gold/test_df")
pipeline_model = PipelineModel.load("/home/toma/Documents/BIGDATA/big_data_pipeline_v3")
pipeline_seg = PipelineModel.load("/home/toma/Documents/BIGDATA/segmentation_model")
dfs = pipeline_model.transform(dfs)
dfs = pipeline_seg.transform(dfs)
dfs.show()





+--------+------------------------+-----------------------------+-----------------+------------------+-----------------+------------------+-----------+--------------------+-----------------+----------------+--------------+--------------+-----------+--------------+-----------------+----------------+--------------+--------------------+----------------+-------------+---------------+-----------+------------+------+-------------------+--------------+-----------------+-----------------------+--------+----------------------+-------------------+------------------------+-------------+------------------------+-----------------------+-------------------+-----------+----------------+----------------------+---------------+--------------------+---------------+-------------+---------------+-------------------+-------------------+--------------------+--------------------+-------------+--------------+--------------------------+--------------+-------------------+-------------------+--------------------+

                                                                                

In [6]:
spark.stop()