In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import pandas as pd
import os

SOURCE_CSV_FILE = os.path.abspath("./datasets/rangalamahesh_bank_churn.csv")
# Khởi tạo Spark session
spark = SparkSession.builder.appName("AnomalyDetection").getOrCreate()

# Đọc dữ liệu
df = spark.read.csv(SOURCE_CSV_FILE, header=True, inferSchema=True)

# Hiển thị schema của dữ liệu
print("Schema của dữ liệu:")
df.printSchema()

# Hiển thị một số dòng dữ liệu
print("Một số dòng dữ liệu ban đầu:")
df.show(5)

# Lựa chọn các cột đặc trưng
feature_cols = ["EstimatedSalary", "Balance"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

# Hiển thị schema sau khi thêm cột đặc trưng (features)
print("Schema sau khi thêm cột đặc trưng (features):")
df.printSchema()

# Hiển thị một số dòng dữ liệu sau khi thêm cột features
print("Dữ liệu sau khi thêm cột đặc trưng (features):")
df.select("features").show(5, truncate=False)

# Huấn luyện mô hình K-means
kmeans = KMeans(featuresCol="features", k=2)
model = kmeans.fit(df)

# Hiển thị tọa độ của các centroid
print("Tọa độ của các centroid:")
for center in model.clusterCenters():
    print(center)

# Dự đoán cụm
predictions = model.transform(df)

# Hiển thị một số dự đoán
print("Một số dự đoán cụm:")
predictions.select("features", "prediction").show(5, truncate=False)

# Đánh giá mô hình
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette with squared euclidean distance: {silhouette}")

# Hiển thị số lượng mẫu trong mỗi cụm
cluster_sizes = predictions.groupBy("prediction").count().toPandas()
print("Số lượng mẫu trong mỗi cụm:")
print(cluster_sizes)

# Xác định cụm bất thường dựa trên số lượng mẫu trong mỗi cụm (cụm có số lượng mẫu nhỏ nhất)
anomalous_cluster = cluster_sizes.loc[cluster_sizes['count'].idxmin()]['prediction']
print(f"Cụm bất thường: {anomalous_cluster}")

# Hiển thị các khách hàng thuộc cụm bất thường
print(f"Khách hàng bất thường trong cụm {anomalous_cluster}:")
anomalies = predictions.filter(predictions.prediction == anomalous_cluster)
anomalies.select("CustomerId", "Surname", "CreditScore", "EstimatedSalary", "Balance").show(truncate=False)

# Dừng Spark session
spark.stop()

Schema của dữ liệu:
root
 |-- id: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- HasCrCard: double (nullable = true)
 |-- IsActiveMember: double (nullable = true)
 |-- EstimatedSalary: double (nullable = true)

Một số dòng dữ liệu ban đầu:
+------+----------+---------+-----------+---------+------+----+------+---------+-------------+---------+--------------+---------------+
|    id|CustomerId|  Surname|CreditScore|Geography|Gender| Age|Tenure|  Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|
+------+----------+---------+-----------+---------+------+----+------+---------+-------------+---------+--------------+---------------+
|165