In [1]:
import os
import sys

spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
sys.path.insert(0, os.path.join(spark_home, 'python'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

import numpy as np
import pyspark as ps
# import math
import glob
import re
import time

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/

Using Python version 3.5.6 (default, Aug 26 2018 16:05:27)
SparkSession available as 'spark'.


In [4]:
def save_results(centroids):
    """
    param: centriods, a list containing final centroids
    return: a textfile 
    """
    file = open("final_centroids.txt", "w")
    for i in centroids:
        pt = ""
        for val in i:
            pt += str(val) + " "
        file.write(pt + "\n")
    file.close()
    return 1


def k_means(data_file, centroids_file, MAX_ITER = 100, tol = 0.001):
    """
    :param data_file: path of raw data
    :param centroids_file: path of initial guess of centroids
    :param MAX_ITER: maximum iterations
    :param tol: tolerance of convergence
    :return:
    """

    # Load the data
    data = sc.textFile(data_file).map(lambda line: np.array([float(x) for x in line.split(' ')])).cache()
    # load intial centriods and save it as a list
    centroids_0 = sc.textFile(centroids_file).map(lambda line: np.array([float(x) for x in line.split(' ')])).collect()

    for i in range(MAX_ITER):
        
        # cluster each point
        pt_in_group = data.map(lambda m: (np.argmin([np.linalg.norm(m - n) for n in centroids_0]), (m, 1)))
        # compute new centroids
        centroids_1 = pt_in_group.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])).sortByKey()
        centroids_1 = centroids_1.map(lambda x: x[1][0]/x[1][1]).collect()
        
        # err, compute the difference between consecutive steps (measured by l_2 norm).
        err = np.linalg.norm(np.array(centroids_1) - np.array(centroids_0))
        print(str(i)+","+str(err))
        
        # if err < tol, converge
        if err <= tol:
            print(str(i+1)+" iterations.\n"+"Error: "+str(err)+"Centroids:\n")
            save_results(centroids_1)
            break
        centroids_0 = centroids_1[:]
        if i == MAX_ITER - 1:
            save_results(centroids_1)
    return 1




In [5]:
if __name__ == '__main__':

    sc = ps.SparkContext.getOrCreate()
    data_file = './data.txt'
    centroids_file = './c1.txt'
    k_means(data_file, centroids_file, MAX_ITER=100, tol=0)
    sc.stop()

0,901.8791257276841
1,352.6586322602892
2,454.71095817407456
3,181.83152722638806
4,42.88763183011611
5,19.414023277368035
6,23.59732633918716
7,27.217845544153807
8,33.20675924342171
9,33.828662837388755
10,42.149134100214816
11,34.361986717389634
12,25.04486813148128
13,25.948897333259023
14,21.444573425945638
15,21.041865159592692
16,32.055791045604984
17,26.00658121441194
18,25.947903470628745
19,24.92252105537892
20,36.61756119713721
21,44.87698466220883
22,49.579550282260364
23,45.80372406811303
24,46.63843852305763
25,44.994700009216
26,51.64828761560805
27,157.72361613918832
28,210.14771973526976
29,157.1334896978539
30,212.63399334503703
31,383.8521942960784
32,1422.0502394924085
33,3432.9440898319463
34,1573.1769782957651
35,92.24187969236726
36,69.12261398243048
37,61.43691664930312
38,85.14211787468224
39,139.1202010185663
40,79.8311476084497
41,125.34583551279228
42,125.87944282875638
43,98.23507225220722
44,63.163344171140025
45,28.866089055204565
46,24.7095730653173
47,1