In [None]:
!pip install pyspark




In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, array, row_number, monotonically_increasing_id, when, sum as spark_sum
from pyspark.sql.types import FloatType
from pyspark.sql.window import Window
import numpy as np

# Khởi tạo SparkSession
spark = SparkSession.builder.appName("DiabetesPrediction").getOrCreate()

# Hàm tính khoảng cách Euclidean
def euclidean_distance(point1, point2):
    point1 = np.array(point1)
    point2 = np.array(point2)
    return float(np.sqrt(np.sum((point1 - point2) ** 2)))

# UDF tính khoảng cách giữa các điểm
distance_udf = udf(lambda x, y: euclidean_distance(x, y), FloatType())

# Đọc file CSV vào DataFrame
file_path = "diabetes.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Sample the data
df = df.sample(fraction=0.05, seed=123)

print("dataframe ban đầu:")
df.show()


# Chuyển đổi các cột đặc trưng thành một cột `features`
feature_columns = ["gender", "age", "hypertension", "heart_disease", "smoking_history", "bmi", "HbA1c_level", "blood_glucose_level"]
df = df.withColumn("features", array(*feature_columns))

# Loại bỏ các cột đặc trưng ban đầu
df = df.select("features", "diabetes")

print("dataframe sau khi xữ lý và loại các cột không cần thiết:")
df.show()

# Tỷ lệ chia tập dữ liệu (ví dụ: 80% train, 20% test)
train_ratio = 0.8

# Chia ngẫu nhiên tập dữ liệu
train_df, test_df = df.randomSplit([train_ratio, 1 - train_ratio], seed=42)

print('train_df:',train_df.count())
print('test_df: ',test_df.count())

def knn(train_df, test_df, k):
    # Thêm cột ID vào test_df để theo dõi các hàng sau khi join
    test_df = test_df.withColumn("id", monotonically_increasing_id())

    # Alias dataframes for clarity
    train_df = train_df.alias('train')
    test_df = test_df.alias('test')

    #Cross join tập train_df và test_df
    cross_joined_df = train_df.crossJoin(test_df)
    print("dataframe cross_joined_df:")
    # cross_joined_df.show()

    # Tính khoảng cách giữa các điểm trong train_df và test_df
    cross_joined_df = cross_joined_df.withColumn(
        'distance',
        distance_udf(col('train.features'), col('test.features'))
    )
    # Sắp xếp và lấy k điểm gần nhất cho mỗi điểm trong test_df
    windowSpec = Window.partitionBy('test.id').orderBy('distance')

    k_nearest_neighbors = cross_joined_df.withColumn('row', row_number().over(windowSpec)).filter(col('row') <= k)

    print("k_nearest_neighbors")
    # k_nearest_neighbors.show(10)
    # Tính nhãn phổ biến nhất trong k điểm gần nhất
    label_counts = k_nearest_neighbors.groupBy('test.id', 'train.diabetes').count()
    most_common_label = label_counts.withColumn(
        'rank',
        row_number().over(Window.partitionBy('test.id').orderBy(col('count').desc()))
    ).filter(col('rank') == 1)

    print("dataframe most_common_label:")
    # most_common_label.show()

    # Join lại với test_df để thêm nhãn dự đoán vào test_df
    predicted_test_df = test_df.join(
        most_common_label.select(col('test.id'), col('train.diabetes').alias('predicted_label')),
        on='id',
        how='left'
    )
    return predicted_test_df

k = 3
predicted_test_df = knn(train_df, test_df, k)

print("dataframe dự đoán")
# predicted_test_df.show()

