In [None]:
from pyspark.sql import *
import numpy as np
from tqdm.notebook import tqdm
import time
import matplotlib.pyplot as plt
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

In [2]:
from pyspark import SparkContext, SparkConf
import numpy as np

conf = SparkConf().setAppName("SilhouetteScore").setMaster("spark://localhost:7077")  # Sử dụng 2 workers
sc = SparkContext(conf=conf)

In [3]:
DATA_PATH = "hdfs://localhost:9000/user/nwiieth/customer_segmentation/data/customer_data.csv"
MIN_K = 2
MAX_K = 8

## Generate features values for RDD

In [4]:
def generate_features(values):
    res = []
    for v in values:
        res.append(float(v))
    return np.array(res)

## Read data

In [5]:
def read_data(path):
    rdd = sc.textFile(path)
    header = rdd.first()
    rdd_split_by_comma = rdd.filter(lambda x: x != header).map(lambda x: x.split(","))
    rdd_id_features = rdd_split_by_comma.map(lambda x: generate_features(x[1:]))
    return rdd_id_features

## Compute euclidean distance from a point to all centroids

In [6]:
def euclidean_distance(point1, point2):
    return np.linalg.norm(point1 - point2)

In [7]:
def get_distance_to_centroids(point, centroids_list):
    res = []
    for c in centroids_list.value:
        distance = euclidean_distance(point, c)
        res.append(distance)
    return res

## Check whether kmeans has converged

Converged conditions: The distance between new and current centroids is equal or lower thanh a threshold.

In [8]:
def has_converged(centroids, new_centroids, threshold):
    if len(centroids) != len(new_centroids):
        return False

    for c, nc in zip(centroids, new_centroids):
        distance = euclidean_distance(c, nc)
        if distance > threshold:
            return False
    return True

## Kmeans Clustering function

In [9]:
def kmeans_clustering(rdd, k, max_iterations=100, threshold=0.01):


    # Khởi tạo các điểm trung tâm ngẫu nhiên, các điểm được lấy ngẫu nhiên từ các worker khác nhau, kết quả được tập hợp và trả về driver
    centroids = rdd.takeSample(withReplacement=False, num=k)

    # Khởi tạo RDD lưu trữ việc gán nhãn các điểm dữ liệu vào các cụm
    assignments = None

    # Lặp qua các lần lặp
    for it in tqdm(range(max_iterations), desc=f"Clustering with k={k}..."):
        # Các điểm trung tâm được broadcast đến tất cả worker để có thể đọc được
        broadcast_centroids = sc.broadcast(centroids)

        # Tính khoảng cách từ mỗi điểm dữ liệu đến các điểm trung tâm
        # Mỗi dòng dữ liệu được thực hiện map tại worker chứa nó
        distances = rdd.map(lambda point: (point, get_distance_to_centroids(point, broadcast_centroids)))

        # Gán nhãn các điểm dữ liệu vào cụm dựa trên điểm trung tâm gần nhất, thực hiện tại worker chứa dòng dữ liệu
        assignments = distances.map(lambda x: (x[0], np.argmin(x[1])))

        # Tính toán vị trí mới của các điểm trung tâm
        # remapping trả về từng cặp (cluster_id, (mảng chứa tổng từng phần tử của các điểm dữ liệu thuộc cụm, số lượng phần tử thuộc cụm))
        # remapping thực hiện map trên từng worker và reduceByKey để gom dữ liệu từ tất cả các worker về 1 nơi
        remapping = assignments.map(lambda x: (x[1], (x[0], 1))).reduceByKey(
            lambda x, y: (x[0] + y[0], x[1] + y[1])
        )
        # Chia mỗi phần tử trong mảng cho số lượng phần tử thuộc từng cụm để lấy mảng chứa trung bình từng phần tử của các điểm trong cụm
        # Thực hiện với dữ liệu của tất cả các điểm tại 1 nơi duy nhất
        new_centroids = remapping.map(lambda x: x[1][0] / x[1][1]).collect()

        # Kiểm tra xem thuật toán đã hội tụ hay chưa
        if has_converged(centroids, new_centroids, threshold):
            print(f"Clustering has converged after {it + 1} iterations.")
            break

        # Cập nhật các điểm trung tâm
        centroids = new_centroids

    # Trả về vị trí của các điểm trung tâm và việc gán nhãn các điểm dữ liệu vào các cụm
    return centroids, assignments


## Compute Silhouette Score

