# Parallelization

TPOT2 uses the Dask package for parallelization either locally (dask.destributed.LocalCluster) or multi-node via a job schedule (dask-jobqueue). 

## Local Machine Parallelization

TPOT2 can be easily parallelized on a local computer by setting the n_jobs and memory_limit parameters.

`n_jobs` dictates how many dask workers to launch. In TPOT2 this corresponds to the number of pipelines to evaluate in parallel.

`memory_limit` is the amount of RAM to use per worker. 

In [None]:
import tpot2
import sklearn
import sklearn.datasets
import numpy as np
scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_digits(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)


est = tpot2.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit="4GB", verbose=1)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))

## Manual Dask Clients and Dashboard

You can also manually initialize a dask client. This can be useful to gain additional control over the parallelization, debugging, as well as viewing a dashboard of the live performance of TPOT2.

You can find more details in the official [documentation here.](https://docs.dask.org/en/stable/)


[Dask Python Tutorial](https://docs.dask.org/en/stable/deploying-python.html)
[Dask Dashboard](https://docs.dask.org/en/stable/dashboard.html)

Initializing a basic dask local cluster

In [None]:
from dask.distributed import Client, LocalCluster

n_jobs = 4
memory_limit = "4GB"

cluster = LocalCluster(n_workers=n_jobs, #if no client is passed in and no global client exists, create our own
                        threads_per_worker=1,
                        memory_limit=memory_limit)
client = Client(cluster)

Get the link to view the dask Dashboard. 

In [None]:
 client.dashboard_link

Pass into TPOT to Train.
Note that the if a client is passed in manually, TPOT will ignore n_jobs and memory_limit.
If there is no client passed in, TPOT will ignore any global/existing client and create its own.

In [None]:
est = tpot2.TPOTClassifier(population_size= 8, generations=5, client=client verbose=1)
# this is equivalent to: 
# est = tpot2.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit="4GB", verbose=1)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))

#It is good to close the client and cluster when you are done with them
client.close()
cluster.close()

Option 2

You can initialize the cluster and client with a context manager that will automatically close them. 

In [None]:
from dask.distributed import Client, LocalCluster
import tpot2
import sklearn
import sklearn.datasets
import numpy as np

scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_digits(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)


n_jobs = 4
memory_limit = "4GB"

with LocalCluster(  
    n_workers=n_jobs,
    threads_per_worker=1,
    memory_limit='4GB',
) as cluster, Client(cluster) as client:
    est = tpot2.TPOTClassifier(population_size= 8, generations=5, client=client, verbose=1)
    est.fit(X_train, y_train)
    print(scorer(est, X_test, y_test))

## Dask multi node parallelization

Dask can parallelize across multiple nodes via job queueing systems. This is done using the dask-jobqueue package. More information can be found in the official [documentation here.]( https://jobqueue.dask.org/en/latest/)

To parallelize TPOT2 with dask-jobqueue, simply pass in a client based on a jobqueue cluster with desired settings into the client parameter. Each job will evaluate a single pipeline.

Note that TPOT will ignore n_jobs and memory_limit as these should be set inside the dask cluster. 

In [None]:
from dask.distributed import Client, LocalCluster
import sklearn
import sklearn.datasets
import sklearn.metrics
import sklearn.model_selection
import tpot2

from dask_jobqueue import SGECluster # or SLURMCluster, PBSCluster, etc. Replace SGE with your scheduler.
cluster = SGECluster(
    queue='all.q',
    cores=2,
    memory="50 GB"

)

cluster.adapt(minimum_jobs=10, maximum_jobs=100)  # auto-scale between 10 and 100 jobs

client = Client(cluster)

scorer = sklearn.metrics.get_scorer('roc_auc_ovr')
X, y = sklearn.datasets.load_digits(return_X_y=True)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, train_size=0.75, test_size=0.25)

est = tpot2.TPOTClassifier(population_size= 100, generations=5, client=client, verbose=1)
# this is equivalent to: 
# est = tpot2.TPOTClassifier(population_size= 8, generations=5, n_jobs=4, memory_limit="4GB", verbose=1)
est.fit(X_train, y_train)
print(scorer(est, X_test, y_test))

#It is good to close the client and cluster when you are done with them
client.close()
cluster.close()