# Skalable K-means++

In [31]:
from __future__ import division
import os
import sys
import glob
import matplotlib.pyplot as plt
import numpy as np
import numpy.random as nrd
from sklearn.cluster import KMeans
import timeit
%matplotlib inline
%precision 4
plt.style.use('ggplot')

## 1. Background

The paper I used is skalable K-means, which introduce a parallelizable initialization algorithm, k-means|| for k-means clustering.

K-means is a popular method to separate data into k groups to achieve the minimum distance between each point and its cluster center. Two major theoretic (and practice) downsides of K-means are that the final result can be very bad compared to the global optimal and in the worst case running time can be exponential. K-means++ is designed to improve the performance of K-means though a better initialization approach.

K-means ++, an algorithm to choose the initial values of k-means clustering, has been proved that the initial values generated by this algorithm is close to global optimum. It is guaranteed to find a solution that is O(log k) competitive to the optimal k-means solution. However, it's not suitable for massive data due to its inherent sequential essence. We have to pass k times over the whole data set, which will dramatically slow down the speed to even when k is large in a data set.

Therefore, the author come up with a updated initialization algorithm, k-means||, to address this problem, which only needs logarithmic number of passes. (In fact, in practice, the number of rounds can be very small if we want to get more points in one iteration.) This really interests me, because k-means clustering is really widely used in the data analysis. It's meaningful to figure out how to apply this to large-scale data sets which are increasingly prevalent. Besides, the algorithm is parallelizable, so that I can explore more and learn more about parallelization in python programming.

### 1.1 Algorithm (Pseudocode)

###k-means

Let X={$x_1,x_2,...x_n$} be the set of points in d-dimensional Euclidean space, and k is the number of clusters we will divide X into. This starts with randomly choosing k points from X as initial values of centers $c_1,...,c_k$. In each iteration, each point $x_i$ is assigned to the closest cluster by calculating $\text{arg min}_j d(x_i,c_j)$. The calculate the new centroids of the observations in the new clusters as the k centers for the next iteration.
$$centroid(X) = \frac{1}{|X|}\sum_{x \in X} x$$
The iteration is repeated until a stable set of centers is obtained.

###k-means++

The main idea of k-means++ is to choose centers one by one in a controlled fashion, where the current set of chosen centers will influence the choice of next center. The algorithm is shown as below:
1. C <- sample a point uniformly at random from X
2. while |C|<k, do:
3. $\qquad$ sample x $\in$ X with probability $\frac{d^2(x,c)}{\phi_x(c)}$, where
$$\phi_Y(C) = \sum_{y\in Y}d^2(y,C)$$
4. $\qquad$ C <- C $\cup$ {x}
5. end while

###k-means ||

1. C <- smaple a point uniformly at random from X
2. $\psi$ <- $\phi(C)$
3. for O($\log \phi$) times do:
4. $\qquad$ C' <- sample each point x $\in$ X independently with probability $p_x =\frac{l \cdot d^2(x,c)}{\phi_x(c)}$ 
5. $\qquad$ C <- C $\cup$ C'
5. end for
7. For x $\in$ C, set $w_x$ to be the number of points in X closer to x than any other point in C
8. Recluster the weighted points in C into K clusters

###Random

It means get k initual points randomly, each points in the data has the same probability to be chosen. We can use random.choice to get that.

##2. Implementation

###2.1 dataset

GAUSSMIXTURE, which is synthetic. I first sample k centers from a 10-dimensional Gaussian distribution with mean $I_{10}$ and variance $RI_{10}$. Then add points from Gaussian distributions of unit variance around centers. The sample size would be 10000. To make the dataset more flexible, I define a function to generate the data, and we can change k and R to see if the result and conclusion is robust.

In [32]:
def GAUSSMIXTURE(k,R):
    """R is the variance to generate centers, K is the number of centers;
    will sample 10000 points in 10-dimensional space"""
    n = 10000
    centers = nrd.multivariate_normal([0]*10,R*np.identity(10),k)
    data = [nrd.multivariate_normal(center, np.identity(10),int(n/k)) for center in centers]
    data = np.vstack(data)
    return data

###2.2 Important functions

