In [None]:
# prompt: kết nối với drive

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from pyspark.sql import SparkSession, functions as F
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import pandas as pd
from pyspark.sql.functions import max, datediff, countDistinct, sum as Fsum, col
import pyspark.sql.functions as F
import numpy as np

# Tạo Spark session
spark = SparkSession.builder.appName("SklearnIsolationForest").getOrCreate()

# Ví dụ: DataFrame đã có RFM
df = spark.read.csv("/content/df_cleaned.csv", header=True, inferSchema=True)

# Ngày phân tích
snapshot_date = df.agg(max("InvoiceDate")).collect()[0][0]

rfm_df = df.groupBy("Customer ID").agg(
    datediff(
        F.lit(snapshot_date), max("InvoiceDate")
    ).alias("Recency"),
    countDistinct("Invoice").alias("Frequency"),
    Fsum(col("Quantity") * col("Price")).alias("Monetary")
)

rfm_df.head(5)


+-----------+--------------------+
|Customer ID|       anomaly_score|
+-----------+--------------------+
|      17389|-0.11653326934861896|
|      12471|-0.09875141847319668|
|      13468|-0.03153693353814502|
|      17677| -0.0829755567064091|
|      15633|-0.00331344735277...|
|      16839|-0.02669899626819605|
|      14160|-0.03548451206342...|
|      16684|-0.14752622798916393|
|      16553|-0.03874064105462...|
|      13081|-0.06758719533674828|
|      14096|-0.05190747208111435|
|      17448|-0.10995273509742787|
|      17811|-1.10587272937445...|
|      12709|-0.02900216441663428|
|      13694|-0.18475879894870806|
|      15856|-0.07609034705546469|
|      14606|-0.12345894764356402|
|      13098|-0.00209909688834...|
|      18102|-0.19085671853794295|
|      12980|-0.00901289222468...|
+-----------+--------------------+
only showing top 20 rows



In [None]:
# Bước 2: Chuyển Spark DataFrame → Pandas
rfm_pd = rfm_df.dropna().toPandas()

# Bước 3: Scale dữ liệu bằng StandardScaler
scaler = StandardScaler()
scaled_features = scaler.fit_transform(rfm_pd[["Recency", "Frequency", "Monetary"]])

# Bước 4: Chạy Isolation Forest
isoforest = IsolationForest(n_estimators=100, contamination=0.02, random_state=42)
rfm_pd["anomaly"] = isoforest.fit_predict(scaled_features)
rfm_pd["anomaly_score"] = isoforest.decision_function(scaled_features)


# Bước 5: Chuyển lại Spark DataFrame nếu cần
df_anomaly = spark.createDataFrame(rfm_pd)

# Xem top 10 khách hàng bất thường
df_anomaly.filter("anomaly == -1").select("Customer ID", "anomaly_score").show(10)

+-----------+--------------------+
|Customer ID|       anomaly_score|
+-----------+--------------------+
|      17389|-0.11653326934861896|
|      12471|-0.09875141847319668|
|      13468|-0.03153693353814502|
|      17677| -0.0829755567064091|
|      15633|-0.00331344735277...|
|      16839|-0.02669899626819605|
|      14160|-0.03548451206342...|
|      16684|-0.14752622798916393|
|      16553|-0.03874064105462...|
|      13081|-0.06758719533674828|
+-----------+--------------------+
only showing top 10 rows



In [None]:
# prompt: Ngưỡng anomaly_score để bị coi là bất thường

anomaly_threshold = df_anomaly.filter("anomaly == -1").select(F.min("anomaly_score")).collect()[0][0]
print(f"Ngưỡng anomaly_score để bị coi là bất thường: {anomaly_threshold}")

Ngưỡng anomaly_score để bị coi là bất thường: -0.1953201614148805


In [None]:
# prompt:  hiển thị top 10 khách hàng bất thường dựa trên điểm anomaly_score tăng dần

df_anomaly.filter("anomaly == -1").select("Customer ID", "anomaly_score").orderBy(F.asc("anomaly_score")).show(10)

+-----------+--------------------+
|Customer ID|       anomaly_score|
+-----------+--------------------+
|      14911| -0.1953201614148805|
|      14646| -0.1930854200738863|
|      14156|-0.19197031535154985|
|      18102|-0.19085671853794295|
|      13694|-0.18475879894870806|
|      15311|-0.18255260165851595|
|      13089| -0.1798032381461453|
|      15061|-0.17433233534588222|
|      16029|-0.17433233534588222|
|      17850|-0.16662298655765873|
+-----------+--------------------+
only showing top 10 rows



