In [None]:
# Import libraries
# Dask
from dask.distributed import Client, SSHCluster, progress
import dask.array as da
import dask.dataframe as dd
# Others
import numpy as np
import pandas as pd

In [None]:
cluster = SSHCluster(
    ["10.67.22.240", "10.67.22.17", "10.67.22.100", "10.67.22.126"],
    connect_options={"known_hosts": None},
    worker_options={"nthreads": 2},
    scheduler_options={"port": 0, "dashboard_address": ":8787"}
)
client = Client(cluster)

In [None]:
client

## Developing the K-means|| algorithm

### Aux functions

One important function here is the cost:

$$\phi_X(C) = \sum_{x\in X} d^2(x, C) = \sum_{x\in X} \min_{i=1,..., k}||x-c_i||^2$$

Notice that:

$$||x-y||^2 = \sum_j (x_j - y_j)^2 = \sum_j x_j^2 - 2x_j y_j + y_j^2$$

Then, for $x_n\in\{x_1, x_2, ..., x_{|X|}\}$ and $c_m\in{c_1, c_2, ..., c_{|C|}}$, we can define the squared distance matrix $D^2\in\mathbb{R}^{|X|\times |C|}$ as:

$$D^2_{nm} = ||x_n-c_m||^2 = \sum_j x_{nj}^2 - 2x_{nj} c_{mj} + c_{mj}^2$$

$$D^2_{nm} = \sum_j x_{nj}x^T_{jn} - 2\sum_j x_{nj} c^T_{jm} + \sum_j c_{mj}c^T_{jm} = (XX^T)_{nn} - 2 (XC^T)_{nm} + (CC^T)_{mm}$$

Furthermore, notice that if the data $X$ is fixed, then the matrix $(XX^T)$ is constant and does not depend on the choice of centroids $C$. Therefore, we may calculate the vector $(XX^T)_{nn}$ at the beggining of the process and store it so that we don't need to recalculate it every time we want to estimate the cost function.

In [None]:
def get_first_sample(data_path):
    '''
    Params:
        data_path : str
            Path to a given data file.
    Output:
        Returns the first row of the given data.
    '''
    # Read only first row using pandas
    return pd.read_csv(data_path, nrows=1)


def get_XXT_term(X):
    '''
    Params:
        X : Dask array or dataframe
            Array containing data points.
    Output:
        Returns the self multiplication term from the
        squared distance matrix formula.
    '''
    # Turn into array
    X_da = da.array(X)
    # Get diagonal of X*X_T
    XXT = da.einsum('ij,ij -> i', X_da, X_da)
    return XXT


def partial_squared_dist_matrix(C, X):
    '''
    Params:
        C : Dask array or dataframe
            Array containing centroid locations
        X : Dask array or dataframe
            Array containing data points.
    Output:
        Returns the partial squared distance matrix, 
        evaluated over the set of points X with respect 
        to the centroids C. The partial matrix is defined
        as:
         D'^2 = 2*XC^T + CC^T
    '''
    # Turn into arrays
    X_da = da.array(X)
    
    # Calculate XC term
    XC_term = da.einsum('nj, mj -> nm', X_da, C)

    # Calculate CC term
    CC_term = da.einsum('ij,ij -> i', C, C)
    
    return -2*XC_term + CC_term
    

def cost_function(C, X, XXT):
    '''
    Params:
        C : Dask array or dataframe
            Array containing centroid locations
        X : Dask array or dataframe
            Array containing data points.
        XXT : Dask array or dataframe
            Array with same number of rows as X, 
            containing the X*X^T term of the
            squared distance matrix.
    Output:
        Returns the K-means cost function, evaluated
        over the set of points X with respect to the
        centroids C.
    '''
    # First, get partial squared distances
    D2 = partial_squared_dist_matrix(C, X)

    # Minimize over C axis and sum
    D2_min_sum = da.sum(da.min(D2, axis=1))

    # Add to XXT sum and return
    return da.sum(XXT) + D2_min_sum