# Tính toán các chỉ số đánh giá phân tán
metrics_df = predicted_test_df.withColumn(
    'true_positive_0', when((col('diabetes') == 0) & (col('predicted_label') == 0), 1).otherwise(0)
).withColumn(
    'false_positive_0', when((col('diabetes') != 0) & (col('predicted_label') == 0), 1).otherwise(0)
).withColumn(
    'false_negative_0', when((col('diabetes') == 0) & (col('predicted_label') != 0), 1).otherwise(0)
).withColumn(
    'true_negative_0', when((col('diabetes') != 0) & (col('predicted_label') != 0), 1).otherwise(0)
).withColumn(
    'true_positive_1', when((col('diabetes') == 1) & (col('predicted_label') == 1), 1).otherwise(0)
).withColumn(
    'false_positive_1', when((col('diabetes') != 1) & (col('predicted_label') == 1), 1).otherwise(0)
).withColumn(
    'false_negative_1', when((col('diabetes') == 1) & (col('predicted_label') != 1), 1).otherwise(0)
).withColumn(
    'true_negative_1', when((col('diabetes') != 1) & (col('predicted_label') != 1), 1).otherwise(0)
)

# metrics_df.show(5)



dataframe ban đầu:
+------+--------------------+------------+-------------+---------------+-------------------+-------------------+--------------------+--------+
|gender|                 age|hypertension|heart_disease|smoking_history|                bmi|        HbA1c_level| blood_glucose_level|diabetes|
+------+--------------------+------------+-------------+---------------+-------------------+-------------------+--------------------+--------+
|   1.0|  0.4119119119119119|         0.0|          0.0|            0.0|0.24159663865546224|0.45454545454545453| 0.36363636363636365|     0.0|
|   0.0|  0.5870870870870871|         0.0|          0.0|            0.0|   0.20203081232493|                0.4|  0.5454545454545454|     0.0|
|   0.0|  0.6371371371371372|         0.0|          0.0|            2.0|   0.20203081232493|0.47272727272727266| 0.35454545454545455|     0.0|
|   0.0|  0.5995995995995996|         0.0|          0.0|            2.0|0.22338935574229696|0.49090909090909096|           

In [None]:
# Tính toán tổng cộng các chỉ số cho từng lớp
sum_metrics = metrics_df.agg(
    spark_sum('true_positive_0').alias('true_positive_0'),
    spark_sum('false_positive_0').alias('false_positive_0'),
    spark_sum('false_negative_0').alias('false_negative_0'),
    spark_sum('true_negative_0').alias('true_negative_0'),
    spark_sum('true_positive_1').alias('true_positive_1'),
    spark_sum('false_positive_1').alias('false_positive_1'),
    spark_sum('false_negative_1').alias('false_negative_1'),
    spark_sum('true_negative_1').alias('true_negative_1')
).first()

# Tính toán Precision, Recall, F1-Score và Accuracy cho từng lớp
precision_0 = sum_metrics['true_positive_0'] / (sum_metrics['true_positive_0'] + sum_metrics['false_positive_0'])
recall_0 = sum_metrics['true_positive_0'] / (sum_metrics['true_positive_0'] + sum_metrics['false_negative_0'])
f1_score_0 = 2 * (precision_0 * recall_0) / (precision_0 + recall_0)

precision_1 = sum_metrics['true_positive_1'] / (sum_metrics['true_positive_1'] + sum_metrics['false_positive_1'])
recall_1 = sum_metrics['true_positive_1'] / (sum_metrics['true_positive_1'] + sum_metrics['false_negative_1'])
f1_score_1 = 2 * (precision_1 * recall_1) / (precision_1 + recall_1)

total_predictions = metrics_df.count()
accuracy = (sum_metrics['true_positive_1'] + sum_metrics['true_negative_1']) / total_predictions

print(f"Class 0 - Precision: {precision_0:.4f}")
print(f"Class 0 - Recall: {recall_0:.4f}")
print(f"Class 0 - F1 Score: {f1_score_0:.4f}")

print(f"Class 1 - Precision: {precision_1:.4f}")
print(f"Class 1 - Recall: {recall_1:.4f}")
print(f"Class 1 - F1 Score: {f1_score_1:.4f}")

print(f"Overall Accuracy: {accuracy:.4f}")

Class 0 - Precision: 0.9344
Class 0 - Recall: 0.8143
Class 0 - F1 Score: 0.8702
Class 1 - Precision: 0.8312
Class 1 - Recall: 0.9412
Class 1 - F1 Score: 0.8828
Overall Accuracy: 0.8768


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, array, row_number, monotonically_increasing_id, when, sum as spark_sum
from pyspark.sql.types import FloatType
from pyspark.sql.window import Window
import numpy as np

