In [56]:
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession

import findspark
findspark.init()

In [57]:
spark_session = SparkSession.builder.master("local[*]").config("spark.driver.memory", "15g").appName('NB_MapReduce').getOrCreate()

In [58]:
# Read dataset with spark
train_df_spark = spark_session.read.csv('train.csv', header=True, inferSchema=True)

In [59]:
input_cols = train_df_spark.columns[:-1]
output_col = train_df_spark.columns[-1]

In [60]:
# Encode the features into a vector
featureassemble = VectorAssembler(inputCols=input_cols, outputCol='features')
output = featureassemble.transform(train_df_spark)
output.show(n=5)

+--------+--------+-------------+-----------+-----------+-----------+-----------+--------+--------+---------+------------+--------------------+
|Rainfall|Sunshine|WindGustSpeed|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|RainToday|RainTomorrow|            features|
+--------+--------+-------------+-----------+-----------+-----------+-----------+--------+--------+---------+------------+--------------------+
|       0|       4|           15|         48|         69|         45|         30|       6|       7|        0|           1|[0.0,4.0,15.0,48....|
|       0|      24|           25|         33|         19|        140|        137|       7|       7|        0|           0|[0.0,24.0,25.0,33...|
|       0|       8|           37|         55|         82|         62|         65|       7|       7|        0|           0|[0.0,8.0,37.0,55....|
|       1|      14|           13|         55|         48|        182|        191|       4|       0|        0|           0|[1.0,14.0,13.0

In [61]:
# Convert the data to RDD
train_rdd_spark = train_df_spark.rdd

In [62]:
class NB_Classifier:
    def __init__(self, input_cols):
        self.input_cols = input_cols

    def fit(self, train_rdd_spark):
        # Find the probability of each class in the dataset
        target_map = train_rdd_spark.map(lambda x: (x[len(self.input_cols)], 1))
        target_reduce = target_map.reduceByKey(lambda x, y: x + y)
        num_records = train_rdd_spark.count()
        probability_target_reduce = target_reduce.map(lambda x: (x[0], x[1] / num_records))
        self.probability_target_reduce_dict = probability_target_reduce.collectAsMap()
        self.probability_target_reduce_dict = sorted(self.probability_target_reduce_dict.items(), key=lambda x: x[0])

        f_map = []
        for i in range(len(self.input_cols)):
            f_map.append(train_rdd_spark.map(lambda x: ((x[i]), 1)))

        f_reduce = []
        for i in range(len(self.input_cols)):
            f_reduce.append(f_map[i].reduceByKey(lambda x, y: x + y))

        f_target_map = []
        for i in range(len(self.input_cols)):
            f_target_map.append(train_rdd_spark.map(lambda x: ((x[i], x[len(self.input_cols)]), 1)))

        f_target_reduce = []
        for i in range(len(self.input_cols)):
            f_target_reduce.append(f_target_map[i].reduceByKey(lambda x, y: x + y))

        probability_f_target_reduce = []
        for i in range(len(self.input_cols)):
            probability_f_target_reduce.append(f_target_reduce[i].map(lambda x: (x[0][0], (x[0][1], x[1]))))
            probability_f_target_reduce[i] = probability_f_target_reduce[i].join(f_reduce[i])
            probability_f_target_reduce[i] = probability_f_target_reduce[i].map(lambda x: (x[0], (x[1][0][0], x[1][0][1]), x[1][1]))
            probability_f_target_reduce[i] = probability_f_target_reduce[i].map(lambda x: (x[0], (x[1][0], x[1][1] / x[2])))
            probability_f_target_reduce[i] = probability_f_target_reduce[i].groupByKey().mapValues(list)

        self.probability_f_target_reduce_dict = []
        for i in range(len(self.input_cols)):
            self.probability_f_target_reduce_dict.append(probability_f_target_reduce[i].collectAsMap())

        for i in range(len(self.input_cols)):
            for key in self.probability_f_target_reduce_dict[i]:
                self.probability_f_target_reduce_dict[i][key].sort(key=lambda x: x[0])
            
    def predict(self, features):
        f_target = []
        for i in range(len(self.input_cols)):
            if features[i] in self.probability_f_target_reduce_dict[i]:
                f_target.append(self.probability_f_target_reduce_dict[i][features[i]])
                f_target[i] = [x[1] for x in f_target[i]]
                if len(f_target[i]) < len(self.probability_target_reduce_dict):
                    if f_target[i][0] == 0:
                        f_target[i].insert(1, 0)
                    else:
                        f_target[i].insert(0, 0)
            else:
                f_target.append([0] * len(self.probability_target_reduce_dict))
                
        prob = [1] * len(f_target[0])

        for j in range(len(f_target[0])):
            for i in range(len(self.input_cols)):
                prob[j] *= f_target[i][j]
        
        # Multiply by the probability of the class
        for i in range(len(f_target[0])):
            prob[i] *= self.probability_target_reduce_dict[i][1]
        # Argmax
        prediction = prob.index(max(prob))
        
        return prediction


In [63]:
classifier = NB_Classifier(input_cols)
classifier.fit(train_rdd_spark)

test_data = pd.read_csv("test.csv")

test_features = []
for i in range(len(input_cols)):
    test_features.append(test_data[input_cols[i]].tolist())

y_true = test_data[output_col].tolist()
y_pred = []

for i in range(len(test_features[0])):
    features = []
    for j in range(len(input_cols)):
        features.append(test_features[j][i])
    prediction = classifier.predict(features)
    y_pred.append(prediction)

print("Naive Bayes Classifier with MapReduce")
print(f"Accuracy: {accuracy_score(y_true, y_pred) * 100:.4f}%")
print(f"Weighted Precision: {precision_score(y_true, y_pred, average='weighted') * 100:.4f}%")
print(f"Weighted Recall: {recall_score(y_true, y_pred, average='weighted') * 100:.4f}%")
print(f"Weighted F1-score: {f1_score(y_true, y_pred, average='weighted') * 100:.4f}%")

Naive Bayes Classifier with MapReduce
Accuracy: 77.4806%
Weighted Precision: 82.5540%
Weighted Recall: 77.4806%
Weighted F1-score: 67.6928%
