# Dask client setup

In [1]:
from dask.distributed import LocalCluster
cluster = LocalCluster(n_workers=4,processes=True,
    threads_per_worker=1)          # Fully-featured local Dask cluster
client = cluster.get_client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 4,Total memory: 14.86 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:37643,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 4
Started: Just now,Total memory: 14.86 GiB

0,1
Comm: tcp://127.0.0.1:37183,Total threads: 1
Dashboard: http://127.0.0.1:43227/status,Memory: 3.71 GiB
Nanny: tcp://127.0.0.1:42637,
Local directory: /tmp/dask-scratch-space/worker-u5mfo8bv,Local directory: /tmp/dask-scratch-space/worker-u5mfo8bv

0,1
Comm: tcp://127.0.0.1:38035,Total threads: 1
Dashboard: http://127.0.0.1:44979/status,Memory: 3.71 GiB
Nanny: tcp://127.0.0.1:44441,
Local directory: /tmp/dask-scratch-space/worker-nfnpef1_,Local directory: /tmp/dask-scratch-space/worker-nfnpef1_

0,1
Comm: tcp://127.0.0.1:36167,Total threads: 1
Dashboard: http://127.0.0.1:34357/status,Memory: 3.71 GiB
Nanny: tcp://127.0.0.1:41915,
Local directory: /tmp/dask-scratch-space/worker-mhyfbiz4,Local directory: /tmp/dask-scratch-space/worker-mhyfbiz4

0,1
Comm: tcp://127.0.0.1:37041,Total threads: 1
Dashboard: http://127.0.0.1:36487/status,Memory: 3.71 GiB
Nanny: tcp://127.0.0.1:38849,
Local directory: /tmp/dask-scratch-space/worker-4zro2wh8,Local directory: /tmp/dask-scratch-space/worker-4zro2wh8


In [2]:
import sys
sys.path.append('../')

import algorithms.lloyd_clustering as lloyd

# Generate data

In [18]:
from sklearn.datasets import make_blobs
from sklearn.preprocessing import StandardScaler
import numpy as np

# Generate data for different center amounts
for center_amount in range(2, 11):
    X, y = make_blobs(n_samples=1000, centers=center_amount, n_features=2, random_state=42)
    X = StandardScaler().fit_transform(X)
    np.savetxt(f'../data/data_center_{center_amount}.txt', X, fmt='%.8f')

# Generate data for different dimension amounts
for dimension_amount in range(2, 11):
    X, y = make_blobs(n_samples=1000, centers=5, n_features=dimension_amount, random_state=42)
    X = StandardScaler().fit_transform(X)
    np.savetxt(f'../data/data_dimension_{dimension_amount}.txt', X, fmt='%.8f')

# Generate data for different sample amounts
for sample_amount in range(1000, 10001, 1000):
    X, y = make_blobs(n_samples=sample_amount, centers=5, n_features=2, random_state=42)
    X = StandardScaler().fit_transform(X)
    np.savetxt(f'../data/data_sample_{sample_amount}.txt', X, fmt='%.8f')


## Test the dask version with the dashboard to see if the code is running in parallel

In [3]:
lloyd.lloyd_algorithm('../data/data_05.txt',5,True,3)

{(np.float64(24.253505300698617),
  np.float64(77.24744735942157),
  np.float64(57.60686892373834),
  np.float64(41.833895158532414),
  np.float64(50.9036591919488)),
 (np.float64(32.794065774457934),
  np.float64(27.666926029671128),
  np.float64(23.206297333748314),
  np.float64(51.28078638862953),
  np.float64(49.50612096690528)),
 (np.float64(46.36948501541416),
  np.float64(25.585434881487846),
  np.float64(78.56031738009804),
  np.float64(60.84580785364128),
  np.float64(51.28473240006065)),
 (np.float64(71.65840429845905),
  np.float64(67.70377128953771),
  np.float64(42.65845498783455),
  np.float64(77.68881792376318),
  np.float64(49.4102798053528)),
 (np.float64(77.4585302343524),
  np.float64(52.477625614813384),
  np.float64(48.97892757257209),
  np.float64(22.298968077924584),
  np.float64(51.6286044941653))}

