In [1]:
!pip install --quiet mrjob==0.7.4

In [2]:
import numpy as np
import heapq
from heapq import heappush, heappop
import collections
import csv
import sys
from pyspark import SparkContext, SparkConf
import time

In [3]:
from google.colab import drive
drive.mount("/content/gdrive")

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [4]:
config = SparkConf().setAppName("knn_classification").setMaster("local")
spark_context = SparkContext(conf=config)

In [5]:
train_dataset_path = '/content/gdrive/MyDrive/BigData/Dataset_Project/training_data_1.csv'
test_dataset_path = '/content/gdrive/MyDrive/BigData/Dataset_Project/testing_data_1.csv'

max_train_values = []
min_train_values = []
max_test_values = []
min_test_values = []
train_sample_count = 0
test_sample_count = 0


In [6]:
with open(train_dataset_path, mode='r') as train_file:
    train_reader = csv.reader(train_file)
    first_row = True

    for row in train_reader:
        if row[0][0].isdigit():
            train_sample_count += 1
            for i in range(len(row)):
                if first_row:
                    max_train_values.append(float(row[i]))
                    min_train_values.append(float(row[i]))
                else:
                    max_train_values[i] = max(max_train_values[i], float(row[i]))
                    min_train_values[i] = min(min_train_values[i], float(row[i]))
            first_row = False

# Reading and processing the test dataset
with open(test_dataset_path, mode='r') as test_file:
    test_reader = csv.reader(test_file)
    first_row = True

    for row in test_reader:
        if row[0].isdigit():
            test_sample_count += 1
            for i in range(1, len(row)):
                if first_row:
                    max_test_values.append(float(row[i]))
                    min_test_values.append(float(row[i]))
                else:
                    max_test_values[i-1] = max(max_test_values[i-1], float(row[i]))
                    min_test_values[i-1] = min(min_test_values[i-1], float(row[i]))
            first_row = False

# Displaying min and max values for training and test datasets
print(max_train_values)
print(min_train_values)
print(max_test_values)
print(min_test_values)


[4.0, 13.0, 4.0, 13.0, 4.0, 13.0, 4.0, 13.0, 4.0, 13.0, 6.0]
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0]
[4.0, 13.0, 4.0, 13.0, 4.0, 13.0, 4.0, 13.0, 4.0, 13.0]
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]


In [7]:
#  normalization function for the training and test dataset
#  using the formula : (curr_val[i] -min(ith column))/(max(ith column) - min(ith column))

# Normalization functions for training and test data
def normalize_train(row, min_values, max_values):
    row_elements = row.split(",")
    normalized_row = []
    if row_elements[0][0].isdigit():
        normalized_row.append(int(row_elements[-1]))
        for i in range(len(row_elements)):
            normalized_row.append((float(row_elements[i]) - min_values[i]) / (max_values[i] - min_values[i]))
    return normalized_row

def normalize_test(row, min_values, max_values):
    row_elements = row.split(",")
    normalized_row = []
    if row_elements[0][0].isdigit():
        normalized_row.append(int(row_elements[0]))
        for i in range(1, len(row_elements)):
            normalized_row.append((float(row_elements[i]) - min_values[i-1]) / (max_values[i-1] - min_values[i-1]))
    return normalized_row

In [8]:
# Setting mapper count and features count
mapper_count = 64
feature_count = 10

train_rdd_raw = spark_context.textFile(train_dataset_path, mapper_count)
test_rdd_raw = spark_context.textFile(test_dataset_path)

# Normalizing both datasets
train_rdd = train_rdd_raw.map(lambda line: normalize_train(line, min_train_values, max_train_values)).cache()
test_rdd = test_rdd_raw.map(lambda line: normalize_test(line, min_test_values, max_test_values)).cache()

In [9]:
def calculate_iterations(train_samples, test_samples, memory_limit):
    iterations = 0
    train_weight = (8 * train_samples * feature_count) / (mapper_count * 1024.0 * 1024.0)
    test_weight = (8 * test_samples * feature_count) / (1024.0 * 1024.0)
    if (train_weight + test_weight < memory_limit * 1024.0):
        iterations = 1
    else:
        if train_weight >= memory_limit * 1024.0:
            print("Training weight exceeds memory limit. Exiting.")
            sys.exit(1)
        iterations = int(1 + (test_weight / ((memory_limit * 1024.0) - train_weight)))
    return iterations

