# KMeans with Redis

KMeans is machine-learning algorithm (NP-hard), popularly employed for cluster analysis in data mining, and interesting for benchmarking and performance evaluation. 

The objective of the Kmeans algorithm to group a set of multidimensional points into a predefined number of clusters, in which each point belongs to the closest cluster (with the nearest mean distance), in an iterative process.

This implementation uses Redis to store the input points.


# Define data model

The data model is used to define the objects that will be used in the storage system (Redis).

To this end, it is necessary to implement the user defined classes **overriding the StorageObject** class from the storage API.

*--> In Jupyter notebook, the data model has to be defined into a separate file in order to be imported in the workers*

In [None]:
%%writefile data_model.py

from storage.storage_object import StorageObject

class PSCO(StorageObject):
    def __init__(self, matrix="Content"):
        super(PSCO, self).__init__()
        self.matrix = matrix

    def get_matrix(self):
        return self.matrix

And import it to be used in the master

In [None]:
from data_model import PSCO

# Start PyCOMPSs

First, import the PyCOMPSs library

In [None]:
import os
import pycompss.interactive as ipycompss

Now start the COMPSs runtime defining:

* the storage implementation with the **storage_impl** parameter ('redis' specifies the built-in Redis API)
* the storage configuration file with the **storage_conf** parameter (which contains the nodes - one per line - that belong to the infrastructure)

In [None]:
ipycompss.start(storage_impl='redis', storage_conf=os.getcwd() + "/storage_conf.txt", graph=True)

Continue as a normal PyCOMPSs application...

In [None]:
from pycompss.api.task import task

In [None]:
import numpy as np

In [None]:
def init_random(numV, dim, seed):
    np.random.seed(seed)
    c = [np.random.uniform(-3.5, 3.5, dim)]
    while len(c) < numV:
        p = np.random.uniform(-3.5, 3.5, dim)
        distance = [np.linalg.norm(p-i) for i in c]
        if min(distance) > 2:
            c.append(p)
    return c

In some functions or tasks, the user can use the classes defined in the data model, and since they override **StorageObject** class, they contain the storage API defined functions, such as **make_persistent**.

* **make_persistent** forces the object to be stored in the underlying storage (Redis), and from that point, PyCOMPSs will use just the object identifier among nodes. The workers will retrieve automatically to the objects through the storage API

* Also, since it is an object, it is possible to invoke the user defined methods

In [None]:
#@task(returns=PSCO)  # Not a task for plotting
def genFragment(numV, K, c, dim, mode='gauss'):
    if mode == "gauss":
        n = int(float(numV) / K)
        r = numV % K
        data = []
        for k in range(K):
            s = np.random.uniform(0.05, 0.75)
            for i in range(n+r):
                d = np.array([np.random.normal(c[k][j], s) for j in range(dim)])
                data.append(d)
        mat = np.array(data)[:numV]
    else:
        mat = [np.random.random(dim) for _ in range(numV)]
    fragment = PSCO(mat)
    fragment.make_persistent()
    return fragment

Tasks which receive a storage object can use them as normal objects (e.g. XP in **cluster_points_partial** and **partial_sum**).

In [None]:
@task(returns=dict)
def cluster_points_partial(XP, mu, ind):
    dic = {}
    for x in enumerate(XP.get_matrix()):
        bestmukey = min([(i[0], np.linalg.norm(x[1] - mu[i[0]])) for i in enumerate(mu)], key=lambda t: t[1])[0]
        if bestmukey not in dic:
            dic[bestmukey] = [x[0] + ind]
        else:
            dic[bestmukey].append(x[0] + ind)
    return dic

In [None]:
@task(returns=dict)
def partial_sum(XP, clusters, ind):
    p = [(i, [(XP.get_matrix()[j - ind]) for j in clusters[i]]) for i in clusters]
    dic = {}
    for i, l in p:
        dic[i] = (len(l), np.sum(l, axis=0))
    return dic

Centers reduction task and merging function:

In [None]:
@task(returns=dict, priority=True)
def reduceCentersTask(a, b):
    for key in b:
        if key not in a:
            a[key] = b[key]
        else:
            a[key] = (a[key][0] + b[key][0], a[key][1] + b[key][1])
    return a

In [None]:
def mergeReduce(function, data):
    from collections import deque
    q = deque(list(range(len(data))))
    while len(q):
        x = q.popleft()
        if len(q):
            y = q.popleft()
            data[x] = function(data[x], data[y])
            q.append(x)
        else:
            return data[x]

Convergence check function:

