In [1]:
from pyspark.sql import SparkSession
 
# Start Spark session
spark = SparkSession.builder.appName("cust_seg").getOrCreate()
 
# Load dataset
df = spark.read.csv("s3://projectcustomer/RFM.csv", header=True, inferSchema=True)
 
# Show the first few rows of the DataFrame
df.show()

VBox()

Starting Spark application


ID,Kind,State,Spark UI,Driver log,User,Current session?
2,pyspark,idle,Link,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-------+---------+--------+
|CustomerID|Recency|Frequency|Monetary|
+----------+-------+---------+--------+
|     12626|     23|        9| 6620.48|
|     17809|     16|       12| 5162.91|
|     14837|     89|        4|  1649.5|
|     13483|     65|        1|  140.34|
|     17754|      0|        5| 1857.46|
|     13508|    243|        1|  110.97|
|     16706|     37|        3|   549.2|
|     13504|     64|        1|  295.93|
|     14944|     29|       11| 5865.57|
|     15594|     15|        5| 1771.78|
|     17346|      3|       15| 2717.71|
|     14030|     18|        8| 2344.22|
|     15216|     87|        1|    96.6|
|     17396|     39|        8|  7330.8|
|     17774|     96|        4| 1245.11|
|     15491|     95|        6| 3100.09|
|     13994|      3|        6| 2362.26|
|     12566|     86|        1|  351.65|
|     12447|    243|        1|  476.49|
|     14367|      8|       16| 9291.22|
+----------+-------+---------+--------+
only showing top 20 rows

In [2]:
# Compute 25th, 50th, and 75th percentiles for Recency, Frequency, and Monetary
recency_q25, recency_q50, recency_q75 = df.approxQuantile("Recency", [0.25, 0.5, 0.75], 0.01)
frequency_q25, frequency_q50, frequency_q75 = df.approxQuantile("Frequency", [0.25, 0.5, 0.75], 0.01)
monetary_q25, monetary_q50, monetary_q75 = df.approxQuantile("Monetary", [0.25, 0.5, 0.75], 0.01)

# Print 
print("RFM Percentile Thresholds:\n")

print("Recency:")
print(f"   25th percentile: {recency_q25}")
print(f"   50th percentile: {recency_q50}")
print(f"   75th percentile: {recency_q75}\n")

print("Frequency:")
print(f"   25th percentile: {frequency_q25}")
print(f"   50th percentile: {frequency_q50}")
print(f"   75th percentile: {frequency_q75}\n")

print("Monetary:")
print(f"   25th percentile: {monetary_q25}")
print(f"   50th percentile: {monetary_q50}")
print(f"   75th percentile: {monetary_q75}\n")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RFM Percentile Thresholds:

Recency:
   25th percentile: 17.0
   50th percentile: 49.0
   75th percentile: 137.0

Frequency:
   25th percentile: 1.0
   50th percentile: 2.0
   75th percentile: 5.0

Monetary:
   25th percentile: 306.67
   50th percentile: 662.88
   75th percentile: 1628.74

In [3]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Step 1: Assemble the RFM columns into a feature vector
assembler = VectorAssembler(inputCols=["Recency", "Frequency", "Monetary"], outputCol="rfm_features")
rfm_vector = assembler.transform(df)

# Step 2: Apply StandardScaler to normalize the RFM features
scaler = StandardScaler(inputCol="rfm_features", outputCol="scaled_features", withStd=True, withMean=True)
scaler_model = scaler.fit(rfm_vector)
scaled_data = scaler_model.transform(rfm_vector)

