In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml.clustering import KMeans, BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.functions import col, isnan, when, count, mean, min, max, stddev, percentile_approx
from pyspark.ml.stat import Correlation
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import seaborn as sns
from pyspark.sql.types import DoubleType
import warnings
warnings.filterwarnings("ignore")

spark = SparkSession.builder \
    .appName("MultiBehaviorCustomerSegmentation") \
    .config("spark.driver.memory", "30g") \
    .config("spark.executor.memory", "30g") \
    .config("spark.executor.memoryOverhead", "18g") \
    .config("spark.driver.maxResultSize", "18g") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.instances", "4") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "2") \
    .config("spark.dynamicAllocation.maxExecutors", "10") \
    .config("spark.dynamicAllocation.initialExecutors", "4") \
    .config("spark.default.parallelism", "200") \
    .config("spark.sql.shuffle.partitions", "200") \
    .enableHiveSupport() \
    .getOrCreate()


25/03/24 11:02:55 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:

# 2. Load and Clean Data
# ---------------------
df = spark.read.csv("ctzn_2_years_data.csv", header=True, inferSchema=True)
df = df.na.fill(0)

# ---------------------
# 3. Define Cluster Types
# ---------------------
cluster_configs = {
    "spending": {
        "features": [
            "purchases", "oneoff_purchases", "installments_purchases",
            "purchases_frequency", "oneoff_purchases_frequency", "purchases_installments_frequency"
        ],
        "k": 5
    },
    "cash": {
        "features": ["cash_advance", "cash_advance_frequency", "cash_advance_trx"],
        "k": 4
    },
    "payment": {
        "features": ["payments", "minimum_payments", "prc_full_payment", "tenure"],
        "k": 4
    },
    "credit": {
        "features": ["balance", "credit_limit"],
        "k": 4
    },
    "health": {
        "features": ["balance", "purchases", "payments", "cash_advance", "credit_limit"],
        "k": 4
    }
}

                                                                                

In [5]:
# Add derived feature for credit utilization
df = df.withColumn("balance_credit_ratio", when(col("credit_limit") != 0, col("balance") / col("credit_limit")).otherwise(0))
cluster_configs["credit"]["features"].append("balance_credit_ratio")

In [9]:
import builtins

# 4. Function: Run Clustering
# ---------------------
def run_clustering(df, id_col, cluster_name, feature_cols, k):
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    assembled = assembler.transform(df.select(id_col, *feature_cols))

    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
    scaled = scaler.fit(assembled).transform(assembled)

    pca = PCA(k=builtins.min(2, len(feature_cols)), inputCol="scaled_features", outputCol="pca_features")
    pca_data = pca.fit(scaled).transform(scaled)

    # kmeans = KMeans(k=k, seed=42, featuresCol="pca_features")
    kmeans = KMeans(k=k, seed=42, featuresCol="scaled_features")
    model = kmeans.fit(pca_data)
    clustered = model.transform(pca_data)

    # Extract only required columns
    return clustered.select(id_col, col("prediction").alias(f"{cluster_name}_cluster"))

In [10]:
# ---------------------
# 5. Run All Clusterings
# ---------------------
cluster_results = []
for cluster_name, config in cluster_configs.items():
    print(f"Clustering: {cluster_name}")
    result = run_clustering(df, "cif_id", cluster_name, config["features"], config["k"])
    cluster_results.append(result)


Clustering: spending


                                                                                

Clustering: cash


                                                                                

Clustering: payment


                                                                                

Clustering: credit


                                                                                

Clustering: health


                                                                                

In [11]:
# 6. Join All Results on cif_id
# ---------------------
final_clusters = cluster_results[0]
for cluster_df in cluster_results[1:]:
    final_clusters = final_clusters.join(cluster_df, on="cif_id", how="inner")

# ---------------------
# 7. Convert to Pandas and Save
# ---------------------
final_clusters_pd = final_clusters.toPandas()
final_clusters_pd.to_csv("multi_behavior_customer_segments.csv", index=False)

