In [101]:
# !pip install pyspark

In [102]:
# !pip install findspark

In [103]:
# !git clone https://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:@github.com/BasmaElhoseny01/Big-Data-Project

In [104]:
import findspark
from pyspark.sql import SparkSession
import sys
import math

In [105]:
# check spark installation
findspark.init()

# Create Spark Session
spark=SparkSession.builder\
    .master("local[*]")\
    .appName("KmeansClustering")\
    .getOrCreate()

# Create Spark Context
sc=spark.sparkContext

In [106]:
# Read Input Data
# [FIX] Split Step
# data_rdd = sc.textFile("/content/Big-Data-Project/3d_10000n_7k.txt")
data_rdd = sc.textFile("3d_10000n_7k.txt")

In [107]:
# # Show first 10 rows
print(data_rdd.take(3))
print(data_rdd.count())

['-1.0472979515228584,5.231936223538009,6.608318888642993', '-10.522536074590798,0.7698579117624849,4.324456527058799', '-8.113011201832247,2.5982909987071263,-9.462184807455044']
10000


In [113]:
K=5
maxIterations = 15
distance_threshold= 1e-5

In [114]:
#CENTROIDS CONVERSION
centroids=[]

tmp = [line.split(",") for line in data_rdd.takeSample(False, K)]
for index, centroid in enumerate(tmp):
    centroids += [[index, [float(string) for string in centroid]]]

In [110]:
##POINTS CONVERSION
points_rdd = data_rdd.map(lambda line: [[float(string) for string in line.split(',')], 1])
points_rdd.cache()

print(points_rdd.take(3))

[[[-1.0472979515228584, 5.231936223538009, 6.608318888642993], 1], [[-10.522536074590798, 0.7698579117624849, 4.324456527058799], 1], [[-8.113011201832247, 2.5982909987071263, -9.462184807455044], 1]]


In [111]:
def get_closest_centroid(point, centroids):
    '''
    Function to get the closest centroid to a point
    point: list of float [x1, x2, x3, ..., xn]
    centroids: list of list [[index, [x1, x2, x3, ..., xn]], ...]

    return: closest centroid list [index, [x1, x2, x3, ..., xn]]
    '''
    closest_centroid = centroids[0] # Closest centroid Index
    closest_distance = float('inf') # distance between point and closest centroid
    for centroid in centroids:
        # Compute Euclidean distance between point and centroid rule: sqrt(sum((a-b)^2))
        distance = sum([(a - b) ** 2 for a, b in zip(point, centroid[1])]) ** 0.5

        if distance < closest_distance:
            closest_distance = distance
            closest_centroid = centroid
    return closest_centroid[0]

def sum_2_points(p1, p2):
    '''
    Function to sum 2 points
    P1: tuple of 2 elements points and count of points (point(added), no_points)
    P2: tuple of 2 elements points and count of points (point(added), no_points)
    '''
    # Element-wise Summation of the 2 points
    # Apply the square function to each element of the list using map
    # points_sum = list(map(sum,p1[0], p2[0]))
    # points_sum = [0.5,0.6,0.9]
    points_sum = [x + y for x, y in zip(p1[0], p2[0])]

    # Increment the total number of points
    points_counter = p1[1] + p2[1]

    # Return the sum of the 2 points and the sum of the counts
    return [points_sum, points_counter]

def average_points(p):
  '''
  p: tuple of 2 elements points and count of points (point(added), no_points)
  '''

  return list(map( lambda x: x / p[1], p[0]))

def ecludien_dist(p1,p2):
  '''
  '''
  squared_differences = [(x - y) ** 2 for x, y in zip(p1, p2)]
  distance_squared = sum(squared_differences)
  return math.sqrt(distance_squared)