it's visibly parallel but the data is quite small

# Run all versions of the algorithm.
Use memray and timeit to measure performance.

## Import all required extra packages for benchmarking

In [4]:
import logging
import timeit
from memory_profiler import  memory_usage
import ast
import dask.array as da
import  csv

In [2]:
def write_benchmark_to_csv(file:str, data: list):
    with open(file, 'a', newline='') as file:
        writer = csv.writer(file)
        writer.writerows(data)

In [3]:
def process_data(data:list):
    mean_val = np.mean(data)
    min_val = np.min(data)
    max_val = np.max(data)
    std_deviation = np.std(data)
    return [mean_val, min_val, max_val, std_deviation]

In [1]:
def prepare_files(files:[str]):
    for file in files:
        with open(file, 'w') as outputfile:
            writer = csv.writer(outputfile)
            writer.writerow(['amount','mean','min','max','std_deviation'])

## Time benchmarks using timeit.

In [5]:
log = logging.getLogger(__name__)

def time_benchmark():
    log.info('Starting time benchmark')
    paths = [
        '../output/output_time_center_base.csv',
        '../output/output_time_center_numpy.csv',
        '../output/output_time_center_dask.csv',
        '../output/output_time_dimension.csv',
        '../output/output_time_dimension_base.csv',
        '../output/output_time_dimension_numpy.csv',
        '../output/output_time_dimension_dask.csv',
        '../output/output_time_sample_base.csv'
        '../output/output_time_sample_numpy.csv',
        '../output/output_time_sample_dask.csv'
    ]
    
    prepare_files(paths)
    
    
    for center_amount in range(2,11):
        # Read the data from the file
        with open(f'../data/data_center_{center_amount}.txt', encoding="utf-8") as file:
            filecontent = file.read()
            points: list[tuple[float, ...]] = list(ast.literal_eval(filecontent))
        
        # Initialize lists to store the times
        base_times = []
        numpy_times = []
        dask_times = []
        
        # Iterate 100 times to check means and error margin.
        for i in range(0,100):
            initial_centers= lloyd.k_means_plus_plus(points, center_amount)            
            # Time the implementations
            base_times.append(timeit.timeit(lambda: lloyd.k_means_base(points, initial_centers), number=1))
            numpy_times.append(timeit.timeit(lambda: lloyd.k_means_numpy(np.array(points), initial_centers), number=1))
            dask_times.append(timeit.timeit(lambda: lloyd.k_means_dask(da.array(points), initial_centers), number=1))
            
            log.info(f'saving results for center amount: {center_amount}, iteration: {i}')
        # Save the results to the file
        result_base=[center_amount,*(process_data(base_times))]
        result_numpy=[center_amount,*(process_data(numpy_times))]
        result_dask=[center_amount,*(process_data(dask_times))]
        write_benchmark_to_csv('../output/output_time_center_base.csv', result_base)
        write_benchmark_to_csv('../output/output_time_center_numpy.csv', result_numpy)
        write_benchmark_to_csv('../output/output_time_center_dask.csv', result_dask)
    
    log.info('Finished center amount benchmark')
    
    for dimensions in range(2,11):
        #Read the data from the file
        with open(f'../data/data_dimension_{dimensions}.txt', encoding="utf-8") as file:
            filecontent = file.read()
            points: list[tuple[float, ...]] = list(ast.literal_eval(filecontent))
        
        # Initialize lists to store the times
        base_times = []
        numpy_times = []
        dask_times = []
        
        # Iterate 100 times to check means and error margin.
        for i in range(0,100):
            initial_centers= lloyd.k_means_plus_plus(points, 5)
            
            # Time the implementations
            base_times.append(timeit.timeit(lambda: lloyd.k_means_base(points, initial_centers), number=1))
            numpy_times.append(timeit.timeit(lambda: lloyd.k_means_numpy(np.array(points), initial_centers), number=1))
            dask_times.append(timeit.timeit(lambda: lloyd.k_means_dask(da.array(points), initial_centers), number=1))
            
            log.info(f'saving results for dimension amount: {dimensions}, iteration: {i}')
        # Save the results to the file
        result_base=[dimensions,*(process_data(base_times))]
        result_numpy=[dimensions,*(process_data(numpy_times))]
        result_dask=[dimensions,*(process_data(dask_times))]
        write_benchmark_to_csv('../output/output_time_dimension_base.csv', result_base)
        write_benchmark_to_csv('../output/output_time_dimension_numpy.csv', result_numpy)
        write_benchmark_to_csv('../output/output_time_dimension_dask.csv', result_dask)
    
    log.info('Finished dimension amount benchmark')
    
    for sample_amount in range(1000, 10001, 1000):
        # Read the data from the file
        with open(f'../data/data_sample_{sample_amount}.txt', encoding="utf-8") as file:
            filecontent = file.read()
            points: list[tuple[float, ...]] = list(ast.literal_eval(filecontent))
        
        # Initialize lists to store the times
        base_times = []
        numpy_times = []
        dask_times = []
        
        # Iterate 100 times to check means and error margin.
        for i in range(0,100):
            initial_centers= lloyd.k_means_plus_plus(points, 5)
            
            # Time the implementations
            base_times.append(timeit.timeit(lambda: lloyd.k_means_base(points, initial_centers), number=1))
            numpy_times.append(timeit.timeit(lambda: lloyd.k_means_numpy(np.array(points), initial_centers), number=1))
            dask_times.append(timeit.timeit(lambda: lloyd.k_means_dask(da.array(points), initial_centers), number=1))
            
            log.info(f'saving results for sample amount: {sample_amount}, iteration: {i}')
        # Save the results to the file
        result_base=[sample_amount,*(process_data(base_times))]
        result_numpy=[sample_amount,*(process_data(numpy_times))]
        result_dask=[sample_amount,*(process_data(dask_times))]
        write_benchmark_to_csv('../output/output_time_sample_base.csv', result_base)
        write_benchmark_to_csv('../output/output_time_sample_numpy.csv', result_numpy)
        write_benchmark_to_csv('../output/output_time_sample_dask.csv', result_dask)
    log.info('Finished sample amount benchmark')
    
    log.info('Finished time benchmark')

