In [1]:
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import col
# from pyspark.ml.feature import StandardScaler, VectorAssembler
# from pyspark.ml.classification import RandomForestClassifier
# from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# # Tạo SparkSession
# spark = SparkSession.builder.appName("ChurnPrediction").getOrCreate()

# # Đọc dữ liệu
# df = spark.read.csv("C:/Users/PC/Desktop/Do_an_Big_data/PythonCodes/work/data/Churn_Modelling_FE.csv", header=True, inferSchema=True)

# # Loại bỏ các cột không cần thiết
# df = df.drop("RowNumber", "CustomerId", "Surname")

# # Chuyển đổi cột đầu ra thành kiểu số nguyên
# df = df.withColumn("Exited", col("Exited").cast("integer"))

# # Kết hợp các đặc trưng thành một vector
# feature_cols = [col for col in df.columns if col != "Exited"]
# assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
# df = assembler.transform(df)

# # Chuẩn hóa dữ liệu
# scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
# scaler_model = scaler.fit(df)
# df = scaler_model.transform(df)

# # Chia tập train/test
# train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# # Huấn luyện mô hình Random Forest
# rf = RandomForestClassifier(featuresCol="scaled_features", labelCol="Exited", numTrees=100)
# rf_model = rf.fit(train_data)

# # Lưu mô hình
# rf_model.write().overwrite().save("C:/Users/PC/Desktop/Do_an_Big_data/PythonCodes/work/models/random_forest_on_sparkml")

# # Dự đoán trên tập test
# predictions = rf_model.transform(test_data)

# # Hiển thị kết quả
# display_cols = ["Exited", "rawPrediction", "probability", "prediction"]
# predictions.select(display_cols).show(10, False)

# # Đánh giá mô hình
# accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction", metricName="accuracy")
# precision_evaluator = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction", metricName="weightedPrecision")
# recall_evaluator = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction", metricName="weightedRecall")
# f1_evaluator = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction", metricName="f1")
# binary_auc_evaluator = BinaryClassificationEvaluator(labelCol="Exited", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

# # Tính toán các chỉ số
# accuracy = accuracy_evaluator.evaluate(predictions)
# precision = precision_evaluator.evaluate(predictions)
# recall = recall_evaluator.evaluate(predictions)
# f1_score = f1_evaluator.evaluate(predictions)
# auc_roc = binary_auc_evaluator.evaluate(predictions)

# # In kết quả
# print(f"Accuracy: {accuracy:.4f}")
# print(f"Precision: {precision:.4f}")
# print(f"Recall: {recall:.4f}")
# print(f"F1 Score: {f1_score:.4f}")
# print(f"AUC-ROC: {auc_roc:.4f}")


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Tạo SparkSession
spark = SparkSession.builder.appName("ChurnPrediction").getOrCreate()

# Đọc dữ liệu
df = spark.read.csv("C:/Users/PC/Desktop/Do_an_Big_data/PythonCodes/work/data/Churn_Modelling_FE.csv", header=True, inferSchema=True)

# Loại bỏ các cột không cần thiết
df = df.drop("RowNumber", "CustomerId", "Surname")

# Chuyển đổi cột đầu ra thành kiểu số nguyên
df = df.withColumn("Exited", col("Exited").cast("integer"))

# Kiểm tra phân phối lớp
print("Original Class distribution:")
df.groupBy("Exited").count().show()

# Tách dữ liệu thành hai lớp
df_majority = df.filter(col("Exited") == 0)  # Lớp đa số
df_minority = df.filter(col("Exited") == 1)  # Lớp thiểu số

# Oversampling lớp thiểu số
fraction_minority = 2.0  # Nhân đôi số lượng mẫu lớp 1
df_minority_oversampled = df_minority.sample(withReplacement=True, fraction=fraction_minority, seed=42)

# Undersampling lớp đa số
fraction_majority = 0.5  # Giảm một nửa số lượng mẫu lớp 0
df_majority_undersampled = df_majority.sample(withReplacement=False, fraction=fraction_majority, seed=42)

# Kết hợp lại dữ liệu
df_balanced = df_majority_undersampled.union(df_minority_oversampled)

