In [1]:
!pip install --quiet pyspark

[K     |████████████████████████████████| 281.4 MB 47 kB/s 
[K     |████████████████████████████████| 199 kB 54.8 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
import numpy as np
import heapq
from heapq import heapify, heappush, heappop
import collections
import csv
import sys

In [3]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("word count").setMaster("local")
sc = SparkContext(conf=conf)

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
train_ds = '/content/drive/MyDrive/datasets/Datasets/traindata_medium.csv'
test_ds =  '/content/drive/MyDrive/datasets/Datasets/test_set_med.csv'

#  different size of data that we have used for the experiments :

# train_ds = '/content/drive/MyDrive/datasets/Datasets/train_set.csv'
# test_ds =  '/content/drive/MyDrive/datasets/Datasets/test_set_small.csv'
# test_ds = '/content/drive/MyDrive/datasets/Datasets/test_set_medium.csv'
# test_ds = '/content/drive/MyDrive/datasets/Datasets/test_set_large1.csv'
# test_ds = '/content/drive/MyDrive/datasets/Datasets/test_set_large2.csv'

#  (curr-min(col))/(max(col) - min(col))

In [6]:
#  for the normalization : find maximum and minimum values for each feature
mx_val_tr = [] 
mn_val_tr = []
mx_val_ts = []
mn_val_ts = []
numTrainSamples = 0
numTestSamples = 0
# opening the CSV file
with open(train_ds, mode ='r')as file:
   
  # reading the CSV file
  csvFile = csv.reader(file)
  enter = False

  # displaying the contents of the CSV file
  for row_list in csvFile:
      if(row_list[0][0].isdigit()): 
        numTrainSamples += 1  # calculating no of rows
        for i in range(0,len(row_list)): # iterating over rows
          if(enter==False):
            mx_val_tr.append(float(row_list[i]))
            mn_val_tr.append(float(row_list[i]))
          else:
            mx_val_tr[i] = max(mx_val_tr[i],float(row_list[i])) # updating maximum value for ith column
            mn_val_tr[i] = min(mn_val_tr[i],float(row_list[i])) # updating minimum value for ith column
        enter = True

with open(test_ds, mode ='r')as file:
   
  # reading the CSV file
  csvFile = csv.reader(file)
  enter = False

  # displaying the contents of the CSV file
  for row_list in csvFile:
    if(row_list[0].isdigit()):
      numTestSamples += 1 # calculating no of rows
      for i in range(1,len(row_list)): # iterating over rows
        if(enter==False):
          mx_val_ts.append(float(row_list[i]))
          mn_val_ts.append(float(row_list[i]))
        else:
          mx_val_ts[i-1] = max(mx_val_ts[i-1],float(row_list[i])) # updating maximum value for ith column
          mn_val_ts[i-1] = min(mn_val_ts[i-1],float(row_list[i])) # updating minimum value for ith column
      enter = True

#  get maximum and minimum for each column of the datasets
print(mx_val_tr)
print(mn_val_tr)
print(mx_val_ts)
print(mn_val_ts)

[4.0, 13.0, 4.0, 13.0, 4.0, 13.0, 4.0, 13.0, 4.0, 13.0, 6.0]
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0]
[4.0, 13.0, 4.0, 13.0, 4.0, 13.0, 4.0, 13.0, 4.0, 13.0]
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]


In [7]:
#  normalization function for the training and test dataset
#  using the formula : (curr_val[i] -min(ith column))/(max(ith column) - min(ith column))

def normalize_tr(row,mn_val_tr,mx_val_tr):
    row_list = row.split(",")
    list_ans = []
    if(row_list[0][0].isdigit()):
      list_ans.append(int(row_list[-1])) # put first entry as hand for the simplicity
      for i in range(0,len(row_list)): # iterating over each row and normalizing it
        list_ans.append((float(row_list[i]) - mn_val_tr[i])/(mx_val_tr[i]-mn_val_tr[i]))
    return list_ans

def normalize_ts(row,mn_val_ts,mx_val_ts):
    row_list = row.split(",")
    list_ans = []
    if(row_list[0][0].isdigit()):
      list_ans.append(int(row_list[0])) # put first entry as test id for the simplicity
      for i in range(1,len(row_list)): # iterating over each row and normalizing it
        list_ans.append((float(row_list[i]) - mn_val_ts[i-1])/(mx_val_ts[i-1]-mn_val_ts[i-1]))
    return list_ans

In [8]:
# different number of mappers
# number_map = 10
number_map = 50
# number_map = 100
# number_map = 200

numFeatures = 10 # number of columns used to find euclidean distance

tr_rdd_raw = sc.textFile(train_ds,number_map)
ts_rdd_raw = sc.textFile(test_ds)

#  normalizing the both dataset
tr_rdd = tr_rdd_raw.map(lambda line : normalize_tr(line,mn_val_tr,mx_val_tr)).cache()
ts_rdd = ts_rdd_raw.map(lambda line : normalize_ts(line,mn_val_ts,mx_val_ts)).cache()

