# <center> STA663 Final Project: Scalable K-means
### <center> Xin Xu, Fu Wen

## <center> Abstract 

Due to its simplicity, the `k-means` algorithm is one of the most famous machine learning algorithms used to cluster data[1]. The main critical problem of this algorithm is that it might be blocked locally based on the initial random chosen centers. The `k-means++` algorithm is developed to solve this problem, spreading out the initial centers with an updating non-uniform distribution. However, `K-means++` has a limited applicability to large data sets due to its inherent sequential nature, which requires k passes through the whole data set to find the optimal initialization of centers. The `K-means||` algorithm in the paper "Scalable K-Means++" is the parallel version of the `k-means++`[2] and an improvement. Instead of sampling a single point, it oversamples a couple of centers in each iteration and guarantees the performance at the same time. 

In this report, we firstly implement the `K-means||` algorithm in Python. Then, we speed up the algorithm using `Cython` and parallelize the algorithm using `multiprocessing`. In the applicaiton part, we apply it to the `GAUSSMIXTURE` dataset simulated as in [1] and the `SPAM` dataset from UC Irvine Machine Learning repository [3]. We compare the misclassification rate (for `GAUSSMIXTURE` dataset), clustering cost and runtime of the `k-means||` algorithm with the result of random initialization and the `k-means++`. In the end, we inplement `k-means||` algorithm in Spark using `pyspark.millib`. 

## 1. Introduction

### 1.1 Background and Related Algorithm

As one of the most popular clustering algorithms, the **k-means** algorithm has been widely used for the last half of the century[4]. The main idea is to randomly choose k centers, repeatedly assign each point to its nearest center and calculate the new centers by minimizing the sum of the squares of the distance in its neighborhood.

The k-means algorithm has a critical problem of unreliable initialization[2]. This algorithm with an incorrect initialization cannot find a globally optimal solution but rests on a locally optimal solution. In addition, the running time before convergence is long. **K-means++** algorithm avoids this problem by finding k better initial centers. It first samples one random point uniformly from the data, then sets the subsequent k-1 centers with probability proportional to its contribution to the overall error given the previous centers. In contrast to the k-means algorithm, k-means++ reduces the probability of picking several initial centers in one cluster. However, the sequence initialization process also limits its applicability to large data sets or data with large k since the whole algorithm is not scalable.

Bahmani et al. constructed the scalable k-means++ algorithm (**k-means||** algorithm) in their paper "Scalable K-Means++"[1]. The main idea is to sample more than one point (O(k)) in each round and repeat the process for fewer iterations ($O(logn)$). Then, the algorithm reclusters the $O(klogn)$ points generated from the above process into $k$ initial centers.



### 1.2 Notation and the Algorithm

Suppose $X = \{x_1, \dots, x_n \}$ are d-dimentional points to be clustered and k is the number of cluster (a positer integer). 

For a subset $Y \in X$, define the distance from a point $x$ to $Y$ as $d(x,Y) = min_{y \in Y} \| x-y\|$, where $\|x -y \|$denote the Euclidean distance between $x$ and $y$, define the centroid of $Y$ as
$$Centroid(Y) = \frac{1}{|Y|} \sum_{y \in Y} y$$

For a set of cluster centers $C = \{ c_1, c_2 ,\dots , c_k\}$, define the _cost of $Y$_ with respect to $C$ as:

$$ \phi_Y(C) = \sum_{y \in Y} d(y,C)^2$$

In `k-means||` algorithm, it set an oversampling factor $l = \Omega (k)$. $l>1$ is an integer.


Steps of `k-means||` algorithm:

- Sample a point uniformaly from X as the first center $C$
- Compute the cost of clustering based on this choice $\phi_{X}(C)=\psi$
- for $O(log\psi)$ times repeat:  
 - Independently sample $l$ points with probability $p_{x}=\frac{l\cdot d^{2}(x,C)}{\phi_{X}(C)}$ as $C'$
 - $C=C\cup C'$
- For each point $x \in C$, compute $w_{x}$ as the number of points in $X$ closer to x than other point in $C$
- Get k clusters from reclustering those weighted points in $C$ 

## 2.  Implementation

### 2.1 K-Means 

In [1]:
%%file distance_func.py
import numpy as np

def distance(data, centroids):
    """ Calculate the distance from each data point to each center
    Parameters:
       data   n*d
       center k*d
    
    Returns:
       distence n*k 
    """
    ## calculate distence between each point to the centroids
    dist = np.sum((data[:, np.newaxis, :] - centroids)**2, axis=2)
    return dist

Overwriting distance_func.py


In [2]:
%%file kmeans_func.py
import numpy as np
from distance_func import distance

