# Phân loại dữ liệu dạng bảng nhị phân với PySpark

In [1]:
import findspark
findspark.init('D:\Learning\XuLyDuLieu\spark-3.5.3-bin-hadoop3')

In [2]:
import pandas as pd
import numpy as np
from datetime import date, timedelta, datetime
import time

import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import * 
from pyspark.sql.types import * 

### 1. Tải dữ liệu

In [87]:
spark = SparkSession.builder.appName('imbalanced_binary_classification').getOrCreate()

In [88]:
spark

In [89]:
file_location = "./census.csv"
file_type = "csv"

infer_schema = "true"
first_row_is_header = "False"
delimiter = ","


df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location) \
  .toDF("age", "workClass", "fnlwgt", "education", "education-num","marital-status", "occupation", "relationship",
        "race", "sex", "capital-gain", "capital-loss", "hours-per-week", "native-country", "income")

In [90]:
df.show()

+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|age|       workClass|fnlwgt|   education|education-num|      marital-status|       occupation| relationship|              race|   sex|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
| 39|       State-gov| 77516|   Bachelors|           13|       Never-married|     Adm-clerical|Not-in-family|             White|  Male|        2174|           0|            40| United-States| <=50K|
| 50|Self-emp-not-inc| 83311|   Bachelors|           13|  Married-civ-spouse|  Exec-managerial|      Husband|             White|  Male|           0|           0|            13| United-States| <=50K|
| 38|

### 2. Tiền xử lý dữ liệu

In [91]:
from pyspark.sql import functions as F

df = df.withColumn('>50K', F.when(df.income == '<=50K', 0).otherwise(1))
df = df.drop('income')
df.columns

['age',
 'workClass',
 'fnlwgt',
 'education',
 'education-num',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 '>50K']

In [92]:
categorical_columns = [
 'workClass',
 'education',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'hours-per-week',
 'native-country',
 ]

In [93]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import (DecisionTreeClassifier, GBTClassifier, RandomForestClassifier)
from pyspark.ml.evaluation import BinaryClassificationEvaluator

indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in categorical_columns]
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers]

In [94]:
categorical_encoded = [encoder.getOutputCol() for encoder in encoders]
numerical_columns = ['age', 'education-num', 'capital-gain', 'capital-loss']
inputcols = categorical_encoded + numerical_columns
assembler = VectorAssembler(inputCols=inputcols, outputCol="features")

In [95]:
pipeline = Pipeline(stages=indexers + encoders+[assembler])
model = pipeline.fit(df)
transformed = model.transform(df)
display(transformed)

DataFrame[age: int, workClass: string, fnlwgt: int, education: string, education-num: int, marital-status: string, occupation: string, relationship: string, race: string, sex: string, capital-gain: int, capital-loss: int, hours-per-week: int, native-country: string, >50K: int, workClass_indexed: double, education_indexed: double, marital-status_indexed: double, occupation_indexed: double, relationship_indexed: double, race_indexed: double, sex_indexed: double, hours-per-week_indexed: double, native-country_indexed: double, workClass_indexed_encoded: vector, education_indexed_encoded: vector, marital-status_indexed_encoded: vector, occupation_indexed_encoded: vector, relationship_indexed_encoded: vector, race_indexed_encoded: vector, sex_indexed_encoded: vector, hours-per-week_indexed_encoded: vector, native-country_indexed_encoded: vector, features: vector]

In [96]:
final_data = transformed.select('features', '>50K')

### 3. Khai bao model

In [97]:
# Decision Trees
# Random Forests
# Gradient Boosted Trees

dtc = DecisionTreeClassifier(labelCol='>50K', featuresCol='features')

rfc = RandomForestClassifier(numTrees=150, labelCol='>50K', featuresCol='features')

gbt = GBTClassifier(labelCol='>50K', featuresCol='features', maxIter=10)

In [98]:
# Split data
train_data, test_data = final_data.randomSplit([0.8,0.2], seed=623)
print(train_data.count())
print(test_data.count())

39100
9742


### 4. Huấn luyện mô hình

In [99]:
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)

### 5. Đánh giá mô hình

In [100]:
dtc_preds = dtc_model.transform(test_data)
rfc_preds = rfc_model.transform(test_data)
gbt_preds = gbt_model.transform(test_data)

In [101]:
my_eval = BinaryClassificationEvaluator(labelCol='>50K')

In [102]:
# Decision Tree evaluation metric
print('DTC AUC:')
print(my_eval.evaluate(dtc_preds))

DTC AUC:
0.6044418969198606


In [103]:
# Random Forest evaluation metric
print('RFC AUC:')
print(my_eval.evaluate(rfc_preds))

RFC AUC:
0.8914442213223257


In [104]:
# Gradient Boosting Tree evaluation metric
print('GBT AUC:')
print(my_eval.evaluate(gbt_preds))

GBT AUC:
0.9056879327977847


### 6. Cải thiện mô hình

In [107]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [2, 4, 6])
             .addGrid(gbt.maxBins, [20, 40, 60])
             .addGrid(gbt.maxIter, [10, 20, 30, 40])
             .build())

In [108]:
def custom_cross_validation(estimator, paramGrid, evaluator, train_data, numFolds=5):
    metrics = []
    splits = train_data.randomSplit([1.0 / numFolds] * numFolds, seed=623)

    for i in range(numFolds):
        test_fold = splits[i]
        train_fold = train_data.subtract(test_fold)

        for param_map in paramGrid:
            model = estimator.copy(param_map).fit(train_fold)
            predictions = model.transform(test_fold)
            metric = evaluator.evaluate(predictions)
            metrics.append((param_map, metric, i))
    return metrics

