In [1]:
import numpy as np
import time
from multiprocess import Pool, cpu_count # Using multiprocess instead of multiprocessing
from sklearn.datasets import make_blobs

In [2]:
# 1. Worker function for [P2] Implementation
def calc_distance(X_chunk, centroids):
    """
    Worker Function: [Map Step]
    Calculates distances for a chunk of data and assigns clusters.
    """
    # Vectorized Euclidean distance calculation
    dists = np.linalg.norm(X_chunk[:, np.newaxis] - centroids, axis=2)
    labels = np.argmin(dists, axis=1)
    
    partial_sum = []
    partial_count = []
    n_clusters = len(centroids)
    
    for i in range(n_clusters):
        points_in_cluster = X_chunk[labels == i]
        if len(points_in_cluster) > 0:
            partial_sum.append(np.sum(points_in_cluster, axis=0))
            partial_count.append(len(points_in_cluster))
        else:
            partial_sum.append(np.zeros(X_chunk.shape[1]))
            partial_count.append(0)
            
    return partial_sum, partial_count

In [3]:
# 2. Master function for [P1] Design
def parallel_kmeans(X, n_clusters, max_iter=100, n_jobs=2):
    # Initialize centroids [P1 Revised]
    indices = np.random.choice(X.shape[0], n_clusters, replace=False)
    centroids = X[indices]
    data_chunks = np.array_split(X, n_jobs)
    
    for iteration in range(max_iter):
        # [MAP STEP] Using Pool from multiprocess to handle Jupyter's environment
        with Pool(processes=n_jobs) as pool:
            results = pool.starmap(calc_distance,[(chunk, centroids) for chunk in data_chunks])
        
        # [REDUCE STEP] [P0] Communication occurs here
        new_centroids = np.zeros_like(centroids)
        total_counts = np.zeros(n_clusters)
        
        for p_sum, p_count in results:
            for i in range(n_clusters):
                new_centroids[i] += p_sum[i]
                total_counts[i] += p_count[i]
        
        # Update Centroids
        for i in range(n_clusters):
            if total_counts[i] > 0:
                new_centroids[i] /= total_counts[i]
        
        # Convergence check
        if np.allclose(centroids, new_centroids, atol=1e-4):
            print(f"Converged at iteration {iteration}")
            break
        centroids = new_centroids
        
    return centroids

In [4]:
# 3. Execution and Testing [P3]
if __name__ == "__main__":
    # Input Generation
    print("Generating 500,000 data points...")
    X_input, _ = make_blobs(n_samples=500000, centers=5, n_features=10, random_state=42)
    
    # Serial Test
    print("Running Serial (1 Core)...")
    start_serial = time.time()
    parallel_kmeans(X_input, n_clusters=5, n_jobs=1)
    end_serial = time.time() - start_serial
    print(f"Serial Time: {end_serial:.4f}s")
    
    # Parallel Test
    num_cores = min(4, cpu_count())
    print(f"Running Parallel ({num_cores} Cores)...")
    start_parallel = time.time()
    parallel_kmeans(X_input, n_clusters=5, n_jobs=num_cores)
    end_parallel = time.time() - start_parallel
    print(f"Parallel Time: {end_parallel:.4f}s")
    
    # Speedup Calculation [P3]
    print("-" * 30)
    print(f"Speedup: {end_serial / end_parallel:.2f}x")

Generating 500,000 data points...
Running Serial (1 Core)...
Serial Time: 18.6237s
Running Parallel (4 Cores)...
Converged at iteration 2
Parallel Time: 0.2615s
------------------------------
Speedup: 71.21x
