# From the Dask Summit 2021: Scale Machine Learning Code with Dask

# Init a local cluster

In [1]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)

In [2]:
client

0,1
Connection method: Cluster object,Cluster type: LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 7.77 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:46447,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 7.77 GiB

0,1
Comm: tcp://127.0.0.1:39527,Total threads: 2
Dashboard: http://127.0.0.1:43345/status,Memory: 1.94 GiB
Nanny: tcp://127.0.0.1:41143,
Local directory: /home/jovyan/notebooks/dask-worker-space/worker-ldjn2fxf,Local directory: /home/jovyan/notebooks/dask-worker-space/worker-ldjn2fxf

0,1
Comm: tcp://127.0.0.1:32779,Total threads: 2
Dashboard: http://127.0.0.1:40501/status,Memory: 1.94 GiB
Nanny: tcp://127.0.0.1:42117,
Local directory: /home/jovyan/notebooks/dask-worker-space/worker-3bznsw77,Local directory: /home/jovyan/notebooks/dask-worker-space/worker-3bznsw77

0,1
Comm: tcp://127.0.0.1:42745,Total threads: 2
Dashboard: http://127.0.0.1:45149/status,Memory: 1.94 GiB
Nanny: tcp://127.0.0.1:35415,
Local directory: /home/jovyan/notebooks/dask-worker-space/worker-xx9uvsiv,Local directory: /home/jovyan/notebooks/dask-worker-space/worker-xx9uvsiv

0,1
Comm: tcp://127.0.0.1:35939,Total threads: 2
Dashboard: http://127.0.0.1:43347/status,Memory: 1.94 GiB
Nanny: tcp://127.0.0.1:32947,
Local directory: /home/jovyan/notebooks/dask-worker-space/worker-sf5abopu,Local directory: /home/jovyan/notebooks/dask-worker-space/worker-sf5abopu


# train and optimize model with just scikit-learn

In [3]:
from sklearn.datasets import make_classification

X, y = make_classification(n_samples=10000, n_features=4, random_state=0)

In [4]:
from sklearn.svm import SVC

estimator = SVC(random_state=0)
estimator.fit(X, y)

estimator.support_vectors_[:4]

array([[-0.77244139,  0.3607576 , -2.38110133,  0.08757   ],
       [ 1.14946035,  0.62254594,  0.37302939,  0.45965795],
       [-0.77694695,  0.31434299, -2.26231851,  0.06339125],
       [ 0.79010037,  0.68530624, -0.44740487,  0.44692959]])

In [5]:
estimator.score(X, y)

0.905

In [6]:
%%time
from sklearn.model_selection import GridSearchCV

estimator = SVC(gamma='auto', random_state=0, probability=True)
param_grid = {
    'C': [0.001, 10.0],
    'kernel': ['rbf', 'poly'],
}

grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2)
grid_search.fit(X, y)

Fitting 2 folds for each of 4 candidates, totalling 8 fits
[CV] END ................................C=0.001, kernel=rbf; total time=   4.3s
[CV] END ................................C=0.001, kernel=rbf; total time=   4.3s
[CV] END ...............................C=0.001, kernel=poly; total time=   2.4s
[CV] END ...............................C=0.001, kernel=poly; total time=   2.4s
[CV] END .................................C=10.0, kernel=rbf; total time=   1.1s
[CV] END .................................C=10.0, kernel=rbf; total time=   1.0s
[CV] END ................................C=10.0, kernel=poly; total time=   2.2s
[CV] END ................................C=10.0, kernel=poly; total time=   2.0s
CPU times: user 24.1 s, sys: 532 ms, total: 24.7 s
Wall time: 23.6 s


GridSearchCV(cv=2,
             estimator=SVC(gamma='auto', probability=True, random_state=0),
             param_grid={'C': [0.001, 10.0], 'kernel': ['rbf', 'poly']},
             verbose=2)

In [7]:
grid_search.best_params_, grid_search.best_score_

({'C': 10.0, 'kernel': 'rbf'}, 0.9086000000000001)

Scikit-Learn has nice single-machine parallelism, via `Joblib`. Any scikit-learn estimator that can operate in parallel exposes an `n_jobs` keyword. This controls the number of CPU cores that will be used. **Observe the difference**

In [8]:
%%time
grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)
grid_search.fit(X, y)

Fitting 2 folds for each of 4 candidates, totalling 8 fits
CPU times: user 4.45 s, sys: 197 ms, total: 4.64 s
Wall time: 9.43 s


GridSearchCV(cv=2,
             estimator=SVC(gamma='auto', probability=True, random_state=0),
             n_jobs=-1,
             param_grid={'C': [0.001, 10.0], 'kernel': ['rbf', 'poly']},
             verbose=2)

