In [1]:
from pyspark.sql import SparkSession
import random
from math import sqrt
from pyspark.sql import functions as F

# Tạo Spark Session
spark = SparkSession.builder \
    .appName("Random Forest MapReduce") \
    .getOrCreate()

# Đọc dữ liệu từ CSV
data = spark.read.csv("heart.csv", header=True, inferSchema=True)

24/12/15 17:18:42 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

In [2]:
data.show(5)
data.printSchema()

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 52|  1|  0|     125| 212|  0|      1|    168|    0|    1.0|    2|  2|   3|     0|
| 53|  1|  0|     140| 203|  1|      0|    155|    1|    3.1|    0|  0|   3|     0|
| 70|  1|  0|     145| 174|  0|      1|    125|    1|    2.6|    0|  0|   3|     0|
| 61|  1|  0|     148| 203|  0|      1|    161|    0|    0.0|    2|  1|   3|     0|
| 62|  0|  0|     138| 294|  1|      1|    106|    0|    1.9|    1|  3|   2|     0|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
only showing top 5 rows

root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- cp: integer (nullable = true)
 |-- trestbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: integer (nullable =

# Tiền xử lý  dữ liệu

## Kiểm tra các giá trị NULL

In [3]:
# Đếm giá trị null trong từng cột
null_counts = data.select([F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c) for c in data.columns])
null_counts.show()

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|  0|  0|  0|       0|   0|  0|      0|      0|    0|      0|    0|  0|   0|     0|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+



## Xóa các hàng trùng lặp

In [4]:
data = data.dropDuplicates()

# Tính toán thống kê chỉ số

In [5]:
# Đổi tên các cột trong DataFrame PySpark
data_visual = data.withColumnRenamed('age', 'Age') \
           .withColumnRenamed('sex', 'Sex') \
           .withColumnRenamed('cp', 'Chest Pain') \
           .withColumnRenamed('trestbps', 'Resting_BP') \
           .withColumnRenamed('chol', 'Cholestrol') \
           .withColumnRenamed('fbs', 'Fasting_Blood_Sugar') \
           .withColumnRenamed('restecg', 'Resting_Electrocardiographic') \
           .withColumnRenamed('thalach', 'Max_Heart_Rate') \
           .withColumnRenamed('exang', 'Exercise_Induced_Angina') \
           .withColumnRenamed('oldpeak', 'Old_Peak') \
           .withColumnRenamed('slope', 'Slope') \
           .withColumnRenamed('ca', 'No_Major_Vessels') \
           .withColumnRenamed('thal', 'Thal') \
           .withColumnRenamed('target', 'Target')

# Kiểm tra tên các cột sau khi đổi
data_visual.columns


['Age',
 'Sex',
 'Chest Pain',
 'Resting_BP',
 'Cholestrol',
 'Fasting_Blood_Sugar',
 'Resting_Electrocardiographic',
 'Max_Heart_Rate',
 'Exercise_Induced_Angina',
 'Old_Peak',
 'Slope',
 'No_Major_Vessels',
 'Thal',
 'Target']

In [6]:
# Tính các chỉ số thống kê cơ bản cho cột Resting_BP
Resting_BP_sumary = data_visual.select(
    F.min("Resting_BP").alias("min_bp"),
    F.max("Resting_BP").alias("max_bp"),
    F.mean("Resting_BP").alias("mean_bp"),
    F.variance("Resting_BP").alias("variance_bp"),
    F.stddev("Resting_BP").alias("stddev_bp"),
    F.expr("percentile_approx(Resting_BP, 0.5)").alias("median_bp"),
)
Resting_BP_sumary.show()

+------+------+------------------+-----------------+------------------+---------+
|min_bp|max_bp|           mean_bp|      variance_bp|         stddev_bp|median_bp|
+------+------+------------------+-----------------+------------------+---------+
|    94|   200|131.60264900662253|308.4728168797168|17.563394230037563|      130|
+------+------+------------------+-----------------+------------------+---------+



