In [13]:
import math
from pyspark.sql import SparkSession

In [14]:
spark = SparkSession.builder.appName('NaiveBayes').getOrCreate()

23/05/16 10:56:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [15]:
training_data = spark.read.csv('dataset/05-naive-bayes/train.csv', header=True, inferSchema=True)
training_data  = training_data.rdd
num_features = len(training_data.first()) - 1
training_data = training_data.map(lambda x: (x[:-1], int(x[-1])))

# # # Calculate the class priors
class_counts = training_data.map(lambda x: (x[1], 1)).reduceByKey(lambda x, y: x + y)
total_count = training_data.count()

class_priors = class_counts.map(lambda x: (x[0], x[1] / float(total_count)))

# calculate the feature counts by class
feature_counts_by_class = training_data.flatMap(lambda x: [(i, x[0][i], x[1]) for i in range(len(x[0]))])

feature_counts_by_class = feature_counts_by_class.map(lambda x: ((x[0], x[1], x[2]), 1)).reduceByKey(lambda x, y: x + y)

# sort by the column index and then the class
sorted_features = feature_counts_by_class.sortBy(lambda x: (x[0][0], x[0][2]))

class_counts_list = class_counts.collect()

class_counts_dict = dict(class_counts_list)

# Calculate the conditional probabilities for each feature and class
conditional_probs = feature_counts_by_class.map(lambda x: ((x[0][0], x[0][1], x[0][2]), (x[1], class_counts_dict[x[0][2]])))
conditional_probs = conditional_probs.sortBy(lambda x: (x[0][0], x[0][2]))

conditional_probs = conditional_probs.mapValues(lambda x: (x[0] + 1) / (x[1] + num_features))

# Sort by the column index and then the class
sorted_probs = conditional_probs.sortBy(lambda x: (x[0][0], x[0][2]))

# Convert the probabilities to a dictionary for faster lookup

prob_dict = sorted_probs.collectAsMap()

# convert the dict to have the class as the key
prob_dict = sorted_probs.map(lambda x: (x[0][2], ((x[0][0], x[0][1]), x[1]))).groupByKey().mapValues(list).collectAsMap()

# convert the dict to have the feature as the key in the inner dict and value and prob as the value
prob_dict = sorted_probs.map(lambda x: (x[0][2], ((x[0][0], x[0][1]), x[1]))).groupByKey().mapValues(list).collectAsMap()

class_priors = class_priors.collectAsMap() 

new_dict = {}

for key, value in prob_dict.items():
    new_dict[key] = {}
    for sub_value in value:
        inner_key = sub_value[0][0]
        inner_value = (sub_value[0][1], sub_value[1])
        if inner_key in new_dict[key]:
            new_dict[key][inner_key].append(inner_value)
        else:
            new_dict[key][inner_key] = [inner_value]

                                                                                

In [16]:
# split the testing data into features and labels
def classify(data):
    features = data[0]
    actual_label = data[1]
    predicted_label = None
    max_posterior = float('-inf')

    for class_label in class_priors.keys():
        posterior = math.log(class_priors[class_label])
        # loop over the features and index
        for feature, feature_value in enumerate(features):
            # get the conditional probability for the feature value and class label
            for value, prob in new_dict[class_label][feature]:
                if value == feature_value:
                    posterior += math.log(prob)
                    break
        if posterior > max_posterior:
            max_posterior = posterior
            predicted_label = class_label

    return (actual_label, predicted_label)

testing_data = spark.read.csv('dataset/05-naive-bayes/test.csv', header=True, inferSchema=True).rdd
testing_data = testing_data.map(lambda x: (x[:-1], int(x[-1])))
predictions = testing_data.map(classify)

# Calculate the accuracy of the model
correct_predictions = predictions.filter(lambda x: x[0] == x[1]).count()
accuracy = correct_predictions / float(testing_data.count())
print("Accuracy:", accuracy)

spark.stop()

Accuracy: 0.9428571428571428