def KMeans(data, k, centroids, max_iter = 10000): 
    
    """ Apply the KMeans clustering algorithm
    
    Parameters:
      data                        ndarrays data 
      k                           number of cluster
      centroids                   initial centroids
    
    Returns:
      "Iteration before Coverge"  time used to converge
      "Centroids"                 the final centroids finded by KMeans    
      "Labels"                    the cluster of each data   
    """
    
    n = data.shape[0] 
    iterations = 0
    
    while iterations < max_iter:        
        dist = distance(data,centroids)
        
        ## give cluster label to each point 
        cluster_label = np.argmin(dist, axis=1)
        
        ## calculate new centroids
        newCentroids = np.zeros(centroids.shape)
        for j in range(0, k):
            if sum(cluster_label == j) == 0:
                newCentroids[j] = centroids[j]
            else:
                newCentroids[j] = np.mean(data[cluster_label == j, :], axis=0)
        
        ## Check if it is converged
        if np.array_equal(centroids, newCentroids):
            print("Converge! after:",iterations,"iterations")
            break 
            
        centroids = newCentroids
        iterations += 1
        
    return({"Iteration before Coverge": iterations, 
            "Centroids": centroids, 
            "Labels": cluster_label})

Overwriting kmeans_func.py


### 2.2.2 K-Means++

In [3]:
%%file kmeanspp_func.py
import numpy as np
from distance_func import distance

def cost(dist):
    """ Calculate the cost of data with respect to the current centroids
    Parameters:
       dist     distance matrix between data and current centroids   
    Returns:    the normalized constant in the distribution 
    """
    return np.sum(np.min(dist,axis=1))

def distribution(dist,cost):
    """ Calculate the distribution to sample new centers
    Parameters:
       dist       distance matrix between data and current centroids
       cost       the cost of data with respect to the current centroids
    Returns:      distribution 
    """
    return np.min(dist, axis=1)/cost

def sample_new(data,distribution,l):
    """ Sample new centers
    Parameters:
       data         n*d
       distribution n*1
       l            the number of new centers to sample
    Returns:        new centers                          
    """
    return data[np.random.choice(range(len(distribution)),l,p=distribution),:]

def KMeansPlusPlus(data, k):    
    """ Apply the KMeans++ clustering algorithm to get the initial centroids   
    Parameters: 
      data                        ndarrays data 
      k                           number of cluster  
    Returns:
      "Centroids"                 the complete initial centroids by KMeans++
      
    """
    
    #Initialize the first centroid
    centroids = data[np.random.choice(data.shape[0],1),:]
    
    while centroids.shape[0] < k :
                
        #Get the distance between data and centroids
        dist = distance(data, centroids)
        
        #Calculate the cost of data with respect to the centroids
        norm_const = cost(dist)
        
        #Calculate the distribution for sampling a new center
        p = distribution(dist,norm_const)
        
        #Sample the new center and append it to the original ones
        centroids = np.r_[centroids, sample_new(data,p,1)]
    
    return centroids

Overwriting kmeanspp_func.py


### 2.2.3 Kmeans Parallel

In [4]:
%%file scalablekmeanspp_func.py
import numpy as np
from distance_func import distance
from kmeanspp_func import cost, distribution, sample_new

def get_weight(dist,centroids):
    """ Get the weight of each centroid
    
    Parameters:
    dist      matrix of distance between data and current centroids
    centroids current centroids
    
    Returns weight of each centroid
    """
    min_dist = np.zeros(dist.shape)
    min_dist[range(dist.shape[0]), np.argmin(dist, axis=1)] = 1
    count = np.array([np.count_nonzero(min_dist[:, i]) for i in range(centroids.shape[0])])
    return count/np.sum(count)

def weightedKMeans(data, k, weight, centroids, max_iter = 10000): 
    
    """ Apply the weighted KMeans clustering algorithm
    
    Parameters:
      data                        ndarrays data 
      k                           number of cluster
      weight                      weight matrix of data
      centroids                   initial centroids
    
    Returns:
      "Iteration before Coverge"  time used to converge
      "Centroids"                 the final centroids finded by KMeans    
      "Labels"                    the cluster of each data   
    """
    
    n = data.shape[0] 
    iterations = 0
    
    while iterations < max_iter:        
        dist = distance(data, centroids) * weight[:, np.newaxis]
        
        ## give cluster label to each point 
        cluster_label = np.argmin(dist, axis=1)
        
        ## calculate new centroids
        newCentroids = np.zeros(centroids.shape)
        for j in range(0, k):
            if sum(cluster_label == j) == 0:
                newCentroids[j] = centroids[j]
            else:
                newCentroids[j] = np.mean(data[cluster_label == j, :], axis=0)
        
        ## Check if it is converged
        if np.array_equal(centroids, newCentroids):
            print("Converge")
            break 
        
        centroids = newCentroids
        iterations += 1
        
    return(centroids)

def ScalableKMeansPlusPlus(data, k, l, weighted=False, iter=5):
    
    """ Apply the KMeans|| clustering algorithm   
    Parameters:
      data     ndarrays data 
      k        number of cluster
      l        number of point sampled in each iteration
      weighted if True, using weighted reclustering in the last step 
                  else, using weighted sampling in the last step.
    
    Returns:   the initial centroids finded by KMeans||  
      
    """
    
    centroids = data[np.random.choice(range(data.shape[0]),1), :]
    
    
    for i in range(iter):
        #Get the distance between data and centroids
        dist = distance(data, centroids)
        
        #Calculate the cost of data with respect to the centroids
        norm_const = cost(dist)
        
        
        #Calculate the distribution for sampling l new centers
        p = distribution(dist,norm_const)
        
        #Sample the l new centers and append them to the original ones
        centroids = np.r_[centroids, sample_new(data,p,l)]
    

    ## reduce k*l to k using KMeans++ 
    dist = distance(data, centroids)
    weights = get_weight(dist, centroids)
    if weighted:
        initial = centroids[np.random.choice(range(len(weights)),k,replace=False),:]
        centers=  weightedKMeans(centroids, k, weights, initial)
    else: 
        centers= centroids[np.random.choice(len(weights), k, replace= False, p = weights),:]
    return centers