In [10]:
available_memory = 0.2
iteration_count = calculate_iterations(train_sample_count, test_sample_count, available_memory)
test_rdd.partitionBy(iteration_count)

MapPartitionsRDD[9] at mapPartitions at PythonRDD.scala:160

In [11]:
def compute_knn(train_chunk, test_data, k_neighbors):
    knn_results = []

    # Calculating distances between train and test points
    for train_row in train_chunk:
        if len(train_row) > 0:
            train_point = np.array(train_row[1:11])
            for test_row in test_data:
                if len(test_row) > 0:
                    test_point = np.array(test_row[1:11])
                    distance = np.linalg.norm(train_point - test_point)
                    knn_results.append((test_row[0], (distance, train_row[0])))

    # Grouping by test_id and collecting the neighbors
    knn_by_test_point = collections.defaultdict(list)
    for test_id, (dist, train_id) in knn_results:
        knn_by_test_point[test_id].append((dist, train_id))

    # Sorting by test_id, then by distance, and retaining the top k_neighbors
    final_knn_results = []
    for test_id in sorted(knn_by_test_point):
        neighbors = knn_by_test_point[test_id]
        neighbors.sort(key=lambda x: x[0])
        final_knn_results.extend([(test_id, neighbor) for neighbor in neighbors[:k_neighbors]])

    return final_knn_results

In [12]:
def select_knn_results(test_row):
    max_heap = []
    heapq._heapify_max(max_heap)
    for dist_class in test_row[1]:
        heappush(max_heap, dist_class)
        if len(max_heap) > k_neighbors:
            heapq._heappop_max(max_heap)

    class_count = collections.defaultdict(int)
    predicted_class = -1
    max_count = 0
    for val in max_heap:
        class_count[val[1]] += 1
        if class_count[val[1]] > max_count:
            max_count = class_count[val[1]]
            predicted_class = val[1]
    return (test_row[0], predicted_class)

In [13]:
knn_start_time = time.time()
broadcast_test_data = test_rdd.glom().collect()
train_rdd_chunks = train_rdd.glom()

k_neighbors = 5

for test_partition in broadcast_test_data:
    knn_results = train_rdd_chunks.flatMap(lambda chunk: compute_knn(chunk, test_partition, k_neighbors))

    #reduce_start_time = time.time()

    grouped_results = knn_results.groupByKey().mapValues(list)
    predictions = grouped_results.map(lambda line: select_knn_results(line))

    #reduce_time = time.time() - reduce_start_time

    print(predictions.take(1000))

[(64, 1), (128, 1), (192, 0), (256, 0), (320, 0), (384, 0), (448, 1), (512, 0), (576, 1), (640, 0), (704, 1), (768, 1), (832, 1), (896, 0), (960, 1), (1024, 0), (1088, 1), (1152, 0), (1216, 1), (1280, 0), (1344, 0), (1408, 0), (1472, 0), (1536, 0), (1600, 0), (1664, 1), (1728, 0), (1792, 1), (1856, 1), (1920, 0), (1984, 0), (1, 1), (65, 0), (129, 0), (193, 0), (257, 0), (321, 0), (385, 1), (449, 1), (513, 0), (577, 0), (641, 1), (705, 1), (769, 1), (833, 0), (897, 1), (961, 1), (1025, 1), (1089, 0), (1153, 1), (1217, 0), (1281, 1), (1345, 0), (1409, 1), (1473, 0), (1537, 1), (1601, 1), (1665, 0), (1729, 1), (1793, 0), (1857, 0), (1921, 0), (1985, 0), (2, 1), (66, 0), (130, 0), (194, 1), (258, 3), (322, 0), (386, 0), (450, 1), (514, 1), (578, 1), (642, 0), (706, 0), (770, 1), (834, 0), (898, 0), (962, 0), (1026, 1), (1090, 0), (1154, 1), (1218, 0), (1282, 0), (1346, 0), (1410, 0), (1474, 1), (1538, 1), (1602, 0), (1666, 0), (1730, 1), (1794, 0), (1858, 1), (1922, 1), (1986, 0), (3, 0), 

In [14]:
print(f"kNN computation completed in {time.time() - knn_start_time:.2f} seconds.")

kNN computation completed in 94.75 seconds.