# Khởi tạo SparkSession
spark = SparkSession.builder.appName("DiabetesPrediction").getOrCreate()

# Hàm tính khoảng cách Euclidean
def euclidean_distance(point1, point2):
    point1 = np.array(point1)
    point2 = np.array(point2)
    return float(np.sqrt(np.sum((point1 - point2) ** 2)))

# UDF tính khoảng cách giữa các điểm
distance_udf = udf(lambda x, y: euclidean_distance(x, y), FloatType())

# Đọc file CSV vào DataFrame
file_path = "bigdata_data.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# # Sample the data
df = df.sample(fraction=0.1, seed=69)

print("dataframe ban đầu:")
df.show()


label_column = "diabetes"

# Lấy danh sách tất cả các cột trừ cột phân lớp
feature_columns = [col for col in df.columns if col != label_column]

# Sử dụng hàm array để kết hợp các cột thành một cột 'features'
df = df.withColumn("features", array(*[col(c) for c in feature_columns]))

# Loại bỏ các cột đặc trưng ban đầu
df = df.select("features", "diabetes")

print("dataframe sau khi xữ lý và loại các cột không cần thiết:")
# df.show()

# Tỷ lệ chia tập dữ liệu (ví dụ: 80% train, 20% test)
train_ratio = 0.8

# Chia ngẫu nhiên tập dữ liệu
train_df, test_df = df.randomSplit([train_ratio, 1 - train_ratio], seed=42)

print('train_df:',train_df.count())
print('test_df: ',test_df.count())

def knn(train_df, test_df, k):
    # Thêm cột ID vào test_df để theo dõi các hàng sau khi join
    test_df = test_df.withColumn("id", monotonically_increasing_id())

    # Alias dataframes for clarity
    train_df = train_df.alias('train')
    test_df = test_df.alias('test')

    #Cross join tập train_df và test_df
    cross_joined_df = train_df.crossJoin(test_df)
    print("dataframe cross_joined_df:")
    cross_joined_df.show()

    # Tính khoảng cách giữa các điểm trong train_df và test_df
    cross_joined_df = cross_joined_df.withColumn(
        'distance',
        distance_udf(col('train.features'), col('test.features'))
    )
    # Sắp xếp và lấy k điểm gần nhất cho mỗi điểm trong test_df
    windowSpec = Window.partitionBy('test.id').orderBy('distance')

    k_nearest_neighbors = cross_joined_df.withColumn('row', row_number().over(windowSpec)).filter(col('row') <= k)

    print("k_nearest_neighbors")
    k_nearest_neighbors.show(10)
    # Tính nhãn phổ biến nhất trong k điểm gần nhất
    label_counts = k_nearest_neighbors.groupBy('test.id', 'train.diabetes').count()
    most_common_label = label_counts.withColumn(
        'rank',
        row_number().over(Window.partitionBy('test.id').orderBy(col('count').desc()))
    ).filter(col('rank') == 1)

    print("dataframe most_common_label:")
    most_common_label.show()

    # Join lại với test_df để thêm nhãn dự đoán vào test_df
    predicted_test_df = test_df.join(
        most_common_label.select(col('test.id'), col('train.diabetes').alias('predicted_label')),
        on='id',
        how='left'
    )
    return predicted_test_df

k = 3
predicted_test_df = knn(train_df, test_df, k)

print("dataframe dự đoán")
predicted_test_df.show()