# Show the resulting DataFrame with the scaled features
scaled_data.select("CustomerID", "scaled_features").show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------------------------------------------------------+
|CustomerID|scaled_features                                                   |
+----------+------------------------------------------------------------------+
|12626     |[-0.6906914279334252,0.12404818683839786,0.1482353600495279]      |
|17809     |[-0.7607374691812382,0.2121443998542179,0.0965988742467065]       |
|14837     |[-0.030257324739759502,-0.022778834854635572,-0.02786866116119202]|
|13483     |[-0.270415180446547,-0.11087504787045563,-0.08133279592946106]    |
|17754     |[-0.9208427063190966,0.006586569483971114,-0.020501383028714862]  |
|13508     |[1.510755582712127,-0.11087504787045563,-0.08237326986075005]     |
|16706     |[-0.5505993454377991,-0.05214423919324226,-0.0668483501839109]    |
|13504     |[-0.28042175776766315,-0.11087504787045563,-0.07582079936939125]  |
|14944     |[-0.6306519640067283,0.18277899551561122,0.12149160194706428]     |
|15594     |[-0.7707440465023544,0.00658

In [4]:
# 1. K-MEANS 

from pyspark.ml.clustering import KMeans

print("K-Means with K=4:")
kmeans = KMeans(k=4, seed=42, featuresCol="scaled_features", predictionCol="prediction")
model = kmeans.fit(scaled_data)
predictions = model.transform(scaled_data)

# Add cluster labels to RFM data
rfm_kmeans = predictions.select("Recency", "Frequency", "Monetary", "prediction") \
                        .withColumnRenamed("prediction", "K-MeansCluster")

rfm_kmeans.show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

K-Means with K=4:
+-------+---------+--------+--------------+
|Recency|Frequency|Monetary|K-MeansCluster|
+-------+---------+--------+--------------+
|     23|        9| 6620.48|             1|
|     16|       12| 5162.91|             1|
|     89|        4|  1649.5|             1|
|     65|        1|  140.34|             1|
|      0|        5| 1857.46|             1|
|    243|        1|  110.97|             0|
|     37|        3|   549.2|             1|
|     64|        1|  295.93|             1|
|     29|       11| 5865.57|             1|
|     15|        5| 1771.78|             1|
|      3|       15| 2717.71|             1|
|     18|        8| 2344.22|             1|
|     87|        1|    96.6|             1|
|     39|        8|  7330.8|             1|
|     96|        4| 1245.11|             3|
|     95|        6| 3100.09|             3|
|      3|        6| 2362.26|             1|
|     86|        1|  351.65|             1|
|    243|        1|  476.49|             0|
|      8|     

In [5]:
from pyspark.sql.functions import col, avg, min, max
from pyspark.ml.evaluation import ClusteringEvaluator

# 1. Count of samples in each cluster
print("Cluster Counts:")
rfm_kmeans.groupBy("K-MeansCluster").count().orderBy("K-MeansCluster").show()

# 2. Statistics (mean, min, max) for each cluster
print("Cluster-wise Recency Summary Stats:")
rfm_kmeans.groupBy("K-MeansCluster") \
    .agg(
        avg("Recency").alias("avg_Recency"),
        min("Recency").alias("min_Recency"),
        max("Recency").alias("max_Recency")
    ) \
    .orderBy("K-MeansCluster") \
    .show(truncate=False)

# Frequency summary
print("Cluster-wise Frequency Summary Stats:")
rfm_kmeans.groupBy("K-MeansCluster") \
    .agg(
        avg("Frequency").alias("avg_Frequency"),
        min("Frequency").alias("min_Frequency"),
        max("Frequency").alias("max_Frequency")
    ) \
    .orderBy("K-MeansCluster") \
    .show(truncate=False)

# Monetary summary
print("Cluster-wise Monetary Summary Stats:")
rfm_kmeans.groupBy("K-MeansCluster") \
    .agg(
        avg("Monetary").alias("avg_Monetary"),
        min("Monetary").alias("min_Monetary"),
        max("Monetary").alias("max_Monetary")
    ) \
    .orderBy("K-MeansCluster") \
    .show(truncate=False)

# 3. Centroids of the clusters
# model.clusterCenters returns an array of Vectors, one per cluster
print("Cluster Centroids (in scaled feature space):")
centroids = model.clusterCenters()
for idx, centroid in enumerate(centroids):
    print(f"Cluster {idx}: {centroid}")

# 4. Compute Silhouette Score using scaled features and cluster predictions
evaluator = ClusteringEvaluator(featuresCol="scaled_features", predictionCol="prediction",
                                metricName="silhouette", distanceMeasure="squaredEuclidean")

silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette:.4f}")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cluster Counts:
+--------------+-----+
|K-MeansCluster|count|
+--------------+-----+
|             0|  625|
|             1| 2898|
|             2|    1|
|             3|  814|
+--------------+-----+

Cluster-wise Recency Summary Stats:
+--------------+------------------+-----------+-----------+
|K-MeansCluster|avg_Recency       |min_Recency|max_Recency|
+--------------+------------------+-----------+-----------+
|0             |293.8192          |225        |373        |
|1             |31.559351276742582|0          |92         |
|2             |0.0               |0          |0          |
|3             |152.46068796068795|92         |222        |
+--------------+------------------+-----------+-----------+

Cluster-wise Frequency Summary Stats:
+--------------+------------------+-------------+-------------+
|K-MeansCluster|avg_Frequency     |min_Frequency|max_Frequency|
+--------------+------------------+-------------+-------------+
|0             |1.3552            |1            |34 

