In [1]:
import cuml
from cuml.metrics import accuracy_score
from cuml.datasets.classification import make_classification

from joblib import parallel_backend

from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster

# This will use all GPUs on the local host by default
cluster = LocalCUDACluster(threads_per_worker=1, ip="", dashboard_address="8002")
c = Client(cluster)

n_workers = 2



In [2]:
from contextlib import contextmanager
import time

@contextmanager
def timed(name):
    t0 = time.time()
    yield
    t1 = time.time()
    print("..%-24s:  %8.4f" % (name, t1 - t0))

In [3]:
def inner_func():
    return X.sum()

def inner_func_explicit(x_in):
    return x_in.sum()

In [4]:
X, y = make_classification(n_samples=10_000_000, n_features=10, n_classes=2)

In [5]:
import joblib
with timed("dask-backend"):
    with parallel_backend("dask", n_jobs=n_workers, client=c):
        res_dask = joblib.Parallel(verbose=100)(
            joblib.delayed(inner_func)()
            for i in range(10)
        )

[Parallel(n_jobs=2)]: Using backend DaskDistributedBackend with 2 concurrent workers.
[Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:   10.3s
[Parallel(n_jobs=2)]: Done   2 tasks      | elapsed:   11.0s
[Parallel(n_jobs=2)]: Done   3 tasks      | elapsed:   11.8s
[Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:   12.5s
[Parallel(n_jobs=2)]: Done   5 tasks      | elapsed:   17.2s
[Parallel(n_jobs=2)]: Done   6 tasks      | elapsed:   19.4s
[Parallel(n_jobs=2)]: Done   7 tasks      | elapsed:   21.3s
[Parallel(n_jobs=2)]: Done   8 out of  10 | elapsed:   21.3s remaining:    5.3s
[Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:   22.6s remaining:    0.0s
[Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:   22.6s finished
..dask-backend            :   22.6091


In [6]:
import joblib
with timed("dask-backend-ex-noscatter"):
    with parallel_backend("dask", n_jobs=n_workers, client=c):
        res_dask = joblib.Parallel(verbose=100)(
            joblib.delayed(inner_func_explicit)(X)
            for i in range(10)
        )

[Parallel(n_jobs=2)]: Using backend DaskDistributedBackend with 2 concurrent workers.
[Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    4.1s
[Parallel(n_jobs=2)]: Done   2 tasks      | elapsed:    4.3s
[Parallel(n_jobs=2)]: Done   3 tasks      | elapsed:    4.4s
[Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    4.4s
[Parallel(n_jobs=2)]: Done   5 tasks      | elapsed:    4.5s
[Parallel(n_jobs=2)]: Done   6 tasks      | elapsed:    4.5s
[Parallel(n_jobs=2)]: Done   7 tasks      | elapsed:    4.7s
[Parallel(n_jobs=2)]: Done   8 out of  10 | elapsed:    4.7s remaining:    1.2s
[Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:    4.8s remaining:    0.0s
[Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:    4.8s finished
..dask-backend-ex-noscatter:    4.8408


In [7]:
import joblib
with timed("dask-backend-ex-scatter"):
    with parallel_backend("dask", n_jobs=n_workers, client=c, scatter=[X]):
        res_dask = joblib.Parallel(verbose=100)(
            joblib.delayed(inner_func_explicit)(X)
            for i in range(10)
        )

[Parallel(n_jobs=2)]: Using backend DaskDistributedBackend with 2 concurrent workers.
[Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    0.1s
[Parallel(n_jobs=2)]: Done   2 tasks      | elapsed:    0.1s
[Parallel(n_jobs=2)]: Done   3 tasks      | elapsed:    0.2s
[Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    0.3s
[Parallel(n_jobs=2)]: Done   5 tasks      | elapsed:    0.3s
[Parallel(n_jobs=2)]: Done   6 tasks      | elapsed:    0.3s
[Parallel(n_jobs=2)]: Done   7 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done   8 out of  10 | elapsed:    0.4s remaining:    0.1s
[Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:    0.4s remaining:    0.0s
[Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:    0.4s finished
..dask-backend-ex-scatter :    3.1994


In [8]:
with timed("loky-backend"):
    with parallel_backend("loky", n_jobs=n_workers):
        res_lok = joblib.Parallel(verbose=100)(
            joblib.delayed(inner_func)()
            for i in range(10)
        )

[Parallel(n_jobs=2)]: Using backend LokyBackend with 2 concurrent workers.
[Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    1.3s
[Parallel(n_jobs=2)]: Done   2 tasks      | elapsed:    1.4s
[Parallel(n_jobs=2)]: Done   3 tasks      | elapsed:    1.6s
[Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    1.7s
[Parallel(n_jobs=2)]: Done   5 tasks      | elapsed:    2.0s
[Parallel(n_jobs=2)]: Done   6 tasks      | elapsed:    2.4s
[Parallel(n_jobs=2)]: Done   7 tasks      | elapsed:    2.8s
[Parallel(n_jobs=2)]: Done   8 out of  10 | elapsed:    3.2s remaining:    0.8s
[Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:    3.8s remaining:    0.0s
[Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:    3.8s finished
..loky-backend            :    4.1000


In [9]:
with timed("loky-backend-ex"):
    with parallel_backend("loky", n_jobs=n_workers):
        res_loky = joblib.Parallel(verbose=100)(
            joblib.delayed(inner_func_explicit)(X)
            for i in range(10)
        )

[Parallel(n_jobs=2)]: Using backend LokyBackend with 2 concurrent workers.
[Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    0.6s
[Parallel(n_jobs=2)]: Done   2 tasks      | elapsed:    0.9s
[Parallel(n_jobs=2)]: Done   3 tasks      | elapsed:    1.2s
[Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    1.6s
[Parallel(n_jobs=2)]: Done   5 tasks      | elapsed:    1.9s
[Parallel(n_jobs=2)]: Done   6 tasks      | elapsed:    2.2s
[Parallel(n_jobs=2)]: Done   7 tasks      | elapsed:    2.6s
[Parallel(n_jobs=2)]: Done   8 out of  10 | elapsed:    2.9s remaining:    0.7s
[Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:    3.7s remaining:    0.0s
[Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:    3.7s finished
..loky-backend-ex         :    3.8619


In [10]:
res = []
with timed("seq"):
    for i in range(10):
        res.append(inner_func_explicit(X))

..seq                     :    0.0050
