In [None]:
"""
    2018/03/12, a local search algorithm for K-means with no outliers (LS).
    Based on (Kmeans Origin), this is a new version with class KMeans.
    
    Result :
        2018/03/13: SPARK 5063
"""

In [1]:
import random
import time
import math

    
def costCal(A,B):
    return sum((A[i]-B[i])**2 for i in range(len(A)-1))

def updateCluster(A,centerTemp):
    minDistance = 999999
    newCluster  = -1
    for ci in range(len(centerTemp)):
        c = centerTemp[ci]
        newDistance = sum((A[j]-c[j])**2 for j in range(len(A)-1))
        if newDistance < minDistance:
            minDistance = newDistance
            newCluster = ci
    return A + [newCluster+1]

class Kmeans():
    '''The local search algorithm for K-means with no outliers'''
    
    def __init__(self,k,dimension,threshold=1e-4,iterationTime=10):
        '''the number of clusters, dimensions, file path'''
        self.k             = k
        self.dimension     = dimension
        self.threshold     = threshold
        self.iterationTime = iterationTime
    
    
    # a file-> an RDD(with string lists)-> an RDD(with float lists) 
    def createRDD(self,path,splitChar):
        """create an RDD from a file"""
        # a str list => a int list => a float list
        self.data = sc.textFile(path).map(lambda line: line.split(splitChar)).map(lambda x : [float(a) for a in x] )
        self.size = (self.data).count()
        print(self.size)
        
    def originCluster(self):
        """print origin cluters"""
        print("Origin Clusters: ")
        for i in range(self.k):
            print("Cluster " + str(i+1) + " : " + str(self.data.filter(lambda x : x[-1]==i+1 ).count()))
    
    
    # find initial centers
    def randomCenters(self):
        """pick k center points randomly"""
        self.centers = self.data.takeSample(False,self.k,random.randint(1,1000))
        print(self.centers)
        
    def advancedCenters(self):
        """K-means++"""
        self.centers = []
    
    def swapUC(self,u,cent,i):
        return cent[:i] + [u] + cent[i+1:]
    
    #iterations
    def converge(self):
        iterCount = 0
        
        data = self.data.map(lambda x: updateCluster(x,self.centers))
        iterCount += 1
        self.cost = self.costKM(self.centers)
        print(self.cost)
        
        while(iterCount<=self.iterationTime):
            # a temporary (improved) set of centers
            newCenters = [0 for i in range(self.k)]
            for i in range(self.k):
                tempCount = data.filter(lambda x:x[-1]==i+1).count()
                newCen    = data.filter(lambda x:x[-1]==i+1).reduce(lambda x,y:[(x[j]+y[j]) for j in range(self.dimension)]+[0])
                newCen    = [newCenter[j]/tempCount for j in range(self.dimension)] + [i+1]
                newCenters[i] = newCen
            # for each center and non-center, perform a swap    
            uSet = data.collect()
            costTemp = self.cost
            for u in uSet:
                swapPos  = -1
                for x in range(k):
                    cos = costKM(swap(u,newCenters,x))
                    if cos < costTemp:
                        swapPos = x
                        costTemp = cos
                if swapPos != -1 :
                    newCenters[i] = u  #update the temp solution
            del uSet
            
            if newCenters == self.centers:
                break
            self.centers = copy.deepcopy(newCenters)
            del newCenters
            data = data.map(lambda x:updateCluster(x))
            costTemp = self.costKM(self.centers)
            iterCount += 1
            if costTemp >= (1-threshold)*self.cost :
                self.cost = costTemp
                print("Final cost: " + str(self.cost))
                break
    
    # calculation method   
    def costKM(self,centerTemp):
        # centerTemp are supposed to be new centers
        # sum(x[i]-c[i]**2 for i in range(len(x)-1)) is the distance between two points
        # min(distance(x,c) for c in centerTemp) is the distance between a point and center points
        costSum = self.data.map(lambda x:(min(sum((x[i]-c[i])**2 for i in range(len(x)-1)) for c in centerTemp))).reduce(lambda x,y : x+y)
        return costSum
    
    # what about accuracy?
    def compareResult(self):
        for i in range(k):
            cnt = data.filter(lambda x: x[-1]==i+1).count()
            print(str(i+1) + " : " + cnt)
        return 0              
                         
                         

In [2]:
myKmeans = Kmeans(2,3)
myKmeans.createRDD("Skin_NonSkin.txt","\t")
myKmeans.originCluster()
myKmeans.randomCenters()
myKmeans.converge()
myKmeans.compareResult()

245057
Origin Clusters: 
Cluster 1 : 50859
Cluster 2 : 194198
[[201.0, 200.0, 162.0, 2.0], [169.0, 167.0, 119.0, 2.0]]
3593915563.0


Traceback (most recent call last):
  File "/home/kana/spark/python/pyspark/cloudpickle.py", line 235, in dump
    return Pickler.dump(self, obj)
  File "/home/kana/anaconda2/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/home/kana/anaconda2/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/kana/anaconda2/lib/python2.7/pickle.py", line 568, in save_tuple
    save(element)
  File "/home/kana/anaconda2/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/kana/spark/python/pyspark/cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "/home/kana/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "/home/kana/anaconda2/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/home/kana/anaconda2/lib/python2

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.