# What is Dask and why we need tools like it

There are no problems with processing datasets of up to several Gb, be it some computational task or machine learning model training. Single, although powerful enough, machine can handle such volume easily.

It's a bit more elaborated to process tens of `Gb` or more, or **speed-up training** of complex models. Since vertical scaling is always limited by how large the machine is, there's usually no other way, but to go for horizontal scaling and some type of parallelism.

**Dask** offers tools for this exact case. For example,

- you may want to **leverage all the cores** of your current machine to speed-up the computations, but do not want to to for `multiprocessing`,
- alternatively, you may need to **process data too large** for machine's memory, which is called **out-of-core processing**,
- or you may need a **unified setup** for both local parallelism (for prototyping) and distributed cloud-based computation.

Many interesting problems in machine learning are simply not solvable on a single machine and Dask offers a great and simple way to introduce parallelism into your problem.

Another benefit is that Dask is written in Python, so there's no need to use tricky to set up Scala-based Spark.

In this tutorial we will use **local** setup, i.e. Dask cluster will run on a single machine. The main benefit is full-utilization of all machine cores.

# Dask cluster

First, we need to create a Dask cluster and a Dask client:

In [1]:
%pylab inline
plt.style.use('bmh')

import pathlib
import numpy as np
import pandas as pd

from distributed import Client, LocalCluster

Populating the interactive namespace from numpy and matplotlib


In [2]:
# you may want to change `n_workers` according to your hardware setup
cluster = LocalCluster(n_workers=12)
client = Client(cluster)

In [3]:
cluster