In [7]:
# Tính các chỉ số thống kê cơ bản cho cột Cholestrol
Cholestrol_sumary = data_visual.select(
    F.min("Cholestrol").alias("min_cholestrol"),
    F.max("Cholestrol").alias("max_cholestrol"),
    F.mean("Cholestrol").alias("mean_cholestrol"),
    F.variance("Cholestrol").alias("variance_cholestrol"),
    F.stddev("Cholestrol").alias("stddev_cholestrol"),
    F.expr("percentile_approx(Cholestrol, 0.5)").alias("median_cholestrol")
)
Cholestrol_sumary.show()

+--------------+--------------+---------------+-------------------+-----------------+-----------------+
|min_cholestrol|max_cholestrol|mean_cholestrol|variance_cholestrol|stddev_cholestrol|median_cholestrol|
+--------------+--------------+---------------+-------------------+-----------------+-----------------+
|           126|           564|          246.5|  2678.423588039868|51.75348865574057|              240|
+--------------+--------------+---------------+-------------------+-----------------+-----------------+



In [8]:
# Tính các chỉ số thống kê cơ bản cho cột Max_Heart_Rate
Max_Heart_Rate_sumary = data_visual.select(
    F.min("Max_Heart_Rate").alias("min_max_heart_rate"),
    F.max("Max_Heart_Rate").alias("max_max_heart_rate"),
    F.mean("Max_Heart_Rate").alias("mean_max_heart_rate"),
    F.variance("Max_Heart_Rate").alias("variance_max_heart_rate"),
    F.stddev("Max_Heart_Rate").alias("stddev_max_heart_rate"),
    F.expr("percentile_approx(Max_Heart_Rate, 0.5)").alias("median_max_heart_rate")
)
Max_Heart_Rate_sumary.show()

+------------------+------------------+-------------------+-----------------------+---------------------+---------------------+
|min_max_heart_rate|max_max_heart_rate|mean_max_heart_rate|variance_max_heart_rate|stddev_max_heart_rate|median_max_heart_rate|
+------------------+------------------+-------------------+-----------------------+---------------------+---------------------+
|                71|               202| 149.56953642384107|      524.5715605817253|   22.903527251969845|                  152|
+------------------+------------------+-------------------+-----------------------+---------------------+---------------------+



In [9]:
# Tính các chỉ số thống kê cơ bản cho cột Old_Peak
Old_Peak_sumary = data_visual.select(
    F.min("Old_Peak").alias("min_old_peak"),
    F.max("Old_Peak").alias("max_old_peak"),
    F.mean("Old_Peak").alias("mean_old_peak"),
    F.variance("Old_Peak").alias("variance_old_peak"),
    F.stddev("Old_Peak").alias("stddev_old_peak"),
    F.expr("percentile_approx(Old_Peak, 0.5)").alias("median_old_peak")
)
Old_Peak_sumary.show()

+------------+------------+------------------+------------------+------------------+---------------+
|min_old_peak|max_old_peak|     mean_old_peak| variance_old_peak|   stddev_old_peak|median_old_peak|
+------------+------------+------------------+------------------+------------------+---------------+
|         0.0|         6.2|1.0430463576158941|1.3489714197707428|1.1614522890634564|            0.8|
+------------+------------+------------------+------------------+------------------+---------------+



In [10]:
# Tính các chỉ số thống kê cơ bản cho cột Age
age_sumary = data_visual.select(
    F.min("Age").alias("min_age"),
    F.max("Age").alias("max_age"),
    F.mean("Age").alias("mean_age"),
    F.variance("Age").alias("variance_age"),
    F.stddev("Age").alias("stddev_age"),
    F.expr("percentile_approx(Age, 0.5)").alias("median_age"),
)
age_sumary.show()

+-------+-------+------------------+-----------------+-----------------+----------+
|min_age|max_age|          mean_age|     variance_age|       stddev_age|median_age|
+-------+-------+------------------+-----------------+-----------------+----------+
|     29|     77|54.420529801324506|81.86575652900926|9.047969746247457|        55|
+-------+-------+------------------+-----------------+-----------------+----------+