Overwriting scalablekmeanspp_func.py


## 3. Testing

In this part, we implement some unit tests to ensure all the functions work.
- For **distance** function, we test its non-negativity, symmetry and accuracy of outputs given some spacific inputs.
- For **cost function**, we test its nongegativity.
- For **distribution** fucntion, we test its non-negativity and whether the output sums up to 1 or not.
- For **sample_new** function, we test the length of the sample and whether all the sample points belong to the original data sst.
- For **get_weight** function, we test the weights are non-negative and sum up to 1.
- For **KMeans** fucntion, we test the length of the labels and the number of different labels.
- For **KMeansPlusPlus** and **ScalableKMeansPlusPlus**, we test the lengths equal to the inputs and the initilizations are in the original data set.

In [5]:
%%file test_distance.py

import numpy as np
from numpy.testing import assert_almost_equal
from distance_func import distance

def test_non_negativity():
    u = np.random.normal(size=(3,4))
    v = np.random.normal(size=(5,4))
    assert (distance(u, v)>= 0).all()
    
def test_coincidence_when_zero():
    u = np.zeros((3,4))
    v = np.zeros((5,4))
    assert (distance(u, v)==0).all()

def test_coincidence_when_not_zero():
    u = np.random.normal(size=(3,4))
    v = np.random.normal(size=(5,4))
    assert (distance(u, v)!= 0).any()

def test_symmetry():
    u = np.random.normal(size=(3,4))
    v = np.random.normal(size=(5,4))
    assert (distance(u, v)== distance(v, u).T).all()

def test_known1():
    u = np.array([[0,0],[1,1]])
    v = np.array([[0,0],[1,1]])
    dist = np.array([[0,2],[2,0]])
    assert_almost_equal(distance(u, v), dist)
    
def test_known2():
    u = np.array([[0,0,0],[1,1,1],[2,2,2]])
    v = np.array([[1,1,1],[2,2,2],[3,3,3]])
    dist = np.array([[3,12,27],[0,3,12],[3,0,3]])
    assert_almost_equal(distance(u, v), dist)

Overwriting test_distance.py


In [6]:
%%file test_cost.py

import numpy as np
from numpy.testing import assert_almost_equal
from kmeanspp_func import cost
from distance_func import distance

def test_non_negative():
    for i in range(10):
        data = np.random.normal(size=(5,4))
        c = data[np.random.choice(range(4),2),]
        dist = distance(data,c)
        assert cost(dist) >= 0


Overwriting test_cost.py


In [7]:
%%file test_distribution.py

import numpy as np
from numpy.testing import assert_almost_equal
from distance_func import distance
from kmeanspp_func import distribution,cost

def test_non_negative():
    data = np.random.normal(size=(20,4))
    centroids = data[np.random.choice(range(4),4),]
    dist = distance(data,centroids)
    c = cost(dist)
    p = distribution(dist,c)
    assert (p>=0).all()
    
def test_sum_to_one():
    data = np.random.normal(size=(20,4))
    centroids = data[np.random.choice(range(4),4),]
    dist = distance(data,centroids)
    c = cost(dist)
    p = distribution(dist,c)
    assert_almost_equal(np.sum(p),1)

Overwriting test_distribution.py


In [8]:
%%file test_sample_new.py
import numpy as np
from numpy.testing import assert_almost_equal
from distance_func import distance
from kmeanspp_func import cost, sample_new, distribution

def test_length():
    data = np.random.normal(size=(20,4))
    centroids = data[np.random.choice(range(4),4),]
    dist = distance(data,centroids)
    c = cost(dist)
    p = distribution(dist,c)
    l = 5
    c_new = sample_new(data,p,l)
    assert len(c_new)==5

def test_in_data():
    data = np.random.normal(size=(20,4))
    centroids = data[np.random.choice(range(4),4),]
    dist = distance(data,centroids)
    c = cost(dist)
    p = distribution(dist,c)
    l = 5
    c_new = sample_new(data,p,l)
    check = [i in data for i in c_new]
    assert all(check)
    

Overwriting test_sample_new.py


In [9]:
%%file test_weights.py

import numpy as np
from numpy.testing import assert_almost_equal
from distance_func import distance
from scalablekmeanspp_func import get_weight

def test_non_negative():
    data = np.random.normal(size=(20,4))
    centroids = data[np.random.choice(range(4),4),]
    dist = distance(data,centroids)
    w = get_weight(dist,centroids)
    assert (w>=0).all()
    
def test_sum_to_one():
    data = np.random.normal(size=(20,4))
    centroids = data[np.random.choice(range(4),4),]
    dist = distance(data,centroids)
    w = get_weight(dist,centroids)
    assert_almost_equal(np.sum(w),1)

