# Dask

## Configuration

In [None]:
# parallelization
import joblib
from dask.distributed import Client, LocalCluster
# from dask_cloudprovider import FargateCluster

# system libraries
import timeit
import multiprocessing

# dataframe like pandas
import numpy as np
import dask.dataframe as dd

# machine learning & plot
from sklearn.datasets import load_digits
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC
import matplotlib.pyplot as plt

In [None]:
# in_local = True
memory_limit = '300MB'
cv = 3
n_iter = 50 #0
verbose = 1 #10

## Explore resources 

In [None]:
digits = load_digits()
print('data shape: ', digits.data.shape)
print('target of id=0: ', digits.target[0])
plt.gray()
plt.matshow(digits.images[0])
plt.show()
print(digits.data[0].reshape(8,8))

In [None]:
multiprocessing.cpu_count()

## Methods

In [None]:
def get_sample():
    digits = load_digits()
    return digits.data, digits.target

def get_client():
    try:
        client = Client('tcp://127.0.0.1:8787', timeout='2s')
    except OSError:
        cluster = LocalCluster(n_workers=multiprocessing.cpu_count(), memory_limit=memory_limit, processes=True, scheduler_port=8787)
        print('Cluster link: ', cluster.dashboard_link)
        client = Client(cluster)
    client.restart()
    return client

## Main

In [None]:
# manage dask cluster
t_start = timeit.default_timer()
client = get_client()
t_client = timeit.default_timer() - t_start
print('dask', t_client)

client

In [None]:
# load data
t_start = timeit.default_timer()
data, target = get_sample()
t_load = timeit.default_timer() - t_start
print('load', t_load)

In [None]:
# initialize model
t_start = timeit.default_timer()
param_space = {
    'C': np.logspace(-6, 6, 13),
    'gamma': np.logspace(-8, 8, 17),
    'tol': np.logspace(-4, -1, 4),
    'class_weight': [None, 'balanced'],
}
model = SVC(kernel='rbf')
search = RandomizedSearchCV(model, param_space, cv=cv, n_iter=n_iter, verbose=verbose)
t_conf = timeit.default_timer() - t_start
print('conf', t_conf)

In [None]:
# parallelization of SearchCV
t_start = timeit.default_timer()
# with joblib.parallel_backend('dask', scatter=[data, target]):
with joblib.parallel_backend('dask', n_jobs=multiprocessing.cpu_count(), scatter=[data, target]):
    search.fit(data, target)
t_fit = timeit.default_timer() - t_start
print('fit', t_fit)

In [None]:
client.shutdown()