In [None]:
import dask.array as da
from dask.distributed import Client
from dask_ml.preprocessing import StandardScaler
from dask_ml.cluster import KMeans
import numpy as np
from dask_ml.metrics import pairwise_distances, pairwise_distances_argmin_min
from time import time
from timeit import default_timer as timer
from dask_ml.datasets import make_blobs
import pandas as pd
from dask.distributed import SSHCluster
import matplotlib.pyplot as plt

In [None]:
# IPs / hostnames of your three VMs
hosts = [
    "10.67.22.199",
    "10.67.22.199",   # VM 1  – will run the scheduler *and* a worker
    "10.67.22.138",   # VM 2  – workers only
    "10.67.22.85"    # VM 3  – workers only
]
cluster = SSHCluster(
        hosts,
        connect_options={
            "username": "ungureanu",       # SSH user on all three VMs
            "password": getpass.getpass("SSH password:"),
            "known_hosts": None,
        },
        remote_python="/opt/miniconda3/envs/dask-env/bin/python", 
        scheduler_options={
            "port": 8786,
            "dashboard_address":":8787",
        },
        worker_options={
            "n_workers": 1,        
            "nthreads": 4,      
            "memory_limit": "7.8GB"
        },
        
    )
    
client = Client(cluster)
client

In [None]:
#cluster.close()
#client.close()

# original version

In [1]:
def phi(X,C):
    return pairwise_distances(X, C, metric='sqeuclidean').min(1).sum() 
    # Compute the sum of distances to the nearest centroid at axis=1
    
def kmeans_parallel_init_dask(X, k, l):
    # Step 1: Randomly select a point from X
    n, d = X.shape
    idx = np.random.choice(n, size=1)
    C = X[idx].compute()  # Collect to memory for use

    # Step 2: Compute φX(C)
    phi_X_C = phi(X, C).compute() # Compute the sum of distances to the nearest centroid

    # Steps 3-6: Repeat O(log φX(C)) times
    rounds = int(np.log(phi_X_C))
    #print(f"Begin centroid sampling with number of rounds: {rounds}")
    for _ in range(rounds):
        dist_sq = pairwise_distances(X, C, metric='sqeuclidean').min(1)
        dist_sum = dist_sq.sum()
        p_x = l * dist_sq / dist_sum
        samples = da.random.uniform(size=len(p_x), chunks=p_x.chunks) < p_x
        sampled_idx = da.where(samples)[0].compute()
        
        new_C = X[sorted(sampled_idx)].compute() #https://github.com/dask/dask-ml/issues/39 
        C = np.vstack((C, new_C))

    # Step 7: Compute weights
    dist_to_C = pairwise_distances(X, C, metric='euclidean')
    closest_C = da.argmin(dist_to_C, axis=1)

    weights = np.empty(len(C))
    counts = da.bincount(closest_C, minlength=len(C)).compute()
    weights[:len(counts)] = counts
    
    # Normalize weights so that they sum up to the number of centroids
    weight_sum = np.sum(weights)
    if weight_sum == 0:
        raise ValueError("Sum of weights is zero, cannot normalize.")
    
    weights_normalized = weights / weight_sum
    
    dask_C = da.from_array(C, chunks=(C.shape[0], C.shape[1])) # here we ensure that the re-clustering occurs on a single-thread

    # Step 8: Recluster the weighted points in C into k clusters
    #print("Begin centroid re-clustering")
    labels, centroids = lloyd_kmeans_plusplus(X=dask_C, weights=weights_normalized,k=k, max_iters=10, tol=1e-8)

    return centroids

In [3]:
def update_centroids_weighted(X, labels, weights, k):
    """
    Update the centroids by computing the weighted mean of the points assigned to each cluster.
    """
    centroids = []
    
    for i in range(k):
        # Select the points that are assigned to cluster i
        cluster_points = X[labels == i]
        
        # Select the corresponding weights for these points
        cluster_weights = weights[labels == i]
        
        if len(cluster_points) == 0:
            # If no points are assigned to this cluster, avoid division by zero
            # Continue without updating that centroid
            continue

        # Compute the weighted mean using dask.array.average
        weighted_mean = da.average(cluster_points, axis=0, weights=cluster_weights)
        
        # Append the computed weighted mean to the centroids list
        centroids.append(weighted_mean)
    
    # Convert centroids list to Dask array
    return da.stack(centroids)