In [8]:
from pyspark.sql.functions import avg, round as rnd, col

# Step 1: Compute mean R, F, M for each cluster
cluster_stats = rfm_kmeans.groupBy("K-MeansCluster") \
    .agg(
        rnd(avg("Recency"), 2).alias("Avg_Recency"),
        rnd(avg("Frequency"), 2).alias("Avg_Frequency"),
        rnd(avg("Monetary"), 2).alias("Avg_Monetary")
    ).orderBy("K-MeansCluster")

print("Cluster-wise Average RFM Values:")
cluster_stats.show(truncate=False)

# Step 2: Convert to Pandas for easier logic building (optional)
stats_pd = cluster_stats.toPandas()

# Step 3: Manually define cluster labels based on thresholds
def label_cluster(row):
    
    # Loyal Customers
    if row['Avg_Recency'] <= recency_q25 and row['Avg_Frequency'] >= frequency_q75 and row['Avg_Monetary'] >= monetary_q75:
        return "Loyal High-Value Customers"
    elif row['Avg_Recency'] <= recency_q25 and (row['Avg_Frequency'] >= frequency_q50 or row['Avg_Monetary'] >= monetary_q50):
        return "Loyal Mid-Value Customers"

    # Regular Customers
    elif row['Avg_Recency'] <= recency_q50 and row['Avg_Frequency'] >= frequency_q50 and row['Avg_Monetary'] >= monetary_q50:
        return "Engaged Regular Customers"
    elif row['Avg_Recency'] <= recency_q50 and (row['Avg_Frequency'] >= frequency_q25 or row['Avg_Monetary'] >= monetary_q25):
        return "Passive Regular Customers"

    # At-Risk Customers
    elif row['Avg_Recency'] >= recency_q75 and row['Avg_Frequency'] <= frequency_q25 and row['Avg_Monetary'] <= monetary_q25:
        return "Churned Low-Value Customers"
    elif row['Avg_Recency'] >= recency_q75 and (row['Avg_Frequency'] <= frequency_q50 and row['Avg_Monetary'] <= monetary_q50):
        return "Churned Mid-Value Customers"

    # VIP or New Customers 
    elif row['Avg_Recency'] >= recency_q50 and row['Avg_Frequency'] >= frequency_q50 and row['Avg_Monetary'] >= 164339.33:
            return "VIP Customers"
    else:
        return "New Customers"

# Apply labels
stats_pd['Cluster_Label'] = stats_pd.apply(label_cluster, axis=1)

print("\nCluster Interpretation:")
print(stats_pd[['K-MeansCluster', 'Avg_Recency', 'Avg_Frequency', 'Avg_Monetary', 'Cluster_Label']])


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cluster-wise Average RFM Values:
+--------------+-----------+-------------+------------+
|K-MeansCluster|Avg_Recency|Avg_Frequency|Avg_Monetary|
+--------------+-----------+-------------+------------+
|0             |293.82     |1.36         |609.12      |
|1             |31.56      |5.48         |2679.6      |
|2             |0.0        |2189.0       |1772220.83  |
|3             |152.46     |2.22         |798.11      |
+--------------+-----------+-------------+------------+


Cluster Interpretation:
   K-MeansCluster  Avg_Recency  ...  Avg_Monetary                Cluster_Label
0               0       293.82  ...        609.12  Churned Mid-Value Customers
1               1        31.56  ...       2679.60    Engaged Regular Customers
2               2         0.00  ...    1772220.83   Loyal High-Value Customers
3               3       152.46  ...        798.11                New Customers

[4 rows x 5 columns]

In [9]:
# 2. GAUSSIAN MIXTURE MODEL 

from pyspark.ml.clustering import GaussianMixture

print("Gaussian Mixture Model with K=4:")
gmm = GaussianMixture(k=4, seed=42, featuresCol="scaled_features", predictionCol="prediction")
model = gmm.fit(scaled_data)
predictions = model.transform(scaled_data)

# Add cluster labels to RFM data
rfm_gmm = predictions.select("Recency", "Frequency", "Monetary", "prediction") \
                     .withColumnRenamed("prediction", "GMMCluster")