print("Saved: multi_behavior_customer_segments.csv")

                                                                                

✅ Saved: multi_behavior_customer_segments.csv


In [14]:
def interpret_clusters(df, id_col, cluster_col, feature_cols, cluster_name):
    """
    Returns:
    - cluster_profiles: per-cluster mean values of features
    - relative_importance: % difference from average
    - descriptions: business labels per cluster
    """
    # Prepare
    cluster_df = df.select(cluster_col, *feature_cols)
    for c in feature_cols:
        cluster_df = cluster_df.withColumn(c, col(c).cast("double"))

    pandas_df = cluster_df.toPandas()

    # Mean feature values per cluster
    cluster_profiles = pandas_df.groupby(cluster_col).mean()
    overall_means = pandas_df[feature_cols].mean()

    # Relative importance
    relative_importance = cluster_profiles.copy()
    for col_name in feature_cols:
        overall = overall_means[col_name]
        if overall != 0:
            relative_importance[col_name] = (cluster_profiles[col_name] - overall) / overall
        else:
            relative_importance[col_name] = cluster_profiles[col_name] - overall

    # Build descriptions
    descriptions = {}
    for cluster_id in cluster_profiles.index:
        desc = []
        top_features = relative_importance.loc[cluster_id].abs().sort_values(ascending=False).head(3).index
        for feat in top_features:
            val = relative_importance.loc[cluster_id, feat]
            direction = "higher" if val > 0 else "lower"
            desc.append(f"{feat} is {abs(val)*100:.1f}% {direction} than average")
        descriptions[cluster_id] = f"{cluster_name.capitalize()} Cluster {cluster_id}: " + "; ".join(desc)

    return cluster_profiles, relative_importance, descriptions


In [15]:
# Example for spending
spending_cluster_df = df.join(cluster_results[0], on="cif_id")
spending_profiles, spending_rel, spending_desc = interpret_clusters(
    df=spending_cluster_df,
    id_col="cif_id",
    cluster_col="spending_cluster",
    feature_cols=cluster_configs["spending"]["features"],
    cluster_name="spending"
)

# Print results
for cid, desc in spending_desc.items():
    print(desc)


                                                                                

Spending Cluster 0: oneoff_purchases_frequency is 100.0% lower than average; installments_purchases is 62.9% lower than average; purchases is 59.8% lower than average
Spending Cluster 1: installments_purchases is 760642.6% higher than average; purchases is 694571.1% higher than average; oneoff_purchases is 481756.0% higher than average
Spending Cluster 2: installments_purchases is 180925.0% higher than average; purchases is 176204.6% higher than average; oneoff_purchases is 161000.4% higher than average
Spending Cluster 3: oneoff_purchases_frequency is 4187.5% higher than average; installments_purchases is 72.7% lower than average; oneoff_purchases is 57.6% higher than average
Spending Cluster 4: purchases_frequency is 148516.7% higher than average; oneoff_purchases is 6397.8% higher than average; purchases is 3452.4% higher than average


In [16]:
cluster_results_dict = {
    "spending": cluster_results[0],
    "cash": cluster_results[1],
    "payment": cluster_results[2],
    "credit": cluster_results[3],
    "health": cluster_results[4],
}

for name in cluster_results_dict.keys():
    joined = df.join(cluster_results_dict[name], on="cif_id")
    profiles, rel, desc = interpret_clusters(
        joined,
        id_col="cif_id",
        cluster_col=f"{name}_cluster",
        feature_cols=cluster_configs[name]["features"],
        cluster_name=name
    )
    print(f"\n📊 {name.upper()} CLUSTER INTERPRETATIONS:")
    for cid, d in desc.items():
        print(f"- Cluster {cid}: {d}")


                                                                                


