In [294]:
import numpy as np
import datetime
import random

from pyspark.mllib.random import RandomRDDs
from pyspark import SparkContext
import matplotlib.pyplot as plt

In [295]:
def customSplit(row):
    values = row[0]
    index = row[1]
    dataItems = values.split(',')
    for i in range(len(dataItems) - 1):
        dataItems[i] = float(dataItems[i])
    return (index, dataItems)

In [296]:
def loadData(path):
    dataFromText = sc.textFile(path)
    data = dataFromText.filter(lambda x: x is not None).filter(lambda x: x != "")
    dataZipped = data.zipWithIndex()
    return dataZipped.map(lambda x: customSplit(x))

In [297]:
def initCentroids(data, numClusters):
    sample = sc.parallelize(data.takeSample(False, numClusters))
    centroids = sample.map(lambda point: point[1][:-1])
    return centroids.zipWithIndex().map(lambda point: (point[1], point[0]))

In [298]:
def assginToCluster(data, centroids):
    # Calculate Cartesian product of centroids and data
    cartesianData = centroids.cartesian(data)
    cartesianDataDistances = cartesianData.map(lambda dataPoint: calculateDistance(dataPoint[0], dataPoint[1]))
    dataMinDistance = cartesianDataDistances.groupByKey().map(lambda x: (x[0], list(x[1]))).map(lambda row: minDist(row))
    return dataMinDistance

In [299]:
def calculateDistance(centroid, dataPoint):
    list1 = centroid[1]
    list2 = dataPoint[1][:-1]
    array1 = np.array(list1)
    array2 = np.array(list2)
    dist = np.linalg.norm(array1-array2)
    return (dataPoint[0], (centroid[0], dist))

In [300]:
def minDist(row):
    index = row[0]
    lst = row[1]
    minDist = float('inf')
    minPoint = None
    for elem in lst:
        centroidIndex = elem[0]
        distance = elem[1]
        if (distance < minDist):
            minDist = distance
            minPoint = (centroidIndex, distance)
    return (index, minPoint)

In [301]:
def reCalculating(iCentroid, clusterItems):
    allLists = []
    for element in clusterItems:
        # element = ([5.4, 3.7, 1.5, 0.2, 'Iris-setosa'], 3.7349698793966195)
        allLists.append(element[0][:-1])
    averageArray = list(np.average(allLists, axis=0))
    newCenteroid = (iCentroid, averageArray)
    return newCenteroid

In [326]:
def computeCentroids(dataMinDistance):
    databyCluster = dataMinDistance.join(data).map(lambda x: (x[1][0][0], (x[1][1], x[1][0][1])))
    databyCluster = databyCluster.groupByKey().map(lambda data: (data[0], list(data[1])))
    return databyCluster.map(lambda x: reCalculating(x[0], x[1]))

In [360]:
def hasConverged(centroids, newCentroids):
    centroidsData = centroids.join(newCentroids)
    centroidsDataBool = centroidsData.map(lambda cluster: cluster[1][0] == cluster[1][1])
    return all(item == True for item in centroidsDataBool.collect())

In [303]:
sc = SparkContext("local", "generator")

In [367]:
data = loadData('data/iris_clustering.dat')

In [368]:
data.collect()

[(0, [5.1, 3.5, 1.4, 0.2, 'Iris-setosa']),
 (1, [4.9, 3.0, 1.4, 0.2, 'Iris-setosa']),
 (2, [4.7, 3.2, 1.3, 0.2, 'Iris-setosa']),
 (3, [4.6, 3.1, 1.5, 0.2, 'Iris-setosa']),
 (4, [5.0, 3.6, 1.4, 0.2, 'Iris-setosa']),
 (5, [5.4, 3.9, 1.7, 0.4, 'Iris-setosa']),
 (6, [4.6, 3.4, 1.4, 0.3, 'Iris-setosa']),
 (7, [5.0, 3.4, 1.5, 0.2, 'Iris-setosa']),
 (8, [4.4, 2.9, 1.4, 0.2, 'Iris-setosa']),
 (9, [4.9, 3.1, 1.5, 0.1, 'Iris-setosa']),
 (10, [5.4, 3.7, 1.5, 0.2, 'Iris-setosa']),
 (11, [4.8, 3.4, 1.6, 0.2, 'Iris-setosa']),
 (12, [4.8, 3.0, 1.4, 0.1, 'Iris-setosa']),
 (13, [4.3, 3.0, 1.1, 0.1, 'Iris-setosa']),
 (14, [5.8, 4.0, 1.2, 0.2, 'Iris-setosa']),
 (15, [5.7, 4.4, 1.5, 0.4, 'Iris-setosa']),
 (16, [5.4, 3.9, 1.3, 0.4, 'Iris-setosa']),
 (17, [5.1, 3.5, 1.4, 0.3, 'Iris-setosa']),
 (18, [5.7, 3.8, 1.7, 0.3, 'Iris-setosa']),
 (19, [5.1, 3.8, 1.5, 0.3, 'Iris-setosa']),
 (20, [5.4, 3.4, 1.7, 0.2, 'Iris-setosa']),
 (21, [5.1, 3.7, 1.5, 0.4, 'Iris-setosa']),
 (22, [4.6, 3.6, 1.0, 0.2, 'Iris-setosa'])

In [369]:
centroids = initCentroids(data, 3)

In [370]:
centroids.collect()

[(0, [7.7, 2.6, 6.9, 2.3]),
 (1, [5.0, 2.0, 3.5, 1.0]),
 (2, [6.7, 3.3, 5.7, 2.5])]

In [371]:
iterations = 0
startTime = datetime.datetime.now()

while True:
    iterations = iterations + 1
    dataMinDistance = assginToCluster(data, centroids)
    newCenteroids = computeCentroids(dataMinDistance)
    
    if hasConverged(centroids, newCenteroids):
        break;
    centroids = sc.parallelize(newCenteroids.collect())

endTime = datetime.datetime.now()

print("Elapsed time: " + str(endTime - startTime))
print("Number of iterations: " + str(iterations))

Elapsed time: 0:01:35.040637
Number of iterations: 8


In [372]:
centroids.collect()

[(2,
  [5.901612903225807,
   2.748387096774194,
   4.393548387096774,
   1.4338709677419348]),
 (0,
  [6.849999999999999,
   3.073684210526315,
   5.742105263157894,
   2.071052631578947]),
 (1, [5.005999999999999, 3.418, 1.4640000000000006, 0.24399999999999994])]

In [373]:
sc.stop()