Overwriting test_weights.py


In [10]:
%%file test_kmeans.py
import numpy as np
from distance_func import distance
from kmeans_func import KMeans

def test_label():
    for i in range(10):
        data = np.random.normal(size=(50,2))
        k = 3
        centroids = data[np.random.choice(range(data.shape[0]), k, replace=False),:]
        label = KMeans(data, k, centroids)["Labels"]
        assert max(label) == k-1 and len(label)==data.shape[0]

Overwriting test_kmeans.py


In [11]:
%%file test_kmeanspp.py
import numpy as np
from kmeanspp_func import KMeansPlusPlus
from scalablekmeanspp_func import ScalableKMeansPlusPlus

def test_length():
    data = np.random.normal(size=(2000,2))
    k = 3
    l = 5
    ini1 = KMeansPlusPlus(data, k)
    ini2 = ScalableKMeansPlusPlus(data, k, l)
    assert len(ini1)==k and len(ini2)==k

def test_length():
    data = np.random.normal(size=(2000,2))
    k = 3
    l = 5
    ini1 = KMeansPlusPlus(data, k)
    ini2 = ScalableKMeansPlusPlus(data, k, l)
    check1 = [i in data for i in ini1]
    check2 = [i in data for i in ini1]
    assert all(check1) and all(check2)

Overwriting test_kmeanspp.py


In [12]:
! py.test 

platform win32 -- Python 2.7.11, pytest-2.8.5, py-1.4.31, pluggy-0.3.1
rootdir: C:\Users\Raghav\Documents\GitHub\Python-for-Data-Science\STA663\Projects\Scalable KMeans PlusPlus 2\Fu Wen,Xin Xu_Bahmani_Scalable K-Means++, inifile: 
collected 15 items

test_cost.py .
test_distance.py ......
test_distribution.py ..
test_kmeans.py .
test_kmeanspp.py F
test_sample_new.py ..
test_weights.py .F

_________________________________ test_length _________________________________

    def test_length():
        data = np.random.normal(size=(2000,2))
        k = 3
        l = 5
        ini1 = KMeansPlusPlus(data, k)
>       ini2 = ScalableKMeansPlusPlus(data, k, l)

test_kmeanspp.py:18: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
scalablekmeanspp_func.py:101: in ScalableKMeansPlusPlus
    centers= centroids[np.random.choice(len(weights), k, replace= False, p = weights),:]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   ???
E 

## 4.  Optimization

In this part, we simply use Cython to speed up our multiple fucntions. Since we've already applies broadcasting and vectorize those functions as possible as we can in the previous part, the speeding up performance is not that obvious.


In [13]:
%load_ext Cython

In [14]:
%%cython -a
import numpy as np
cimport numpy as np

def distance_cy(data, centroids):
    """ Calculate the distance from each data point to each center
    Parameters:
       data   n*d
       center k*d
    
    Returns:
       distence n*k 
    """
    ## calculate distence between each point to the centroids
    dist = np.sum((data[:, np.newaxis, :] - centroids)**2, axis=2)
    return dist

def KMeans_cy(data, k, centroids, max_iter = 10000): 
    
    """ Apply the KMeans clustering algorithm
    
    Parameters:
      data                        ndarrays data 
      k                           number of cluster
      centroids                   initial centroids
    
    Returns:
      "Iteration before Coverge"  time used to converge
      "Centroids"                 the final centroids finded by KMeans    
      "Labels"                    the cluster of each data   
    """
    
    n = data.shape[0] 
    iterations = 0
    
    while iterations < max_iter:        
        dist = distance_cy(data,centroids)
        
        ## give cluster label to each point 
        cluster_label = np.argmin(dist, axis=1)
        
        ## calculate new centroids
        newCentroids = np.zeros(centroids.shape)
        for j in range(0, k):
            if sum(cluster_label == j) == 0:
                newCentroids[j] = centroids[j]
            else:
                newCentroids[j] = np.mean(data[cluster_label == j, :], axis=0)
        
        ## Check if it is converged
        if np.array_equal(centroids, newCentroids):
            print("Converge")
            break 
            
        centroids = newCentroids
        iterations += 1
        
    return({"Iteration before Coverge": iterations, 
            "Centroids": centroids, 
            "Labels": cluster_label})

def cost_cy(dist):
    """ Calculate the cost of data with respect to the current centroids
    Parameters:
       dist     distance matrix between data and current centroids
    
    Returns:    the normalized constant in the distribution 
    """
    return np.sum(np.min(dist,axis=1))

def distribution_cy(dist,cost):
    """ Calculate the distribution to sample new centers
    Parameters:
       dist       distance matrix between data and current centroids
       cost       the cost of data with respect to the current centroids
    Returns:      distribution 
    """
    return np.min(dist, axis=1)/cost

def sample_new_cy(data,distribution,l):
    """ Sample new centers
    
    Parameters:
       data         n*d
       distribution n*1
       l            the number of new centers to sample
    Returns:        new centers                          
    """
    return data[np.random.choice(range(len(distribution)),l,p=distribution),:]