# Train on Dask using scikit and joblib with dask as backend

In [9]:
param_grid = {
    'C': [0.001, 0.1, 1.0, 2.5, 5, 10.0],
    # Uncomment this for larger Grid searches on a cluster
    'kernel': ['rbf', 'poly', 'linear'],
    'shrinking': [True, False],
}

grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=5, n_jobs=-1)

In [10]:
import joblib
import dask.distributed

In [11]:
%%time
# scatter X and y across the Dask cluster workers, in this case is just a numpy array
with joblib.parallel_backend(f"dask", scatter=[X, y]): 
    grid_search.fit(X, y)

Fitting 5 folds for each of 36 candidates, totalling 180 fits
CPU times: user 20.6 s, sys: 2.56 s, total: 23.2 s
Wall time: 2min 54s


It took 2min 37s on my 3.6 GHz 8-Core Intel Core i9, and 2min 54s on the same computer but running on Docker with 8GB of RAM, 8 CPUs and 1GB of swap

In [15]:
grid_search.best_params_, grid_search.best_score_

({'C': 10.0, 'kernel': 'rbf', 'shrinking': True}, 0.9119000000000002)

# Predict/Estimate on parallel

When used inside a GridSearch, you'll need to update the keys of the parameters, just like with any meta-estimator. The only complication comes when using ParallelPostFit with another meta-estimator like GridSearchCV. In this case, you'll need to prefix your parameter names with estimator__.

In [18]:
from dask_ml.wrappers import ParallelPostFit

In [19]:

svc = ParallelPostFit(SVC(random_state=0, gamma='scale'))

param_grid = {
    # use estimator__param instead of param
    'estimator__C': [0.01, 1.0, 10],
}

grid_search = GridSearchCV(svc, param_grid, cv=3)
grid_search.fit(X, y)

GridSearchCV(cv=3, estimator=ParallelPostFit(estimator=SVC(random_state=0)),
             param_grid={'estimator__C': [0.01, 1.0, 10]})

In [21]:
# simulate large data, 10 times the original
import dask.array as da

big_X = da.concatenate([
    da.from_array(X, chunks=X.shape)
    for _ in range(10)
])
big_X

Unnamed: 0,Array,Chunk
Bytes,3.05 MiB,312.50 kiB
Shape,"(100000, 4)","(10000, 4)"
Count,11 Tasks,10 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 3.05 MiB 312.50 kiB Shape (100000, 4) (10000, 4) Count 11 Tasks 10 Chunks Type float64 numpy.ndarray",4  100000,

Unnamed: 0,Array,Chunk
Bytes,3.05 MiB,312.50 kiB
Shape,"(100000, 4)","(10000, 4)"
Count,11 Tasks,10 Chunks
Type,float64,numpy.ndarray


In [22]:
predicted = grid_search.predict(big_X)
predicted

Unnamed: 0,Array,Chunk
Bytes,781.25 kiB,78.12 kiB
Shape,"(100000,)","(10000,)"
Count,21 Tasks,10 Chunks
Type,int64,numpy.ndarray
"Array Chunk Bytes 781.25 kiB 78.12 kiB Shape (100000,) (10000,) Count 21 Tasks 10 Chunks Type int64 numpy.ndarray",100000  1,

Unnamed: 0,Array,Chunk
Bytes,781.25 kiB,78.12 kiB
Shape,"(100000,)","(10000,)"
Count,21 Tasks,10 Chunks
Type,int64,numpy.ndarray


In [24]:
# This turns a lazy Dask collection into its in-memory equivalent.
# For example a Dask array turns into a NumPy array and a Dask dataframe
# turns into a Pandas dataframe.  The entire dataset must fit into memory
# before calling this operation.
predicted.compute()

array([0, 1, 1, ..., 0, 1, 0])

# Train on large datasets that go beyond the computer memory

`dask-ml` has implemented estimators that work well on dask arrays and dataframes that may be larger than your machine's RAM.

In [26]:
import dask.array as da
import dask.delayed

from sklearn.datasets import make_blobs
import numpy as np

In [28]:
n_centers = 12
n_features = 20

X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)

centers = np.zeros((n_centers, n_features))

for i in range(n_centers):
    centers[i] = X_small[y_small == i].mean(0)

In [29]:
n_samples_per_block = 200000
n_blocks = 600

delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,
                                     centers=centers,
                                     n_features=n_features,
                                     random_state=i)[0]
            for i in range(n_blocks)]
arrays = [da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype=X.dtype)
          for obj in delayeds]