def kmeans_plusplus_init(X, weights, k):
    '''
    K-means++ initialization to select k initial centroids from X as a numpy array, keeping C as a NumPy array and weighting by provided weights.
    '''
    n, d = X.shape
    # Step 1: Randomly select the first centroid
    idx = np.random.choice(n, size=1)
    C = X[idx].compute()

    for _ in range(1, k):
        # Step 2: Compute distances from each point to the nearest centroid
        # C is a NumPy array, X is a Dask array
        # Compute the distances from each point to the nearest centroid normalizing by weights
        distances = pairwise_distances(X, C, metric='sqeuclidean').min(1) * (weights)
        
        # Compute the probabilities for choosing each point
        probabilities = distances / distances.sum()
        
        # Sample a new point based on these probabilities
        new_idx = np.random.choice(n, size=1, p=probabilities)
        new_centroid = X[sorted(new_idx)].compute()
        
        # Add the new centroid to the list
        C = np.vstack((C, new_centroid))
    return C

def lloyd_kmeans_plusplus(X, weights, k, max_iters=100, tol=1e-8):
    """
    Lloyd's algorithm for k-means clustering using Dask weighting the mean to update centroids.
    """
    centroids = kmeans_plusplus_init(X, weights, k)

    for i in range(max_iters):
        labels = assign_clusters(X, centroids).compute()
        new_centroids = update_centroids_weighted(X, labels, weights=weights, k=k).compute()
        
        if da.allclose(centroids, new_centroids, atol=tol).compute():
            #print(f"Centroid Lloyd Converged after {i+1} iterations.")
            break
        
        centroids = new_centroids

    return labels, centroids

In [5]:
def assign_clusters(X, centroids):
    """
    Assign each point to the nearest centroid using Dask to parallelize the computation.
    """
    return pairwise_distances_argmin_min(X, centroids, metric='sqeuclidean')[0]


def update_centroids(X, labels, k):
    """
    Update the centroids by computing the mean of the points assigned to each cluster with Dask.
    """
    centroids = da.stack([X[labels == i].mean(axis=0) for i in range(k)])
    return centroids
    
def kmeans_parallel(X, k, max_iters=100, tol=1e-8, l=2):
    centroids = kmeans_parallel_init_dask(X, k, l)
    for i in range(max_iters):
        labels = assign_clusters(X, centroids)
        new_centroids = update_centroids(X, labels, k).compute()

        if da.allclose(centroids, new_centroids, atol=tol):
            #print(f"Main KMeans Converged after {i+1} iterations.")
            break

        centroids = new_centroids

    return labels, centroids

# modified version

In [None]:
import dask.array as da
import numpy as np
from dask_ml.metrics import pairwise_distances, pairwise_distances_argmin_min

def total_min_distance(data, centroids):
    distances = pairwise_distances(data, centroids, metric='sqeuclidean')
    return distances.min(axis=1).sum().compute()

def initial_candidate_selection(data, num_clusters, oversampling_factor):
    num_points, _ = data.shape
    seed_idx = np.random.choice(num_points, size=1)
    centroid_pool = data[seed_idx].compute()

    init_cost = total_min_distance(data, centroid_pool)
    num_rounds = int(np.log(init_cost + 1e-6))  # Avoid log(0)

    for _ in range(num_rounds):
        dist_sq = pairwise_distances(data, centroid_pool, metric='sqeuclidean').min(axis=1)
        prob_dist = (oversampling_factor * dist_sq / dist_sq.sum()).compute()
        random_values = da.random.random(size=len(prob_dist), chunks=prob_dist.chunks)
        selected_mask = random_values < prob_dist
        selected_indices = da.where(selected_mask)[0].compute()
        new_centroids = data[sorted(selected_indices)].compute()
        centroid_pool = np.vstack([centroid_pool, new_centroids])

    return centroid_pool

def compute_assignment_weights(data, centroids):
    distances = pairwise_distances(data, centroids, metric='euclidean')
    closest_indices = da.argmin(distances, axis=1)
    counts = da.bincount(closest_indices, minlength=len(centroids)).compute()
    weight_sum = counts.sum()
    if weight_sum == 0:
        return np.ones(len(centroids)) / len(centroids)
    cluster_weights = counts / weight_sum
    return cluster_weights

def kmeans_plus_plus_weighted_init(data, cluster_weights, num_clusters):
    num_points, _ = data.shape
    seed_idx = np.random.choice(num_points, size=1)
    centers = data[seed_idx].compute()

    for _ in range(1, num_clusters):
        dist_sq = pairwise_distances(data, centers, metric='sqeuclidean').min(axis=1) * cluster_weights
        prob = dist_sq / dist_sq.sum()
        new_idx = np.random.choice(num_points, size=1, p=prob.compute())
        new_center = data[sorted(new_idx)].compute()
        centers = np.vstack([centers, new_center])

    return centers

def assign_to_centroids(data, centers):
    cluster_labels, _ = pairwise_distances_argmin_min(data, centers, metric='sqeuclidean')
    return cluster_labels

