## ML Objective

#####Business Problem

Segment users based on behavior to identify high-value, medium-value, and low-value customers.

#####Why ML here?

Manual rules don’t scale for millions of users. Clustering helps discover hidden behavior patterns.

### Read Data from GOLD layer

In [0]:
from pyspark.sql.functions import col

gold_df = spark.read.table(
    "e_commerce_capstone.gold.user_behavior_metrics"
)

gold_df.printSchema()
gold_df.show(5)


root
 |-- user_id: long (nullable = true)
 |-- total_events: long (nullable = true)
 |-- purchase_count: long (nullable = true)
 |-- total_spent: double (nullable = true)

+---------+------------+--------------+-----------+
|  user_id|total_events|purchase_count|total_spent|
+---------+------------+--------------+-----------+
|532749124|         121|             0|        0.0|
|545348348|          69|             0|        0.0|
|514443726|        1942|             4|     249.16|
|542625372|         266|             0|        0.0|
|512518970|         331|            12|    7296.03|
+---------+------------+--------------+-----------+
only showing top 5 rows


### Select ML Features

In [0]:
features_df = gold_df.select(
    "user_id",
    "total_events",
    "purchase_count",
    "total_spent"
)


#####Feature Scaling 
Why scale?

- total_spent is very large

- KMeans uses distance

- Without scaling → wrong clusters

In [0]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

assembler = VectorAssembler(
    inputCols=["total_events", "purchase_count", "total_spent"],
    outputCol="features_raw"
)

assembled_df = assembler.transform(features_df)

scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withMean=True,
    withStd=True
)

scaler_model = scaler.fit(assembled_df)
scaled_df = scaler_model.transform(assembled_df)


### Train ML Model (KMeans)

In [0]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(
    k=3,              # 3 segments
    seed=42,
    featuresCol="features"
)

model = kmeans.fit(scaled_df)


### Generate Predictions (User Segments)

In [0]:
predictions = model.transform(scaled_df)

predictions.select(
    "user_id",
    "prediction"
).show(10)


+---------+----------+
|  user_id|prediction|
+---------+----------+
|572101511|         1|
|517668004|         0|
|513197926|         0|
|512443301|         1|
|518369549|         0|
|542815072|         0|
|543580688|         0|
|515161213|         0|
|513700637|         0|
|519164937|         0|
+---------+----------+
only showing top 10 rows


###Interpret Clusters

In [0]:
from pyspark.sql.functions import avg

cluster_profile = (
    predictions
    .groupBy("prediction")
    .agg(
        avg("total_spent").alias("avg_spent"),
        avg("purchase_count").alias("avg_purchase"),
        avg("total_events").alias("avg_events")
    )
)
display(cluster_profile)


prediction,avg_spent,avg_purchase,avg_events
1,24132.400735855404,58.44802424786709,679.0291311180961
0,104.1803760642167,0.371752605590608,25.56736217789467
2,0.0,0.0,199179.0


Rank clusters by avg_spent

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank

window = Window.orderBy("avg_spent")

cluster_ranked = (
    cluster_profile
    .withColumn("rank", dense_rank().over(window))
)




Convert ranks → business segments

In [0]:
from pyspark.sql.functions import expr

cluster_labeled = (
    cluster_ranked
    .withColumn(
        "user_segment",
        expr("""
            CASE
                WHEN rank = 1 THEN 'Low Value'
                WHEN rank = 2 THEN 'Medium Value'
                WHEN rank = 3 THEN 'High Value'
            END
        """)
    )
    .select("prediction", "user_segment")
)




Join labels back to each user

In [0]:
final_df = (
    predictions
    .join(cluster_labeled, on="prediction", how="left")
)




### Log Model with MLflow (Serverless-Safe)

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS e_commerce_capstone.gold.mlflow_tmp;


In [0]:
##Fix UC Volume requirement
import os
os.environ["MLFLOW_DFS_TMP"] = "/Volumes/e_commerce_capstone/gold/mlflow_tmp"

##Suppress MLflow warning (clean logs)
import logging
logging.getLogger("mlflow.models.model").setLevel(logging.ERROR)

In [0]:
##Log model

import mlflow
import mlflow.spark

mlflow.set_experiment("/ecommerce-user-segmentation")

with mlflow.start_run():
    mlflow.log_param("k", 3)
    mlflow.log_metric("num_users", predictions.count())

    mlflow.spark.log_model(
        model,
        artifact_path="kmeans_model"
    )




### Save Predictions back to GOLD

In [0]:
final_df.groupBy("user_segment").agg(
    avg("total_spent").alias("avg_spent"),
    avg("purchase_count").alias("avg_purchase"),
    avg("total_events").alias("avg_events")
).orderBy("avg_spent").show()




+------------+------------------+-----------------+-----------------+
|user_segment|         avg_spent|     avg_purchase|       avg_events|
+------------+------------------+-----------------+-----------------+
|   Low Value|               0.0|              0.0|         199179.0|
|Medium Value| 104.1803760642167|0.371752605590608|25.56736217789467|
|  High Value|24132.400735855404|58.44802424786709|679.0291311180961|
+------------+------------------+-----------------+-----------------+



In [0]:
final_df.select(
    "user_id",
    "user_segment",
    "total_events",
    "purchase_count",
    "total_spent"
).write.format("delta") \
 .mode("overwrite") \
 .saveAsTable("e_commerce_capstone.gold.user_segments")




#####Machine Learning (User Segmentation)

Applied KMeans clustering on the Gold layer to segment users based on engagement, purchases, and revenue.
Feature scaling was used to ensure fair distance calculation.
MLflow was used to track experiments, parameters, and models.
The output was written back to the Gold layer for analytics and dashboards.

## Compute Cluster-Level Mean Statistics

In [0]:
from pyspark.sql import functions as F

cluster_stats_df = (
    predictions
    .groupBy("prediction")
    .agg(
        F.round(F.avg("total_events"), 2).alias("avg_total_events"),
        F.round(F.avg("purchase_count"), 2).alias("avg_purchase_count"),
        F.round(F.avg("total_spent"), 2).alias("avg_total_spent")
    )
    .orderBy("prediction")
)

cluster_stats_df.display()

prediction,avg_total_events,avg_purchase_count,avg_total_spent
0,25.57,0.37,104.18
1,679.03,58.45,24132.4
2,199179.0,0.0,0.0


### Create Business-Friendly Cluster Names

In [0]:
cluster_labels = [
    (0, "Occasional Buyers"),
    (1, "High-Value Loyal Customers"),
    (2,"Browsers / Non-Converters")
]

cluster_label_df = spark.createDataFrame(
    cluster_labels, ["prediction", "segment_name"]
)


### Join Statistics with Segment Names 

In [0]:
cluster_summary_df = (
    cluster_stats_df
    .join(cluster_label_df, on="prediction", how="left")
)

cluster_summary_df.display()

prediction,avg_total_events,avg_purchase_count,avg_total_spent,segment_name
1,679.03,58.45,24132.4,High-Value Loyal Customers
0,25.57,0.37,104.18,Occasional Buyers
2,199179.0,0.0,0.0,Browsers / Non-Converters


In [0]:
cluster_summary_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("e_commerce_capstone.gold.user_segment_summary")


**Cluster-level averages** were computed to interpret behavioral differences across segments, **transforming raw ML predictions into actionable business insights.**