In [1]:
import dask.array as da
from dask.distributed import Client
from dask_ml.preprocessing import StandardScaler
from dask_ml.cluster import KMeans
import numpy as np
from dask_ml.metrics import pairwise_distances
from dask_ml.metrics import pairwise_distances_argmin_min
from time import time
from timeit import default_timer as timer
from dask_ml.datasets import make_blobs
import pandas as pd
from dask.distributed import SSHCluster
import matplotlib.pyplot as plt
import pickle

In [2]:
# List of IP addresses for the virtual machines (VMs) where Dask workers will run
VM_IPS = ['10.67.22.89','10.67.22.89', '10.67.22.241', '10.67.22.171']

# Initialize an SSHCluster with the specified VMs
cluster = SSHCluster(
    hosts=VM_IPS,  # IP addresses of the VMs
    connect_options={
        "username": "tramarin"  # Username used to SSH into the VMs
    },
    worker_options={
        "n_workers": 1,  # Number of worker processes to launch per VM
        "nthreads": 4,   # Number of threads per worker process
        "memory_limit": '7GB'
    },
    scheduler_options={
        "port": 8786,  # Port on the scheduler VM for communication with workers
        "dashboard_address": 8787,  # Port on the scheduler VM for the Dask dashboard
    },
)


# The SSHCluster object 'cluster' now manages the cluster of VMs.
# You can connect a Dask client to this cluster and start submitting tasks.

# Connect the Dask client to the existing cluster using the scheduler's address
client = Client("tcp://localhost:8786")

# Print the cluster configuration
scheduler_info = client.scheduler_info()
workers_info = scheduler_info['workers'] 
worker = list(workers_info.values())[0]
n_workers = len(workers_info)
n_threads = worker['nthreads']

print(f'client dashboard link: {client.dashboard_link}')
#print(f"Cluster setup with {n_workers} workers, each with {n_threads} threads and {memory_limit} memory limit per worker")

2024-09-01 16:51:31,107 - distributed.deploy.ssh - INFO - 2024-09-01 16:51:31,107 - distributed.scheduler - INFO - State start
2024-09-01 16:51:31,109 - distributed.deploy.ssh - INFO - 2024-09-01 16:51:31,108 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-scratch-space/worker-f9p4k1zn', purging
2024-09-01 16:51:31,111 - distributed.deploy.ssh - INFO - 2024-09-01 16:51:31,110 - distributed.scheduler - INFO -   Scheduler at:    tcp://10.67.22.89:8786
2024-09-01 16:51:32,229 - distributed.deploy.ssh - INFO - 2024-09-01 16:51:32,229 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.67.22.89:42343'
2024-09-01 16:51:32,507 - distributed.deploy.ssh - INFO - 2024-09-01 16:51:32,509 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.67.22.171:39503'
2024-09-01 16:51:32,544 - distributed.deploy.ssh - INFO - 2024-09-01 16:51:32,543 - distributed.worker - INFO -       Start worker at:    tcp://10.67.22.89:41219
2024-09-01 16:51:32,604 - dis

client dashboard link: http://localhost:8787/status


In [3]:
client

0,1
Connection method: Direct,
Dashboard: http://localhost:8787/status,

0,1
Comm: tcp://10.67.22.89:8786,Workers: 1
Dashboard: http://10.67.22.89:8787/status,Total threads: 4
Started: Just now,Total memory: 6.52 GiB

0,1
Comm: tcp://10.67.22.89:41219,Total threads: 4
Dashboard: http://10.67.22.89:43641/status,Memory: 6.52 GiB
Nanny: tcp://10.67.22.89:42343,
Local directory: /tmp/dask-scratch-space/worker-rxj6u8v5,Local directory: /tmp/dask-scratch-space/worker-rxj6u8v5
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 60.25 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B


In [4]:
def phi(X,C):
    return pairwise_distances(X, C, metric='sqeuclidean').min(1).sum() 
    # Compute the sum of distances to the nearest centroid at axis=1
    
