In [25]:
import numpy as np
from threading import Thread, Lock

# Thread-safe centroid update
class ThreadSafeCentroids:
    def __init__(self, centroids):
        self.centroids = centroids
        self.lock = Lock()

    def update(self, updates):
        with self.lock:
            for i, (new_sum, count) in updates.items():
                if count > 0:
                    self.centroids[i] = new_sum / count

# K-means clustering with threading
def kmeans_threaded(data, k, init_centroids = None, 
                    max_iterations=100, num_threads=1):
    n_samples, n_features = data.shape
    if init_centroids is None:
        centroids = data[np.random.choice(n_samples, k, replace=False)]
    else:
        centroids = init_centroids
    thread_safe_centroids = ThreadSafeCentroids(centroids)

    def assign_and_update(chunk, thread_id, chunk_updates):
        chunk_centroids = np.zeros((k, n_features))
        counts = np.zeros(k)

        distances = np.linalg.norm(chunk[:, None] - centroids[None, :], axis=2)
        clusters = np.argmin(distances, axis=1)

        for i, cluster in enumerate(clusters):
            chunk_centroids[cluster] += chunk[i]
            counts[cluster] += 1

        chunk_updates[thread_id] = {i: (chunk_centroids[i], counts[i]) for i in range(k)}

    for _ in range(max_iterations):
        # Step 1: Divide data into chunks
        chunk_size = n_samples // num_threads
        threads = []
        chunk_updates = [None] * num_threads

        # Step 2: Create and start threads
        for i in range(num_threads):
            start = i * chunk_size
            end = (i + 1) * chunk_size if i != num_threads - 1 else n_samples
            chunk = data[start:end]
            thread = Thread(target=assign_and_update, args=(chunk, i, chunk_updates))
            threads.append(thread)
            thread.start()

        # Step 3: Wait for all threads to finish
        for thread in threads:
            thread.join()

        # Step 4: Update centroids
        global_updates = {}
        for updates in chunk_updates:
            for cluster_id, (new_sum, count) in updates.items():
                if cluster_id not in global_updates:
                    global_updates[cluster_id] = (new_sum, count)
                else:
                    current_sum, current_count = global_updates[cluster_id]
                    global_updates[cluster_id] = (current_sum + new_sum, current_count + count)

        thread_safe_centroids.update(global_updates)

        # Check for convergence
        if np.allclose(centroids, thread_safe_centroids.centroids):
            break
        centroids = np.copy(thread_safe_centroids.centroids)

    return centroids

In [119]:
# import numpy as np
# from multiprocessing import Process, Manager, Array
# import ctypes
# from scipy.spatial.distance import cdist

# # K-means clustering with multiprocessing
# def kmeans_multiprocess(data, k, init_centroids=None,
#                         max_iterations=100, num_processes=4):
#     n_samples, n_features = data.shape
#     data = np.array(data, dtype=np.float64)

#     # Initialize centroids
#     if init_centroids is None:
#         centroids = data[np.random.choice(n_samples, k, replace=False)]
#     else:
#         centroids = init_centroids    
#     shared_centroids = Array(ctypes.c_double, centroids.flatten(), lock=False)

#     def assign_and_update(chunk, return_dict, process_id):
#         local_centroids = np.frombuffer(shared_centroids).reshape((k, n_features))
#         chunk_centroids = np.zeros((k, n_features))
#         counts = np.zeros(k)

#         # Compute distances and assign clusters
#         distances = cdist(chunk, local_centroids)
#         clusters = np.argmin(distances, axis=1)

#         # Compute chunk centroids
#         for i, cluster in enumerate(clusters):
#             chunk_centroids[cluster] += chunk[i]
#             counts[cluster] += 1

#         # Store updates in a shared dictionary
#         return_dict[process_id] = (chunk_centroids, counts)

#     for _ in range(max_iterations):
#         # Divide data into chunks
#         chunk_size = n_samples // num_processes
#         processes = []
#         manager = Manager()
#         return_dict = manager.dict()

#         # Start processes
#         for i in range(num_processes):
#             start = i * chunk_size
#             end = (i + 1) * chunk_size if i != num_processes - 1 else n_samples
#             chunk = data[start:end]
#             process = Process(target=assign_and_update, args=(chunk, return_dict, i))
#             processes.append(process)
#             process.start()

#         # Wait for all processes to complete
#         for process in processes:
#             process.join()

#         # Aggregate updates
#         global_centroids = np.zeros((k, n_features))
#         global_counts = np.zeros(k)
#         for chunk_centroids, counts in return_dict.values():
#             global_centroids += chunk_centroids
#             global_counts += counts

#         # Update centroids
#         for i in range(k):
#             if global_counts[i] > 0:
#                 centroids[i] = global_centroids[i] / global_counts[i]

#         # Update shared centroids
#         np.copyto(np.frombuffer(shared_centroids).reshape((k, n_features)), centroids)

#         # Check for convergence
#         if np.allclose(centroids, np.frombuffer(shared_centroids).reshape((k, n_features))):
#             break
#     return np.array(centroids)

In [134]:
"""
    Optimal version
"""
import numpy as np
from multiprocessing import Process, Array, cpu_count
import ctypes
from scipy.spatial.distance import cdist