metrics = custom_cross_validation(gbt, paramGrid, my_eval, train_data, numFolds=5)

In [109]:
metrics_df = pd.DataFrame(metrics, columns=["param_map", "metric", "fold"])
print(metrics_df.head())


                                           param_map    metric  fold
0  {GBTClassifier_12dc90b440e3__maxDepth: 2, GBTC...  0.890046     0
1  {GBTClassifier_12dc90b440e3__maxDepth: 2, GBTC...  0.895798     0
2  {GBTClassifier_12dc90b440e3__maxDepth: 2, GBTC...  0.899908     0
3  {GBTClassifier_12dc90b440e3__maxDepth: 2, GBTC...  0.902037     0
4  {GBTClassifier_12dc90b440e3__maxDepth: 2, GBTC...  0.890975     0


### 6. So sánh các phiên bản và đánh giá

In [114]:
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator

# Hàm để chuyển đổi param_map từ đối tượng Param thành dictionary
def extract_param_map(param_map):
    simplified = {}
    for param, value in param_map.items():
        param_name = param.name if hasattr(param, 'name') else str(param)
        simplified[param_name] = value
    return simplified

metrics_df["param_map"] = metrics_df["param_map"].apply(extract_param_map)

# Hàm chuyển đổi param_map thành chuỗi dễ đọc cho biểu đồ
def simplify_param_map(param_map):
    simplified = ", ".join(f"{k}: {v}" for k, v in param_map.items())
    return simplified

metrics_df["param_map_simplified"] = metrics_df["param_map"].apply(simplify_param_map)
grouped = metrics_df.groupby(["param_map_simplified", "fold"])["metric"].mean().reset_index()

pivot_table = grouped.pivot(index="param_map_simplified", columns="fold", values="metric")

print(pivot_table.head())


fold                                          0         1         2         3  \
param_map_simplified                                                            
maxDepth: 2, maxBins: 20, maxIter: 10  0.890046  0.881755  0.884905  0.885758   
maxDepth: 2, maxBins: 20, maxIter: 20  0.895798  0.890808  0.891459  0.892752   
maxDepth: 2, maxBins: 20, maxIter: 30  0.899908  0.894674  0.895140  0.895918   
maxDepth: 2, maxBins: 20, maxIter: 40  0.902037  0.895899  0.896351  0.897342   
maxDepth: 2, maxBins: 40, maxIter: 10  0.890975  0.883220  0.887663  0.887862   

fold                                          4  
param_map_simplified                             
maxDepth: 2, maxBins: 20, maxIter: 10  0.893385  
maxDepth: 2, maxBins: 20, maxIter: 20  0.901065  
maxDepth: 2, maxBins: 20, maxIter: 30  0.904179  
maxDepth: 2, maxBins: 20, maxIter: 40  0.905501  
maxDepth: 2, maxBins: 40, maxIter: 10  0.895793  


In [None]:
colors = plt.cm.viridis(np.linspace(0, 1, len(pivot_table)))

# Vẽ biểu đồ
plt.figure(figsize=(12, 8))
for idx, (param_map, values) in enumerate(pivot_table.iterrows()):
    plt.plot(values.index, values.values, marker="o", label=param_map, color=colors[idx])
plt.title("Biểu đồ thể hiện đánh giá giữa các biến thể mô hình tùy chỉnh tham số", fontsize=16)
plt.xlabel("Bộ Train", fontsize=14)
plt.ylabel("Thông số đánh giá AUC", fontsize=14)
plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))
plt.legend(title="Tham số tùy chỉnh", bbox_to_anchor=(1.05, 1), loc="upper left")
plt.grid(True)
plt.tight_layout()

# Hiển thị biểu đồ
plt.show()

#### Mô hình Gradient Boosted Tree (GBT) cho hiệu quả tốt nhất

Trong bước cross-validate, chúng ta sử dụng **Gradient Boosted Tree (GBT)**, mô hình đã cho kết quả tốt nhất khi thử nghiệm trên 3 mô hình Decision Tree, Random Forest, Gradient Boosted Tree. Qua việc điều chỉnh các tham số của mô hình, chúng ta đã tìm ra bộ tham số tối ưu giúp mô hình đạt được hiệu quả tốt nhất.

##### Bộ tham số tối ưu

Mô hình GBT hoạt động hiệu quả nhất khi sử dụng các giá trị sau:

- **maxDepth** = 6: Độ sâu tối đa của cây. Với giá trị này, mô hình có thể học được các mối quan hệ phức tạp trong dữ liệu mà không bị overfitting.
- **maxBins** = 60: Số lượng bin tối đa để phân chia các đặc trưng liên tục. 
- **maxIter** = 40: Số vòng lặp tối đa để huấn luyện mô hình. Đây là số lần mô hình cập nhật và cải thiện các cây trong quá trình huấn luyện.

##### Kết quả

Với bộ tham số này, mô hình đã cho kết quả tối ưu trong việc dự đoán, đạt được giá trị **AUC (Area Under Curve)** cao nhất, giúp xác định hiệu quả dự đoán của mô hình.



In [76]:
# End Spark Session
spark.stop()