# Kiểm tra phân phối lớp sau khi cân bằng
print("Balanced Class distribution:")
df_balanced.groupBy("Exited").count().show()

# Kết hợp các đặc trưng thành một vector
feature_cols = [col for col in df_balanced.columns if col != "Exited"]
print("Feature columns used in training:", feature_cols)
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_balanced = assembler.transform(df_balanced)

# Chuẩn hóa dữ liệu
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
scaler_model = scaler.fit(df_balanced)
df_balanced = scaler_model.transform(df_balanced)

# Chia tập train/test
train_data, test_data = df_balanced.randomSplit([0.8, 0.2], seed=42)

# Lưu tập test vào file CSV (loại bỏ cột features và scaled_features)
test_output_path = "C:/Users/PC/Desktop/Do_an_Big_data/PythonCodes/work/data/test_data.csv"
test_data_for_csv = test_data.drop("features", "scaled_features")
test_data_for_csv.coalesce(1).write.mode("overwrite").csv(test_output_path, header=True)
print(f"Test data saved to {test_output_path}")

# Huấn luyện mô hình Random Forest
rf = RandomForestClassifier(featuresCol="scaled_features", labelCol="Exited", seed=42)

# Tạo lưới tham số để tối ưu hóa
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

# Sử dụng CrossValidator để chọn mô hình tốt nhất
evaluator = BinaryClassificationEvaluator(labelCol="Exited", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3, seed=42)
cv_model = cv.fit(train_data)

# Lấy mô hình tốt nhất
rf_model = cv_model.bestModel

# Lưu mô hình
rf_model.write().overwrite().save("C:/Users/PC/Desktop/Do_an_Big_data/PythonCodes/work/models/random_forest_on_sparkml")

# Dự đoán trên tập test
predictions = rf_model.transform(test_data)

# Hiển thị kết quả
display_cols = ["Exited", "rawPrediction", "probability", "prediction"]
predictions.select(display_cols).show(10, False)

# Đánh giá mô hình
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction", metricName="accuracy")
precision_evaluator = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction", metricName="weightedPrecision")
recall_evaluator = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction", metricName="weightedRecall")
f1_evaluator = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction", metricName="f1")
binary_auc_evaluator = BinaryClassificationEvaluator(labelCol="Exited", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

# Tính toán các chỉ số
accuracy = accuracy_evaluator.evaluate(predictions)
precision = precision_evaluator.evaluate(predictions)
recall = recall_evaluator.evaluate(predictions)
f1_score = f1_evaluator.evaluate(predictions)
auc_roc = binary_auc_evaluator.evaluate(predictions)

# In kết quả
print(f"Best Model Parameters: numTrees={rf_model.getNumTrees}, maxDepth={rf_model.getMaxDepth()}")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1_score:.4f}")
print(f"AUC-ROC: {auc_roc:.4f}")

# Dừng SparkSession
spark.stop()

Original Class distribution:
+------+-----+
|Exited|count|
+------+-----+
|     1| 1891|
|     0| 7677|
+------+-----+

Balanced Class distribution:
+------+-----+
|Exited|count|
+------+-----+
|     0| 3912|
|     1| 3757|
+------+-----+

Feature columns used in training: ['CreditScore', 'Age', 'Tenure', 'Balance', 'NumOfProducts', 'HasCrCard', 'IsActiveMember', 'EstimatedSalary', 'BalanceSalary', 'TenureAge', 'ScoreAge', 'tenure_age', 'tenure_salary', 'score_age', 'score_salary', 'newAge', 'newCreditScore', 'AgeScore', 'BalanceScore', 'SalaryScore', 'newEstimatedSalary', 'score_balance', 'age_balance', 'balance_salary', 'age_hascrcard', 'product_utilization_rate_by_year', 'product_utilization_rate_by_salary', 'countries_monthly_average_salaries', 'Germany', 'Spain', 'Female', 'Male']
Test data saved to C:/Users/PC/Desktop/Do_an_Big_data/PythonCodes/work/data/test_data.csv
+------+--------------------------------------+----------------------------------------+----------+
|Exited|rawPr

In [6]:
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import col, count, when
# from pyspark.ml.feature import StandardScaler, VectorAssembler
# from pyspark.ml.classification import RandomForestClassifier
# from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# # Tạo SparkSession
# spark = SparkSession.builder.appName("ChurnPrediction").getOrCreate()