def sample_new_centroids(C, X, XXT, phi, l):
    '''
    Params:
        C : Dask array or dataframe
            Array containing centroid locations
        X : Dask array or dataframe
            Array containing data points.
        XXT : Dask array or dataframe
            Array with same number of rows as X, 
            containing the X*X^T term of the
            squared distance matrix.
        phi : float
            Current cost function for the data X
            and the centroids C.
        l : float or int
            Oversampling factor, must be greater 
            than zero.
    Output:
        Returns new centroids from X, sampled with
        probability:
             p_x = l * D^2 / phi
        where D^2 is the squared distance from x to
        C and phi is the current cost function.
    '''
    # Get D^2
    partial_D_sq = partial_squared_dist_matrix(C, X)
    D_sq = da.add(XXT, da.min(partial_D_sq, axis=1))

    # Get sampling probabilities
    p_X = (l * D_sq / phi).compute_chunk_sizes()
    
    # Draw random numbers between 0 and 1
    rand_nums = da.random.uniform(size=p_X.shape)

    # Get new centroid indexes
    C_prime_idx = rand_nums < p_X

    # Gather new centroids from the data
    C_prime = X.loc[C_prime_idx]
    return C_prime

def get_cluster_classification(C, X, XXT):
    '''
    Params:
        C : Dask array or dataframe
            Array containing centroid locations
        X : Dask array or dataframe
            Array containing data points.
    Output:
        Returns the corresponding centroid in C
        for each sample in X.
    '''
    # Get partial squared distances
    partial_D_sq = partial_squared_dist_matrix(C, X)

    # Select the correct centroid from arg min
    C_ids = da.argmin(partial_D_sq, axis=1)
    return C_ids

def get_centroid_weights(labels):
    '''
    Params:
        labels : array
            Set of labels indicating the centroid of
            each data point.
    Output:
        Returns the weight of each centroid, defined 
        as the number of samples in the data closer
        to that centroid than to any other centroid.
    '''
    unique_labels, counts = da.unique(da.array(labels), return_counts=True)
    return unique_labels, counts


def compute_centroids(X, weights, labels, dims):
    '''
    Params:
        X : array
            Array containing data points.
        weights : array
            Weight for each data point.
        labels : array
            Set of labels indicating the cluster of
            each data point.
        dims : int
            Number of dimensions of the given data.
    Output:
        Returns the centroids for the given data and
        the given partition.
    '''    
    # Turn data into Dask array
    X = da.array(X)
    weights = da.array(weights)
    
    # Get unique labels
    unique_l = da.unique(da.array(labels)).compute_chunk_sizes()

    # Init new centroids
    C = da.array(np.zeros(shape=(len(unique_l), dims)))

    # Operate for each unique cluster
    for i, idx in enumerate(unique_l):
        # Get data
        C_data = X[labels == idx, :].compute_chunk_sizes()
        data_weights = weights[labels == idx].compute_chunk_sizes()

        # Calculate weighted mean as new centroid
        new_C = da.average(C_data, axis=0, weights=data_weights)

        # Store new centroid
        C[i,:] += new_C
        
    return C

### Main pipeline

In [None]:
# Script parameters
RANDOM_SEED = 42
INPUT_DATA = 'testing_data.csv'
LABEL_COLUMN = 'label'
NPARTITIONS = 1
K = 3 # Number of clusters
L = 3 # Oversampling factor
VERBOSE = 2 # Amount of information to print out (0: Nothing, 1: Only timings, 2: All information)