# print(ts_rdd.take(100))

In [9]:
#  prefine function taken from github repository
def calIter(numTrainSmps,numTestSmps,memAllow):
  numIterations = 0;
  weightTrain = (8 * numTrainSmps * numFeatures) / (number_map * 1024.0 * 1024.0)
  weightTest = (8 * numTestSmps * numFeatures) / (1024.0 * 1024.0)
  # print(weightTrain," ",weightTest)
  if (weightTrain + weightTest < memAllow * 1024.0):
        numIterations = 1
  else: 
    if (weightTrain >= memAllow * 1024.0):
      print("Train wight bigger than lim-task. Abort")
      sys.exit(1)

    numIterations = int((1 + (weightTest / ((memAllow * 1024.0) - weightTrain))))
  return numIterations

In [12]:
avail_mem = 0.2
num_of_iter = calIter(numTrainSamples,numTestSamples,avail_mem) # get number  of iterations
ts_rdd.partitionBy(num_of_iter,partitionFunc=range) # divide test dataset into num_of_iter parts

MapPartitionsRDD[13] at mapPartitions at PythonRDD.scala:145

In [13]:
def computekNN(row,all_ts,k):
  info = []
  if(len(row) > 0):
    point1 = np.array((row[1],row[2],row[3],row[4],row[5],row[6],row[7],row[8],row[9],row[10])) # row of training dataset 
    for i in range(0,len(all_ts)):
      if(len(all_ts[i])>0):
        point2 = np.array((all_ts[i][1],all_ts[i][2],all_ts[i][3],all_ts[i][4],all_ts[i][5],all_ts[i][6],all_ts[i][7],all_ts[i][8],all_ts[i][9],all_ts[i][10])) #row of test dataset
        dist = np.linalg.norm(point1 - point2) # euclidean distance

        info.append((all_ts[i][0],(dist,row[0]))) # return as list ((test_id,(distance,class)))
  return info

In [14]:
def CombineResults(row): 
  #  input row : (key = test_id,value = [(distance1,class1),(distance2,class2),...,(distancen,classn)])
  info = []
  max_heap = [] # initialize heap to find top k
  heapq._heapify_max(max_heap)
  for i in row[1]:
    heappush(max_heap,(i[0],i[1]))
    if (len(max_heap) > k): # pop if exceed k size
      heapq._heappop_max(max_heap)

  count = collections.defaultdict(lambda: 0) # initialize dictionary to count voting for each class in top k
  ans = -1
  curr_max_count = 0 
  for val in max_heap:
    count[val[1]]+=1 # update count for each class
    if(count[val[1]] > curr_max_count): # update answer if get more mejority voting
      curr_max_count = count[val[1]]
      ans = val[1]
  return (row[0],ans) # (test_id,predicted_class)

In [15]:
broad_cast_data = ts_rdd.glom().collect() # collect list of distributed object
k = 10 # set k values
for ts_data in broad_cast_data: 
  resultkNN = tr_rdd.flatMap(lambda line : computekNN(line,ts_data,k))  # flatmap return -> list of((test_id,(distance,class)))
  res = resultkNN.groupByKey().mapValues(list)  # grouping it by key return -> list of((key = test_id,value = list of((distance,class))))
  get_ans = res.map(lambda line : CombineResults(line)) # return -> list of((test_id,predicted class))
  print(get_ans.take(1000))

[(50, 0), (100, 0), (150, 1), (200, 0), (250, 0), (300, 0), (350, 0), (400, 0), (450, 1), (500, 0), (550, 0), (600, 0), (650, 0), (700, 1), (750, 0), (800, 0), (850, 0), (900, 0), (950, 0), (1000, 0), (1050, 0), (1100, 0), (1150, 0), (1200, 0), (1250, 0), (1300, 0), (1350, 0), (1400, 0), (1450, 0), (1500, 0), (1550, 0), (1600, 0), (1650, 0), (1700, 0), (1750, 0), (1800, 0), (1850, 0), (1900, 0), (1950, 0), (1, 0), (51, 0), (101, 0), (151, 0), (201, 0), (251, 0), (301, 0), (351, 0), (401, 0), (451, 0), (501, 0), (551, 1), (601, 0), (651, 0), (701, 0), (751, 0), (801, 0), (851, 0), (901, 0), (951, 0), (1001, 0), (1051, 0), (1101, 1), (1151, 0), (1201, 0), (1251, 0), (1301, 0), (1351, 0), (1401, 0), (1451, 0), (1501, 0), (1551, 1), (1601, 0), (1651, 0), (1701, 0), (1751, 1), (1801, 0), (1851, 0), (1901, 0), (1951, 0), (2, 0), (52, 0), (102, 0), (152, 0), (202, 0), (252, 0), (302, 0), (352, 0), (402, 0), (452, 0), (502, 0), (552, 1), (602, 0), (652, 0), (702, 0), (752, 0), (802, 0), (852, 

In [None]:
sc.stop()