📊 SPENDING CLUSTER INTERPRETATIONS:
- Cluster 0: Spending Cluster 0: oneoff_purchases_frequency is 100.0% lower than average; installments_purchases is 62.9% lower than average; purchases is 59.8% lower than average
- Cluster 1: Spending Cluster 1: installments_purchases is 760642.6% higher than average; purchases is 694571.1% higher than average; oneoff_purchases is 481756.0% higher than average
- Cluster 2: Spending Cluster 2: installments_purchases is 180925.0% higher than average; purchases is 176204.6% higher than average; oneoff_purchases is 161000.4% higher than average
- Cluster 3: Spending Cluster 3: oneoff_purchases_frequency is 4187.5% higher than average; installments_purchases is 72.7% lower than average; oneoff_purchases is 57.6% higher than average
- Cluster 4: Spending Cluster 4: purchases_frequency is 148516.7% higher than average; oneoff_purchases is 6397.8% higher than average; purchases is 3452.4% higher than average


                                                                                


📊 CASH CLUSTER INTERPRETATIONS:
- Cluster 0: Cash Cluster 0: cash_advance_frequency is 100.0% lower than average; cash_advance is 85.5% lower than average; cash_advance_trx is 47.6% lower than average
- Cluster 1: Cash Cluster 1: cash_advance_frequency is 32.6% higher than average; cash_advance is 31.7% lower than average; cash_advance_trx is 31.5% lower than average
- Cluster 2: Cash Cluster 2: cash_advance_trx is 2202.6% higher than average; cash_advance is 2037.4% higher than average; cash_advance_frequency is 8.7% higher than average
- Cluster 3: Cash Cluster 3: cash_advance is 5846995.0% higher than average; cash_advance_trx is 4663.6% higher than average; cash_advance_frequency is 32.6% higher than average


                                                                                


📊 PAYMENT CLUSTER INTERPRETATIONS:
- Cluster 0: Payment Cluster 0: prc_full_payment is 100.0% lower than average; minimum_payments is 80.2% lower than average; payments is 17.4% lower than average
- Cluster 1: Payment Cluster 1: minimum_payments is 380969.4% higher than average; payments is 97.7% lower than average; prc_full_payment is 1.7% lower than average
- Cluster 2: Payment Cluster 2: minimum_payments is 79.9% lower than average; payments is 77.6% lower than average; tenure is 72.9% higher than average
- Cluster 3: Payment Cluster 3: minimum_payments is 79.7% lower than average; prc_full_payment is 51.9% higher than average; payments is 47.0% higher than average


                                                                                


📊 CREDIT CLUSTER INTERPRETATIONS:
- Cluster 0: Credit Cluster 0: balance is 137.8% higher than average; balance_credit_ratio is 83.8% lower than average; credit_limit is 79.9% lower than average
- Cluster 1: Credit Cluster 1: balance_credit_ratio is 40753617.6% higher than average; balance is 7062.8% higher than average; credit_limit is 100.0% lower than average
- Cluster 2: Credit Cluster 2: credit_limit is 380969.4% higher than average; balance is 568.9% lower than average; balance_credit_ratio is 100.0% lower than average
- Cluster 3: Credit Cluster 3: balance is 16731792.4% lower than average; credit_limit is 100.0% lower than average; balance_credit_ratio is 100.0% lower than average


                                                                                


📊 HEALTH CLUSTER INTERPRETATIONS:
- Cluster 0: Health Cluster 0: balance is 137.7% higher than average; credit_limit is 79.9% lower than average; cash_advance is 12.0% lower than average
- Cluster 1: Health Cluster 1: balance is 16731792.4% lower than average; payments is 553822.9% higher than average; purchases is 499045.9% higher than average
- Cluster 2: Health Cluster 2: credit_limit is 380969.4% higher than average; balance is 568.9% lower than average; cash_advance is 100.0% lower than average
- Cluster 3: Health Cluster 3: cash_advance is 5846995.0% higher than average; payments is 124692.0% higher than average; purchases is 124472.8% higher than average
