In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=8d4c8e9e091c4f0c2b45c824046aaf9b040588f47226518f3315105d0992f152
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [55]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import heapq
from heapq import heapify, heappush, heappop
import sys
import math
import numpy as np
import pandas as pd

def calculate_min_max(data):
    # Calculate min and max values for each feature
    feature_min_max = [(float('inf'), float('-inf')) for _ in range(len(data[0].split(',')))]
    for line in data:
        features = [float(x) for x in line.split(',')]
        for i, value in enumerate(features):
            feature_min_max[i] = (min(feature_min_max[i][0], value), max(feature_min_max[i][1], value))
    return feature_min_max

def normalize(line, feature_min_max):
    # Normalize each feature in the line using (X - Xmin) / (Xmax - Xmin)
    features = [float(x) for x in line.split(',')]
    normalized_features = [(features[i] - feature_min_max[i][0]) / (feature_min_max[i][1] - feature_min_max[i][0])
                           for i in range(len(features)-1)]
    normalized_features.append(int(features[10]))
    return normalized_features

def distance(x,y):
    sum = 0.0
    for i in range(10):
      sum += (x[i]-y[i])*(x[i]-y[i])
    return math.sqrt(sum)

def kNN(training_set, test_set, k):

    result = []

    for i in range(len(test_set)):
      d = []
      c = []
      x = test_set[i][1]
      max_heap = []
      heapq.heapify(max_heap)
      for j in range(len(training_set)):
        y = training_set[j]
        dist = distance(x,y)
        heappush(max_heap,(-dist,(dist,y[10])))
        #heappush(max_heap,(dist,y[10]))
        if (len(max_heap) > k):
          heapq.heappop(max_heap)
      while max_heap:
        out = heapq.heappop(max_heap)
        d.append(-out[0])
        c.append(out[1][1])
      result.append((test_set[i][0],(d,c)))
    return result

def calIter(tr_weight, ts_weight, mem_allow):
    numIterations = 0;
    weightTrain = (8 * tr_weight * numFeatures) / (num_mappers * 1024.0 * 1024.0)
    weightTest = (8 * ts_weight * numFeatures) / (1024.0 * 1024.0)
    if (weightTrain + weightTest < mem_allow * 1024.0):
          numIterations = 1
    else:
      if (weightTrain >= mem_allow * 1024.0):
        print("Train weight bigger than lim-task. Abort")
        sys.exit(1)

      numIterations = int((1 + (weightTest / ((mem_allow * 1024.0) - weightTrain))))
    return numIterations

def combineResult(result1, result2):

    d1 = result1[0]
    d2 = result2[0]
    c1 = result1[1]
    c2 = result2[1]
    d = []
    c = []
    j1 = len(d1)-1
    j2 = len(d2)-1
    for i in range(len(d1)):
      if (j2 >= 0 and j1 >= 0 and d1[j1] < d2[j2]) or j2 < 0 :
        d.append(d1[j1])
        c.append(c1[j1])
        j1 = j1-1
      elif (j1 >= 0 and j2 >= 0 and d2[j2] <= d1[j1]) or j1 < 0:
        d.append(d2[j2])
        c.append(c2[j2])
        j2 = j2-1
    return (d,c)

def calculateRightPredicted(result, test_set):

    output = []
    first = test_set[0][0]
    for i in range(len(result)):
      c = result[i][1][1]
      index = result[i][0]-first
      count = np.zeros(10)
      maxVotes = 0
      predicted = -1
      for j in range(k):
        count[c[j]] = count[c[j]]+1
        if count[c[j]] > maxVotes:
          maxVotes = count[c[j]]
          predicted = c[j]
      actual = test_set[index][1][10]
      output.append((actual,predicted))
    return output


def calculateConfusionMatrix(right_predicted_classes):
    actual_list = []
    predicted_list = []

    for i in range(len(right_predicted_classes)):
        for j in range(len(right_predicted_classes[i])):
            actual_list.append(right_predicted_classes[i][j][0])
            predicted_list.append(right_predicted_classes[i][j][1])

    # Create Series from lists
    y_actu = pd.Series(actual_list, name='Actual', dtype=int)
    y_pred = pd.Series(predicted_list, name='Predicted', dtype=int)
    df_confusion = pd.crosstab(y_actu, y_pred, margins=True)
    return df_confusion


if __name__ == "__main__":
    conf = SparkConf().setAppName("kNN-IS")
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)

    # Load training dataset with a specified number of partitions (mappers)
    num_mappers = 2  # Set the desired number of mappers
    numFeatures = 10
    k = 1
    TR_RDD_raw = sc.textFile("/content/drive/MyDrive/datasets/poker-hand-training-true.data", num_mappers)
    feature_min_max = calculate_min_max(TR_RDD_raw.collect())  # Collect data to calculate min and max
    TR_RDD = TR_RDD_raw.map(lambda line: normalize(line, feature_min_max)).cache()
    #TR_RDD.collect()
    #print(TR_RDD.take(100))

    # Load and zip test dataset with index
    TS_RDD_raw = sc.textFile("/content/drive/MyDrive/datasets/test1.data")
    TS_RDD_raw_with_index = TS_RDD_raw.zipWithIndex()
    TS_RDD = TS_RDD_raw_with_index.map(lambda x: (x[1], normalize(x[0], feature_min_max))).cache()

    #TS_RDD.collect()
    #print(TS_RDD.take(100))

    # Get weights for iterations
    tr_weight = TR_RDD.count()
    ts_weight = TS_RDD.count()
    mem_allow = 0.2
    iterations = calIter(tr_weight, ts_weight, mem_allow)

    # Range partitioning for TS_RDD
    TS_RDD.partitionBy(iterations,partitionFunc=range)
    all_partitions = TS_RDD.glom().collect()
    #print(len(all_partitions[0]))

    right_predicted_classes = []

    for i in range(len(all_partitions)):
        # Broadcast TS_i
        TS_i = spark.sparkContext.broadcast(all_partitions[i])

        # MapPartition to perform kNN
        resultKNN = TR_RDD.mapPartitions(lambda tr_partition: kNN(list(tr_partition), TS_i.value, k))
        #resultKNN.collect()
        #print(resultKNN.take(10))

        # ReduceByKey to combine results
        result = resultKNN.reduceByKey(lambda result1,result2: combineResult(result1,result2)).collect()
        #print(result)

        # Calculate right predicted classes for this iteration
        right_predicted_classes.append(calculateRightPredicted(result,TS_i.value))

    #print(right_predicted_classes)

    # Calculate confusion matrix
    confusion_matrix = calculateConfusionMatrix(right_predicted_classes)
    print(confusion_matrix)

    # Stop the Spark context
    sc.stop()


Predicted   0   1  2  3  4  8  9  All
Actual                               
0          30  10  1  0  0  0  0   41
1          13  20  2  0  0  0  0   35
2           2   3  3  0  0  0  0    8
3           1   1  0  2  0  0  0    4
4           0   0  0  0  1  0  0    1
6           1   0  0  0  0  0  0    1
8           0   0  0  0  0  5  0    5
9           0   0  0  0  0  0  5    5
All        47  34  6  2  1  5  5  100


In [42]:
sc.stop()