In [4]:
import math 
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("K-means")
sc = SparkContext.getOrCreate()

# data = sc.textFile().split()

# example of data
data = ((2,1), (4,5), (1,3), (-2,1), (5,3), (1,1), (2,2.5), (3,5))
# we use parallelize to partition the tuple 
data_rdd = sc.parallelize(data).cache()


def closestPoint(p, centers):
  """
  calculates the distance between data points and the current 
  centroids and returns the index of the closest centroid which 
  represents the the cluster number
  """
  bestIndex = 0
  closest = float(10000) # set it to a very high value 
  for i, center in enumerate(centers):
    # Or we could you use distance(p, centers[i])
    tempDist = math.sqrt(((p[0]-center[0])**2)+((p[1]-center[1])**2)) 
    if tempDist < closest:
      closest = tempDist
      bestIndex = i
  return bestIndex
  

# kPoints are the intial centeroids where k is the number the centroids/clusters
# otherwise we could use (randint(A,B), randint(A,B)) to select each centroid 
# assuming the data comes in two dimensions and the points coordinates are integers

K = 2  # number the centroids/clusters
kPoints = data_rdd.takeSample(False, K, 1)  # intial centroids
tempDist = 50.0  
convergeDist = 1.0

while tempDist > convergeDist:
  # assigning points to clusters (cluster number, ((px,py), 1))
  closest = data_rdd.map(lambda p: (closestPoint(p, kPoints), (p, 1)))
  # combine into (points sum, points count)
  pointStats = closest.reduceByKey(lambda p1_c1, p2_c2: (p1_c1[0] + p2_c2[0], p1_c1[1] + p2_c2[1]))
  # take the new centroids as the average of each cluster and collect the results as 
  # (cluster number, new centroid) at the driver node
  newPoints = pointStats.map(lambda st: (st[0], (st[1][0][0] / st[1][1], st[1][0][1] / st[1][1]))).collect()
  # calculate the distance between the old centroids and the new ones to measure convergance
  tempDist = sum(math.sqrt(((kPoints[iK][0]-newp[0])**2) +
             ((kPoints[iK][1]-newp[1])**2)) for (iK, newp) in newPoints)
  # update to the new centroids
  for (iK, newp) in newPoints:
    kPoints[iK] = newp
      

print("Final centers: " + str(kPoints))

Final centers: [(-2.0, 1.0), (0.2857142857142857, 0.14285714285714285)]