def KMeansPlusPlus_cy(data, k):    
    """ Apply the KMeans++ clustering algorithm to get the initial centroids   
    Parameters: 
      data                        ndarrays data 
      k                           number of cluster
    
    Returns:
      "Centroids"                 the complete initial centroids by KMeans++
      
    """
    
    #Initialize the first centroid
    centroids = data[np.random.choice(data.shape[0],1),:]
    
    while centroids.shape[0] < k :
                
        #Get the distance between data and centroids
        dist = distance_cy(data, centroids)
        
        #Calculate the cost of data with respect to the centroids
        norm_const = cost_cy(dist)
        
        #Calculate the distribution for sampling a new center
        p = distribution_cy(dist,norm_const)
        
        #Sample the new center and append it to the original ones
        centroids = np.r_[centroids, sample_new_cy(data,p,1)]
    
    return centroids

def get_weight_cy(dist,centroids):
    """ Get the weight of each centroid
    
    Parameters:
    dist      matrix of distance between data and current centroids
    centroids current centroids
    
    Returns weight of each centroid
    """
    min_dist = np.zeros(dist.shape)
    min_dist[range(dist.shape[0]), np.argmin(dist, axis=1)] = 1
    count = np.array([np.count_nonzero(min_dist[:, i]) for i in range(centroids.shape[0])])
    return count/np.sum(count)

def weightedKMeans(data, k, weight, centroids, max_iter = 10000): 
    
    """ Apply the weighted KMeans clustering algorithm
    
    Parameters:
      data                        ndarrays data 
      k                           number of cluster
      weight                      weight matrix of data
      centroids                   initial centroids
    
    Returns:
      "Iteration before Coverge"  time used to converge
      "Centroids"                 the final centroids finded by KMeans    
      "Labels"                    the cluster of each data   
    """
    
    n = data.shape[0] 
    iterations = 0
    
    while iterations < max_iter:        
        dist = distance_cy(data, centroids) * weight[:, np.newaxis]
        
        ## give cluster label to each point 
        cluster_label = np.argmin(dist, axis=1)
        
        ## calculate new centroids
        newCentroids = np.zeros(centroids.shape)
        for j in range(0, k):
            if sum(cluster_label == j) == 0:
                newCentroids[j] = centroids[j]
            else:
                newCentroids[j] = np.mean(data[cluster_label == j, :], axis=0)
        
        ## Check if it is converged
        if np.array_equal(centroids, newCentroids):
            print("Converge")
            break 
        
        centroids = newCentroids
        iterations += 1
        
    return(centroids)


def ScalableKMeansPlusPlus_cy(data, k, l, weighted=False, iter=5):
    
    """ Apply the KMeans|| clustering algorithm
    
    Parameters:
      data     ndarrays data 
      k        number of cluster
      l        number of point sampled in each iteration
      weighted if True, using weighted reclustering in the last step 
                  else, using weighted sampling in the last step.
    
    Returns:   the final centroids finded by KMeans||  
      
    """
    
    centroids = data[np.random.choice(range(data.shape[0]),1), :]
    
    
    for i in range(iter):
        #Get the distance between data and centroids
        dist = distance_cy(data, centroids)
        
        #Calculate the cost of data with respect to the centroids
        norm_const = cost_cy(dist)
        
        #Calculate the distribution for sampling l new centers
        p = distribution_cy(dist,norm_const)
        
        #Sample the l new centers and append them to the original ones
        centroids = np.r_[centroids, sample_new_cy(data,p,l)]
    

    ## reduce k*l to k using KMeans++ 
    dist = distance_cy(data, centroids)
    weights = get_weight_cy(dist,centroids)
    if weighted:
        initial = centroids[np.random.choice(range(len(weights)),k,replace=False),:]
        centers=  weightedKMeans(centroids, k, weights, initial)
    else: 
        centers= centroids[np.random.choice(len(weights), k, replace= False, p = weights),:]
    return centers

DistutilsPlatformError: Unable to find vcvarsall.bat

## 5. High performance computing

Use `multiprocessing`, we speed up `cost` function and `sample_new` function with all cpu cores. So, we parallelly calculate the distance from each point to its nearest center and its probability distribution for next sample in `cost` function. In `sample_new` function, we sample $l$ new centroids parallelly, which is the most meaningful part of `KMeans ||` algorithm.

In [None]:
%%cython -a

import numpy as np
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import Pool, cpu_count
import multiprocessing as mp
from functools import partial
from distance_func import distance

def min_distance(d, centroids):
    
    """ Calculate the minimum distance from point d 
        to its nearest center in centroids."""
    dist = np.min(np.sum((centroids - d)**2, axis=1))
    return dist


# speed up by multiprocessing
def cost_p(data, centroids): 
    
    """ Calculate the cost of data with respect to 
    the current centroids and the new probability 
    distribution of each point for next sample"""    

    with Pool(processes = cpu_count()) as pool:
        partial_dist = partial(min_distance, centroids = centroids)
        min_dist = pool.map(partial_dist, data)
        cost = np.sum(min_dist)
        p = min_dist/cost
    return cost,p


def randomSample(x, a, p):
    np.random.seed()
    return np.random.choice(a = a, size = x , p =p)