In [10]:
def evaluate_squared_euclidean_silhouette(assignments_rdd):
    # Tính toán CSI (Centroid Squared Inertia) cho mỗi điểm
    rdd_with_csi = assignments_rdd.map(lambda assignment: (assignment[0], assignment[1], sum([x**2 for x in assignment[0]])))

    # Tính toán các giá trị tổng hợp cho từng cụm
    clusters_aggregate_values = rdd_with_csi.groupBy(lambda x: x[1]).mapValues(
        lambda values: (
            sum(value[0] for value in values),  # Tổng vector của các điểm trong cụm
            sum(value[2] for value in values),  # Tổng csi của các điểm trong cụm
            len(values)                         # Số lượng điểm trong cụm
        )
    )
    # Chuyển đổi kết quả thành từ điển để truyền tới các worker nodes
    clusters_map = clusters_aggregate_values.collectAsMap()
    broadcasted_clusters_map = sc.broadcast(clusters_map)
    def compute_silhouette(vector, cluster_id, csi):
        # Lấy thông tin của các cụm
        cluster_stats = broadcasted_clusters_map.value

        def compute_csi_diff(csi, point, cluster_stats):
            # Tính toán sự khác biệt CSI giữa điểm và cụm
            y_multiply_point = sum(cluster_stats[0] * point)
            return csi + cluster_stats[1] / cluster_stats[2] - 2 * y_multiply_point / cluster_stats[2]

        # Tính toán b (khoảng cách trung bình đến cụm gần nhất khác)
        min_other = float('inf')
        for c in cluster_stats:
            if c != cluster_id:
                sil = compute_csi_diff(csi, vector, cluster_stats[c])
                if sil < min_other:
                    min_other = sil
        # Tính toán a (khoảng cách trung bình đến các điểm khác trong cùng cụm)
        cluster_current_point = cluster_stats[cluster_id]
        cluster_sil = compute_csi_diff(csi, vector, cluster_current_point) * cluster_current_point[2] / (cluster_current_point[2] - 1)
        # Tính toán hệ số silhouette
        silhouette_coeff = 0.0
        if cluster_sil < min_other:
            silhouette_coeff = 1 - (cluster_sil / min_other)
        elif cluster_sil > min_other:
            silhouette_coeff = (min_other / cluster_sil) - 1
        return silhouette_coeff

    # Tính hệ số silhouette cho mỗi điểm
    rdd_with_silhouette = rdd_with_csi.map(lambda row: (row[0], row[1], row[2], compute_silhouette(row[0], row[1], row[2])))

    # Tính giá trị trung bình của hệ số silhouette
    return rdd_with_silhouette.map(lambda row: row[3]).mean()

In [11]:
rdd_data = read_data(DATA_PATH)

In [12]:
k=2
centroids, assignments = kmeans_clustering(rdd_data, k)

Clustering with k=2...:   0%|          | 0/100 [00:00<?, ?it/s]

Clustering has converged after 3 iterations.


In [13]:
evaluate_squared_euclidean_silhouette(assignments)

0.7066330036335957

## Find best k with all features

In [16]:
prev_score = None
count = 0
scores_on_k = []
for k in range (MIN_K, MAX_K):
    centroids, assignments = kmeans_clustering(rdd_data, k)

    t_start = time.time()
    score = evaluate_squared_euclidean_silhouette(assignments)
    t_end = time.time()

    print(f"silhouette score = {score} - time = {t_end - t_start}")
    print("=================================")
    scores_on_k.append(score)
    if prev_score is None:
        prev_score = score
    elif score <= prev_score and prev_score - score > 0.05:
        count += 1
    if count >= 2:
        break

Clustering with k=2...:   0%|          | 0/100 [00:00<?, ?it/s]

Clustering has converged after 6 iterations.
silhouette score = 0.7066330036335957 - time = 2.1034257411956787


Clustering with k=3...:   0%|          | 0/100 [00:00<?, ?it/s]

silhouette score = 0.5189459492276806 - time = 2.094111204147339


Clustering with k=4...:   0%|          | 0/100 [00:00<?, ?it/s]

silhouette score = 0.4202850440436096 - time = 2.7958850860595703


## Find best features on best k

In [None]:
features_to_drop = [
    [1], # drop work_experience   CASE 0
    [1, 2], # drop work_experience, family_size   CASE 1
    [1, 2, 4], # drop work_experience, family_size, ever_married    CASE 2
    [1, 2, 4, 6] # drop work_experience, family_size, ever_married, profession  CASE 3
]
best_score = max(scores_on_k)

best_case = 0
idx = 0

