Use Voting Classifiers
======================

A [Voting classifier](http://scikit-learn.org/stable/modules/ensemble.html#voting-classifier) model combines multiple different models (i.e., sub-estimators) into a single model, which is (ideally) stronger than any of the individual models alone. 

[Dask](http://ml.dask.org/joblib.html) provides the software to train individual sub-estimators on different machines in a cluster. This enables users to train more models in parallel than would have been possible on a single machine. Note that users will only observe this benefit if they have a distributed cluster with more resources than their single machine (because sklearn already enables users to parallelize training across cores on a single machine).

What follows is an example of how one would deploy a voting classifier model in dask (using a local cluster).

<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg" width="30%" alt="Dask logo">

In [1]:
from sklearn.ensemble import VotingClassifier
from sklearn.linear_model import SGDClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC

import sklearn.datasets

We create a synthetic dataset (with 1000 rows and 20 columns) that we can give to the voting classifier model.

In [2]:
X, y = sklearn.datasets.make_classification(n_samples=1_000, n_features=20)

In [5]:
X[1:3], y[1:6] 

(array([[-1.45599042, -0.89833428,  0.82832075, -0.13685283,  0.43154217,
          1.37479417,  1.04756559,  0.27898677, -1.02998656, -0.25376654,
         -1.0163173 , -0.82696912,  0.11185965,  0.99510313, -0.84523275,
          0.674029  , -1.05413864,  1.58181778,  0.0519534 ,  0.34222092],
        [-0.63243196,  0.17229496,  0.44844456, -2.53030678,  0.55624536,
         -0.31274824, -0.39310599, -0.52598743,  0.6286556 , -0.79559269,
         -0.34013307,  0.17774073,  0.72093957,  0.19787114,  1.83916978,
          0.3137113 , -0.39790351, -1.21070307, -2.378677  , -0.56823504]]),
 array([0, 1, 1, 0, 0]))

We specify the VotingClassifier as a list of (name, sub-estimator) tuples. Fitting the VotingClassifier on the data fits each of the sub-estimators in turn. We set the ```n_jobs``` argument to be -1, which instructs sklearn to use all available cores (notice that we haven't used dask).

In [7]:
classifiers = [
    ('sgd', SGDClassifier(max_iter=1000)),
    ('logisticregression', LogisticRegression()),
    ('svc', SVC(gamma='auto')),
]
clf = VotingClassifier(classifiers, n_jobs=-1)
clf

We call the classifier's fit method in order to train the classifier.

In [8]:
%time clf.fit(X, y)

CPU times: user 20.4 ms, sys: 59.8 ms, total: 80.2 ms
Wall time: 1.6 s


Creating a Dask [client](https://distributed.readthedocs.io/en/latest/client.html) provides performance and progress metrics via the dashboard. Because ```Client``` is given no arugments, its output refers to a [local cluster](http://distributed.readthedocs.io/en/latest/local-cluster.html) (not a distributed cluster).

We can view the dashboard by clicking the link after running the cell.

In [9]:
import joblib
from distributed import Client

client = Client()
client

2022-10-25 21:24:56,768 - distributed.diskutils - INFO - Found stale lock file and directory '/Users/parsanemati/Yandex.Disk.localized/github/data_science_eda/parallel computing /Dask website/dask-worker-space/worker-87ymwm_q', purging
2022-10-25 21:24:56,768 - distributed.diskutils - INFO - Found stale lock file and directory '/Users/parsanemati/Yandex.Disk.localized/github/data_science_eda/parallel computing /Dask website/dask-worker-space/worker-_aw7xnq3', purging
2022-10-25 21:24:56,769 - distributed.diskutils - INFO - Found stale lock file and directory '/Users/parsanemati/Yandex.Disk.localized/github/data_science_eda/parallel computing /Dask website/dask-worker-space/worker-ljht9blg', purging
2022-10-25 21:24:56,769 - distributed.diskutils - INFO - Found stale lock file and directory '/Users/parsanemati/Yandex.Disk.localized/github/data_science_eda/parallel computing /Dask website/dask-worker-space/worker-t6knnpqo', purging


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 5
Total threads: 10,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:51930,Workers: 5
Dashboard: http://127.0.0.1:8787/status,Total threads: 10
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:51951,Total threads: 2
Dashboard: http://127.0.0.1:51953/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:51934,
Local directory: /Users/parsanemati/Yandex.Disk.localized/github/data_science_eda/parallel computing /Dask website/dask-worker-space/worker-1khxp49e,Local directory: /Users/parsanemati/Yandex.Disk.localized/github/data_science_eda/parallel computing /Dask website/dask-worker-space/worker-1khxp49e

0,1
Comm: tcp://127.0.0.1:51959,Total threads: 2
Dashboard: http://127.0.0.1:51962/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:51937,
Local directory: /Users/parsanemati/Yandex.Disk.localized/github/data_science_eda/parallel computing /Dask website/dask-worker-space/worker-e9bytyoo,Local directory: /Users/parsanemati/Yandex.Disk.localized/github/data_science_eda/parallel computing /Dask website/dask-worker-space/worker-e9bytyoo

0,1
Comm: tcp://127.0.0.1:51950,Total threads: 2
Dashboard: http://127.0.0.1:51954/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:51935,
Local directory: /Users/parsanemati/Yandex.Disk.localized/github/data_science_eda/parallel computing /Dask website/dask-worker-space/worker-onn64hrs,Local directory: /Users/parsanemati/Yandex.Disk.localized/github/data_science_eda/parallel computing /Dask website/dask-worker-space/worker-onn64hrs

0,1
Comm: tcp://127.0.0.1:51958,Total threads: 2
Dashboard: http://127.0.0.1:51961/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:51936,
Local directory: /Users/parsanemati/Yandex.Disk.localized/github/data_science_eda/parallel computing /Dask website/dask-worker-space/worker-zjl1rnj9,Local directory: /Users/parsanemati/Yandex.Disk.localized/github/data_science_eda/parallel computing /Dask website/dask-worker-space/worker-zjl1rnj9

0,1
Comm: tcp://127.0.0.1:51952,Total threads: 2
Dashboard: http://127.0.0.1:51957/status,Memory: 3.20 GiB
Nanny: tcp://127.0.0.1:51933,
Local directory: /Users/parsanemati/Yandex.Disk.localized/github/data_science_eda/parallel computing /Dask website/dask-worker-space/worker-ysjmudh1,Local directory: /Users/parsanemati/Yandex.Disk.localized/github/data_science_eda/parallel computing /Dask website/dask-worker-space/worker-ysjmudh1


To train the voting classifier, we call the classifier's fit method, but enclosed in joblib's ```parallel_backend``` context manager. This distributes training of sub-estimators acoss the cluster.

In [10]:
%%time 
with joblib.parallel_backend("dask"):
    clf.fit(X, y)

print(clf)

VotingClassifier(estimators=[('sgd', SGDClassifier()),
                             ('logisticregression', LogisticRegression()),
                             ('svc', SVC(gamma='auto'))],
                 n_jobs=-1)
CPU times: user 135 ms, sys: 123 ms, total: 259 ms
Wall time: 1.06 s


Note, that we see no advantage of using dask because we are using a local cluster rather than a distributed cluster and sklearn is already using all my computer's cores. If we were using a distributed cluster, dask would enable us to take advantage of the multiple machines and train sub-estimators across them.