In [1]:
from pyspark.sql import SparkSession
from math import sqrt
from collections import Counter
import heapq
import time
import pandas as pd
import matplotlib.pyplot as plt

# 1. Khởi tạo Spark Session
spark = SparkSession.builder.appName("KNN_Spark_Optimized").getOrCreate()
sc = spark.sparkContext

# 2. Đọc dữ liệu từ file CSV và chuyển thành RDD
df = spark.read.csv('/content/drive/MyDrive/Final Year/BigData/Hotel_KNN.csv', header=True, inferSchema=True)
data = df.rdd.map(lambda row: (row[:-1], row[-1]))  # (features, label)

# 3. Chia dữ liệu thành train và test
train_rdd, test_rdd = data.randomSplit([0.8, 0.2], seed=42)

# 4. Chia nhỏ dữ liệu kiểm tra tương ứng với số partition của train_rdd
num_partitions = 4
train_rdd = train_rdd.repartition(num_partitions)
test_data_partitions = test_rdd.zipWithIndex().map(lambda x: (x[1] % num_partitions, x[0]))
test_data_groups = test_data_partitions.groupByKey().mapValues(list).collectAsMap()

# 5. Broadcast dữ liệu kiểm tra được phân chia
test_data_broadcast = sc.broadcast(test_data_groups)

# 6. Hàm tính khoảng cách Euclidean
def euclidean_distance(point1, point2):
    return sqrt(sum((float(p1) - float(p2)) ** 2
                    for p1, p2 in zip(point1, point2)
                    if p1 is not None and p2 is not None))

# 7. Hàm tính k-NN trên từng partition
def knn_map_partition(partition_id, train_partition, k=5):
    test_data = test_data_broadcast.value.get(partition_id, [])  # Lấy dữ liệu test tương ứng
    results = []
    train_list = list(train_partition)  # Chuyển dữ liệu huấn luyện thành list
    for test_point in test_data:
        test_features, test_label = test_point
        k_neighbors = []
        for train_features, train_label in train_list:
            dist = euclidean_distance(test_features, train_features)
            if len(k_neighbors) < k:
                heapq.heappush(k_neighbors, (-dist, train_label))  # Min-Heap
            else:
                heapq.heappushpop(k_neighbors, (-dist, train_label))
        k_neighbors = sorted([(-d, label) for d, label in k_neighbors])  # Sắp xếp k lân cận
        results.append((test_label, k_neighbors))  # Lưu kết quả k-NN
    return results

# 8. Ánh xạ và gộp kết quả
start_time = time.time()
mapped_results = train_rdd.mapPartitionsWithIndex(
    lambda idx, part: knn_map_partition(idx, part, k=5)
)

# 9. Bỏ phiếu đa số để dự đoán nhãn
def reduce_phase(mapped_data):
    test_label, k_neighbors = mapped_data
    if not k_neighbors:  # Kiểm tra danh sách rỗng
        return (test_label, None)
    labels = [label for _, label in k_neighbors]
    most_common_label = Counter(labels).most_common(1)[0][0]
    return (test_label, most_common_label)

predictions = mapped_results.map(reduce_phase).filter(lambda x: x[1] is not None)

# 10. Tính Accuracy
prediction_and_label = predictions.map(lambda x: (x[1], x[0]))  # (predicted_label, actual_label)
correct_predictions = prediction_and_label.filter(lambda x: x[0] == x[1]).count()
total_predictions = prediction_and_label.count()
accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0

# 11. Tính ma trận nhầm lẫn
confusion_matrix = prediction_and_label.map(lambda x: ((x[1], x[0]), 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .collect()

unique_labels = sorted(set(row[-1] for row in df.collect()))
conf_matrix_df = pd.DataFrame(0, index=unique_labels, columns=unique_labels)

for (actual, predicted), count in confusion_matrix:
    conf_matrix_df.at[actual, predicted] = count

# 12. Tính Precision, Recall và F1-score
def calculate_metrics(conf_matrix):
    metrics = {}
    for label in conf_matrix.index:
        tp = conf_matrix.at[label, label]  # True Positive
        fp = conf_matrix[label].sum() - tp  # False Positive
        fn = conf_matrix.loc[label].sum() - tp  # False Negative
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
        metrics[label] = {"Precision": precision, "Recall": recall, "F1-Score": f1}
    return metrics

metrics = calculate_metrics(conf_matrix_df)
metrics_df = pd.DataFrame(metrics).T

# 13. Hiển thị kết quả
runtime = time.time() - start_time
print(f"Accuracy: {accuracy * 100:.2f}%")
print(f"Time Running: {runtime:.2f} seconds")
print("Confusion Matrix:")
print(conf_matrix_df)
print("\nPrecision, Recall, F1-Score:")
print(metrics_df)

# 14. Dừng Spark
spark.stop()

Accuracy: 81.70%
Time Running: 833.38 seconds
Confusion Matrix:
      0     1
0  1622   764
1   553  4256

Precision, Recall, F1-Score:
   Precision    Recall  F1-Score
0   0.745747  0.679799  0.711248
1   0.847809  0.885007  0.866009