rfm_gmm.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Gaussian Mixture Model with K=4:
+-------+---------+--------+----------+
|Recency|Frequency|Monetary|GMMCluster|
+-------+---------+--------+----------+
|     23|        9| 6620.48|         3|
|     16|       12| 5162.91|         3|
|     89|        4|  1649.5|         2|
|     65|        1|  140.34|         2|
|      0|        5| 1857.46|         3|
|    243|        1|  110.97|         2|
|     37|        3|   549.2|         3|
|     64|        1|  295.93|         2|
|     29|       11| 5865.57|         3|
|     15|        5| 1771.78|         3|
|      3|       15| 2717.71|         3|
|     18|        8| 2344.22|         3|
|     87|        1|    96.6|         2|
|     39|        8|  7330.8|         3|
|     96|        4| 1245.11|         2|
|     95|        6| 3100.09|         2|
|      3|        6| 2362.26|         3|
|     86|        1|  351.65|         2|
|    243|        1|  476.49|         2|
|      8|       16| 9291.22|         3|
+-------+---------+--------+----------+
only sh

In [10]:
from pyspark.sql.functions import col
from pyspark.ml.evaluation import ClusteringEvaluator

# 1. Count of samples in each cluster
print("Cluster Counts:")
rfm_gmm.groupBy("GMMCluster").count().orderBy("GMMCluster").show()

# 2. Cluster-wise RFM statistics (mean, min, max)
print("Cluster-wise Recency Summary Stats:")
rfm_gmm.groupBy("GMMCluster") \
    .agg(
        avg("Recency").alias("avg_Recency"),
        min("Recency").alias("min_Recency"),
        max("Recency").alias("max_Recency")
    ) \
    .orderBy("GMMCluster") \
    .show(truncate=False)

# Frequency summary
print("Cluster-wise Frequency Summary Stats:")
rfm_gmm.groupBy("GMMCluster") \
    .agg(
        avg("Frequency").alias("avg_Frequency"),
        min("Frequency").alias("min_Frequency"),
        max("Frequency").alias("max_Frequency")
    ) \
    .orderBy("GMMCluster") \
    .show(truncate=False)

# Monetary summary
print("Cluster-wise Monetary Summary Stats:")
rfm_gmm.groupBy("GMMCluster") \
    .agg(
        avg("Monetary").alias("avg_Monetary"),
        min("Monetary").alias("min_Monetary"),
        max("Monetary").alias("max_Monetary")
    ) \
    .orderBy("GMMCluster") \
    .show(truncate=False)

# 3. Print GMM component means and covariances
print("Cluster Means (in scaled feature space):")
for idx, mean in enumerate(model.gaussiansDF.select("mean").collect()):
    print(f"Cluster {idx} Mean: {mean['mean']}")

print("\nCluster Covariance Matrices:")
for idx, cov in enumerate(model.gaussiansDF.select("cov").collect()):
    print(f"Cluster {idx} Covariance:\n{cov['cov']}")

# 4. Evaluate GMM model
log_likelihood = model.summary.logLikelihood
print(f"\nGMM Log Likelihood: {log_likelihood:.4f}")

# 5. Optional: Compute silhouette score using hard cluster labels
evaluator = ClusteringEvaluator(featuresCol="scaled_features", predictionCol="prediction",
                                metricName="silhouette", distanceMeasure="squaredEuclidean")
silhouette = evaluator.evaluate(predictions)
print(f"GMM Silhouette Score (approx): {silhouette:.4f}")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cluster Counts:
+----------+-----+
|GMMCluster|count|
+----------+-----+
|         0|  117|
|         1|   20|
|         2| 2210|
|         3| 1991|
+----------+-----+

Cluster-wise Recency Summary Stats:
+----------+------------------+-----------+-----------+
|GMMCluster|avg_Recency       |min_Recency|max_Recency|
+----------+------------------+-----------+-----------+
|0         |52.794871794871796|0          |217        |
|1         |89.7              |0          |372        |
|2         |158.81764705882352|0          |373        |
|3         |20.21145153189352 |0          |78         |
+----------+------------------+-----------+-----------+

Cluster-wise Frequency Summary Stats:
+----------+------------------+-------------+-------------+
|GMMCluster|avg_Frequency     |min_Frequency|max_Frequency|
+----------+------------------+-------------+-------------+
|0         |21.47008547008547 |1            |201          |
|1         |155.05            |1            |2189         |
|2      

In [11]:
from pyspark.sql.functions import avg, round as rnd, col

# Step 1: Compute mean R, F, M for each cluster
cluster_stats = rfm_gmm.groupBy("GMMCluster") \
    .agg(
        rnd(avg("Recency"), 2).alias("Avg_Recency"),
        rnd(avg("Frequency"), 2).alias("Avg_Frequency"),
        rnd(avg("Monetary"), 2).alias("Avg_Monetary")
    ).orderBy("GMMCluster")