def kmeans_parallel_init_dask(X, k, l):
    # Step 1: Randomly select a point from X
    n, d = X.shape
    idx = np.random.choice(n, size=1)
    C = X[sorted(idx)].compute()  # Collect to memory for use

    # Step 2: Compute φX(C)
    phi_X_C = phi(X, C).compute() # Compute the sum of distances to the nearest centroid

    # Steps 3-6: Repeat O(log φX(C)) times
    rounds = int(np.log(phi_X_C))
    #print(f"Begin centroid sampling with number of rounds: {rounds}")
    for _ in range(rounds):
        dist_sq = pairwise_distances(X, C, metric='sqeuclidean').min(1)
        dist_sum = dist_sq.sum()
        p_x = l * dist_sq / dist_sum
        samples = da.random.uniform(size=len(p_x), chunks=p_x.chunks) < p_x
        sampled_idx = da.where(samples)[0].compute()
        
        new_C = X[sorted(sampled_idx)].compute() #https://github.com/dask/dask-ml/issues/39 
        C = np.vstack((C, new_C))

    # Step 7: Compute weights
    dist_to_C = pairwise_distances(X, C, metric='euclidean')
    closest_C = da.argmin(dist_to_C, axis=1)

    weights = np.empty(len(C))
    counts = da.bincount(closest_C, minlength=len(C)).compute()
    weights[:len(counts)] = counts
    
    # Normalize weights so that they sum up to the number of centroids
    weight_sum = np.sum(weights)
    if weight_sum == 0:
        raise ValueError("Sum of weights is zero, cannot normalize.")
    
    weights_normalized = weights / weight_sum
    
    dask_C = da.from_array(C, chunks=(C.shape[0], C.shape[1])) # here we ensure that the re-clustering occurs on a single-thread

    # Step 8: Recluster the weighted points in C into k clusters
    #print("Begin centroid re-clustering")
    labels, centroids = lloyd_kmeans_plusplus(X=dask_C, weights=weights_normalized,k=k, max_iters=10, tol=1e-8)

    return centroids

In [5]:
def update_centroids_weighted(X, labels, weights, k):
    """
    Update the centroids by computing the weighted mean of the points assigned to each cluster.
    """
    centroids = []
    
    for i in range(k):
        # Select the points that are assigned to cluster i
        cluster_points = X[labels == i]
        
        # Select the corresponding weights for these points
        cluster_weights = weights[labels == i]
        
        if len(cluster_points) == 0:
            # If no points are assigned to this cluster, avoid division by zero
            # Continue without updating that centroid
            continue

        # Compute the weighted mean using dask.array.average
        weighted_mean = da.average(cluster_points, axis=0, weights=cluster_weights)
        
        # Append the computed weighted mean to the centroids list
        centroids.append(weighted_mean)
    
    # Convert centroids list to Dask array
    return da.stack(centroids)

def kmeans_plusplus_init(X, weights, k):
    '''
    K-means++ initialization to select k initial centroids from X as a numpy array, keeping C as a NumPy array and weighting by provided weights.
    '''
    n, d = X.shape
    # Step 1: Randomly select the first centroid
    idx = np.random.choice(n, size=1)
    C = X[idx].compute()

    for _ in range(1, k):
        # Step 2: Compute distances from each point to the nearest centroid
        # C is a NumPy array, X is a Dask array
        # Compute the distances from each point to the nearest centroid normalizing by weights
        distances = pairwise_distances(X, C, metric='sqeuclidean').min(1) * (weights)
        
        # Compute the probabilities for choosing each point
        probabilities = distances / distances.sum()
        
        # Sample a new point based on these probabilities
        new_idx = np.random.choice(n, size=1, p=probabilities)
        new_centroid = X[sorted(new_idx)].compute()
        
        # Add the new centroid to the list
        C = np.vstack((C, new_centroid))
    return C