# Tính toán các chỉ số đánh giá phân tán
metrics_df = predicted_test_df.withColumn(
    'true_positive_0', when((col('diabetes') == 0) & (col('predicted_label') == 0), 1).otherwise(0)
).withColumn(
    'false_positive_0', when((col('diabetes') != 0) & (col('predicted_label') == 0), 1).otherwise(0)
).withColumn(
    'false_negative_0', when((col('diabetes') == 0) & (col('predicted_label') != 0), 1).otherwise(0)
).withColumn(
    'true_negative_0', when((col('diabetes') != 0) & (col('predicted_label') != 0), 1).otherwise(0)
).withColumn(
    'true_positive_1', when((col('diabetes') == 1) & (col('predicted_label') == 1), 1).otherwise(0)
).withColumn(
    'false_positive_1', when((col('diabetes') != 1) & (col('predicted_label') == 1), 1).otherwise(0)
).withColumn(
    'false_negative_1', when((col('diabetes') == 1) & (col('predicted_label') != 1), 1).otherwise(0)
).withColumn(
    'true_negative_1', when((col('diabetes') != 1) & (col('predicted_label') != 1), 1).otherwise(0)
)

# metrics_df.show(5)



dataframe ban đầu:
+-----+------------+-------------+-----+-----------+-------------------+--------+------+----+-----+-------+-------+----+------+-----+-----------+
|  age|hypertension|heart_disease|  bmi|HbA1c_level|blood_glucose_level|diabetes|Female|Male|Other|No Info|current|ever|former|never|not current|
+-----+------------+-------------+-----+-----------+-------------------+--------+------+----+-----+-------+-------+----+------+-----+-----------+
|0.424|           0|            0|0.264|      0.491|              0.355|       0|     1|   0|    0|      0|      0|   0|     1|    0|          0|
|0.812|           0|            0|0.151|        0.0|              0.091|       0|     1|   0|    0|      0|      0|   0|     0|    1|          0|
| 0.55|           0|            0| 0.15|      0.545|              0.295|       0|     1|   0|    0|      0|      0|   0|     0|    1|          0|
|  1.0|           0|            0|0.202|      0.236|              0.209|       0|     0|   1|    0|      

In [None]:
# Tính toán tổng cộng các chỉ số cho từng lớp
sum_metrics = metrics_df.agg(
    spark_sum('true_positive_0').alias('true_positive_0'),
    spark_sum('false_positive_0').alias('false_positive_0'),
    spark_sum('false_negative_0').alias('false_negative_0'),
    spark_sum('true_negative_0').alias('true_negative_0'),
    spark_sum('true_positive_1').alias('true_positive_1'),
    spark_sum('false_positive_1').alias('false_positive_1'),
    spark_sum('false_negative_1').alias('false_negative_1'),
    spark_sum('true_negative_1').alias('true_negative_1')
).first()

# Tính toán Precision, Recall, F1-Score và Accuracy cho từng lớp
precision_0 = sum_metrics['true_positive_0'] / (sum_metrics['true_positive_0'] + sum_metrics['false_positive_0'])
recall_0 = sum_metrics['true_positive_0'] / (sum_metrics['true_positive_0'] + sum_metrics['false_negative_0'])
f1_score_0 = 2 * (precision_0 * recall_0) / (precision_0 + recall_0)

precision_1 = sum_metrics['true_positive_1'] / (sum_metrics['true_positive_1'] + sum_metrics['false_positive_1'])
recall_1 = sum_metrics['true_positive_1'] / (sum_metrics['true_positive_1'] + sum_metrics['false_negative_1'])
f1_score_1 = 2 * (precision_1 * recall_1) / (precision_1 + recall_1)

total_predictions = metrics_df.count()
accuracy = (sum_metrics['true_positive_1'] + sum_metrics['true_negative_1']) / total_predictions

print(f"Class 0 - Precision: {precision_0:.4f}")
print(f"Class 0 - Recall: {recall_0:.4f}")
print(f"Class 0 - F1 Score: {f1_score_0:.4f}")

print(f"Class 1 - Precision: {precision_1:.4f}")
print(f"Class 1 - Recall: {recall_1:.4f}")
print(f"Class 1 - F1 Score: {f1_score_1:.4f}")

print(f"Overall Accuracy: {accuracy:.4f}")

Class 0 - Precision: 0.9344
Class 0 - Recall: 0.8143
Class 0 - F1 Score: 0.8702
Class 1 - Precision: 0.8312
Class 1 - Recall: 0.9412
Class 1 - F1 Score: 0.8828
Overall Accuracy: 0.8768