print("Cluster-wise Average RFM Values:")
cluster_stats.show(truncate=False)

# Step 2: Convert to Pandas for easier logic building (optional)
stats_pd = cluster_stats.toPandas()

# Step 3: Manually define cluster labels based on thresholds
def label_cluster(row):
    # Loyal Customers
    if row['Avg_Recency'] <= recency_q25 and row['Avg_Frequency'] >= frequency_q75 and row['Avg_Monetary'] >= monetary_q75:
        return "Loyal High-Value Customers"
    elif row['Avg_Recency'] <= recency_q25 and (row['Avg_Frequency'] >= frequency_q50 or row['Avg_Monetary'] >= monetary_q50):
        return "Loyal Mid-Value Customers"

    # Regular Customers
    elif row['Avg_Recency'] <= recency_q50 and row['Avg_Frequency'] >= frequency_q50 and row['Avg_Monetary'] >= monetary_q50:
        return "Engaged Regular Customers"
    elif row['Avg_Recency'] <= recency_q50 and (row['Avg_Frequency'] >= frequency_q25 or row['Avg_Monetary'] >= monetary_q25):
        return "Passive Regular Customers"

    # At-Risk Customers
    elif row['Avg_Recency'] >= recency_q75 and row['Avg_Frequency'] <= frequency_q25 and row['Avg_Monetary'] <= monetary_q25:
        return "Churned Low-Value Customers"
    elif row['Avg_Recency'] >= recency_q75 and (row['Avg_Frequency'] <= frequency_q50 and row['Avg_Monetary'] <= monetary_q50):
        return "Churned Mid-Value Customers"

    # VIP or New Customers 
    elif row['Avg_Recency'] >= recency_q50 and row['Avg_Frequency'] >= frequency_q50 and row['Avg_Monetary'] >= 164339.33:
            return "VIP Customers"
    else:
        return "New Customers"

# Apply labels
stats_pd['Cluster_Label'] = stats_pd.apply(label_cluster, axis=1)

print("\nCluster Interpretation:")
print(stats_pd[['GMMCluster', 'Avg_Recency', 'Avg_Frequency', 'Avg_Monetary', 'Cluster_Label']])


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cluster-wise Average RFM Values:
+----------+-----------+-------------+------------+
|GMMCluster|Avg_Recency|Avg_Frequency|Avg_Monetary|
+----------+-----------+-------------+------------+
|0         |52.79      |21.47        |13275.45    |
|1         |89.7       |155.05       |164339.33   |
|2         |158.82     |1.87         |652.15      |
|3         |20.21      |5.51         |2153.09     |
+----------+-----------+-------------+------------+


Cluster Interpretation:
   GMMCluster  Avg_Recency  ...  Avg_Monetary                Cluster_Label
0           0        52.79  ...      13275.45                New Customers
1           1        89.70  ...     164339.33                VIP Customers
2           2       158.82  ...        652.15  Churned Mid-Value Customers
3           3        20.21  ...       2153.09    Engaged Regular Customers

[4 rows x 5 columns]

In [12]:
# Join the DataFrames on Recency, Frequency, Monetary
rfm_combined = rfm_kmeans \
    .join(rfm_gmm, on=["Recency", "Frequency", "Monetary"])

# Show the final DataFrame
rfm_combined.show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------+--------+--------------+----------+
|Recency|Frequency|Monetary|K-MeansCluster|GMMCluster|
+-------+---------+--------+--------------+----------+
|     23|        9| 6620.48|             1|         3|
|     16|       12| 5162.91|             1|         3|
|     89|        4|  1649.5|             1|         2|
|     65|        1|  140.34|             1|         2|
|      0|        5| 1857.46|             1|         3|
|    243|        1|  110.97|             0|         2|
|     37|        3|   549.2|             1|         3|
|     64|        1|  295.93|             1|         2|
|     29|       11| 5865.57|             1|         3|
|     15|        5| 1771.78|             1|         3|
|      3|       15| 2717.71|             1|         3|
|     18|        8| 2344.22|             1|         3|
|     87|        1|    96.6|             1|         2|
|     39|        8|  7330.8|             1|         3|
|     96|        4| 1245.11|             3|         2|
|     95| 

In [13]:
# Save the final DataFrame as a CSV file to the specified S3 path
rfm_combined.write \
    .option("header", "true") \
    .csv("s3://projectcustomer/output/rfm_combined.csv")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…