# K-means clustering with optimized multiprocessing
def kmeans_multiprocess(data, k, init_centroids=None, max_iterations=100, num_processes=None):
    n_samples, n_features = data.shape
    data = np.array(data, dtype=np.float64)

    # Determine the number of processes to use
    if num_processes is None:
        num_processes = cpu_count()  # Use all available CPU cores by default

    # Initialize centroids
    if init_centroids is None:
        centroids = np.array(data[np.random.choice(n_samples, k, replace=False)], dtype=np.float64)
    else:
        centroids = init_centroids    
    shared_centroids = Array(ctypes.c_double, centroids.flatten(), lock=False)
    
    def assign_and_update(start_idx, end_idx, return_dict, process_id):
        local_centroids = np.frombuffer(shared_centroids).reshape((k, n_features))
        chunk_centroids = np.zeros((k, n_features), dtype=np.float64)
        counts = np.zeros(k, dtype=np.int64)

        # Slice data for this process
        chunk = data[start_idx:end_idx]

        # Compute distances and assign clusters
        # distances = np.zeros((end_idx-start_idx, k))
        # for i in range(end_idx-start_idx):
        #     for j in range(k):
        #         distances[i][j] = norm_square[j]
        # distances += -2 * chunk@centroids.T
        distances = cdist(chunk, local_centroids)
        clusters = np.argmin(distances, axis=1)

        # Compute chunk centroids
        for i, cluster in enumerate(clusters):
            chunk_centroids[cluster] += chunk[i]
            counts[cluster] += 1

        # Store updates in a shared dictionary
        return_dict[process_id] = (chunk_centroids, counts)

    for _ in range(max_iterations):
        # Divide data into chunks
        indices = np.array_split(np.arange(n_samples), num_processes)
        processes = []
        manager = Manager()
        return_dict = manager.dict()
        centroids = np.asarray(centroids)
        #norms_square = np.einsum("ij,ij->i", centroids, centroids)
        
        # Start processes
        for process_id, chunk_indices in enumerate(indices):
            start_idx = chunk_indices[0]
            end_idx = chunk_indices[-1] + 1 
            process = Process(target=assign_and_update, args=(start_idx, end_idx, 
                                                              return_dict, process_id))
            processes.append(process)
            process.start()

        # Wait for all processes to complete
        for process in processes:
            process.join()

        # Aggregate updates
        global_centroids = np.zeros((k, n_features), dtype=np.float64)
        global_counts = np.zeros(k, dtype=np.int64)
        for chunk_centroids, counts in return_dict.values():
            global_centroids += chunk_centroids
            global_counts += counts
            
        # Update centroids
        for i in range(k):
            if global_counts[i] > 0:
                centroids[i] = global_centroids[i] / global_counts[i]
                
        # Check for convergence
        if np.allclose(centroids, np.frombuffer(shared_centroids).reshape((k, n_features))):
            break
            
        # Update shared centroids
        np.copyto(np.frombuffer(shared_centroids).reshape((k, n_features)), centroids)
    return np.array(centroids)

In [135]:
data = np.random.rand(20_000_000, 10)
k = 3
init_centroids = data[np.random.choice(data.shape[0], k, replace=False)]

In [136]:
import os
import time

In [137]:
st = time.perf_counter()
centroids = kmeans_multiprocess(data, k, 
                                init_centroids.copy(), 
                                max_iterations=50, 
                                num_processes=os.cpu_count())
et = time.perf_counter()
print(f"Execute in {et-st} seconds")

Execute in 78.68113189283758 seconds


In [138]:
centroids

array([[0.49924796, 0.49979747, 0.50028756, 0.4999987 , 0.50003132,
        0.76837712, 0.50008134, 0.50006836, 0.50030594, 0.31718857],
       [0.49991338, 0.50008928, 0.50011637, 0.49985921, 0.49991163,
        0.23156486, 0.49974233, 0.49953336, 0.50074087, 0.31700593],
       [0.50065563, 0.50007008, 0.49970823, 0.50009188, 0.50008672,
        0.49993602, 0.50049502, 0.50018627, 0.49939285, 0.80364321]])

In [139]:
from sklearn.cluster import KMeans

In [140]:
st = time.perf_counter()
kmeans = KMeans(n_clusters=k, 
                init=init_centroids.copy(), 
                max_iter=50, tol=1e-8).fit(data)
et = time.perf_counter()
print(f"Execute in {et-st} seconds")

Execute in 15.301100242882967 seconds


In [141]:
kmeans.cluster_centers_

array([[0.49924796, 0.49979747, 0.50028756, 0.4999987 , 0.50003132,
        0.76837712, 0.50008134, 0.50006836, 0.50030594, 0.31718857],
       [0.49991338, 0.50008928, 0.50011637, 0.49985921, 0.49991163,
        0.23156486, 0.49974233, 0.49953336, 0.50074087, 0.31700593],
       [0.50065563, 0.50007008, 0.49970823, 0.50009188, 0.50008672,
        0.49993602, 0.50049502, 0.50018627, 0.49939285, 0.80364321]])