In [None]:
# prompt: hiển thị top 10 khách hàng bất thường dựa trên điểm anomaly_score giảm dần

df_anomaly.filter("anomaly == -1").select("Customer ID", "anomaly_score").orderBy(F.desc("anomaly_score")).show(10)


+-----------+--------------------+
|Customer ID|       anomaly_score|
+-----------+--------------------+
|      17811|-1.10587272937445...|
|      16746|-4.24577771667933...|
|      14258|-0.00109884082548...|
|      13098|-0.00209909688834...|
|      14028|-0.00217298378235...|
|      15633|-0.00331344735277...|
|      18087|-0.00383009960012...|
|      14849|-0.00395827370890...|
|      14590|-0.00421969064868...|
|      14051|-0.00597922920559002|
+-----------+--------------------+
only showing top 10 rows



In [None]:
# prompt: # Đếm số lượng khách hàng bất thường  # Đếm tổng số khách hàng
# # Tỷ lệ khách hàng bất thường # Hiển thị các khách hàng bất thường

# Đếm số lượng khách hàng bất thường
num_anomalies = df_anomaly.filter("anomaly == -1").count()

# Đếm tổng số khách hàng
total_customers = df_anomaly.count()

# Tỷ lệ khách hàng bất thường
anomaly_ratio = (num_anomalies / total_customers)

print(f"Số lượng khách hàng bất thường: {num_anomalies}")
print(f"Tổng số khách hàng: {total_customers}")
print(f"Tỷ lệ khách hàng bất thường: {anomaly_ratio:.2f}%")

# Hiển thị các khách hàng bất thường (đã được thực hiện ở trên, nhưng lặp lại để rõ ràng)
print("\nCác khách hàng bất thường:")
df_anomaly.filter("anomaly == -1").select("Customer ID", "Recency", "Frequency", "Monetary", "anomaly_score").show(truncate=False)


Số lượng khách hàng bất thường: 118
Tổng số khách hàng: 5863
Tỷ lệ khách hàng bất thường: 0.02%

Các khách hàng bất thường:
+-----------+-------+---------+------------------+--------------------+
|Customer ID|Recency|Frequency|          Monetary|       anomaly_score|
+-----------+-------+---------+------------------+--------------------+
|      17389|      0|       61| 57224.67999999999|-0.11653326934861896|
|      12471|      2|       76| 39873.79000000001|-0.09875141847319668|
|      13468|      1|       72|          13390.51|-0.03153693353814502|
|      17677|      1|       54|          36692.91| -0.0829755567064091|
|      15633|    509|       13| 4352.459999999999|-0.00331344735277...|
|      16839|      8|       44|          22310.49|-0.02669899626819605|
|      14160|    610|        7|           8421.47|-0.03548451206342...|
|      16684|      4|       55|147142.77000000002|-0.14752622798916393|
|      16553|    163|       33|16644.010000000002|-0.03874064105462...|
|      13081

In [None]:
# prompt: thống kê mô tả khách hàng bất thường

# Thống kê mô tả cho các khách hàng bất thường
print("\nThống kê mô tả cho các khách hàng bất thường:")
df_anomaly.filter("anomaly == -1").select("Recency", "Frequency", "Monetary").describe().show()

# Thống kê mô tả cho tất cả khách hàng (để so sánh)
print("\nThống kê mô tả cho tất cả khách hàng:")
df_anomaly.select("Recency", "Frequency", "Monetary").describe().show()



Thống kê mô tả cho các khách hàng bất thường:
+-------+-----------------+-----------------+------------------+
|summary|          Recency|        Frequency|          Monetary|
+-------+-----------------+-----------------+------------------+
|  count|              118|              118|               118|
|   mean|            101.0|60.73728813559322| 56263.37934745763|
| stddev|187.6095235677905|58.78011000971225| 86282.31685117523|
|    min|                0|                1|2995.5400000000004|
|    max|              691|              379| 608821.6499999997|
+-------+-----------------+-----------------+------------------+


Thống kê mô tả cho tất cả khách hàng:
+-------+------------------+------------------+-----------------+
|summary|           Recency|         Frequency|         Monetary|
+-------+------------------+------------------+-----------------+
|  count|              5863|              5863|             5863|
|   mean|200.44567627494456| 6.273409517311956|3000.317688555348