In [1]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

# Initialize findspark
import findspark
findspark.init()

# Create a PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
0% [Waiting for headers] [1 InRelease 14.2 kB/110 kB 13%] [Connected to cloud.r-project.org (52.85.1                                                                                                    Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [1 InRelease 110 kB/110 kB 100%] [Connected to cloud.r-project.org (52.85.151.129)] [Connecting t                                                                                                    Get:3 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
                                                                                                    Hit:4 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
                                                                                                    Get:5 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:6 https:

In [4]:
spark.stop()

In [26]:
sc.stop()

In [37]:
from pyspark import SparkContext
import math

# Initialize Spark context
sc = SparkContext("local", "KNN")

# Example data: Replace this with your actual data
data = [(2, [2.6, 8]), (1, [18.655, 11.20]), (3, [14, 5])]
query_point = [10.6, 1.5]
k = 2

# Create an RDD from the data
data_rdd = sc.parallelize(data)

# Calculate Euclidean distance function
def calculate_distance(point1, point2):
    if len(point1) != len(point2):
        raise ValueError("Points must have the same number of dimensions")

    squared_distances = [(a - b) ** 2 for a, b in zip(point1, point2)]
    euclidean_distance = math.sqrt(sum(squared_distances))
    return euclidean_distance

# Map function to calculate distances
def map_function(data_point):
    data_point_id, coordinates = data_point
    distance = calculate_distance(coordinates, query_point)
    print('==========================================================\n')
    print('RESULT OF MAP FUNCTION:')
    print(f'KEY: {data_point_id}\n VALUE: {distance}')
    return (data_point_id, distance)

# Apply map function
distances_rdd = data_rdd.map(map_function)
collected_distances = distances_rdd.collect()
print('==========================================================\n')
print('RESULT OF MAP FUNCTION:\n')
print(collected_distances)
# Reduce function to find top k distances
def reduce_function(x, y):
    # Combine distances and keep the smallest k
    combined_distances = sorted(x + y, key=lambda x: x[1])[:k]
    print('==========================================================\n')
    print('RESULT OF REDUCE FUNCTION:')
    print(f'KEY: {data_point_id}\n VALUE: {combined_distances}')
    return combined_distances

# Aggregate distances using reduce function
top_k_distances = distances_rdd.reduceByKey(reduce_function)

# Collect the results
results = top_k_distances.collect()

# Print the results
for result in results:
    data_point_id, top_k = result
    print(f"Data Point ID: {data_point_id}, Top K Distances: {top_k}")

# Stop Spark context
sc.stop()



RESULT OF MAP FUNCTION:

[(2, 10.307764064044152), (1, 12.608450539221701), (3, 4.879549159502341)]
Data Point ID: 2, Top K Distances: 10.307764064044152
Data Point ID: 1, Top K Distances: 12.608450539221701
Data Point ID: 3, Top K Distances: 4.879549159502341


In [41]:
sc.stop()