# Thống kê các chỉ số ảnh hưởng đến kết quả bệnh tim

In [11]:
# Tính tỷ lệ mắc bệnh tim theo huyết áp theo độ tuổi
data_visual.createOrReplaceTempView("heart_disease")
query = """
    SELECT
        CONCAT(CAST(Age_Group * 10 AS STRING), '-', CAST(Age_Group * 10 + 10 AS STRING)) AS Age_Range,
        Target,
        COUNT(*) AS count,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(PARTITION BY Age_Group), 2) AS percentage
    FROM (
        SELECT
            FLOOR(Age / 10) AS Age_Group,
            Target
        FROM heart_disease
    ) sub
    GROUP BY Age_Group, Target
    ORDER BY Age_Group;
"""

result_age_target = spark.sql(query)
result_age_target.show()


+---------+------+-----+----------+
|Age_Range|Target|count|percentage|
+---------+------+-----+----------+
|    20-30|     1|    1|    100.00|
|    30-40|     1|   10|     71.43|
|    30-40|     0|    4|     28.57|
|    40-50|     1|   50|     69.44|
|    40-50|     0|   22|     30.56|
|    50-60|     0|   60|     48.00|
|    50-60|     1|   65|     52.00|
|    60-70|     0|   48|     60.00|
|    60-70|     1|   32|     40.00|
|    70-80|     0|    4|     40.00|
|    70-80|     1|    6|     60.00|
+---------+------+-----+----------+



In [12]:
# Tính tỷ lệ mắc bệnh tim theo mức đường huyết
query = """
    SELECT
        Fasting_Blood_Sugar,
        Target,
        COUNT(*) AS count,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(PARTITION BY Fasting_Blood_Sugar), 2) AS percentage
    FROM heart_disease
    GROUP BY Fasting_Blood_Sugar, Target
    ORDER BY Fasting_Blood_Sugar, Target
"""

result_fasting_blood_sugar_target = spark.sql(query)
result_fasting_blood_sugar_target.show()


+-------------------+------+-----+----------+
|Fasting_Blood_Sugar|Target|count|percentage|
+-------------------+------+-----+----------+
|                  0|     0|  116|     45.14|
|                  0|     1|  141|     54.86|
|                  1|     0|   22|     48.89|
|                  1|     1|   23|     51.11|
+-------------------+------+-----+----------+



In [13]:
# Tính tỷ lệ mắc bệnh tim theo loại cơn đau ngực
query = """
    SELECT
        `Chest Pain`,
        Target,
        COUNT(*) AS count,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(PARTITION BY `Chest Pain`), 2) AS percentage
    FROM heart_disease
    GROUP BY `Chest Pain`, Target
    ORDER BY `Chest Pain`, Target
"""

result_chest_pain_target = spark.sql(query)
result_chest_pain_target.show()


+----------+------+-----+----------+
|Chest Pain|Target|count|percentage|
+----------+------+-----+----------+
|         0|     0|  104|     72.73|
|         0|     1|   39|     27.27|
|         1|     0|    9|     18.00|
|         1|     1|   41|     82.00|
|         2|     0|   18|     20.93|
|         2|     1|   68|     79.07|
|         3|     0|    7|     30.43|
|         3|     1|   16|     69.57|
+----------+------+-----+----------+



In [14]:
# Tính tỷ lệ mắc bệnh tim theo huyết áp khi nghỉ ngơi
query = """
    SELECT
        CONCAT(CAST(BP_Group * 50 AS STRING), '-', CAST(BP_Group * 50 + 50 AS STRING)) AS Resting_BP_Range,
        Target,
        COUNT(*) AS count,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(PARTITION BY BP_Group), 2) AS percentage
    FROM (
        SELECT
            FLOOR(Resting_BP / 50) AS BP_Group,
            Target
        FROM heart_disease
    ) sub
    GROUP BY BP_Group, Target
    ORDER BY BP_Group;
"""

