In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=c0aef140c64b2f6bc566d9b4bd53eb3a4392a4b3ce4ad68c8fa3396d845ff6cc
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
pip install mpi4py

Collecting mpi4py
  Downloading mpi4py-3.1.5.tar.gz (2.5 MB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.5 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.2/2.5 MB[0m [31m5.6 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━[0m [32m1.9/2.5 MB[0m [31m27.7 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m27.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: mpi4py
  Building wheel for mpi4py (pyproject.toml) ... [?25l[?25hdone
  Created wheel for mpi4py: filename=mpi4py-3.1.5-cp310-cp310-linux_x86_64.whl size=2746522 sha256=61d6b79b3e588e4a1daf965f3421403b666385f3f12a2

In [None]:
pip install ray

Collecting ray
  Downloading ray-2.8.1-cp310-cp310-manylinux2014_x86_64.whl (62.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.6/62.6 MB[0m [31m11.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: ray
Successfully installed ray-2.8.1


In [None]:
pip install ray

Collecting ray
  Downloading ray-2.8.0-cp310-cp310-manylinux2014_x86_64.whl (62.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.5/62.5 MB[0m [31m14.6 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: ray
Successfully installed ray-2.8.0


In [None]:
import numpy as np
from sklearn.cluster import AgglomerativeClustering
import ray
import time

try:
    ray.init(ignore_reinit_error=True)
except ray.exceptions.RayError:
    pass  # Ignore RayError, which includes RayConnectionError

@ray.remote
def hierarchical_clustering(data_subset, num_clusters):
    clustering = AgglomerativeClustering(n_clusters=num_clusters)
    labels = clustering.fit_predict(data_subset)
    return labels

def parallel_hierarchical_clustering(data, num_clusters, num_processes):
    data_split = np.array_split(data, num_processes)

    # Parallelize hierarchical clustering on each subset using Ray
    futures = [hierarchical_clustering.remote(subset, num_clusters) for subset in data_split]

    # Use ray.wait to handle a subset of tasks at a time
    while futures:
        done, futures = ray.wait(futures, num_returns=min(num_processes, len(futures)))

    # Ensure all remaining tasks are completed
    labels_list = ray.get(done)
    labels_list.extend(ray.get(futures))

    # Concatenate the results from all subsets
    labels = np.concatenate(labels_list)
    return labels

def serial_hierarchical_clustering(data, num_clusters):
    # Perform serial hierarchical clustering
    clustering = AgglomerativeClustering(n_clusters=num_clusters)
    cluster_labels_serial = clustering.fit_predict(data)
    return cluster_labels_serial

if __name__ == "__main__":
    try:
        # User input for data parameters
        num_data_points = int(input("Enter the number of data points: "))
        num_features = int(input("Enter the number of features: "))
        num_clusters = int(input("Enter the number of clusters: "))
        num_processes = int(input("Enter the number of processes: "))

        # Generate random data based on user input
        data = np.random.rand(num_data_points, num_features)

        # Measure serial execution time
        start_time_serial = time.time()

        # Perform serial hierarchical clustering
        cluster_labels_serial = serial_hierarchical_clustering(data, num_clusters)

        # Calculate and print the serial execution time
        execution_time_serial = time.time() - start_time_serial
        print(f"Serial Execution Time: {execution_time_serial:.4f} seconds")

        # Measure parallel execution time
        start_time_parallel = time.time()

        # Perform parallel hierarchical clustering using Ray
        cluster_labels_parallel = parallel_hierarchical_clustering(data, num_clusters, num_processes)

        # Calculate and print the parallel execution time
        execution_time_parallel = time.time() - start_time_parallel
        print(f"Parallel Execution Time: {execution_time_parallel:.4f} seconds")

        # Verify that the outputs are the same
        assert np.array_equal(cluster_labels_serial, cluster_labels_parallel), "Outputs are not the same."

    finally:
        try:
            # Shut down Ray
            ray.shutdown()
        except ray.exceptions.RayError:
            pass  # Ignore RayError, which includes RayConnectionError


2023-12-01 05:20:25,677	INFO worker.py:1673 -- Started a local Ray instance.


Enter the number of data points: 10000
Enter the number of features: 8
Enter the number of clusters: 7
Enter the number of processes: 8
Serial Execution Time: 5.3068 seconds
Parallel Execution Time: 1.2088 seconds


AssertionError: ignored

In [None]:
import numpy as np
from sklearn.cluster import AgglomerativeClustering
import ray
import time

try:
    ray.init(ignore_reinit_error=True)
except ray.exceptions.RayError:
    pass  # Ignore RayError, which includes RayConnectionError

@ray.remote
def hierarchical_clustering(data, num_clusters):
    clustering = AgglomerativeClustering(n_clusters=num_clusters)
    labels = clustering.fit_predict(data)
    return labels

def parallel_hierarchical_clustering(data, num_clusters, num_processes):
    # Parallelize hierarchical clustering on the entire dataset using Ray
    futures = [hierarchical_clustering.remote(data, num_clusters) for _ in range(num_processes)]

    # Wait for all tasks to complete
    results = ray.get(futures)

    # Concatenate the results from all subsets (in this case, the entire dataset)
    labels = np.concatenate(results)
    return labels

def serial_hierarchical_clustering(data, num_clusters):
    # Perform serial hierarchical clustering on the entire dataset
    clustering = AgglomerativeClustering(n_clusters=num_clusters)
    cluster_labels_serial = clustering.fit_predict(data)
    return cluster_labels_serial

if __name__ == "__main__":
    try:
        # Static input for data parameters
        num_data_points = 1000  # Choose a suitable number
        num_features = 8  # Choose a suitable number
        num_clusters = 7  # Choose a suitable number
        num_processes = 8  # Choose a suitable number

        # Generate static random data based on the provided input
        data = np.random.rand(num_data_points, num_features)

        # Measure serial execution time
        start_time_serial = time.time()

        # Perform serial hierarchical clustering on the entire dataset
        cluster_labels_serial = serial_hierarchical_clustering(data, num_clusters)

        # Calculate and print the serial execution time
        execution_time_serial = time.time() - start_time_serial
        print(f"Serial Execution Time: {execution_time_serial:.4f} seconds")

        # Measure parallel execution time
        start_time_parallel = time.time()

        # Perform parallel hierarchical clustering using Ray on the entire dataset
        cluster_labels_parallel = parallel_hierarchical_clustering(data, num_clusters, num_processes)

        # Calculate and print the parallel execution time
        execution_time_parallel = time.time() - start_time_parallel
        print(f"Parallel Execution Time: {execution_time_parallel:.4f} seconds")

        # Print the cluster labels
        print("Cluster Labels (Serial):", cluster_labels_serial)
        print("Cluster Labels (Parallel):", cluster_labels_parallel)

        # Verify that the outputs are the same
        assert np.array_equal(cluster_labels_serial, cluster_labels_parallel), "Outputs are not the same."

    finally:
        try:
            # Shut down Ray
            ray.shutdown()
        except ray.exceptions.RayError:
            pass  # Ignore RayError, which includes RayConnectionError


2023-12-01 14:41:55,096	INFO worker.py:1673 -- Started a local Ray instance.


Serial Execution Time: 0.0854 seconds
Parallel Execution Time: 1.7603 seconds
Cluster Labels (Serial): [4 0 2 1 0 0 0 3 2 4 1 0 1 1 3 4 4 0 0 2 3 0 0 1 4 0 0 1 2 0 3 1 3 1 1 2 0
 4 0 0 4 3 4 3 4 0 0 3 0 0 2 0 2 0 2 0 0 2 2 0 1 0 0 1 1 4 3 0 0 1 0 1 3 1
 2 4 2 2 3 2 0 1 0 4 0 1 2 1 2 0 2 2 4 0 4 0 1 1 4 4 2 4 4 4 0 1 1 3 1 1 2
 3 4 0 3 0 0 0 0 0 2 2 2 2 2 0 0 0 2 0 4 1 2 2 0 1 0 2 3 4 1 4 1 4 2 0 0 1
 3 4 1 3 0 1 2 0 0 3 3 2 4 1 2 2 2 2 2 4 1 1 2 0 1 3 3 1 0 2 4 2 4 4 1 2 2
 1 1 2 1 2 2 0 2 2 1 2 1 1 0 0 2 2 2 3 1 2 3 1 1 4 3 2 3 1 2 0 1 0 2 4 1 0
 0 3 2 4 1 1 4 4 2 2 2 1 2 2 4 3 1 1 3 0 2 2 1 2 0 0 1 3 0 3 4 3 1 4 4 2 3
 3 0 4 1 2 1 2 2 1 1 1 3 0 2 1 2 0 3 4 3 0 3 3 4 2 1 4 2 0 1 0 4 1 4 1 0 2
 2 2 0 2 2 0 1 3 0 3 4 4 3 2 4 2 4 0 2 2 4 1 1 0 0 0 2 1 0 2 3 1 0 1 1 3 4
 4 4 0 3 1 2 0 1 1 4 4 1 1 2 4 0 0 3 2 0 4 2 1 2 2 4 1 2 1 2 1 2 0 4 1 1 0
 3 2 2 1 2 1 1 2 4 1 1 3 0 0 0 1 1 0 0 3 0 3 4 0 2 1 3 3 2 2 3 0 3 1 2 4 2
 2 1 2 1 1 1 0 4 3 0 4 2 4 3 1 4 0 4 2 4 2 4 4 4 4 0 1 0 1 3 4 0 0 3 4 2

AssertionError: ignored