<a href="https://colab.research.google.com/github/Aditya0996/Spark-Kmeans-and-Kmedian/blob/main/Spark_Kmeans_and_Kmedian.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.express as px

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=ce31440468ce5968eba6167db7df09033d99f55bc94444aa14579f789d5c696f
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark import SparkContext, SparkConf
import pyspark

In [None]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")
# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [None]:
#Upload txt file under data
data = spark.read.csv('/content/data/data.txt', sep=' ', header=False)
c1 = spark.read.csv('/content/data/c1.txt', sep=' ', header=None)
c2 = spark.read.csv('/content/data/c2.txt', sep=' ', header=None)
# Convert the txt to RDD and then to lists
data_l = data.rdd.map(lambda row: [float(item) for item in row]).collect()
c1_l = c1.rdd.map(lambda row: [float(item) for item in row]).collect()
c2_l = c2.rdd.map(lambda row: [float(item) for item in row]).collect()

In [None]:
def compute_centroid(points):
    '''
    Given coordinates of points, returns the coordinates of the centroid
    '''
    return points.groupByKey().mapValues(lambda x: list(np.mean(list(x), axis=0))).collect()

In [None]:
def compute_median(points):
    '''
    Given coordinates of points, returns the coordinates of the median
    '''
    return points.groupByKey().mapValues(lambda x: list(np.median(list(x), axis=0))).collect()

Source: https://www.shiksha.com/online-courses/articles/how-to-compute-euclidean-distance-in-python/

In [None]:
def euclidean_distance(p, q):
    '''
    Given coordinates of points p and q, computes their euclidean (l2) distance.
    '''
    p = np.array(p)
    q = np.array(q)
    return np.sqrt(np.sum((p - q) ** 2))


Source: https://www.shiksha.com/online-courses/articles/all-about-manhattan-distance/

In [None]:
def manhattan_distance(p, q):
    '''
    Given coordinates of points p and q, computes their manhattan (l1) distance.
    '''
    p = np.array(p)
    q = np.array(q)
    return np.sum(np.abs(p - q))

In [None]:
def euclidean_cost(points, centroids):
    '''
    Given coordinates of points and centroids, compute the cost for an iteration when euclidean distance is used.
    '''
    def distance(x):
        return min([(euclidean_distance(x, y)) ** 2 for y in centroids])
    return points.map(distance).reduce(lambda a, b: a + b)

In [None]:
def manhattan_cost(points, medians):
    '''
    Given coordinates of points and cluster medians, compute the cost for an iteration when manhattan distance is used.
    '''
    def distance(c):
        return min([manhattan_distance(c, d) for d in medians])
    return points.map(distance).reduce(lambda a, b: a + b)

In [None]:
def plot_cost(cost_data_C1,cost_data_C2):
    '''
    Given the costs for each iteration, plot them
    '''
    iterations = list(range(1, len(cost_data_C1) + 1))

    cost_df = pd.DataFrame({'Iterations': iterations, 'C1': cost_data_C1, 'C2': cost_data_C2})

    # Create a line plot with Plotly
    fig = px.line(cost_df, x='Iterations', y=['C1', 'C2'], title='Cost vs. Iterations', markers=True, line_shape='linear')

    # Customize the plot
    fig.update_xaxes(title_text='Iterations')
    fig.update_yaxes(title_text='Cost')
    fig.update_layout(legend_title_text='Clusters')

    # Show the plot
    fig.show()

In [None]:
def find_closest_cluster(p, centroids):
    '''
    Given point p and coordinates of different centroids, determines the cluster of p.
    This can run in memory as the number of centroids is small.
    '''
    Euclideandistance = [euclidean_distance(p, y) for y in centroids]
    return Euclideandistance.index(min(Euclideandistance))

In [None]:
def find_closest_cluster_manhattan(p, centroids):
    '''
    Given point p and coordinates of different centroids, determines the cluster of p.
    This can run in memory as the number of centroids is small.
    '''
    Mandistance = [manhattan_distance(p, y) for y in centroids]
    return Mandistance.index(min(Mandistance))

In [None]:
def k_means(data, centroids, max_iter=20):
    current_centroids = centroids[:]
    Meancosts = []
    data_map = sc.parallelize(data)
    for iteration in range(max_iter):
        # Assign point p to the cluster with the closest centroid
        cluster_assignments = data_map.map(lambda x: (find_closest_cluster(x, current_centroids), x))
        # Recompute the centroid of c as the mean of all the data points assigned to c
        updated_centroids = compute_centroid(cluster_assignments)
        current_centroids = [centroid[1] for centroid in sorted(updated_centroids, key=lambda x: x[0])]
        Meancosts.append(euclidean_cost(data_map, current_centroids))

    return current_centroids, Meancosts

In [None]:
def k_medians(data, medians, max_iter=20):
    current_medians = medians[:]
    Medcosts = []
    data_rdd = sc.parallelize(data)
    for iteration in range(max_iter):
        # Assign point p to the cluster with the closest centroid
        cluster_assignments = data_rdd.map(lambda x: (find_closest_cluster_manhattan(x, current_medians), x))
        # Recompute the centroid of c as the mean of all the data points assigned to c
        updated_medians = compute_median(cluster_assignments)
        current_medians = [centroid[1] for centroid in sorted(updated_medians, key=lambda x: x[0])]
        Medcosts.append(manhattan_cost(data_rdd, current_medians))
    return medians, Medcosts

In [None]:
kmeans_centroids_c1, kmeans_costs_c1 = k_means(data=data_l, centroids=c1_l)
kmeans_centroids_c2, kmeans_costs_c2 = k_means(data=data_l, centroids=c2_l)
kmedians_centroids_c1, kmedians_costs_c1 = k_medians(data=data_l, medians=c1_l)
kmedians_centroids_c2, kmedians_costs_c2 = k_medians(data=data_l, medians=c2_l)

In [None]:
p_kmeans_c1 = (kmeans_costs_c1[0] - kmeans_costs_c1[10]) / kmeans_costs_c1[0]
p_kmeans_c2 = (kmeans_costs_c2[0] - kmeans_costs_c2[10]) / kmeans_costs_c2[0]
p_kmedians_c1 = (kmedians_costs_c1[0] - kmedians_costs_c1[10]) / kmedians_costs_c1[0]
p_kmedians_c2 = (kmedians_costs_c2[0] - kmedians_costs_c2[10]) / kmedians_costs_c2[0]

In [None]:
plot_cost(kmeans_costs_c1, kmeans_costs_c2)

In [None]:
print(f"Kmean percentage change in cost after 10 iterations for c1: {p_kmeans_c1:.2%}")
print(f"Kmean percentage change in cost after 10 iterations for c2: {p_kmeans_c2:.2%}")


Kmean percentage change in cost after 10 iterations for c1: 10.18%
Kmean percentage change in cost after 10 iterations for c2: 60.66%


In [None]:
#Plot for K Median
plot_cost(kmedians_costs_c1, kmedians_costs_c2)

In [None]:
print(f"Kmean percentage change in cost after 10 iterations for c1: {p_kmedians_c1:.2%}")
print(f"Kmean percentage change in cost after 10 iterations for c2: {p_kmedians_c2:.2%}")

Kmean percentage change in cost after 10 iterations for c1: 4.33%
Kmean percentage change in cost after 10 iterations for c2: 38.83%