## speed up with multiprocessing
def sample_new_p(data, distribution, l):
    
    """ Sample new centers"""  
    
    with Pool(processes = cpu_count()) as pool:
        partial_rc = partial(randomSample, a = len(distribution), p=distribution)
        index = pool.map(partial_rc,[1]*l)
    return np.squeeze(data[index,:],axis=(1,))


def min_distance_index_p(d, centroids):
    
    """ Return the index of the minimum distance from point d 
        to its nearest center in centroids."""
    
    minInd = np.argmin(np.sum((centroids - d)**2, axis=1))
    return minInd 

## speed up with multiprocessing
def get_weight_p(data, centroids):
    
    """ Return weight of all centroids """

    with Pool(processes = cpu_count()) as pool:
        partial_minInd = partial(min_distance_index_p, centroids = centroids )
        min_index = pool.map(partial_minInd, data)
        count = np.array([np.sum(np.array(min_index) == i) for i in range(centroids.shape[0])])
    return count/np.sum(count)

def weightedKMeans(data, k, weight, centroids, max_iter = 10000): 
    
    """ Apply the weighted KMeans clustering algorithm
    
    Parameters:
      data                        ndarrays data 
      k                           number of cluster
      weight                      weight matrix of data
      centroids                   initial centroids
    
    Returns:
      "Iteration before Coverge"  time used to converge
      "Centroids"                 the final centroids finded by KMeans    
      "Labels"                    the cluster of each data   
    """
    
    n = data.shape[0] 
    iterations = 0
    
    while iterations < max_iter:        
        dist = distance(data, centroids) * weight[:, np.newaxis]
        
        ## give cluster label to each point 
        cluster_label = np.argmin(dist, axis=1)
        
        ## calculate new centroids
        newCentroids = np.zeros(centroids.shape)
        for j in range(0, k):
            if sum(cluster_label == j) == 0:
                newCentroids[j] = centroids[j]
            else:
                newCentroids[j] = np.mean(data[cluster_label == j, :], axis=0)
        
        ## Check if it is converged
        if np.array_equal(centroids, newCentroids):
            print("Converge")
            break 
        
        centroids = newCentroids
        iterations += 1
        
    return(centroids)

def ScalableKMeansPlusPlus_p(data, k, l,weighted = False, iter=5):
    
    """ Apply the KMeans|| clustering algorithm"""
    
    centroids = data[np.random.choice(range(data.shape[0]),1), :]    
    
    for i in range(iter):   
        
        # Calculate the cost and new distribution
        norm_const = cost_p(data, centroids)[0]
        p = cost_p(data, centroids)[1] 
        
        # Sample the several(l) new centers and append them to the original ones
        centroids = np.r_[centroids, sample_new_p(data, p, l)]
        
    ## reduce k*l to k using KMeans++ 
    weights = get_weight_p(data,centroids)
    if weighted:
        initial = centroids[np.random.choice(range(len(weights)),k,replace=False),:]
        centers=  weightedKMeans(centroids, k, weights, initial)
    else: 
        centers= centroids[np.random.choice(len(weights), k, replace= False, p = weights),:]
    return centers

## 6.  Application and comparison
In this part, we test the two statements as in Bahmani's Paper[2]:

- `k-means||` obtains as a solution whose clustering cost is on par with `k-means++` and hence is expexcted to be much better than random.

- `k-means||` runs in a fewer number of rounds when compared to `k-means++`, which translates into a faster running time especially in the parallel implementation.

based on a simulated dataset and a real-world dataset. 

### 6.1 Simulate Data 

To generate the dataset GAUSSMIXTURE, we sampled k=5 centers from a 15-dimensional spherical Gaussian distribution with mean at the origin and variance $R\in \{1,10,100 \}$ and then added points from Gaussian distributions with unit variance around each center.

In [None]:
## Simulate data
np.random.seed(1234)
k = 20
n = 10000
d = 15

## simulate k centers from 15-dimensional spherical Gaussian distribution 
mean = np.hstack(np.zeros((d,1)))
cov = np.diag(np.array([1,10,100]*5))
centers = np.random.multivariate_normal(mean, cov, k)

## Simulate n data
for i in range(k):
    mean = centers[i]
    if i == 0:
        data = np.random.multivariate_normal(mean, np.diag(np.ones(d)), int(n/k+n%k))
        trueLabels = np.repeat(i,int(n/k+n%k))
    else:
        data = np.append(data, np.random.multivariate_normal(mean, np.diag(np.ones(d)) , int(n/k)), axis = 0) 
        trueLabels = np.append(trueLabels,np.repeat(i,int(n/k)))

Visualizing the first 4 dimension dimension of data as following:

In [None]:
from pandas import DataFrame
data_v = DataFrame(data[:,0:3]) 
data_v['trueLabels'] = trueLabels
import seaborn as sns
sns.set_context("notebook", font_scale=1.3)
sns.pairplot(data_v, hue="trueLabels")
pass

#### Implement K-means with Ramdom Initialization

In [None]:
from kmeans_func import KMeans
centroids_initial = data[np.random.choice(range(data.shape[0]), k, replace=False),:]
output_k = KMeans(data, k, centroids_initial)

In [None]:
## plot the first two dimensions
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
cmap = plt.get_cmap('gnuplot')
colors = [cmap(i) for i in np.linspace(0, 1, k)]