In [115]:
iterations = 0
while(maxIterations > iterations):
    iterations += 1
    print("Iteration: " + str(iterations))



    # (1) Mapper Compute the closest centroid for each point :D
    # input is a point, output is a tuple with the index of the closest centroid and the point itself
    # (P,1) -> (i, P) : i is the index of the closest centroid to P
    closest_centroids_rdd = points_rdd.map(lambda point: (get_closest_centroid(point[0], centroids), point))
    # print(closest_centroids_rdd.take(10))

    # (2) Combine the points that belong to the same centroid (Partial Sum)   [Per Machine]
    # input is a tuple (i, P), output is a tuple (i, P1+P2+...+Pn)
    combined_points_rdd = closest_centroids_rdd.reduceByKey(lambda p1, p2 : sum_2_points(p1, p2)) #  In the reduceByKey operation, the lambda function is applied iteratively to pairs of values with the same key. If you have three points with the same key, the lambda function will be applied to the first two points, then the result of that operation will be combined with the third point, and so on.
    # print(combined_points_rdd.collect())

    # (3) Shuffle and sort [Not Needed]

    # (4) Reducer Compute the new centroids
    centroids_rdd=combined_points_rdd.mapValues(lambda centroid: average_points(centroid)).sortByKey(ascending=True)

    new_centroids=centroids_rdd.collect()
    # print(centroids)

    # Check Convergence
    convergedCentroids = 0
    for centroid in centroids:
      centroid_index=centroid[0]
      distance=ecludien_dist(centroid[1],new_centroids[centroid_index][1])

      if distance<distance_threshold:
        convergedCentroids+=1


    centroids=new_centroids

    # If no of converged centroids is more then 80% the  done
    if convergedCentroids > len(centroids)*80/100:
            print("Centroids converged")
            break


Iteration: 1
Iteration: 2
Iteration: 3
Iteration: 4
Iteration: 5
Iteration: 6
Iteration: 7
Iteration: 8
Iteration: 9
Iteration: 10
Iteration: 11
Iteration: 12
Iteration: 13
Iteration: 14
Iteration: 15


In [116]:
# Stop the SparkContext in Apache Spark.
print(centroids)
# sc.stop()

[(0, [8.540660063738132, 3.797135494664097, -10.016533581717015]), (1, [-9.023399286354179, -0.9239452138252954, 3.6051181686516967]), (2, [-3.890863163070286, -4.5981781188539825, 3.5875857472576986]), (3, [-8.46228786359154, 4.580668105683215, -9.634251258371473]), (4, [0.16452726549643057, 6.102692624340226, 5.320165920664669])]


In [117]:
# Save Centroids
with open('models/centroids.txt', 'w') as f:
    for centroid in centroids:
        f.write(','.join([str(x) for x in centroid[1]]) + '\n')

In [121]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.evaluation import MulticlassMetrics

# Assign each data point to its nearest centroid
nearest_centroids_rdd = points_rdd.map(lambda x: (get_closest_centroid(x[0], centroids), x[0]))
# print(nearest_centroids_rdd.take(3))
print(nearest_centroids_rdd.count())

# Compute the Silhouette Coefficient for each data point
def silhouette_coefficient(point, nearest_centroid, other_points_in_cluster):
    a = sum(Vectors.squared_distance(point, p) for p in other_points_in_cluster) / len(other_points_in_cluster)
    b = Vectors.squared_distance(point, nearest_centroid)
    return (b - a) / max(a, b)

# silhouette_scores_rdd = nearest_centroids_rdd.map(lambda x: silhouette_coefficient(x[0], x[1], nearest_centroids_rdd.filter(lambda y: y[0] == x[0]).map(lambda z: z[1]).collect()))

# Compute the average Silhouette Score
average_silhouette_score = silhouette_scores_rdd.mean()
print("Average Silhouette Score:", average_silhouette_score)

In [None]:
# Evaluate the model

# Compute shilouette score
# Compute the average distance between each point and all other points in the same cluster
# Compute the average distance between each point and all other points in the next nearest cluster
# Compute the silhouette score for each point
# Compute the average silhouette score for all points
# The silhouette score ranges from -1 to 1, where a high value indicates that the object is well matched to its own cluster and poorly matched to neighboring clusters.