# # Đọc dữ liệu
# df = spark.read.csv("C:/Users/PC/Desktop/Do_an_Big_data/PythonCodes/work/data/Churn_Modelling_FE.csv", header=True, inferSchema=True)

# # Xóa cột không cần thiết
# df = df.drop("RowNumber", "CustomerId", "Surname")

# # Chuyển đổi cột nhãn thành số nguyên
# df = df.withColumn("Exited", col("Exited").cast("integer"))

# # Kiểm tra phân bố dữ liệu trước khi cân bằng
# df.groupBy("Exited").count().show()

# # **CÂN BẰNG DỮ LIỆU** (UnderSampling lớp 1 & OverSampling lớp 0)
# count_class_0 = df.filter(df.Exited == 0).count()
# count_class_1 = df.filter(df.Exited == 1).count()

# ratio = count_class_0 / count_class_1

# if ratio > 1:  # Nếu lớp 0 nhiều hơn, lấy mẫu ngẫu nhiên giảm bớt lớp 0
#     df_majority = df.filter(df.Exited == 0).sample(fraction=1/ratio, seed=42)
#     df_minority = df.filter(df.Exited == 1)
# elif ratio < 1:  # Nếu lớp 1 nhiều hơn, lấy mẫu ngẫu nhiên giảm bớt lớp 1
#     df_majority = df.filter(df.Exited == 1).sample(fraction=ratio, seed=42)
#     df_minority = df.filter(df.Exited == 0)
# else:
#     df_majority = df.filter(df.Exited == 0)
#     df_minority = df.filter(df.Exited == 1)

# df_balanced = df_majority.union(df_minority)

# # Kiểm tra lại phân bố dữ liệu sau khi cân bằng
# df_balanced.groupBy("Exited").count().show()

# # **Chuẩn bị đặc trưng**
# feature_cols = [col for col in df_balanced.columns if col != "Exited"]
# assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
# df_features = assembler.transform(df_balanced)

# # **Chuẩn hóa dữ liệu**
# scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
# scaler_model = scaler.fit(df_features)
# df_scaled = scaler_model.transform(df_features)

# # **Chia tập train/test**
# train_data, test_data = df_scaled.randomSplit([0.8, 0.2], seed=42)

# # **Huấn luyện mô hình Random Forest**
# rf = RandomForestClassifier(featuresCol="scaled_features", labelCol="Exited", numTrees=100, maxDepth=12, minInstancesPerNode=10, featureSubsetStrategy="sqrt")
# rf_model = rf.fit(train_data)

# # **Lưu mô hình**
# rf_model.write().overwrite().save("C:/Users/PC/Desktop/Do_an_Big_data/PythonCodes/work/models/rf_model")

# # **Dự đoán trên tập test**
# predictions = rf_model.transform(test_data)
# predictions.select("Exited", "prediction").show(10)

# # **Đánh giá mô hình**
# accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction", metricName="accuracy")
# accuracy = accuracy_evaluator.evaluate(predictions)

# precision_evaluator = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction", metricName="precisionByLabel")
# precision_0 = precision_evaluator.evaluate(predictions, {precision_evaluator.metricLabel: 0})
# precision_1 = precision_evaluator.evaluate(predictions, {precision_evaluator.metricLabel: 1})

# recall_evaluator = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction", metricName="recallByLabel")
# recall_0 = recall_evaluator.evaluate(predictions, {recall_evaluator.metricLabel: 0})
# recall_1 = recall_evaluator.evaluate(predictions, {recall_evaluator.metricLabel: 1})

# f1_evaluator = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction", metricName="f1")
# f1_score = f1_evaluator.evaluate(predictions)

# auc_evaluator = BinaryClassificationEvaluator(labelCol="Exited", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
# auc = auc_evaluator.evaluate(predictions)

# # In kết quả
# print(f"Accuracy: {accuracy:.4f}")
# print(f"Precision (0): {precision_0:.4f}, Precision (1): {precision_1:.4f}")
# print(f"Recall (0): {recall_0:.4f}, Recall (1): {recall_1:.4f}")
# print(f"F1 Score: {f1_score:.4f}")
# print(f"AUC Score: {auc:.4f}")


