In [1]:
import findspark
import pandas as pd
from data_preprocessing import get_cleaned_data, get_cleaned_data_final
findspark.init()
from sklearn.datasets import load_iris
import numpy as np
import random
import math

In [2]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
spark=SparkSession.builder\
    .master("local[*]")\
    .appName("Apriori")\
    .getOrCreate()

In [3]:
sc=spark.sparkContext

In [4]:
data = get_cleaned_data_final(convert_categorical=True)

In [5]:
data_rdd = spark.createDataFrame(data.head(100_000)).rdd

In [8]:
# (key : class label , value : {attribute1 : value1, attribute2 : value2})
def mapper_1(row):
	dict = row.asDict()
	dict['count'] = 1
	result = {}
	for k,v in dict.items():
		if k != 'TARGET':
			result[k] = v
	return (dict['TARGET'], result)

def reducer_1(old, new):
	for k,v in new.items():
		old[k] = old[k] + v
	return old

mean = data_rdd.map(mapper_1).reduceByKey(reducer_1).collect()


In [38]:
# print(mean)
class_counts = [mean[0][1]['count'], mean[1][1]['count']]
for r in mean:
	for k,v in r[1].items():
		r[1][k] = v / r[1]['count']
# print(mean)

In [39]:
# (key : class label , value : {attribute1 : value1, attribute2 : value2})
def mapper_2(row, mean):
	dict = row.asDict()
	result = {}
	for k1, v1 in dict.items():
		for k2, v2 in dict.items():
			if k1 != 'TARGET' and k2 != 'TARGET':
				result[(k1, k2)] = (v1 - mean[dict['TARGET']][1][k1]) * (v2 - mean[dict['TARGET']][1][k2])
	return (dict['TARGET'], result)

def reducer_2(old, new):
	for k,v in new.items():
		old[k] = old[k] + v
	return old

cov = data_rdd.map(lambda x: mapper_2(x, mean)).reduceByKey(reducer_2).collect()

In [41]:
# print(cov)
for r in cov:
	for k,v in r[1].items():
		r[1][k] = v / class_counts[r[0]]
# print(cov)

In [44]:
# Generate numpy arrays for mean and covariance
# TODO Make Sure Input features are in the same order
features = [k for k,v in mean[0][1].items() if k != 'count']

mean_0 = np.zeros(len(features))
mean_1 = np.zeros(len(features))
cov_0 = np.zeros((len(mean_0), len(mean_0)))
cov_1 = np.zeros((len(mean_1), len(mean_1)))

for f in range(len(features)):
	mean_0[f] = mean[0][1][features[f]]
	mean_1[f] = mean[1][1][features[f]]

for i in range(len(features)):
	for j in range(len(features)):
		cov_0[i][j] = cov[0][1][(features[i], features[j])]
		cov_1[i][j] = cov[1][1][(features[i], features[j])]


In [45]:
print(mean_0.shape)
print(mean_1.shape)
print(cov_0.shape)
print(cov_1.shape)

(34,)
(34,)
(34, 34)
(34, 34)


In [46]:
def predict(x):
	# Calculate the probability of class 0
	prob_0 = 1 / (2 * np.pi * np.sqrt(np.linalg.det(cov_0))) * np.exp(-0.5 * np.dot(np.dot((x - mean_0).T, np.linalg.inv(cov_0)), (x - mean_0)))
	# Calculate the probability of class 1
	prob_1 = 1 / (2 * np.pi * np.sqrt(np.linalg.det(cov_1))) * np.exp(-0.5 * np.dot(np.dot((x - mean_1).T, np.linalg.inv(cov_1)), (x - mean_1)))
	# Return the class with the higher probability
	return 0 if prob_0 > prob_1 else 1


In [70]:
# Test the model
test_data = data.iloc[100_000:102_001]
correct = 0

for i in range(len(test_data)):
	x = np.array([test_data[f].values[i] for f in features])
	pred = predict(x)
	if pred == test_data['TARGET'].values[i]:
		correct += 1

print(correct / len(test_data))


0.665167416291854