centroids1 =output_k["Centroids"]
labels1 = output_k["Labels"]

for i,color in enumerate(colors,start =1):
    plt.scatter(data[labels1==i, :][:,0], data[labels1==i, :][:,1], color=color)

for j in range(k):
    plt.scatter(centroids1[j,0],centroids1[j,1],color = 'w',marker='x')  

#### Implement KMeans with KMeans++ Intialization

In [None]:
from kmeans_func import KMeans
from kmeanspp_func import KMeansPlusPlus

centroids_initial = KMeansPlusPlus(data, 20)
output_kpp = KMeans(data, k, centroids_initial)

In [None]:
cmap = plt.get_cmap('gnuplot')
colors = [cmap(i) for i in np.linspace(0, 1, k)]

centroids1 =output_kpp["Centroids"]
labels1 = output_kpp["Labels"]

for i,color in enumerate(colors,start =1):
    plt.scatter(data[labels1==i, :][:,0], data[labels1==i, :][:,1], color=color)

for j in range(k):
    plt.scatter(centroids1[j,0],centroids1[j,1],color = 'w',marker='x') 

#### Implement KMeans with Scalable KMeans++ Intialization

In [None]:
from kmeans_func import KMeans
from scalablekmeanspp_func import ScalableKMeansPlusPlus
l = 10
centroids_initial = ScalableKMeansPlusPlus(data, 20, l)
output_spp = KMeans(data, k, centroids_initial)

In [None]:
cmap = plt.get_cmap('gnuplot')
colors = [cmap(i) for i in np.linspace(0, 1, k)]

centroids1 =output_spp["Centroids"]
labels1 = output_spp["Labels"]

for i,color in enumerate(colors,start =1):
    plt.scatter(data[labels1==i, :][:,0], data[labels1==i, :][:,1], color=color)

for j in range(k):
    plt.scatter(centroids1[j,0],centroids1[j,1],color = 'w',marker='x')  

### Compare Misclassification Rate

In [None]:
def MisClassRate(trueLabels, predict):
    """
    Calculate the misclassification rate of the algorithm
    
    Parameters:
    trueLabels   n*1 true labels of each observation  
    predict      n*1 predict labels of each observation 
    
    Returns:     misclassification rate 
    """
    
    df = DataFrame({'True':trueLabels, 'Predict':predict['Labels'],'V':1})
    table = pd.pivot_table(df, values ='V', index = ['True'], columns=['Predict'], aggfunc=np.sum).fillna(0)
    misRate = 1-sum(table.max(axis=1))/n
    return misRate

In [None]:
print("Misclassification Rate:")
print("Random:", MisClassRate(trueLabels, output_k)) # Random 
print("KMeans++:",MisClassRate(trueLabels, output_kpp)) # KMeans++
print("Scalable KMeans++:", MisClassRate(trueLabels, output_spp)) # Scalable KMeans++

We could see that with the initial centroids using `KMeans++` or `scalable KMeans++` algorithm, the misclassification rate is much lower than using `Random` initial centroids, which consistent with the first statement.

### Compare Clustering Cost

In [None]:
from distance_func import distance
from kmeanspp_func import cost

def clusterCost(data,predict):
    dist = distance(data,predict["Centroids"])
    return cost(dist)/(10**4)

In [None]:
print("Clustering Cost:")
print("Random:", clusterCost(data, output_k)) # Random 
print("KMeans++:",clusterCost(data, output_kpp)) # KMeans++
print("Scalable KMeans++:", clusterCost(data, output_spp)) # Scalable KMeans++

The clustering cost of the final results using `KMeans ++` and `Scalable KMeans++` are similar and much better than the clustering cost of random initial centroids.

### Compare runtime

In [28]:
%%time
a = KMeansPlusPlus(data, k)  # KMeans++ 
b = KMeans(data,k,a)

Converge! after: 21 iterations
CPU times: user 7.26 s, sys: 514 ms, total: 7.78 s
Wall time: 8.04 s


In [29]:
%%time
a = ScalableKMeansPlusPlus(data, k, l) # Scalable KMeans++
b = KMeans(data,k,a)

Converge! after: 35 iterations
CPU times: user 11.4 s, sys: 587 ms, total: 12 s
Wall time: 12.2 s


In [30]:
%%time
a = ScalableKMeansPlusPlus_cy(data, k, l) # Scalable KMeans++ with cython 
b = KMeans_cy(data,k,a)

Converge
CPU times: user 10 s, sys: 455 ms, total: 10.5 s
Wall time: 10.8 s


In [15]:
%%time
a = ScalableKMeansPlusPlus_p(data, k, l) # Scalable KMeans++ with parallel
b = KMeans_cy(data,k,a)

AttributeError: __exit__

The runtime of `KMeans++` is slow, since it sample $k$ initial centroids one by one. The runtime of `Scalable KMeans++` without speeding up is even slower than `KMeans++`, since it sample more than $k$ centroids and then clustering them again. After speeding up with `Cython` and `multiprocessing`, the `Scalable KMeans` works better in most case (not so stable). In addition, we use the `Scalable KMeans++` in Spark as follwing, it works much better.

#### Spark