+------+-----+
|Exited|count|
+------+-----+
|     1| 1891|
|     0| 7677|
+------+-----+

+------+-----+
|Exited|count|
+------+-----+
|     0| 1975|
|     1| 1891|
+------+-----+

+------+----------+
|Exited|prediction|
+------+----------+
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       1.0|
|     0|       0.0|
|     0|       1.0|
|     0|       1.0|
|     0|       1.0|
+------+----------+
only showing top 10 rows

Accuracy: 0.7531
Precision (0): 0.7183, Precision (1): 0.7940
Recall (0): 0.8040, Recall (1): 0.7056
F1 Score: 0.7528
AUC Score: 0.8374


In [7]:
# from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# from pyspark.ml.classification import RandomForestClassifier

# # **CÂN BẰNG DỮ LIỆU** (Using SMOTE or other techniques if necessary)

# # **Chuẩn bị đặc trưng**
# feature_cols = [col for col in df_balanced.columns if col != "Exited"]
# assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
# df_features = assembler.transform(df_balanced)

# # **Chuẩn hóa dữ liệu**
# scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
# scaler_model = scaler.fit(df_features)
# df_scaled = scaler_model.transform(df_features)

# # **Chia tập train/test**
# train_data, test_data = df_scaled.randomSplit([0.8, 0.2], seed=42)

# # **Tuning the Random Forest Classifier**
# rf = RandomForestClassifier(featuresCol="scaled_features", labelCol="Exited")

# # Define the parameter grid for hyperparameter tuning
# paramGrid = (ParamGridBuilder()
#              .addGrid(rf.numTrees, [50, 100, 150])
#              .addGrid(rf.maxDepth, [5, 10, 15])
#              .addGrid(rf.minInstancesPerNode, [1, 2, 5])
#              .addGrid(rf.featureSubsetStrategy, ["auto", "sqrt", "log2"])
#              .build())

# # Cross-validation for better model selection
# evaluator = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction", metricName="accuracy")
# cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# # Fit the model with cross-validation
# cv_model = cv.fit(train_data)

# # Best Model after Cross-validation
# best_rf_model = cv_model.bestModel

# # **Dự đoán trên tập test**
# predictions = best_rf_model.transform(test_data)
# predictions.select("Exited", "prediction").show(10)

# # **Đánh giá mô hình**
# accuracy = evaluator.evaluate(predictions)
# precision_evaluator = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction", metricName="precisionByLabel")
# precision_0 = precision_evaluator.evaluate(predictions, {precision_evaluator.metricLabel: 0})
# precision_1 = precision_evaluator.evaluate(predictions, {precision_evaluator.metricLabel: 1})

# recall_evaluator = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction", metricName="recallByLabel")
# recall_0 = recall_evaluator.evaluate(predictions, {recall_evaluator.metricLabel: 0})
# recall_1 = recall_evaluator.evaluate(predictions, {recall_evaluator.metricLabel: 1})

# f1_evaluator = MulticlassClassificationEvaluator(labelCol="Exited", predictionCol="prediction", metricName="f1")
# f1_score = f1_evaluator.evaluate(predictions)

# auc_evaluator = BinaryClassificationEvaluator(labelCol="Exited", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
# auc = auc_evaluator.evaluate(predictions)

# # **In kết quả**
# print(f"Accuracy: {accuracy:.4f}")
# print(f"Precision (0): {precision_0:.4f}, Precision (1): {precision_1:.4f}")
# print(f"Recall (0): {recall_0:.4f}, Recall (1): {recall_1:.4f}")
# print(f"F1 Score: {f1_score:.4f}")
# print(f"AUC Score: {auc:.4f}")


+------+----------+
|Exited|prediction|
+------+----------+
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       1.0|
|     0|       0.0|
|     0|       1.0|
|     0|       1.0|
|     0|       1.0|
+------+----------+
only showing top 10 rows

Accuracy: 0.7517
Precision (0): 0.7198, Precision (1): 0.7882
Recall (0): 0.7955, Recall (1): 0.7109
F1 Score: 0.7515
AUC Score: 0.8364