X = da.concatenate(arrays)
X

Unnamed: 0,Array,Chunk
Bytes,17.88 GiB,30.52 MiB
Shape,"(120000000, 20)","(200000, 20)"
Count,2400 Tasks,600 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 17.88 GiB 30.52 MiB Shape (120000000, 20) (200000, 20) Count 2400 Tasks 600 Chunks Type float64 numpy.ndarray",20  120000000,

Unnamed: 0,Array,Chunk
Bytes,17.88 GiB,30.52 MiB
Shape,"(120000000, 20)","(200000, 20)"
Count,2400 Tasks,600 Chunks
Type,float64,numpy.ndarray


An array of 18 GB with 120_000_000 samples

In [30]:
from dask_ml.cluster import KMeans
clf = KMeans(init_max_iter=3, oversampling_factor=10)

In [31]:
%time clf.fit(X)

CPU times: user 1min 25s, sys: 4.85 s, total: 1min 30s
Wall time: 3min 28s


KMeans(init_max_iter=3, oversampling_factor=10)

In [32]:
clf.labels_

Unnamed: 0,Array,Chunk
Bytes,457.76 MiB,781.25 kiB
Shape,"(120000000,)","(200000,)"
Count,4200 Tasks,600 Chunks
Type,int32,numpy.ndarray
"Array Chunk Bytes 457.76 MiB 781.25 kiB Shape (120000000,) (200000,) Count 4200 Tasks 600 Chunks Type int32 numpy.ndarray",120000000  1,

Unnamed: 0,Array,Chunk
Bytes,457.76 MiB,781.25 kiB
Shape,"(120000000,)","(200000,)"
Count,4200 Tasks,600 Chunks
Type,int32,numpy.ndarray


In [39]:
clf.labels_[:100].compute()

array([2, 6, 2, 7, 7, 7, 5, 6, 6, 0, 7, 5, 1, 4, 3, 6, 3, 2, 4, 6, 4, 2,
       7, 0, 3, 0, 4, 4, 7, 1, 2, 7, 5, 5, 3, 4, 0, 5, 6, 6, 5, 2, 2, 5,
       6, 3, 3, 2, 5, 5, 2, 2, 7, 7, 0, 3, 3, 6, 4, 2, 5, 7, 6, 6, 7, 2,
       0, 5, 5, 7, 1, 2, 0, 0, 7, 0, 3, 3, 5, 6, 2, 5, 2, 3, 5, 6, 4, 2,
       2, 3, 1, 2, 0, 6, 3, 6, 4, 7, 5, 0], dtype=int32)

# XGBoost

In [8]:
import xgboost as xgb
import dask.array as da
import dask.distributed

In [5]:
import xgboost as xgb

  from pandas import MultiIndex, Int64Index


`xgboost.dask` is a small wrapper around xgboost. Dask sets XGBoost up, gives XGBoost data and lets XGBoost do it's training in the background using all the workers Dask has available.

In [10]:
# X and y must be Dask dataframes or arrays
num_obs = 1e5
num_features = 20
X = da.random.random(size=(num_obs, num_features), chunks=(1000, num_features))
y = da.random.random(size=(num_obs, 1), chunks=(1000, 1))

dtrain = xgb.dask.DaskDMatrix(client, X, y)

output = xgb.dask.train(
    client,
    {"verbosity": 2, "tree_method": "hist", "objective": "reg:squarederror"},
    dtrain,
    num_boost_round=4,
    evals=[(dtrain, "train")],
)

Function: _start_tracker
args:     (4)
kwargs:   {}
Traceback (most recent call last):
  File "/Users/eduardogutierrez/opt/anaconda3/envs/dask-recipes/lib/python3.8/site-packages/xgboost/tracker.py", line 366, in get_host_ip
    hostIP = socket.gethostbyname(socket.getfqdn())
socket.gaierror: [Errno 8] nodename nor servname provided, or not known

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/eduardogutierrez/opt/anaconda3/envs/dask-recipes/lib/python3.8/site-packages/distributed/worker.py", line 4081, in run
    result = function(*args, **kwargs)
  File "/Users/eduardogutierrez/opt/anaconda3/envs/dask-recipes/lib/python3.8/site-packages/xgboost/dask.py", line 142, in _start_tracker
    host = get_host_ip('auto')
  File "/Users/eduardogutierrez/opt/anaconda3/envs/dask-recipes/lib/python3.8/site-packages/xgboost/tracker.py", line 371, in get_host_ip
    hostIP = socket.gethostbyname(socket.gethostname())
socket.gai

gaierror: [Errno 8] nodename nor servname provided, or not known