for case in tqdm(features_to_drop):
    print(f"Computing on case {idx} ...")
    rdd_data_drop = rdd_data.map(lambda x: np.array([x[i] for i in range(len(x)) if i not in case]))
    centroids, assignments = kmeans_clustering(rdd_data_drop, k=2)

    t_start = time.time()
    score = evaluate_squared_euclidean_silhouette(assignments)
    t_end = time.time()

    if score > best_score:
        best_case = idx
        best_score = score

    print(f"Case {idx}: silhouette score = {score} - time = {t_end - t_start}")
    print("****************************************************************")
    print()
    idx += 1

print("======================================================================")
print("======================================================================")
print(f"Best Silhouette Score is {best_score} for case {best_case}")

  0%|          | 0/4 [00:00<?, ?it/s]

Computing on case 0 ...


Clustering with k=2...:   0%|          | 0/100 [00:00<?, ?it/s]

Clustering has converged after 4 iterations.
Case 0: silhouette score = 0.7387134289382318 - time = 1.9734184741973877
****************************************************************

Computing on case 1 ...


Clustering with k=2...:   0%|          | 0/100 [00:00<?, ?it/s]

Clustering has converged after 4 iterations.
Case 1: silhouette score = 0.7766886438392535 - time = 1.9975709915161133
****************************************************************

Computing on case 2 ...


Clustering with k=2...:   0%|          | 0/100 [00:00<?, ?it/s]

Clustering has converged after 3 iterations.
Case 2: silhouette score = 0.7884810063536986 - time = 2.0746729373931885
****************************************************************

Computing on case 3 ...


Clustering with k=2...:   0%|          | 0/100 [00:00<?, ?it/s]

Clustering has converged after 10 iterations.
Case 3: silhouette score = 0.517118688885958 - time = 1.9580156803131104
****************************************************************

Best Silhouette Score is 0.7884810063536986 for case 2


In [None]:
best_score, features_to_drop[best_case]

(0.7884810063536986, [1, 2, 4])

In [None]:
rdd_data_drop = rdd_data.map(lambda x: np.array([x[i] for i in range(len(x)) if i not in features_to_drop[best_case]]))
best_centroids, best_assignments = kmeans_clustering(rdd_data_drop, k=2)

Clustering with k=2...:   0%|          | 0/100 [00:00<?, ?it/s]

Clustering has converged after 3 iterations.


In [None]:
best_assignments.coalesce(1).saveAsTextFile("./segmentation_result_final")

## Best case

K = 2

Features = ["Age", "Gender", "Graduated", "Profession", "Spending_Score"]

Best score = 0.7884810063536986

## Compare to pyspark ml

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType

# Tạo SparkSession
spark = SparkSession.builder.appName("SilhouetteScore").getOrCreate()

# Chuyển đổi assignments_rdd thành DataFrame với kiểu dữ liệu đúng
rdd_with_index = best_assignments.map(lambda x: Row(features=Vectors.dense(x[0]), prediction=int(x[1])))

# Định nghĩa schema rõ ràng cho DataFrame
schema = StructType([
    StructField("features", VectorUDT(), False),
    StructField("prediction", IntegerType(), False)
])

df = spark.createDataFrame(rdd_with_index, schema)

# Tạo ClusteringEvaluator
evaluator = ClusteringEvaluator(featuresCol='features', predictionCol='prediction', metricName='silhouette', distanceMeasure='squaredEuclidean')

# Tính toán chỉ số Silhouette
silhouette_score = evaluator.evaluate(df)

print(f"Silhouette Score: {silhouette_score}")


Silhouette Score: 0.7884810063537133


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.clustering import KMeans
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField

# Tạo SparkSession
spark = SparkSession.builder.appName("SilhouetteScore").getOrCreate()

# Giả sử best_assignments đã được định nghĩa
# best_assignments = ...

# Chuyển đổi assignments_rdd thành DataFrame với kiểu dữ liệu đúng
rdd_with_index = best_assignments.map(lambda x: Row(features=Vectors.dense(x[0])))

# Định nghĩa schema rõ ràng cho DataFrame
schema = StructType([
    StructField("features", VectorUDT(), False)
])

df = spark.createDataFrame(rdd_with_index, schema)

In [None]:
kmeans = KMeans(k=2, maxIter= 100,tol= 0.01, seed=1, featuresCol='features', predictionCol='prediction')
model = kmeans.fit(df)

# Dự đoán kết quả phân cụm
predictions = model.transform(df)

# Tạo ClusteringEvaluator
evaluator = ClusteringEvaluator(featuresCol='features', predictionCol='prediction', metricName='silhouette', distanceMeasure='squaredEuclidean')

# Tính toán chỉ số Silhouette
silhouette_score = evaluator.evaluate(predictions)

print(f"Silhouette Score: {silhouette_score}")

Silhouette Score: 0.7884810063537133


In [None]:
spark.stop()

## end