# AutoML with TPOT

Automated machine learning holds much promise as an assistive tool for practitioners working with data. In this notebook, we demonstrate the [TPOT](http://epistasislab.github.io/tpot/) library for AutoML running across a [Dask](https://dask.org/) cluster, which we distribute using the CML [Workers API](https://docs.cloudera.com/machine-learning/cloud/distributed-computing/topics/ml-parallel-computing.html).

We'll work with a small sample of the [credit card fraud dataset](https://www.kaggle.com/mlg-ulb/creditcardfraud) curated by the machine learning group at Université Libre de Bruxelles. The same group have a [handbook](https://fraud-detection-handbook.github.io/fraud-detection-handbook/Foreword.html) detailing the application of machine learning for fraud detection, which provides considerably more depth than we do here. Creating a realistic fraud detection system is far too mighty a task for a single humble notebook, and so the dataset here serves only as an example for the purpose of demonstrating the technologies. Namely: TPOT running in parallel on a Dask cluster on CML.

We'll first load the data and perform some automated data profiling. Then, we'll spin up a Dask cluster on top of our CML cluster, on the fly. We'll define a TPOT classifier, fit it to our data, and then shut down our cluster to free up compute resources for other tasks.

## Setup

We import all dependencies up front.

In [None]:
import os
import time

import cdsw
import pandas as pd

from dask.distributed import Client
from pandas_profiling import ProfileReport
from sklearn.model_selection import train_test_split
from tpot import TPOTClassifier

### Load and profile the data

In [None]:
transactions = pd.read_csv("data/creditcardsample.csv")

Let's profile the data. Exploratory data analysis is not a mechanical process, and thus impossible to automate completely. However, open source provides us several options to automatically generate common charts. Here, we're using [pandas-profiling](https://github.com/pandas-profiling/pandas-profiling) to view histograms of each variable and detect duplicate entries.

In [None]:
ProfileReport(transactions, interactions=None)

There are no missing values, but there are some duplicate rows. We don't want to accidentally polute our test set with rows that are duplicated in the training set, so we'll drop them.

In [None]:
unique_transactions = transactions.drop_duplicates()

In [None]:
X = unique_transactions.drop(["Class", "Time"], axis="columns")
y = unique_transactions.Class

Split our data into train and test sets, so we can evaluate performance on a hold out set.

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, shuffle=True, stratify=y)

## Dask cluster

Training a TPOT classifier involves training many possible pipelines to learn which preprocessing steps, algorithms, and hyperparameter combinations perform best on a particular data set. The algorithm is partly parallelizable, and so we benefit from being able to train multiple pipelines at once. TPOT comes with Dask integration, making distributing the work easy, so long as there is a Dask cluster. Here, we spin up an ad hoc cluster by launching multiple CML sessions using the CML workers API.

### Start Dask scheduler

We need to make two directories required by Dask. Dask uses these directories to share network information between the scheduler and workers. From our, user, perspective, we can create them and forget them.

In [None]:
os.makedirs("_scheduler_", exist_ok=True)
os.makedirs("_worker_", exist_ok=True)

We start a Dask scheduler as a CDSW worker process. We do this with `cdsw.launch_workers`, which spins up another session on our cluster and runs the command we provide &mdash; in this case the Dask scheduler. The scheduler is responsible for coordinating work between the Dask workers we will attach. Later we'll start a Dask client in this notebook. The client talks to the scheduler, and the scheduler talks to the workers.

In [None]:
dask_scheduler = cdsw.launch_workers(
  n=1,
  cpu=1,
  memory=2,
  code=f"!dask-scheduler --host 0.0.0.0 --dashboard-address 127.0.0.1:8090 --scheduler-file /home/cdsw/_scheduler_/dask.log"
)

# Wait for the scheduler to start.
time.sleep(10)

We need the IP address of the CML worker with the scheduler on it, so we can connect the Dask workers to it. The IP is not returned in the `dask_scheduler` object (it's unknown at the launch of the scheduler), so we scan through the worker list and find the IP of the worker with the scheduler id. This returns a list, but there should be only one entry.

In [None]:
scheduler_workers = cdsw.list_workers()
scheduler_id = dask_scheduler[0]['id']
scheduler_ip = [worker['ip_address'] for worker in scheduler_workers
                if worker['id'] == scheduler_id][0]

scheduler_url = f"tcp://{scheduler_ip}:8786"

scheduler_url

### Start Dask workers

We're ready to grow our cluster. We start some more CML workers, each with one Dask worker process on it. We pass the scheduler URL we just found so that the scheduler can talk, and distribute work, to the workers.

`N_WORKERS` determines the number of CML workers started (and thus the number of Dask workers running in those sessions). Increasing the number will start more workers. This will speed up the wall-clock time of the TPOT training process, by training more pipelines in parallel, but it uses more cluster resources. Exercise good judgement.

In [None]:
N_WORKERS = 3

In [None]:
dask_workers = cdsw.launch_workers(
  n=N_WORKERS,
  cpu=1,
  memory=2,
  code=f"!dask-worker {scheduler_url} --local-directory /home/cdsw/_worker_"
)

# Wait for the workers to start.
time.sleep(10)

### Connect Dask client

We have a Dask cluster running and distributed over CML sessions. Now we can start a local Dask client and connect it to our scheduler. This is the connection that lets us issue instructions to the Dask cluster. Below, we'll let TPOT handle sending the instructions to Dask.

In [None]:
client = Client(scheduler_url)

We can view some stats about the Dask cluster.

In [None]:
client

The Dask scheduler hosts a dashboard so we can monitor the work it's doing. Here we construct the URL of dashboard, which is hosted on the scheduler worker. Clicking it should open the dashboard in a new browser window.

In [None]:
print("//".join(dask_scheduler[0]["app_url"].split("//"))+ "status")

That's our Dask cluster set up and ready to go!

## Fit a model with TPOT

### Define an estimator

We define a TPOT classifier. TPOT is rather sophisticated, and will search over many possible pipelines of sklearn preprocessors and estimators. All we have to do to use the Dask cluster is pass the `use_dask=True` flag, and it'll connect via the client we defined (we do not need to (and cannot) explicitly pass the client).

Our dataset is highly class-imbalanced (there are few fraudulent transactions), so we choose [average precision](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.average_precision_score.html) as an approriate metric, since it is a classification-threshold-agnostic characterization of the precision-recall curve. In reality, there may be a more appropriate metric dictated by the eventual use of this algorithm in the context of a broader fraud detection system (for instance, perhaps this is a first pass filter).

TPOT operates on sequential generations of candidate pipelines. It tries `population_size` pipeline combinations, then collects the results, and chooses new combinations in a smart way (it's an evolutionary algorithm). It repeats this `generations` number of times. For each pipeline, it uses 10-fold cross-validation. This is a lot of compute (to do it properly, expect hours or days), so we have restricted to a mere 5 generations, each with population 20, for the sake of this demo notebook. We can stop the process at any point, and TPOT will output the best performing pipeline to that point.

In [None]:
estimator = TPOTClassifier(scoring="average_precision", generations=5, population_size=20, use_dask=True, verbosity=2, n_jobs=-1)

### Fit the estimator

We're ready to fit the `TPOTClassifier`. With all the spec given above, this is a small line of code to kick call to do a whole lot of compute!

In [None]:
estimator.fit(X_train, y_train)

We can now use this object exactly like a sklearn estimator.

In [None]:
estimator.predict(X_train)

In [None]:
estimator.score(X_test, y_test)

We can use the estimator directly. Alternatively, we can exporting it, to generate a short template Python script which builds the selected pipeline from it's raw scikit-learn components.

In [None]:
estimator.export("tpot_estimator.py")

## Clean up

We should be good tenants of the cluster and stop hogging workers we aren't using. We should stop only those workers that we started, not all the workers on the cluster, that others may be using. It's also possible to do this through the CML UI in the Sessions view.

In [None]:
cdsw.stop_workers(*[worker["id"] for worker in dask_workers])

We can also stop the worker hosting the Dask scheduler.

In [None]:
cdsw.stop_workers(*[worker['id'] for worker in dask_scheduler])

With that, we're done cleaning up. We hope this helps seed your own experiments in automated machine learning!

***If this documentation includes code, including but not limited to, code examples, Cloudera makes this available to you under the terms of the Apache License, Version 2.0, including any required notices. A copy of the Apache License Version 2.0 can be found [here](https://opensource.org/licenses/Apache-2.0).***