result_resting_bp_target = spark.sql(query)
result_resting_bp_target.show()


+----------------+------+-----+----------+
|Resting_BP_Range|Target|count|percentage|
+----------------+------+-----+----------+
|          50-100|     1|    2|    100.00|
|         100-150|     0|  109|     43.78|
|         100-150|     1|  140|     56.22|
|         150-200|     1|   22|     44.00|
|         150-200|     0|   28|     56.00|
|         200-250|     0|    1|    100.00|
+----------------+------+-----+----------+



In [15]:
# Tính tỷ lệ mắc bệnh tim khi tập thể dục
query = """
    SELECT
        Exercise_Induced_Angina,
        Target,
        COUNT(*) AS count,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(PARTITION BY Exercise_Induced_Angina), 2) AS percentage
    FROM heart_disease
    GROUP BY Exercise_Induced_Angina, Target
    ORDER BY Exercise_Induced_Angina, Target
"""

result_exercise_angina_target = spark.sql(query)
result_exercise_angina_target.show()


+-----------------------+------+-----+----------+
|Exercise_Induced_Angina|Target|count|percentage|
+-----------------------+------+-----+----------+
|                      0|     0|   62|     30.54|
|                      0|     1|  141|     69.46|
|                      1|     0|   76|     76.77|
|                      1|     1|   23|     23.23|
+-----------------------+------+-----+----------+



In [16]:
#Tính tỷ lệ mắc bệnh tim theo Cholestrol
query = """
    SELECT
        CONCAT(CAST(Chol_Group * 50 AS STRING), '-', CAST(Chol_Group * 50 + 50 AS STRING)) AS Cholestrol_Range,
        Target,
        COUNT(*) AS count,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(PARTITION BY Chol_Group), 2) AS percentage
    FROM (
        SELECT
            FLOOR(Cholestrol / 50) AS Chol_Group,
            Target
        FROM heart_disease
    ) sub
    GROUP BY Chol_Group, Target
    ORDER BY Chol_Group, Target;
"""

result_cholesterol_range = spark.sql(query)
result_cholesterol_range.show()


+----------------+------+-----+----------+
|Cholestrol_Range|Target|count|percentage|
+----------------+------+-----+----------+
|         100-150|     0|    2|     40.00|
|         100-150|     1|    3|     60.00|
|         150-200|     0|   18|     40.91|
|         150-200|     1|   26|     59.09|
|         200-250|     0|   50|     40.32|
|         200-250|     1|   74|     59.68|
|         250-300|     0|   47|     55.29|
|         250-300|     1|   38|     44.71|
|         300-350|     0|   18|     50.00|
|         300-350|     1|   18|     50.00|
|         350-400|     0|    1|     25.00|
|         350-400|     1|    3|     75.00|
|         400-450|     0|    2|     66.67|
|         400-450|     1|    1|     33.33|
|         550-600|     1|    1|    100.00|
+----------------+------+-----+----------+



# Chạy thuật toán

In [17]:
data_rdd = data.rdd.map(lambda row: ",".join(map(str, row)))

In [18]:
data_rdd.take(3)

                                                                                

['49,1,2,120,188,0,1,139,0,2.0,1,3,3,0',
 '57,0,1,130,236,0,0,174,0,0.0,1,1,2,0',
 '59,1,0,170,326,0,0,140,1,3.4,0,0,3,0']

In [19]:
# Xử lý dữ liệu thành dạng RDD [(label, [features])]
header = data_rdd.first()
data_rdd = data_rdd.map(lambda line: line.split(",")) \
    .map(lambda fields: (int(fields[-1]), list(map(float, fields[:-1]))))  # label, features

In [20]:
data_rdd.take(5)