In [34]:
import warnings
warnings.filterwarnings('ignore')

In [35]:
from pyspark.mllib.clustering import KMeans
from pyspark import SparkContext
sc = SparkContext('local[*]')

In [36]:
%%time
pdata =sc.parallelize(data)
data_model = KMeans.train(pdata, k, maxIterations=1000, runs=10, initializationMode="kmeans||")

CPU times: user 121 ms, sys: 17.9 ms, total: 139 ms
Wall time: 10.9 s


### 6.2 Real-World Data

In [37]:
f = open("spambase.data","r")
df = pd.read_table('spambase.data', sep=',', names=range(58))
df = np.array(df)

In [38]:
from kmeans_func import KMeans
from kmeanspp_func import KMeansPlusPlus
from scalablekmeanspp_func import ScalableKMeansPlusPlus

k = 20

# random
Rcentroids_initial = df[np.random.choice(range(df.shape[0]), k, replace=False),:]
Routput_k = KMeans_cy(df, k, Rcentroids_initial)

# KMeans++
Rcentroids_initial_kpp = KMeansPlusPlus_cy(df, k)
Routput_kpp = KMeans_cy(df, k, Rcentroids_initial_kpp)

Converge
Converge


In [39]:
# Scalable KMeans++ with l=2/k
Rcentroids_initial_spp = ScalableKMeansPlusPlus(df, k, k/2,weighted = True)
Routput_spp = KMeans_cy(df, k, Rcentroids_initial_spp)

# Scalable KMeans++ with l=2*k
Rcentroids_initial_spp2k = ScalableKMeansPlusPlus(df, k, k*2,weighted = True)
Routput_spp2k = KMeans_cy(df, k, Rcentroids_initial_spp2k)

Converge
Converge
Converge
Converge


In [40]:
print("Clustering Cost:")
print("Random:", clusterCost(df, Routput_k)) # Random 
print("KMeans++:",clusterCost(df, Routput_kpp)) # KMeans++
print("Scalable KMeans++(l=2/k):", clusterCost(df, Routput_spp)) # Scalable KMeans++
print("Scalable KMeans++(l=2k):", clusterCost(df, Routput_spp2k)) # Scalable KMeans++

Clustering Cost:
Random: 15293.8932604
KMeans++: 2344.39493059
Scalable KMeans++(l=2/k): 5766.82861831
Scalable KMeans++(l=2k): 8717.02948687


From the clustering cost above, we could see that the `KMeans++` and `Scalable KMeans++` works much better than random initial centroids.

In [41]:
%%time
k = 20
l = 10
a = KMeansPlusPlus(df, k)  # KMeans++ 
b = KMeans(df, k, a)

Converge! after: 23 iterations
CPU times: user 4.38 s, sys: 835 ms, total: 5.22 s
Wall time: 5.38 s


In [42]:
%%time
a = ScalableKMeansPlusPlus(df, k, l) # Scalable KMeans++
b = KMeans(df,k,a)

Converge! after: 55 iterations
CPU times: user 9.38 s, sys: 1.17 s, total: 10.5 s
Wall time: 10.7 s


In [43]:
%%time
a = ScalableKMeansPlusPlus_cy(df, k, l) # Scalable KMeans++ with cython 
b = KMeans_cy(df,k,a)

Converge
CPU times: user 16.2 s, sys: 1.58 s, total: 17.8 s
Wall time: 18 s


In [44]:
%%time
a = ScalableKMeansPlusPlus_p(df, k, l) # Scalable KMeans++ with parallel
b = KMeans_cy(df,k,a)

Converge
CPU times: user 24.1 s, sys: 2.52 s, total: 26.7 s
Wall time: 27.9 s


#### Spark

In [45]:
%%time
from pyspark.mllib.clustering import KMeans
pdf = sc.parallelize(df)
df_model = KMeans.train(pdf, k, maxIterations=1000, runs=10, initializationMode="kmeans||")

CPU times: user 121 ms, sys: 40.3 ms, total: 161 ms
Wall time: 5.87 s


## 7 Coclusions

From the implementation on the simulated data and the real-world dataset, the `k-means||` and `k-means++` find a better initial centroids than random in most cases, which leads to a better final clustering performance. Also, `k-means||` runs faster than `k-means++`, since it runs a fewer number of rounds and been speed up in the parallel implementation.

However, the clustering cost of `k-means||` is not that stable. We think it might be caused by the problem in the last step. We firstly tried reclustering the multiple weighted centroids in $C$ into $k$ clusters. Since the initial center for the reclustering is picked randomly, it will have the same problem as using random initialization. Then, referring to some online resources[4], we tried sampling $k$ final centroids from the weighted centroids in $C$ in the last step. But we think it might also get several centroids in one large cluster and lead to an unstable result. 


## Reference

[1] Wu, Xindong, et al. "Top 10 algorithms in data mining." Knowledge and information systems 14.1 (2008): 1-37.

[2] Bahmani, Bahman, et al. "Scalable k-means++." Proceedings of the VLDB Endowment 5.7 (2012): 622-633

[3] Lichman, M. (2013). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.

[4] http://stats.stackexchange.com/questions/135656/k-means-a-k-a-scalable-k-means