In [33]:
def Cost(C, Y):
    """C is a subset of the dataset, Y can be a point or a subset"""
    if  len(Y.shape)==1 or Y.shape[0]==1:
        #Y is a point
        MinIndex = np.argmin(np.sum((Y-C)**2,axis=1))
        return np.sum((Y-C[MinIndex,])**2)
    else:
        return np.sum([Cost(C,Y_i) for Y_i in Y])

def weight(C, data):
    """C is the centroid set and data is the target data set"""
    if len(C.shape)==1 or C.shape[0]==1:
        #C only have one point
        if len(data.shape)==1 or data.shape[0]==1:
            return np.array([1])
        else:
            return np.array([len(data)])
    else:
        #the cloest center for each point in data
        Index_min = [np.argmin(np.sum((x-C)**2,axis=1)) for x in data]
        #frequency for each center
        return np.array([Index_min.count(i) for i in range(len(C))]).astype(float)
    
def kmeanspar(k,l,r,data):
    """k is the number of centers, l is the expected number of intermediate points
    in each iteration, r is the number of iterations, data is the target data set"""
    #l*r should be larger than k in case k-means|| select too few points
    if l*r < k:
        raise ValueError('r or l must be bigger, ')
    #if k is too large
    if k >= len(data):
        raise ValueError('k is too large')
    #Step 1: choose one point randomly
    C = data[nrd.choice(range(len(data)),1),]
    #for loop
    for i in range(r):
        prob = [l*Cost(C,x) for x in data]/Cost(C,data)
        flag = nrd.uniform(size=len(data))
        C = np.concatenate((C,data[prob>=flag,]))
    #step 7
    weights = weight(C,data)
    #step 8: k-means++ to choose weighted points
    c = C[nrd.choice(range(len(C)),1),]
    while len(c) < k:
        p = np.array([Cost(c,x) for x in C])
        Prob = p*weights/np.sum(p*weights)
        x = nrd.choice(range(len(C)),1,p=Prob)
        c = np.concatenate((c,C[x,]))
    ##carry out K-means clustering
    km = KMeans(n_clusters=k,init=c,n_init=1)
    km.fit(data)
    return km

def Random(k,data):
    """k is the number of centers, data is target data"""
    if k >= len(data):
        raise ValueError('k is too large')
    ##carry out K-means clustering
    km = KMeans(n_clusters=k,init='random')
    km.fit(data)
    return km

def kmeansplus(k,data):
    if k >= len(data):
        raise ValueError('k is too large')
    ##carry out K-means clustering
    km = KMeans(n_clusters=k,init='k-means++')
    km.fit(data)
    return km

##3. Testing

I mainly tested three functions: Cost, weight and kmeanspar (initialization part) and use 18 tests to make sure they are robust and get the right result. 

I tested Cost and weight in different situation: inputs as integer arrays, input arrays with different dimensions (in case data set/center set only contain one point), some known cases to compare the results and the check to see if it get non-negative results. 