VBox(children=(HTML(value='<h2>LocalCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    …

We created a **cluster** of 12 nodes, and connected to it as a client. Note that you can directly control cluster size from the notebook.

Under the hood, Dask cluster contains **scheduler**, which is responsible for handling computations and spreading them between nodes. Scheduler can be launched also from the command line (see [Command Line](https://docs.dask.org/en/latest/setup/cli.html) section of documentation).

Dask also provides nice realtime **dashboard** to overview tasks and workers (see link in the cell output above).

We can now submit tasks to Dask cluster:

In [4]:
result_future = client.submit(np.sin, np.random.randn(100))

Note that `client.submit` creates what is called **future**, i.e. a handle to task result, which is available as soon as computation completes.

You can retrieve task status

In [5]:
result_future.status

'finished'

In [6]:
result_future.done()

True

or result:

In [7]:
result_future.result()

array([-0.38566922,  0.99510528, -0.65454988,  0.34663499, -0.4439334 ,
       -0.04220119, -0.06956301,  0.00443238,  0.66714345, -0.65239693,
        0.0582789 , -0.96636263,  0.55184192,  0.74382971, -0.06051806,
        0.91579664, -0.0572082 ,  0.99856171,  0.03045415, -0.57625148,
        0.67631342,  0.5595241 , -0.5897384 ,  0.61344204,  0.99679698,
       -0.80810771, -0.84246366,  0.89153179, -0.66727006,  0.3686316 ,
        0.93135312, -0.09279354, -0.30981672,  0.18055714, -0.91940546,
       -0.84028269, -0.85568167, -0.24937423, -0.06769172,  0.91431261,
       -0.95605296,  0.33512032, -0.97361261, -0.1729804 ,  0.99859642,
       -0.52965974, -0.36631202,  0.04066818, -0.182421  ,  0.37265006,
        0.75720296,  0.69464707, -0.70070384,  0.65185387, -0.92658722,
       -0.99572993,  0.62271414,  0.01166503,  0.17914814, -0.97794748,
        0.02352513, -0.06039364,  0.99419334,  0.56095852, -0.96081577,
        0.81174451, -0.78322288,  0.48709807, -0.1035351 ,  0.37

You can also submit **multiple tasks** at once (we recommend to open Dask dashboard alongside and observe how tasks start and proceed):

In [8]:
futures = [client.submit(np.sin, x) for x in np.random.randn(100)]

To get the results, we need to **gather** them:

In [9]:
results = client.gather(futures)
results

[-0.8273538830490595,
 0.873022516274151,
 -0.8861486559438967,
 0.675935836098815,
 0.9991393832119514,
 0.1696600508636658,
 0.7003141469465344,
 -0.8633329987405506,
 0.22136867746820688,
 0.5089359481444492,
 -0.5188206326995011,
 -0.3353401522905225,
 -0.0330622504074351,
 0.24871760442215773,
 0.9542510167476598,
 -0.9401557214029244,
 -0.5295257407841779,
 0.044187215712663326,
 0.34734678165519284,
 0.9960680537581893,
 -0.9788079712704486,
 -0.5229308037953386,
 0.9817890136902524,
 -0.994852732514727,
 -0.6374301180327867,
 0.2509803642770965,
 -0.9358539062459782,
 -0.18231491069040157,
 0.007604044400980221,
 0.12416349105023594,
 -0.5271477910257577,
 -0.580232908038599,
 0.5083249213558899,
 0.20183940171880577,
 0.48621894190693815,
 -0.9349453935133415,
 0.05832975506851019,
 -0.9314275047247508,
 0.734893659108188,
 -0.17826584892600075,
 -0.9912707147603849,
 -0.7000887153382841,
 -0.6925670585980578,
 -0.29476686555519643,
 -0.029796807757556665,
 -0.3767554455734592

Dask also allows for straightforward **chaining** of tasks (note that `s`, `s_sq` and `s_full` are all futures, not Numpy arrays):

In [10]:
x = np.random.randn(10000)
s = client.submit(np.sin, x)
s_sq = client.submit(np.square, x)
s_full = client.submit(np.add, s, s_sq)

In [11]:
s_full.result()

array([ 1.21614459, -0.11206439, -0.15441179, ...,  0.58596929,
        0.55683868,  2.10434109])

# Dask and ML

As a specific and relevant example of parallelization for machine learning, we will consider parallel grid search. Imagine, that you need to fit a parametrized machine learning model (almost all ML models have some parameters).

To find a good set of hyperparameters, you need to fit a model set of parameters. The main Python package for classical machine learning - `scikit-learn` or `sklearn` for short - allows you to do that easily. We will use the Titanic dataset and create a simple classification model for it.

In [12]:
# you may need to change the location according to your local setup
DATA_DIR = pathlib.Path("data/")

In [13]:
train = pd.read_csv(DATA_DIR.joinpath("train.csv"), index_col="PassengerId")
test = pd.read_csv(DATA_DIR.joinpath("test.csv"), index_col="PassengerId")

We will preprocess the dataset first:

In [14]:
train.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 891 entries, 1 to 891
Data columns (total 11 columns):
 #   Column    Non-Null Count  Dtype  
---  ------    --------------  -----  
 0   Survived  891 non-null    int64  
 1   Pclass    891 non-null    int64  
 2   Name      891 non-null    object 
 3   Sex       891 non-null    object 
 4   Age       714 non-null    float64
 5   SibSp     891 non-null    int64  
 6   Parch     891 non-null    int64  
 7   Ticket    891 non-null    object 
 8   Fare      891 non-null    float64
 9   Cabin     204 non-null    object 
 10  Embarked  889 non-null    object 
dtypes: float64(2), int64(4), object(5)
memory usage: 83.5+ KB


In [15]:
age_imputation = train.groupby(["Pclass", "Sex"])["Age"].mean()

train = train.join(age_imputation,
                   on=("Pclass", "Sex"),
                   rsuffix="_imp")

train.loc[train.Age.isnull(), "Age"] = train.loc[train.Age.isnull(), "Age_imp"]
train.drop("Age_imp", axis=1, inplace=True)

test = test.join(age_imputation,
                 on=("Pclass", "Sex"),
                 rsuffix="_imp")

test.loc[test.Age.isnull(), "Age"] = test.loc[test.Age.isnull(), "Age_imp"]
test.drop("Age_imp", axis=1, inplace=True)

most_frequent_port = train.Embarked.value_counts().idxmax()
average_fare = train.Fare.mean()

train.fillna({"Embarked": most_frequent_port}, inplace=True)
test.fillna({"Embarked": most_frequent_port, "Fare": average_fare}, inplace=True)

train.drop(["Name", "Ticket", "Cabin"], axis=1, inplace=True)
test.drop(["Name", "Ticket", "Cabin"], axis=1, inplace=True)

In [16]:
train = pd.get_dummies(train, columns=["Pclass", "Sex", "Embarked"])
test = pd.get_dummies(test, columns=["Pclass", "Sex", "Embarked"])

FEATURES_COLS = train.columns[1:]
TARGET = "Survived"

In [17]:
train.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 891 entries, 1 to 891
Data columns (total 13 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   Survived    891 non-null    int64  
 1   Age         891 non-null    float64
 2   SibSp       891 non-null    int64  
 3   Parch       891 non-null    int64  
 4   Fare        891 non-null    float64
 5   Pclass_1    891 non-null    uint8  
 6   Pclass_2    891 non-null    uint8  
 7   Pclass_3    891 non-null    uint8  
 8   Sex_female  891 non-null    uint8  
 9   Sex_male    891 non-null    uint8  
 10  Embarked_C  891 non-null    uint8  
 11  Embarked_Q  891 non-null    uint8  
 12  Embarked_S  891 non-null    uint8  
dtypes: float64(2), int64(3), uint8(8)
memory usage: 48.7 KB


In [18]:
test.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 418 entries, 892 to 1309
Data columns (total 12 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   Age         418 non-null    float64
 1   SibSp       418 non-null    int64  
 2   Parch       418 non-null    int64  
 3   Fare        418 non-null    float64
 4   Pclass_1    418 non-null    uint8  
 5   Pclass_2    418 non-null    uint8  
 6   Pclass_3    418 non-null    uint8  
 7   Sex_female  418 non-null    uint8  
 8   Sex_male    418 non-null    uint8  
 9   Embarked_C  418 non-null    uint8  
 10  Embarked_Q  418 non-null    uint8  
 11  Embarked_S  418 non-null    uint8  
dtypes: float64(2), int64(2), uint8(8)
memory usage: 19.6 KB


Ok, now our dataset contains no missing values, is fully numeric, so that we can start modeling it. We will use random forest model. You do not need to understand it in full right now, but the main idea is to combine a lot of weak estimators (decision trees) and get a better result overall.

We will also use cross-validation, since it's a crucial part of hyperparameters search. The main idea is, again, simple: you train your models on a part of a dataset, you choose model parameters based on model performance on a different part (previously unseen to reduce overfitting risk, i.e. you cross-validate your model), and then you assess the final model performance with the best parameters on a test set. i.e. test your final model.

`sklearn` provides convenient classes for the entire grid search process. We will use 4-fold cross-validation: for each set of parameters, training dataset will be split in 4 equal parts, and four models will be fitted with that set of parameters in such a way that each model is cross-validated on one of four fold, and the remaining 3 are used for training.

`joblib` is the job manager, which dispatches calculations under the hood and can use different backends to do that in parallel.

In [19]:
import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import f1_score, classification_report

First, we need to specify parameters grid. During grid search, all combinations will be used for fitting.

In [20]:
params = {
    "max_depth": [2,4,6],
    "n_estimators": [100, 200, 500],
    "class_weight": [None, "balanced"]
}

Now we create a model instance, so that `sklearn` knows which model we want. In `sklearn` API model parameters can be directly set, hence, we create an "empty" model, which will serve as a blueprint.

Also note, that we do not provide the scoring criterion. By default, `GridSearchCV` will use whatever scoring the model uses. In the case of `RandomForestClassifier` it's accuracy, which is exactly the metrics Kaggle uses for this dataset.

We now can launch the grid search itself:

In [22]:
model = RandomForestClassifier()
grid_cv = GridSearchCV(model, params, cv=4, verbose=1)

with joblib.parallel_backend('dask'):
    grid_cv.fit(train[FEATURES_COLS], train[TARGET])

Fitting 4 folds for each of 18 candidates, totalling 72 fits


[Parallel(n_jobs=-1)]: Using backend DaskDistributedBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done  26 tasks      | elapsed:   11.6s
[Parallel(n_jobs=-1)]: Done  72 out of  72 | elapsed:   24.2s finished


`joblib` will use the local cluster we created to distribute the training jobs and run them in parallel. The best parameters for the features we have are:

In [23]:
grid_cv.best_params_

{'class_weight': None, 'max_depth': 6, 'n_estimators': 100}

Best score, correspondingly:

In [24]:
grid_cv.best_score_

0.8237991354583283

When grid search finds the best parameters, by default it refits the model on the entire training set, so that we do not need to do that manually. Effectively, we now have a random forest model, trained on the entire training set with the best model parameters. Let's use it for inference:

In [25]:
submission = pd.DataFrame(grid_cv.best_estimator_.predict(test[FEATURES_COLS]),
                          index=test.index, columns=["Survived"])

In [26]:
submission.head()

Unnamed: 0_level_0,Survived
PassengerId,Unnamed: 1_level_1
892,0
893,0
894,0
895,0
896,1


In [27]:
submission.to_csv(DATA_DIR.joinpath("dask_submission.csv"))

These predictions get about `0.775` when submitted to Kaggle.


# Final remarks

You may consider this an overkill for this specific model. That is true, and anyway `joblib` can handle local parallelism well enough. However, imagine that you're searching over a **huge grid** and have a **standby Dask cluster in the cloud**: in that case this setup will serve its purpose really well.

We haven't covered a lot of technical details about Dask (resource quoting, deployment and others), as well as out-of-core processing, but hopefully you got a feeling of it and will dig further as soon as you'll encounter long-running grid/random search or alike.