[(0, [49.0, 1.0, 2.0, 120.0, 188.0, 0.0, 1.0, 139.0, 0.0, 2.0, 1.0, 3.0, 3.0]),
 (0, [57.0, 0.0, 1.0, 130.0, 236.0, 0.0, 0.0, 174.0, 0.0, 0.0, 1.0, 1.0, 2.0]),
 (0, [59.0, 1.0, 0.0, 170.0, 326.0, 0.0, 0.0, 140.0, 1.0, 3.4, 0.0, 0.0, 3.0]),
 (1, [40.0, 1.0, 3.0, 140.0, 199.0, 0.0, 1.0, 178.0, 1.0, 1.4, 2.0, 0.0, 3.0]),
 (1, [51.0, 0.0, 2.0, 130.0, 256.0, 0.0, 0.0, 149.0, 0.0, 0.5, 2.0, 0.0, 2.0])]

In [21]:
def bootstrap_sample_partition(partition_data, sample_fraction):
    """
    Lấy mẫu bootstrap từ dữ liệu trong một phân vùng.
    """
    partition_data_list = list(partition_data)
    n = len(partition_data_list)
    sample_size = int(n * sample_fraction)
    if n > 0:
        return [partition_data_list[random.randint(0, n - 1)] for _ in range(sample_size)]
    return []

In [22]:
def gini_impurity(groups, classes):
    """
    Tính chỉ số Gini cho các nhóm sau khi split
    """
    n_instances = sum([len(group) for group in groups])
    gini = 0.0
    for group in groups:
        size = len(group)
        if size == 0:
            continue
        score = 0.0
        for class_val in classes:
            proportion = [row[0] for row in group].count(class_val) / size
            score += proportion ** 2
        gini += (1.0 - score) * (size / n_instances)
    return gini

In [23]:
def split_node(data, feature_index, threshold):
    """
    Chia dữ liệu dựa trên một ngưỡng của đặc trưng
    """
    left = [row for row in data if row[1][feature_index] < threshold]
    right = [row for row in data if row[1][feature_index] >= threshold]
    return left, right

In [24]:
def find_best_split(data, feature_indices):
    """
    Tìm điểm split tốt nhất với chỉ xét các đặc trưng trong feature_indices.
    """
    classes = list(set([row[0] for row in data]))
    b_index, b_value, b_groups = None, None, None
    b_score = float('inf')
    for feature_index in feature_indices:
        for row in data:
            groups = split_node(data, feature_index, row[1][feature_index])
            gini = gini_impurity(groups, classes)
            if gini < b_score:
                b_index, b_value, b_score, b_groups = feature_index, row[1][feature_index], gini, groups
    return {'index': b_index, 'value': b_value, 'groups': b_groups}

In [25]:
def build_tree(data, max_depth, min_size, depth=0, n_features=None):
    """
    Xây dựng cây quyết định đệ quy
    """
    classes = [row[0] for row in data]
    if len(set(classes)) == 1 or depth >= max_depth or len(data) <= min_size:
        return max(set(classes), key=classes.count)

    # Chọn ngẫu nhiên n_features đặc trưng để chia
    feature_indices = random.sample(range(len(data[0][1])), n_features)

    node = find_best_split(data, feature_indices)
    left, right = node['groups']
    node['left'] = build_tree(left, max_depth, min_size, depth + 1, n_features)
    node['right'] = build_tree(right, max_depth, min_size, depth + 1, n_features)
    return node

In [26]:
# Xử lý MapReduce để xây dựng cây quyết định từ mỗi phân vùng dữ liệu
def map_reduce_process(partition_data, max_depth, min_size, n_features, sample_fraction):
    """
    Huấn luyện cây quyết định trên mẫu bootstrap từ mỗi phân vùng dữ liệu.
    """
    bootstrap_data = bootstrap_sample_partition(partition_data, sample_fraction)
    if bootstrap_data:
        tree = build_tree(bootstrap_data, max_depth, min_size, n_features=n_features)
        return [tree]
    return []

In [27]:
# Dự đoán với một cây quyết định
def predict_tree(tree, row):
    """
    Dự đoán một hàng dữ liệu dựa vào cây quyết định
    """
    if row[tree['index']] < tree['value']:
        if isinstance(tree['left'], dict):
            return predict_tree(tree['left'], row)
        else:
            return tree['left']
    else:
        if isinstance(tree['right'], dict):
            return predict_tree(tree['right'], row)
        else:
            return tree['right']