* When *maxIterations* is reached
* Distance between the old centers and the new ones is lower than *epsilon*

In [None]:
def has_converged(mu, oldmu, epsilon, iter, maxIterations):
    print("iter: " + str(iter))
    print("maxIterations: " + str(maxIterations))
    if oldmu != []:
        if iter < maxIterations:
            aux = [np.linalg.norm(oldmu[i] - mu[i]) for i in range(len(mu))]
            distance = sum(aux)
            if distance < epsilon * epsilon:
                print("Distance_T: " + str(distance))
                return True
            else:
                print("Distance_F: " + str(distance))
                return False
        else:
            # Reached the max amount of iterations
            return True

Plotting function:

* Represent a 2D or 3D picture of the centers and points (coloured by cluster)

In [None]:
def plotKMEANS(dim, mu, clusters, data):
    import pylab as plt
    colors = ['b','g','r','c','m','y','k']
    if dim == 2:
        from matplotlib.patches import Circle
        from matplotlib.collections import PatchCollection
        fig, ax = plt.subplots(figsize=(10,10))
        patches = []
        pcolors = []
        for i in range(len(clusters)):
            for key in clusters[i].keys():
                d = clusters[i][key]
                for j in d:
                    j = j - i * len(data[0].get_matrix())
                    C = Circle((data[i].get_matrix()[j][0], data[i].get_matrix()[j][1]), .05)
                    pcolors.append(colors[key])
                    patches.append(C)
        collection = PatchCollection(patches)
        collection.set_facecolor(pcolors)
        ax.add_collection(collection)
        x, y = zip(*mu)
        plt.plot(x, y, '*', c='y', markersize=20)
        plt.autoscale(enable=True, axis='both', tight=False)
        plt.show()
    elif dim == 3:
        from mpl_toolkits.mplot3d import Axes3D
        fig = plt.figure()
        ax = fig.add_subplot(111, projection='3d')
        for i in range(len(clusters)):
            for key in clusters[i].keys():
                d = clusters[i][key]
                for j in d:
                    j = j - i * len(data[0].get_matrix())
                    ax.scatter(data[i].get_matrix()[j][0], data[i].get_matrix()[j][1], data[i].get_matrix()[j][2], 'o', c=colors[key])
        x, y, z = zip(*mu)
        for i in range(len(mu)):
            ax.scatter(x[i], y[i], z[i], s=80, c='y', marker='D')
        plt.show()
    else:
        print("No representable dim")

## MAIN

Parameters (that can be configured in the following cell):

* **numV**: number of vectors (default: 10.000)                                                           
* **dim**: dimension of the points (default: 2)
* **k**: number of centers (default: 4)
* **numFrag**: number of fragments (default: 16)
* **epsilon**: convergence condition (default: 1e-10)
* **maxIterations**: Maximum number of iterations (default: 20)


In [None]:
from pycompss.api.api import compss_wait_on
import time
%matplotlib inline

numV = 10000       # Vectors ----- with 1000 it is feasible to see the evolution across iterations
dim = 2            # Dimensions
k = 4              # Centers
numFrag = 16       # Fragments
epsilon = 1e-10    # Convergence condition
maxIterations = 20 # Max number of iterations

size = int(numV / numFrag)
startTime = time.time()
cloudCenters = init_random(k, dim, 8) # centers to create data groups
X = [genFragment(size, k, cloudCenters, dim, mode='gauss') for _ in range(numFrag)]

mu = init_random(k, dim, 7) # First centers
oldmu = []
n = 0
startTime = time.time()
while not has_converged(mu, oldmu, epsilon, n, maxIterations):
    oldmu = mu
    clusters = [cluster_points_partial(X[f], mu, f * size) for f in range(numFrag)]
    partialResult = [partial_sum(X[f], clusters[f], f * size) for f in range(numFrag)]
    mu = mergeReduce(reduceCentersTask, partialResult)
    mu = compss_wait_on(mu)
    mu = [mu[c][1] / mu[c][0] for c in mu]
    while len(mu) < k:
        # Add new random center if one of the centers has no points.
        indP = np.random.randint(0, size)
        indF = np.random.randint(0, numFrag)
        mu.append(X[indF].get_matrix()[indP])
    n += 1

clusters = compss_wait_on(clusters)
    
print("-----------------------------")
print("Kmeans Time {} (s)".format(time.time() - startTime))
print("-----------------------------")
print("Result:")
print("Iterations: ", n)
print("Centers: ", mu)

plotKMEANS(dim, mu, clusters, X)

Stop the COMPSs runtime

In [None]:
ipycompss.stop()