In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import numpy as np
from tqdm import tqdm
from numba import njit, vectorize
from multiprocessing import Pool


In [None]:
# Create SparkSession
ss = SparkSession.builder.appName("MapReduceExample").getOrCreate()

In [None]:
# Load Data to RDD
dataRDD = ss.read.csv("card_transdata.csv", header=True, inferSchema=True).rdd
#print first rows
print(dataRDD.take(1))

In [None]:
def calculateDistance(x1, x2):
    # Euclidean distance
    distance = np.linalg.norm(x1-x2)
    return distance

In [None]:

trainRDD, testRDD = dataRDD.randomSplit([0.8, 0.2], seed=42)
#drop fraud column from testRDD
testlabels = testRDD.map(lambda x: x[-1])
testRDD = testRDD.map(lambda x: x[:-1])



## KNN MapReduce

In [None]:
def KNNMapReduce(testPoints):
    
    testPredictions = []
    k = 3
      

    for testPoint in tqdm(testPoints):
        fraudDetection = trainRDD.map(lambda x: (None, (x,calculateDistance(testPoint, np.array(x[:-1])))))
        fraudDetection = fraudDetection.takeOrdered(k, key=lambda x: x[1][1])
        results = fraudDetection
        countFraud = 0
        for result in results:
            if  result[1][0][7] == 1:
                countFraud += 1
        
        if countFraud > k/2:
            testPredictions.append(1)
        else:
            testPredictions.append(0)

    return testPredictions
        

## KNN Normal

In [None]:

@njit
def calculateDistanceVectorized(x1, x2):
    # Euclidean distance
    distance = np.linalg.norm(x1-x2)
    return distance

@njit
def predictKNN(testPoint : np.array, trainRDD : np.array, k):

    #Calculate distance between testPoint and all train points
    # results = calculateDistanceVectorized(testPoint, trainRDD)
    results = np.zeros((len(trainRDD), 1 + trainRDD.shape[1]))
    for i in range(len(trainRDD)):
        results[i][0] = calculateDistanceVectorized(testPoint, trainRDD[i][0:7])
        results[i][1:] = trainRDD[i]

    #Sort results by distance
    results = trainRDD[np.argsort(results[:,0])]
    #Take k nearest neighbors
    results = results[:k]

    #Count frauds
    countFraud = 0
    for i in range(len(results)):
        if  results[i][7] == 1:
            countFraud += 1
        
    if countFraud > k/2:
        return 1
    else:
        return 0

@njit
def normalKNN(testPoints, trainRDD, k = 1):
    testPredictions = np.zeros(len(testPoints))

    # with Pool() as p:
    #     testPredictions = p.map(predictKNN, testPoints)


    for i in range(len(testPoints)):
        testPredictions[i] = predictKNN(testPoints[i], trainRDD, k)
    # for i in tqdm(range(len(testPoints))):
    #     testPredictions[i] = predictKNN(testPoints[i])

    return testPredictions

In [None]:

def testKNN(testPoints, trainRDD, testLabels):
    # testPredictions = np.array(KNNMapReduce())
    testPredictions = normalKNN(testPoints, trainRDD)
    # print(testPredictions)
    testLabelsArr = np.array(testLabels)
    # print(testLabels)
    accuracy = np.sum(testPredictions == testLabelsArr)/len(testLabelsArr)
    print("Accuracy: ", accuracy)

print("Collecting test points")
testPoints = np.array(testRDD.collect()[300:500])
print("Finished collecting test points")
print("Collecting train points")
trainData = np.array(trainRDD.collect())
print("Finished collecting train points")



testKNN(testPoints, trainData, testlabels.collect()[300:500])


In [None]:
#save testPredictions and test labels to same text file
# #open file
# f = open("testPredictions2.txt", "w")
# #write testPredictions and test labels beside each other to file
# for i in range(len(testPredictions)):
#     f.write(str(testPredictions[i]) + " " + str(testLabels[i]) + "\n")
# #close file
# f.close()




## Clustering (K-means)

In [None]:
def calCentroid(x):
        centroid = np.mean(x, axis=0)
        return centroid

def initializeCentroids(dataRDD, k):
    centroids = dataRDD.takeSample(False, k, seed=42)
    return centroids

def assignCluster(x, centroids) -> int:
    distances = np.zeros(len(centroids))
    for i in range(len(centroids)):
        distances[i]= calculateDistance(x, centroids[i])
    cluster = np.argmin(distances)
    return cluster

def KMeans(iter: int = 5):
    

    k = 2
    centroids = initializeCentroids(dataRDD, k)
    centroids = np.array(centroids)



    print(f"Training KNN... {iter} iterations")
    for i in tqdm(range(iter)):
        #Train KNN
        fraudDetection = dataRDD.map(lambda x: (assignCluster(np.array(x),centroids), x))
        fraudDetection = fraudDetection.groupByKey().map(lambda x: (x[0], list(x[1])))
        fraudDetection = fraudDetection.reduceByKey(lambda x,y: x+y)
        fraudDetection = fraudDetection.map(lambda x: (x[0], calCentroid(np.array(x[1]))))
        fraudDetection = fraudDetection.collect()

        # print(fraudDetection.take(11))

        firstCentroid = fraudDetection[0]
        secondCentroid = fraudDetection[1]

        # print(firstCentroid)
        # print(secondCentroid)

        centroids = np.array([firstCentroid[1], secondCentroid[1]])
    
    return centroids

def KMeansWSS(centroids, dataRDD):
    WSS = 0
    data = dataRDD.collect()
    print("Calculating WSS...")
    for i in tqdm(range(len(data))):
        tstPt = np.array(data[i])
        cluster = assignCluster(tstPt, centroids)
        WSS += calculateDistance(tstPt, centroids[cluster]) ** 2

    return WSS


        



In [None]:

centroids = KMeans(2)


In [None]:
WSS = KMeansWSS(centroids, dataRDD)
print("WSS: ", WSS)