# Memory benchmarks using memory_profiler

In [None]:
def mem_benchmark():
    log.info('Starting memory benchmark')
    paths = [
        '../output/output_mem_center_base.csv',
        '../output/output_mem_center_numpy.csv',
        '../output/output_mem_center_dask.csv',
        '../output/output_mem_dimension.csv',
        '../output/output_mem_dimension_base.csv',
        '../output/output_mem_dimension_numpy.csv',
        '../output/output_mem_dimension_dask.csv',
        '../output/output_mem_sample_base.csv'
        '../output/output_mem_sample_numpy.csv',
        '../output/output_mem_sample_dask.csv'
    ]
    
    prepare_files(paths)
    
    for center_amount in range(2,11):
        # Read the data from the file
        with open(f'../data/data_center_{center_amount}.txt', encoding="utf-8") as file:
            filecontent = file.read()
            points: list[tuple[float, ...]] = list(ast.literal_eval(filecontent))
        
        # Initialize lists to store the memory usage
        base_mem = []
        numpy_mem = []
        dask_mem = []
        
        # Iterate 100 times to check means and error margin.
        for i in range(0,100):
            initial_centers= lloyd.k_means_plus_plus(points, center_amount)
            
            # Memory the implementations
            base_mem.append(memory_usage((lloyd.k_means_base, (points, initial_centers))))
            numpy_mem.append(memory_usage((lloyd.k_means_numpy, (np.array(points), initial_centers))))
            dask_mem.append(memory_usage((lloyd.k_means_dask, (da.array(points), initial_centers)),multiprocess=True))
        
            log.info(f'saving results for center amount: {center_amount}, iteration: {i}')
        # Save the results to the file
        result_base=[center_amount,*(process_data(base_mem))]
        result_numpy=[center_amount,*(process_data(numpy_mem))]
        result_dask=[center_amount,*(process_data(dask_mem))]
        write_benchmark_to_csv('../output/output_mem_center_base.csv', result_base)
        write_benchmark_to_csv('../output/output_mem_center_numpy.csv', result_numpy)
        write_benchmark_to_csv('../output/output_mem_center_dask.csv', result_dask)
    
    log.info('Finished center amount benchmark')
    
    for dimensions in range(2,11):
        # Read the data from the file
        with open(f'../data/data_dimension_{dimensions}.txt', encoding="utf-8") as file:
            filecontent = file.read()
            points: list[tuple[float, ...]] = list(ast.literal_eval(filecontent))
        
        # Initialize lists to store the memory usage
        base_mem = []
        numpy_mem = []
        dask_mem = []
        
        # Iterate 100 times to check means and error margin.
        for i in range(0,100):
            initial_centers= lloyd.k_means_plus_plus(points, 5)
            
            # Memory the implementations
            base_mem.append(memory_usage((lloyd.k_means_base, (points, initial_centers))))
            numpy_mem.append(memory_usage((lloyd.k_means_numpy, (np.array(points), initial_centers))))
            dask_mem.append(memory_usage((lloyd.k_means_dask, (da.array(points), initial_centers)),multiprocess=True))
        
            log.info(f'saving results for dimension amount: {dimensions}, iteration: {i}')
        # Save the results to the file
        result_base=[dimensions,*(process_data(base_mem))]
        result_numpy=[dimensions,*(process_data(numpy_mem))]
        result_dask=[dimensions,*(process_data(dask_mem))]
        write_benchmark_to_csv('../output/output_mem_dimension_base.csv', result_base)
        write_benchmark_to_csv('../output/output_mem_dimension_numpy.csv', result_numpy)
        write_benchmark_to_csv('../output/output_mem_dimension_dask.csv', result_dask)
    
    log.info('Finished dimension amount benchmark')
    
    for sample_amount in range(1000, 10001, 1000):
        # Read the data from the file
        with open(f'../data/data_sample_{sample_amount}.txt', encoding="utf-8") as file:
            filecontent = file.read()
            points: list[tuple[float, ...]] = list(ast.literal_eval(filecontent))
        
        # Initialize lists to store the memory usage
        base_mem = []
        numpy_mem = []
        dask_mem = []
        
        # Iterate 100 times to check means and error margin.
        for i in range(0,100):
            initial_centers= lloyd.k_means_plus_plus(points, 5)
            
            # Memory the implementations
            base_mem.append(memory_usage((lloyd.k_means_base, (points, initial_centers))))
            numpy_mem.append(memory_usage((lloyd.k_means_numpy, (np.array(points), initial_centers))))
            dask_mem.append(memory_usage((lloyd.k_means_dask, (da.array(points), initial_centers)),multiprocess=True))
        
            log.info(f'saving results for sample amount: {sample_amount}, iteration: {i}')
        # Save the results to the file
        result_base=[sample_amount,*(process_data(base_mem))]
        result_numpy=[sample_amount,*(process_data(numpy_mem))]
        result_dask=[sample_amount,*(process_data(dask_mem))]
        write_benchmark_to_csv('../output/output_mem_sample_base.csv', result_base)
        write_benchmark_to_csv('../output/output_mem_sample_numpy.csv', result_numpy)
        write_benchmark_to_csv('../output/output_mem_sample_dask.csv', result_dask)
    
    log.info('Finished sample amount benchmark')
    
    log.info('Finished memory benchmark')

# Run the benchmarks

In [None]:
time_benchmark()
mem_benchmark()

# clean up environment

In [None]:
client.close()
cluster.close()

# The results are in the output folder