I also checked if kmeanspar really return k centers, the centers are really in the data set, some known cases and if kmeanspar will recognize when number of iteration (r) times expected number of centers in each iteration (l) is smaller than k (i.e. we can't garantee to get more than k points from the for loop in kmeanspar). For the kmeanspar, I cut off the KMeans clustering part for testing because I only care about how the initialization part works so I can be more focus. KMeans is a existing command which doesn't need to be tested. 

Similarly, I didn't test the Random and Kmeansplus functions, because they two basically are based on KMeans command from sklearn module.

##4. Time Profile

The dataset will have 50 centers which are generated by $MVN(0,R\times I_{10})$. Expected number of generated centers in each iteration is 0.3k, I will run 5 rounds to get enough centers in kmeans||.

In [34]:
k = 50; R = 10;
l = 0.3*k; r = 5;
data = GAUSSMIXTURE(k,R)

###timeit and prun

I use timeit and prun for the profiling. For the first time, the running time of kmeans|| was around 8s while kmeans++ and random was around 1-2s, which was a really bad result. By using prun, I found that weight function  spent a long time and it's due to the two nested for loops. So I changed the algorithm and got weight_v2 (also the existing weight function). The running time of kmeans|| has been improved to around 3s.

###first version of Kmeans||

The first version of weight is defined as below, and to see the improvement by weight_v2 (i.e. weight), I also show the timeit of first version.

In [35]:
def weight_v1(C, data):
    """C is the centroid set and data is the target data set"""
    if len(C.shape)==1 or C.shape[0]==1:
        #C only have one point
        if len(data.shape)==1 or data.shape[0]==1:
            return np.array([1])
        else:
            return np.array([len(data)])
    else:
        #construct a cost matrix c[i,j]=distance(C[i],data[j])
        Cost_matrix = np.array([np.sum((c-x)**2) for c in C
                                             for x in data]).reshape(len(C),len(data))
        Index_min = list(np.argmin(Cost_matrix,axis=0))
        return np.array([Index_min.count(i) for i in range(len(C))])
    
def kmeanspar_v1(k,l,r,data):
    """k is the number of centers, l is the expected number of intermediate points
    in each iteration, r is the number of iterations, data is the target data set"""
    #l*r should be larger than k in case k-means|| select too few points
    if l*r < k:
        raise ValueError('r or l must be bigger, ')
    #if k is too large
    if k >= len(data):
        raise ValueError('k is too large')
    #Step 1: choose one point randomly
    C = data[nrd.choice(range(len(data)),1),]
    #for loop
    for i in range(r):
        prob = [l*Cost(C,x) for x in data]/Cost(C,data)
        flag = nrd.uniform(size=len(data))
        C = np.concatenate((C,data[prob>=flag,]))
    #step 7
    weights = weight_v1(C,data)
    #step 8: k-means++ to choose weighted points
    c = C[nrd.choice(range(len(C)),1),]
    while len(c) < k:
        p = np.array([Cost(c,x) for x in C])
        Prob = p*weights/np.sum(p*weights)
        x = nrd.choice(range(len(C)),1,p=Prob)
        c = np.concatenate((c,C[x,]))
    ##carry out K-means clustering
    km = KMeans(n_clusters=k,init=c,n_init=1)
    km.fit(data)
    return km

###results and comparison

In [36]:
#carry out K-means clustering and time profiling
#try different l and r
%timeit kmeanspar_v1(k,l,r,data)
%timeit kmeanspar(k,l,r,data)
%timeit kmeansplus(k,data)
%timeit Random(k,data)

1 loops, best of 3: 9.06 s per loop
1 loops, best of 3: 3.24 s per loop
1 loops, best of 3: 1.12 s per loop
1 loops, best of 3: 1.19 s per loop


In [37]:
#prun of the first version of Kmeans|| 
stats = %prun -r -q kmeanspar_v1(k,l,r,data)
stats.sort_stats('time').print_stats(10);

          3962347 function calls (3912347 primitive calls) in 10.910 seconds

   Ordered by: internal time
   List reduced from 72 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   886846    3.578    0.000    3.578    0.000 {method 'reduce' of 'numpy.ufunc' objects}
        1    2.824    2.824    7.349    7.349 <ipython-input-35-cdd31743b68d>:1(weight_v1)
103337/53337    1.527    0.000    3.394    0.000 <ipython-input-33-3701a5ceacff>:1(Cost)
   886718    1.320    0.000    5.986    0.000 fromnumeric.py:1623(sum)
   886886    0.680    0.000    0.680    0.000 {isinstance}
   886791    0.412    0.000    3.989    0.000 _methods.py:31(_sum)
   103333    0.196    0.000    0.196    0.000 {method 'argmin' of 'numpy.ndarray' objects}
   103333    0.087    0.000    0.283    0.000 fromnumeric.py:938(argmin)
        1    0.084    0.084   10.910   10.910 <ipython-input-35-cdd31743b68d>:16(kmeanspar_v1)
      228    0.061    0.000    0.061    0

In [38]:
#prun of the Kmeans++ 
stats = %prun -r -q kmeansplus(k,data)
stats.sort_stats('time').print_stats(10);

          77457 function calls in 1.361 seconds

   Ordered by: internal time
   List reduced from 66 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      158    0.377    0.002    0.842    0.005 k_means_.py:400(_labels_inertia_precompute_dense)
      658    0.327    0.000    0.786    0.001 pairwise.py:143(euclidean_distances)
      816    0.276    0.000    0.276    0.000 {numpy.core._dotblas.dot}
     5913    0.133    0.000    0.133    0.000 {method 'reduce' of 'numpy.ufunc' objects}
       10    0.070    0.007    0.457    0.046 k_means_.py:41(_k_init)
      158    0.044    0.000    0.044    0.000 _k_means.pyx:244(_centers_dense)
     3291    0.018    0.000    0.126    0.000 validation.py:37(_assert_all_finite)
      490    0.016    0.000    0.016    0.000 {method 'cumsum' of 'numpy.ndarray' objects}
     8719    0.010    0.000    0.010    0.000 {numpy.core.multiarray.array}
     1975    0.008    0.000    0.016    0.000 shape_base

In [39]:
#prun of the existing version of Kmeans||
stats = %prun -r -q kmeanspar(k,l,r,data)
stats.sort_stats('time').print_stats(10);

          1307542 function calls (1257542 primitive calls) in 3.937 seconds

   Ordered by: internal time
   List reduced from 72 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
103729/53729    1.547    0.000    3.466    0.000 <ipython-input-33-3701a5ceacff>:1(Cost)
   217654    1.086    0.000    1.086    0.000 {method 'reduce' of 'numpy.ufunc' objects}
   217502    0.315    0.000    1.697    0.000 fromnumeric.py:1623(sum)
   113724    0.233    0.000    0.233    0.000 {method 'argmin' of 'numpy.ndarray' objects}
   217718    0.194    0.000    0.194    0.000 {isinstance}
   217599    0.107    0.000    1.191    0.000 _methods.py:31(_sum)
        1    0.102    0.102    0.276    0.276 <ipython-input-33-3701a5ceacff>:10(weight)
   113724    0.095    0.000    0.328    0.000 fromnumeric.py:938(argmin)
        1    0.087    0.087    3.937    3.937 <ipython-input-33-3701a5ceacff>:24(kmeanspar)
       76    0.055    0.001    0.055    0.001 {

### Summary and Optimization Strategies

Even though I made some changes in the weight function and got good results (8s->3s), the running time (including Kmeans cluster) of Kmeans|| was still slower than Kmeans++ using the existing module.

There are two ways I can do to do the optimization: one is using other language (C or Cython) and another one is to parallelize the Kmeans||. 


##5. Optimization - Cython

###5.1 functions for Cython

In [40]:
%load_ext cython

The cython extension is already loaded. To reload it, use:
  %reload_ext cython


In [41]:
%%cython -a 

import numpy as np
from sklearn.cluster import KMeans
cimport numpy as np
cimport cython
from libc.stdlib cimport rand
cdef extern from "stdlib.h":
    int RAND_MAX
    
def randnum():
    return rand()/float(RAND_MAX)

def distance(double[::1] p1,double[::1] p2):
    cdef int i, d
    cdef double w=0
    d = p1.shape[0]
    for i in range(d):
        w += (p1[i]-p2[i])**2
    return w

def Cost_Y1_C(double[:,::1] C,double[::1] Y):
    cdef int i, n_C, d
    cdef double cost = 0
    n_C = C.shape[0]
    d = C.shape[1]
    cdef double[::1] dists=np.zeros(n_C)
    for i in range(n_C):
        dists[i] = distance(Y,C[i,:])
        cost += np.sum(dists)
    return cost

def weight_C(double[:,::1] C,double[:,::1]  data):
    cdef int i, j, n_C, n_Y, d, mins
    n_C = C.shape[0]
    d = C.shape[1]
    n_Y = data.shape[0]
    cdef double[::1] ws = np.zeros(n_C)
    cdef double Min_Index, Min = 0
    for i in range(n_Y):
        Min = distance(C[0,:],data[i,:]);
        for j in range(n_C-1):
            ds = distance(C[j+1,:],data[i,:])
            if ds < Min:
                Min = ds
                Min_Inx = j+1
        ws[Min_Inx] = 1.0/n_Y
    return np.array(ws)
    
    
def kmeanspar_C(int k,int l,int r,data):
    cdef int n_Y, i, j, d
    d = data.shape[1]
    n_Y = data.shape[0]
    cdef double[::1] prob=np.zeros(n_Y)
    cdef double[::1] flag=np.zeros(n_Y)
    cdef double sums=0
    C = data[np.random.choice(range(n_Y),1)[0],]
    sums=0
    for j in range(n_Y):
        flag[j] = randnum()
        prob[j] = distance(C,data[j,])#C Y 1 dim
        sums = sums+prob[j]
    C = np.concatenate((C[None,:],data[np.array(prob)/sums*l>=flag,]))
    for i in range(r):
        sums=0
        for j in range(n_Y):
            flag[j] = randnum()
            prob[j] = Cost_Y1_C(C,data[j,])#Y 1 dim
            sums = sums+prob[j]
        C = np.concatenate((C,data[np.array(prob)/sums*l>=flag,]))
    #step 7
    weights = weight_C(C,data)
    #step 8: k-means++ to choose weighted points
    c = C[np.random.choice(range(len(C)),1)[0],]
    p = np.array([distance(c,x) for x in C])
    Prob = p*weights/np.sum(p*weights)
    x = np.random.choice(range(len(C)),1,p=Prob)
    c = np.concatenate((c[None,:],C[x,]))
    for i in range(k-2):
        p = np.array([Cost_Y1_C(c,x) for x in C])#Y 1 dim
        Prob = p*weights/np.sum(p*weights)
        x = np.random.choice(range(len(C)),1,p=Prob)
        c = np.concatenate((c,C[x,]))
    #carry out K-means clustering
    km = KMeans(n_clusters=k,init=c,n_init=1)
    km.fit(data)
    return km   

###5.2 time profiling

In [42]:
%timeit kmeanspar_C(k,12,r,data)

1 loops, best of 3: 27.9 s per loop


In [43]:
#prun of the existing version of Kmeans||
stats = %prun -r -q kmeanspar_C(k,l,r,data)
stats.sort_stats('time').print_stats(10);

          9223846 function calls in 37.342 seconds

   Ordered by: internal time
   List reduced from 63 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
  2305483   15.434    0.000   15.434    0.000 {method 'reduce' of 'numpy.ufunc' objects}
        1    9.681    9.681   37.342   37.342 {_cython_magic_92bc31960c61feba28b81942836c03f5.kmeanspar_C}
  2305313    9.153    0.000   27.542    0.000 fromnumeric.py:1623(sum)
  2305565    1.591    0.000    1.591    0.000 {isinstance}
  2305428    1.371    0.000   16.803    0.000 _methods.py:31(_sum)
       19    0.048    0.003    0.106    0.006 k_means_.py:400(_labels_inertia_precompute_dense)
       19    0.029    0.002    0.057    0.003 pairwise.py:143(euclidean_distances)
       38    0.023    0.001    0.023    0.001 {numpy.core._dotblas.dot}
       19    0.006    0.000    0.006    0.000 _k_means.pyx:244(_centers_dense)
       96    0.001    0.000    0.004    0.000 validation.py:37(_asser

###5.3 Summary

Unfortunately, the Cython seems worse. The Cython profiling shows that in fact the functions (Cost and weight) really got some improvement, but the total time for {method 'reduce' of 'numpy.ufunc' objects} is too long. It seems like the parallelization should be adopted and get better result. For futher developement, a Parallelized k-means in Cython should be a better choice.

##6. parallelization

###6.1 codes for Multi core

In [44]:
from multiprocessing import Pool, cpu_count
from functools import partial

In [45]:
def min_distance(p1, C, axis = 1):
    return np.min(np.sum((p1-C)**2,axis))

def argmin_distance(p1, C, axis = 1):
    return np.argmin(np.sum((p1-C)**2,axis))

def Cost_par(C, Y):
    """C is a subset of the dataset, Y can be a point or a subset"""
    pool = Pool(processes=cpu_count())
    PartialMinDist = partial(min_distance, C=C, axis=1)
    cost = np.sum(pool.map(PartialMinDist, Y))
    pool.close()
    pool.terminate()
    return cost

def weight_par(C, data):
    start = timeit.default_timer()
    pool = Pool(processes=cpu_count())
    PartialArgminDist = partial(argmin_distance, C=C, axis=1)
    Index_min = list(pool.map(PartialArgminDist,data))
    return np.array([Index_min.count(i) for i in range(len(C))])
    
def kmeanspar_par(k,l,r,data):
    """k is the number of centers, l is the expected number of intermediate points
    in each iteration, r is the number of iterations, data is the target data set"""
    #l*r should be larger than k in case k-means|| select too few points
    if l*r < k:
        raise ValueError('r or l must be bigger, ')
    #if k is too large
    if k >= len(data):
        raise ValueError('k is too large')
        
    #Step 1
    C = data[nrd.choice(range(len(data)),1),]
    #for loop
    for i in range(r):
        pool = Pool(processes=cpu_count())
        PartialMinDist = partial(min_distance, C=C, axis=1)
        prob = np.array(pool.map(PartialMinDist, data))
        prob = prob/np.sum(prob)*float(l)
        pool.close()
        pool.terminate()
        flag = nrd.uniform(size=len(data))
        C = np.concatenate((C,data[prob>=flag,]))
    #step 7
    weights = weight_par(C,data)
    #step 8: k-means++ to choose weighted points
    c = C[nrd.choice(range(len(C)),1),]
    while len(c) < k:
        p = np.array([Cost(c,x) for x in C])
        Prob = p*weights/np.sum(p*weights)
        x = nrd.choice(range(len(C)),1,p=Prob)
        c = np.concatenate((c,C[x,]))
    ##carry out K-means clustering
    km = KMeans(n_clusters=k,init=c,n_init=1)
    km.fit(data)
    return km

###6.2 time profiling

In [46]:
%timeit kmeanspar_par(k,l,r,data)

1 loops, best of 3: 1.85 s per loop


In [47]:
#prun of the existing version of Kmeans||
stats = %prun -r -q kmeanspar_par(k,l,r,data)
stats.sort_stats('time').print_stats(10);

          55993 function calls in 2.157 seconds

   Ordered by: internal time
   List reduced from 183 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      147    1.378    0.009    1.378    0.009 {method 'acquire' of 'thread.lock' objects}
       10    0.345    0.035    0.345    0.035 {method 'acquire' of '_multiprocessing.SemLock' objects}
       86    0.062    0.001    0.062    0.001 {method 'count' of 'list' objects}
     4214    0.060    0.000    0.139    0.000 <ipython-input-33-3701a5ceacff>:1(Cost)
       12    0.048    0.004    0.048    0.004 {posix.fork}
     8604    0.045    0.000    0.045    0.000 {method 'reduce' of 'numpy.ufunc' objects}
       11    0.027    0.002    0.070    0.006 k_means_.py:400(_labels_inertia_precompute_dense)
      343    0.025    0.000    0.025    0.000 {posix.waitpid}
       22    0.022    0.001    0.022    0.001 {numpy.core._dotblas.dot}
       11    0.017    0.002    0.042    0.004 pairwise.p

###6.3 Summary

In [48]:
cpu_count()

2

I tried to use MapReduce and Mr. job for it but it seemed it's hard to use Mr. job in this case, so I used Multi Core. I only used two cores to run the parallized version and it got improved (from 3.7s to 1.7s)! The running time for {method 'reduce' of 'numpy.ufunc' objects} becomes 0.044.

There is a trick that I used in both Cython version and multi core version. I didn't use Cost(data,C) for the center to calculate the probability that each point being chosen as centers in each iteration. Instead, I just normalized the probability array because $Cost(data,C)=\sum_{data} Cost(x,C)$ which could be faster.

##7. Application Analysis

Application to simulated/real problem and comparative anlaysis (up to 40 points)
As an example, a 40-point answer could include an extensive comparative analysis against the majority of classes of existing algorithms used to solve a problem with benchmarking and a thoguhtful consideration of the benefits/drawbacks of each

Kmeans++'s inherently sequential nature makes the biggest bottleneck especially when it's used in massive data. Its total running time is O(nkd), which is the same as that of a single Lloyd’s iteration. Kmeans||, instead, is better to be used for parallelization. 