# TPOT on Dask on CDSW Workers

## Setup

First we install dependencies.

In [1]:
!pip3 install --upgrade dask[complete]==2021.2.0 dask-glm==0.2.0 dask-ml==1.8.0 numpy==1.19.5 TPOT==0.11.7 scikit-learn==0.24.1

Collecting dask[complete]==2021.2.0
  Downloading dask-2021.2.0-py3-none-any.whl (900 kB)
[K     |████████████████████████████████| 900 kB 19.5 MB/s eta 0:00:01
[?25hCollecting dask-glm==0.2.0
  Downloading dask_glm-0.2.0-py2.py3-none-any.whl (12 kB)
Collecting dask-ml==1.8.0
  Downloading dask_ml-1.8.0-py3-none-any.whl (141 kB)
[K     |████████████████████████████████| 141 kB 77.4 MB/s eta 0:00:01
[?25hCollecting numpy==1.19.5
  Downloading numpy-1.19.5-cp36-cp36m-manylinux2010_x86_64.whl (14.8 MB)
[K     |████████████████████████████████| 14.8 MB 78.1 MB/s eta 0:00:01
[?25hCollecting TPOT==0.11.7
  Downloading TPOT-0.11.7-py3-none-any.whl (87 kB)
[K     |████████████████████████████████| 87 kB 656 kB/s  eta 0:00:01
[?25hCollecting scikit-learn==0.24.1
  Downloading scikit_learn-0.24.1-cp36-cp36m-manylinux2010_x86_64.whl (22.2 MB)
[K     |████████████████████████████████| 22.2 MB 106.1 MB/s eta 0:00:01
[?25hCollecting pyyaml
  Downloading PyYAML-5.4.1-cp36-cp36m-manylinux1_x

  Building wheel for contextvars (setup.py) ... [?25ldone
[?25h  Created wheel for contextvars: filename=contextvars-2.4-py3-none-any.whl size=7664 sha256=361d4f8cc03d232dd04807f9b1d8a23c09ff94fba80180e1f5879140763ea746
  Stored in directory: /home/cdsw/.cache/pip/wheels/41/11/53/911724983aa48deb94792432e14e518447212dd6c5477d49d3
Successfully built bokeh stopit contextvars
Installing collected packages: pyyaml, toolz, fsspec, cloudpickle, locket, partd, numpy, pillow, typing-extensions, bokeh, sortedcontainers, tblib, heapdict, zict, click, immutables, contextvars, psutil, msgpack, distributed, dask, multipledispatch, joblib, threadpoolctl, scikit-learn, dask-glm, llvmlite, numba, dask-ml, stopit, deap, tqdm, xgboost, update-checker, TPOT
Successfully installed TPOT-0.11.7 bokeh-2.2.3 click-7.1.2 cloudpickle-1.6.0 contextvars-2.4 dask-2021.2.0 dask-glm-0.2.0 dask-ml-1.8.0 deap-1.3.1 distributed-2021.2.0 fsspec-0.8.5 heapdict-1.0.1 immutables-0.15 joblib-1.0.1 llvmlite-0.35.0 locket-0

In [None]:
#!pip3 install --upgrade dask[complete]==2021.01.1 dask-ml==1.8.0 tpot==0.11.7
!pip3 freeze

Then we import dependencies.

In [1]:
import os
import time

import cdsw
from dask.distributed import Client
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from tpot import TPOTClassifier



Finally, we make two directories that are needed by Dask. Dask uses these directories to share network information between the scheduler and workers. From the user perspective, create them and forget them.

In [2]:
os.makedirs("_scheduler_", exist_ok=True)
os.makedirs("_worker_", exist_ok=True)

## Start Dask scheduler

We start a Dask scheduler as a CDSW worker process. The scheduler is responsible for coordinating work between the workers. Later we'll start a client in this notebook. The client talks to the scheduler, and the scheduler talks to the workers.

In [3]:
dask_scheduler = cdsw.launch_workers(
  n=1,
  cpu=1,
  memory=2,
  kernel="python3",
  code=f"!dask-scheduler --host 0.0.0.0 --dashboard-address 127.0.0.1:8090 --scheduler-file /home/cdsw/_scheduler_/dask.log"
)

# Wait for the scheduler to start.
time.sleep(10)

We need the IP address of worker with the scheduler on it, so we can connect the dask workers to it. The IP is not returned in the `dask_scheduler` object (it's unknown at the launch of the scheduler), so we scan through the worker list and find the IP of the worker with the scheduler `id`. This returns a list, but there should be only one entry.

In [4]:
scheduler_workers = cdsw.list_workers()
scheduler_id = dask_scheduler[0]['id']
scheduler_ip = [worker['ip_address'] for worker in scheduler_workers
                if worker['id'] == scheduler_id][0]

scheduler_url = f"tcp://{scheduler_ip}:8786"

scheduler_url

'tcp://100.100.29.218:8786'

## Start Dask workers

Start some CDSW workers, each with one dask worker process on it. We pass the scheduler URL we just found so that the scheduler can distribute work to the workers.

In [5]:
dask_workers = cdsw.launch_workers(
  n=10,
  cpu=1,
  memory=2,
  kernel="python3",
  code=f"!dask-worker {scheduler_url} --local-directory /home/cdsw/_worker_"
)

# Wait for the workers to start.
time.sleep(10)

## Connect Dask client

Start a local client and connect it to our scheduler. This is how we'll talk to the Dask cluster.

In [6]:
client = Client(scheduler_url)

We can view some stats about the Dask cluster.

In [10]:
client

0,1
Client  Scheduler: tcp://100.100.29.218:8786  Dashboard: http://100.100.29.218:8090/status,Cluster  Workers: 10  Cores: 160  Memory: 20.00 GB


Construct URL of Dask dashboard, which is hosted from a worker.

In [8]:
print('//'.join(dask_scheduler[0]['app_url'].split('//'))+ 'status')

https://5yq79laboko7iqyv.ml-18a296af-d86.demo-aws.ylcu-atmi.cloudera.site/status


## Load data

We load some data. We're just setting up pipelines here so the data isn't important.

In [11]:
digits = load_digits()
X_train, X_test, y_train, y_test = train_test_split(digits.data, digits.target, train_size=0.75, test_size=0.25)

## Define estimator (using Dask!)

We define a TPOT classifier. TPOT is rather sophisticated, and will search over many possible pipelines of sklearn preprocessors and estimators. All we have to do to use the Dask cluster is pass the `use_dask=True` flag, and it'll connect via the client we defined (we do not need to (and cannot) explicitly pass the client).

In [12]:
estimator = TPOTClassifier(generations=5, population_size=20, use_dask=True, verbosity=2, n_jobs=-1)

## Fit estimator (using Dask workers!)

Fit the `TPOTClassifier`. TPOT tries `population_size` pipeline combinations, then collects the results, and chooses new combinations in a smart way (it's an evolutionary algorithm). It repeats this `generations` times. For each pipeline, it uses 10-fold cross-validation. This is a lot of compute (to do it properly, expect hours or days), so we have restricted to a mere 5 generations, each with population 20. We can stope the process at any point, and TPOT will output the best performing pipeline to that point.

In [None]:
estimator.fit(X_train, y_train)

Optimization Progress:   0%|          | 0/120 [00:00<?, ?pipeline/s]


Generation 1 - Current best internal CV score: 0.9732810133553628


In [None]:
estimator.predict(X_train)

In [None]:
estimator.export("testimator.py")

In [None]:
estimator.score(X_test, y_test)

## Close workers

Stop workers. Stop only those that we started, not all the workers on the cluster, that others may be using.

In [None]:
cdsw.stop_workers(*[worker['id'] for worker in dask_workers])

Stop scheduler.

In [None]:
cdsw.stop_workers(*[worker['id'] for worker in dask_scheduler])