def weighted_centroid_update(data, cluster_labels, cluster_weights, num_clusters):
    updated = []
    for cluster_id in range(num_clusters):
        cluster_mask = cluster_labels == cluster_id
        cluster_data = data[cluster_mask]
        weight_subset = cluster_weights[cluster_mask]
        if cluster_data.shape[0] == 0:
            continue
        cluster_mean = da.average(cluster_data, axis=0, weights=weight_subset)
        updated.append(cluster_mean)
    return da.stack(updated)

def run_lloyds(data, cluster_weights, num_clusters, max_iter=100, tolerance=1e-8):
    centroids = kmeans_plus_plus_weighted_init(data, cluster_weights, num_clusters)

    for _ in range(max_iter):
        cluster_labels = assign_to_centroids(data, centroids).compute()
        new_centroids = weighted_centroid_update(data, cluster_labels, cluster_weights, num_clusters).compute()
        if da.allclose(new_centroids, centroids, atol=tolerance).compute():
            break
        centroids = new_centroids
    return cluster_labels, centroids

def run_distributed_kmeans(data, num_clusters, max_iter=100, tolerance=1e-8, oversample_factor=2):
    candidate_centroids = initial_candidate_selection(data, num_clusters, oversample_factor)
    cluster_weights = compute_assignment_weights(data, candidate_centroids)
    dask_centroids = da.from_array(candidate_centroids, chunks=(candidate_centroids.shape[0], candidate_centroids.shape[1]))
    cluster_labels, centroids = run_lloyds(dask_centroids, cluster_weights, num_clusters, max_iter, tolerance)
    
    for _ in range(max_iter):
        cluster_labels = assign_to_centroids(data, centroids)
        new_centroids = da.stack([data[cluster_labels == i].mean(axis=0) for i in range(num_clusters)]).compute()
        if da.allclose(centroids, new_centroids, atol=tolerance).compute():
            break
        centroids = new_centroids

    return cluster_labels, centroids


# artificial data

In [17]:
# blob test for the algorithm performance
n_samples = 5000000*2.5
n_features = 50
centers = 5
random_state = 42
chunks = (n_samples//23,n_features) 
synt_data, true_labels = make_blobs(n_samples=n_samples, n_features=n_features, 
                               centers=centers, chunks=chunks)

In [19]:
synt_data

Unnamed: 0,Array,Chunk
Bytes,4.66 GiB,207.32 MiB
Shape,"(12500000, 50)","(543478, 50)"
Dask graph,24 chunks in 73 graph layers,24 chunks in 73 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 4.66 GiB 207.32 MiB Shape (12500000, 50) (543478, 50) Dask graph 24 chunks in 73 graph layers Data type float64 numpy.ndarray",50  12500000,

Unnamed: 0,Array,Chunk
Bytes,4.66 GiB,207.32 MiB
Shape,"(12500000, 50)","(543478, 50)"
Dask graph,24 chunks in 73 graph layers,24 chunks in 73 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [21]:
# Normalize the data before clustering
scaler = StandardScaler(with_mean=True) # for scaling with a sparse matrix
%time synt_normalized = scaler.fit_transform(synt_data)
del synt_data

CPU times: total: 2min 42s
Wall time: 24.1 s


In [None]:
from dask.distributed import performance_report
k = centers  # Number of clusters

with performance_report(filename="kmean_our.html"):  
    #%time synt_labels, synt_centroids = kmeans_parallel(X=synt_normalized, k=k, l=2)  # original
    %time synt_labels, synt_centroids = run_distributed_kmeans(X=synt_normalized, k=k, l=2)  # modified

In [None]:
from IPython.display import IFrame
# Display the HTML report in Jupyter
IFrame(src="kmean_our.html", width=1280, height=720)

In [None]:
# Plotting a subset of the first 2 dim of the data 
fig, ax = plt.subplots(1, 2, figsize=(12, 6))

subset = int(synt_normalized.shape[0]/10000)

# Kmeans labels plot
ax[0].scatter(synt_normalized[:subset, 0], synt_normalized[:subset, 1], c=synt_labels[:subset], cmap='viridis')
ax[0].scatter(synt_centroids[:, 0], synt_centroids[:, 1], c='red', marker='x', s=100)
ax[0].set_title('K-means clustering with Dask')

# True labels plot
ax[1].scatter(synt_normalized[:subset, 0], synt_normalized[:subset, 1], c=true_labels[:subset], cmap='viridis')
ax[1].scatter(synt_centroids[:, 0], synt_centroids[:, 1], c='red', marker='x', s=100)
ax[1].set_title('True labels')

plt.show()