def lloyd_kmeans_plusplus(X, weights, k, max_iters=100, tol=1e-8):
    """
    Lloyd's algorithm for k-means clustering using Dask weighting the mean to update centroids.
    """
    centroids = kmeans_plusplus_init(X, weights, k)

    for i in range(max_iters):
        labels = assign_clusters(X, centroids).compute()
        new_centroids = update_centroids_weighted(X, labels, weights=weights, k=k).compute()
        
        if da.allclose(centroids, new_centroids, atol=tol).compute():
            #print(f"Centroid Lloyd Converged after {i+1} iterations.")
            break
        
        centroids = new_centroids

    return labels, centroids

In [6]:
def assign_clusters(X, centroids):
    """
    Assign each point to the nearest centroid using Dask to parallelize the computation.
    """
    return pairwise_distances_argmin_min(X, centroids, metric='sqeuclidean')[0]


def update_centroids(X, labels, k):
    """
    Update the centroids by computing the mean of the points assigned to each cluster with Dask.
    """
    centroids = da.stack([X[labels == i].mean(axis=0) for i in range(k)])
    return centroids
    
def kmeans_parallel(X, k, max_iters=100, tol=1e-8, l=2):
    centroids = kmeans_parallel_init_dask(X, k, l)
    for i in range(max_iters):
        labels = assign_clusters(X, centroids)
        new_centroids = update_centroids(X, labels, k).compute()

        if da.allclose(centroids, new_centroids, atol=tol):
            #print(f"Main KMeans Converged after {i+1} iterations.")
            break

        centroids = new_centroids

    return labels, centroids

In [8]:
# Load the KDDCUP99 dataset in a pd dataframe
file_path = '/opt/kddcup99/kddcup.data.gz'
df = pd.read_csv(file_path, compression='gzip', header=None)

# We identified those as the non-numerical columns
columns_to_drop = [1, 2, 3, 41]

# We remove those columns to accomodate all or part of the dataset into an array
df = df.drop(df.columns[columns_to_drop], axis=1)

num_rows = len(df)

num_sampled = int(num_rows * 1) #we can change the percentage of the dataset to sample

# And take only up to the desired part
df= df.iloc[:num_sampled]

# We initialize a numpy array ready to convert to a dask array during benchmark
data = df.values

del df

In [9]:
n_chunks = 191 
dask_array = da.from_array(data, chunks=(data.shape[0] // n_chunks, data.shape[1]))

from dask_ml.preprocessing import StandardScaler

# Normalize the data before clustering
del data 
scaler = StandardScaler(with_mean=True) # for scaling with a sparse matrix
%time data_normalized = scaler.fit_transform(dask_array)
del dask_array
data_normalized
data_normalized=data_normalized.persist()

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


CPU times: user 3.62 s, sys: 2.63 s, total: 6.25 s
Wall time: 9.77 s


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


In [10]:
data_normalized

Unnamed: 0,Array,Chunk
Bytes,1.39 GiB,7.44 MiB
Shape,"(4898431, 38)","(25646, 38)"
Dask graph,192 chunks in 1 graph layer,192 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 1.39 GiB 7.44 MiB Shape (4898431, 38) (25646, 38) Dask graph 192 chunks in 1 graph layer Data type float64 numpy.ndarray",38  4898431,

Unnamed: 0,Array,Chunk
Bytes,1.39 GiB,7.44 MiB
Shape,"(4898431, 38)","(25646, 38)"
Dask graph,192 chunks in 1 graph layer,192 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [11]:
# Number of clusters for KMeans
k = 4
from dask.distributed import performance_report

report_filename = "dask_performance_report_good_192chunks_7gb.html" #cambia nome file
with performance_report(filename=report_filename):
    synt_labels, synt_centroids = kmeans_parallel(X=data_normalized, k=k, l=2)

from IPython.display import IFrame
# Display the HTML report in Jupyter
IFrame(src=report_filename, width=1280, height=720)

In [13]:
# Clean up
client.close()
cluster.close()

In [None]:
#to visualize the created file
from IPython.display import IFrame
# Display the HTML report in Jupyter
report_filename = "dask_performance_report_good_192chunks_7gb.html" #cambia nome file
IFrame(src=report_filename, width=1280, height=720)