# Initialize random number generator from Dask and numpy seed
rng = da.random.default_rng(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

# Read input data
if VERBOSE == 2:
    print('Reading input data\n')
data = pd.read_csv(INPUT_DATA)
data_shape = data.shape

# Separate labels from input
if LABEL_COLUMN != None:
    # Labels
    future = client.scatter(data[LABEL_COLUMN])  # send labels to one worker
    y_true = dd.from_delayed([future], meta=data[LABEL_COLUMN])  # build dask.dataframe on remote data
    y_true = y_true.repartition(npartitions=NPARTITIONS).persist()  # split
    client.rebalance(y_true)  # spread around all of your workers

    # Input
    X_width = data_shape[1]-1
    X = data.drop(columns=[LABEL_COLUMN])
    future = client.scatter(X) # send data to one worker
    X = dd.from_delayed([future], meta=X)  # build dask.dataframe on remote data
    X = X.repartition(npartitions=NPARTITIONS).persist()  # split
    client.rebalance(X)  # spread around all of your workers
    
else:
    # Only input
    X_width = data_shape[1]
    X = data
    future = client.scatter(X) # send data to one worker
    X = dd.from_delayed([future], meta=X, shape=data_shape)  # build dask.dataframe on remote data
    X = X.repartition(npartitions=NPARTITIONS).persist()  # split
    client.rebalance(X)  # spread around all of your workers

# Run the K-means algorithm:
# Get first sample as initial centroid
first_sample = get_first_sample(INPUT_DATA)
if LABEL_COLUMN != None:
    first_sample = first_sample.drop(columns=[LABEL_COLUMN])
C = da.array([np.array(first_sample).flatten()])

# Calculate constant XXT term, 
# also persist since we are going to reuse it 
XXT = get_XXT_term(X).persist()

# Get initial cost function
phi_init = cost_function(C, X, XXT).compute()

# Get number of iterations of the || algorithm
O_log_phi = round(np.log(phi_init))

# Init current cost
phi = phi_init.copy()

# Proceed with main || loop
if VERBOSE == 2:
    print('Running K-means|| initialization:')
for i in range(O_log_phi):
    if VERBOSE == 2:
        print(f'Iteration {i+1} of {O_log_phi}')
        
    # Sample new centroids
    C_prime = sample_new_centroids(C, X, XXT, phi, L)

    # Add to the current centroids
    C = da.vstack([C, C_prime]).compute()

    # Calculate new cost and update current
    phi = cost_function(C, X, XXT).compute()

if VERBOSE == 2:
    # Print number of final centroids from ||
    print('\nNumber of initialized centroids:', C.shape[0])
    
    # Print initial vs. final cost
    print('Cost before initialization:', phi_init)
    print('Cost after initialization:', phi)

# Get the weight of each centroid
if VERBOSE == 2:
    print('\nCalculating centroid weights')
    
X_labels = get_cluster_classification(C, X, XXT).compute_chunk_sizes()
used_C, w_C = get_centroid_weights(X_labels)
used_C = used_C.compute()
w_C = w_C.compute()

# Proceed with Lloyd's algorithm on the centroids
if VERBOSE == 2:
    print('\nClustering centroids')

# Initialize k final centroids, as the k-th heaviest
# centroids from the previous step
C_f = C[np.isin(w_C, np.sort(w_C, )[len(w_C)-K:])]

# Calculate XXT for centroids
CCT =  get_XXT_term(C).persist()

# Perform iterative adjustments
lloyd_done = False
N_lloyd_steps = 0
while not lloyd_done:
    # Save old labels (after first iteration)
    if N_lloyd_steps > 0:
        old_labels = C_labels.copy()
    
    # Calculate current clustering
    C_labels = get_cluster_classification(C_f, C, CCT).persist()

    # Compute new centroids from mean within clusters
    C_f = compute_centroids(C, w_C, C_labels, X_width).compute()
    
    # Check for termination condition (after first iteration)
    if N_lloyd_steps > 0:
        different_labels = da.sum(old_labels != C_labels).compute()
        if different_labels == 0:
            lloyd_done = True

    # Increase step counter
    N_lloyd_steps += 1

if VERBOSE == 2:
    print(f'Centroid clustering finished after {N_lloyd_steps} iterations.')

# Compute final labels
final_labels = get_cluster_classification(C_f, X, XXT).compute()

## Closing the client

In [None]:
client.close()