In [28]:
# Dự đoán với Random Forest
def predict_forest(trees, row):
    """
    Dự đoán bằng cách lấy mode từ các cây trong Random Forest
    """
    predictions = [predict_tree(tree, row[1]) for tree in trees]
    return max(set(predictions), key=predictions.count)

In [29]:
# Chia dữ liệu train-test
train_rdd, test_rdd = data_rdd.randomSplit([0.8, 0.2])

In [30]:
# Huấn luyện Random Forest bằng MapReduce
max_depth = 5
min_size = 10
n_features = int(sqrt(len(train_rdd.first()[1])))  # Chọn số đặc trưng ngẫu nhiên tại mỗi nút

In [31]:
# Sử dụng Bootstrap Sampling và huấn luyện cây cho mỗi mẫu bootstrap
trees_rdd = train_rdd.mapPartitions(
    lambda partition_data: map_reduce_process(
        partition_data, max_depth, min_size, n_features, 1
    )
)

In [32]:
# Lấy ra n cây quyết định từ các phân vùng
num_partitions = train_rdd.getNumPartitions()
forest = trees_rdd.take(num_partitions)

In [33]:
# Dự đoán trên test set
predictions = test_rdd.map(lambda row: (row[0], predict_forest(forest, row)))

In [34]:
# Tính accuracy bằng cách sử dụng map và reduce
correct_predictions = predictions.map(lambda x: 1 if x[0] == x[1] else 0).reduce(lambda a, b: a + b)
total_predictions = predictions.count()

accuracy = correct_predictions / total_predictions
print(f"Accuracy: {accuracy:.2f}")

Accuracy: 0.71


In [35]:
# Hàm để tính toán TP, FP, FN, TN
def calculate_metrics(row):
    actual, predicted = row[0], row[1]
    if actual == 1 and predicted == 1:
        return (1, 0, 0, 0)  # (TP, FP, FN, TN)
    elif actual == 0 and predicted == 1:
        return (0, 1, 0, 0)  # (TP, FP, FN, TN)
    elif actual == 1 and predicted == 0:
        return (0, 0, 1, 0)  # (TP, FP, FN, TN)
    elif actual == 0 and predicted == 0:
        return (0, 0, 0, 1)  # (TP, FP, FN, TN)
    else:
        return (0, 0, 0, 0)

# Tính toán phân tán mà không sử dụng collect
metrics_rdd = predictions.map(calculate_metrics)

# Hàm kết hợp để tính toán tổng hợp TP, FP, FN, TN
def seqOp(acc, value):
    return (acc[0] + value[0],  # TP
            acc[1] + value[1],  # FP
            acc[2] + value[2],  # FN
            acc[3] + value[3])  # TN

# Hàm để kết hợp kết quả từ các phân vùng
def combOp(acc1, acc2):
    return (acc1[0] + acc2[0],  # TP
            acc1[1] + acc2[1],  # FP
            acc1[2] + acc2[2],  # FN
            acc1[3] + acc2[3])  # TN

# Áp dụng aggregate để tính toán trên toàn bộ phân vùng
total_metrics = metrics_rdd.aggregate(
    (0, 0, 0, 0),  # Giá trị khởi tạo (TP, FP, FN, TN)
    seqOp,  # Phép toán tính toán trong mỗi phân vùng
    combOp   # Phép toán kết hợp kết quả giữa các phân vùng
)

# Lấy TP, FP, FN, TN
tp, fp, fn, tn = total_metrics

# Tính toán Precision, Recall và F1-Score
precision = tp / (tp + fp) if tp + fp > 0 else 0.0
recall = tp / (tp + fn) if tp + fn > 0 else 0.0
f1_score = 2 * (precision * recall) / (precision + recall) if precision + recall > 0 else 0.0

# In kết quả
print(f"F1-Score: {f1_score:.2f}")


F1-Score: 0.73
