# **Mount Drive**

In [40]:
from google.colab import drive
import os
drive.mount('/content/drive')
os.chdir('/content/drive/My Drive/Colab Notebooks/Spark')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### In order for Python to find the Spark, download the findspark library and start it with findspark.init() function.

In [41]:
# !pip install pyspark

### In order to work with RDDs, we need to create a SparkContext.

In [42]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import time
import numpy as np
import random


## Since we write local [*] in the master, it will use all cores in our machine. If we said local [4] it will work with 4 cores.

## getOrCreate is used to create a SparkSession if not present.

In [43]:
spark=SparkSession.builder\
    .master("local[*]")\
    .appName("KMeans")\
    .getOrCreate()

In [44]:
sc=spark.sparkContext

## Read Data - in.csv

In [45]:
input_file="kmeans.txt"

In [46]:
points_rdd = sc.textFile(input_file).map(lambda line: tuple(map(float, line.strip().split())))

In [47]:
# points_rdd.take(100)
points_rdd.take(5)

[(664159.0, 550946.0),
 (665845.0, 557965.0),
 (597173.0, 575538.0),
 (618600.0, 551446.0),
 (635690.0, 608046.0)]

In [48]:
points_rdd.count()

32768

In [49]:
# Function to find the closest centroid for each point
def closest_centroid(point):
    min_distance = float("inf")
    closest_centroid_index = 0
    for i, centroid in enumerate(centroids):
        distance = sum((a - b) ** 2 for a, b in zip(point, centroid))
        if distance < min_distance:
            min_distance = distance
            closest_centroid_index = i
    return closest_centroid_index, (point, 1)

In [50]:
total_map_time = 0
total_reduce_time = 0
MAX_ITER = 2
# Perform iterations of K-means
for iteration in range(MAX_ITER):  # Specify the number of iterations
    # Assign each point to the closest centroid
    start_map_time = time.time()
    closest_centroids = points_rdd.map(closest_centroid)
    end_map_time = time.time()

    # Update centroids by taking the mean of points assigned to each centroid
    start_reduce_time = time.time()
    centroids = closest_centroids \
        .reduceByKey(lambda a, b: (tuple(sum(x) for x in zip(a[0], b[0])), a[1] + b[1])) \
        .map(lambda x: (x[0], tuple(c / x[1][1] for c in x[1][0]))) \
        .map(lambda x: x[1]) \
        .collect()
    end_reduce_time = time.time()

    # Print total map time and reduce time
    total_map_time += (end_map_time - start_map_time) * 1e6  # Convert to μs
    total_reduce_time += (end_reduce_time - start_reduce_time) * 1e6  # Convert to μs


    # Print centroids for each iteration
    print(f"Iteration {iteration + 1}:")
    for centroid in centroids:
        print(centroid)

Iteration 1:
(720415.5336787564, 469765.76937248127)
(289889.0842317184, 525228.2178204962)
Iteration 2:
(720415.5336787564, 469765.76937248127)
(289889.0842317184, 525228.2178204962)


In [53]:
# Print total map and reduce time with 5 digits and in μs
print(f"*KMeans Summery for {MAX_ITER} iterations: (in microsecond)*")
print("Total Map Time:", "{:.5f}".format(total_map_time), "μs")
print("Total Reduce Time:", "{:.5f}".format(total_reduce_time), "μs")
print("Total Time:", "{:.5f}".format(total_map_time +total_reduce_time), "μs")

*KMeans Summery for 2 iterations: (in microsecond)*
Total Map Time: 93.69850 μs
Total Reduce Time: 1823235.51178 μs
Total Time: 1823329.21028 μs
