# Using Dask

[Dask](https://dask.org/) is an ecosystem of tools to help you parallelize computational work. [Dask-Gateway](https://gateway.dask.org/) is a part of the Dask ecosystem designed to help you create and assign computational work to a temporary _dask-cluster_. In the https://hub.jupytearth.org Jupyter environment, you are setup ready to go, and this notebook demonstrates it.

## Reference example

For comparison, lets consider the following task that can be run both with and without Dask and remote workers.

In [1]:
# This defines a dummy task that we can execute:
# - locally without Dask
# - locally using Dask and local workers (dedicated linux processes)
# - remotely using Dask and remote workers (dedicated Kubernetes Pods)
#
import numpy as np
import joblib
from sklearn.datasets import load_digits
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC

digits = load_digits()

param_space = {
    'C': np.logspace(-24, 24, 13),
    'gamma': np.logspace(-32, 32, 17),
    'tol': np.logspace(-4, -1, 4),
    'class_weight': [None, 'balanced'],
}

model = SVC(kernel='rbf')
search = RandomizedSearchCV(model, param_space, cv=3, n_iter=50, verbose=10)

## Reference example - Running it without Dask

We can run the reference task locally without Dask involved.

In [None]:
%%time

# Let's execute the reference example's dummy task without
# Dask involved.
#
# A recent execution took 42 seconds.
#
search.fit(digits.data, digits.target)

## Startup of a Dask cluster

Let's use _Dask Gateway_ to start a personal and temporary _Dask cluster_.

A Dask cluster consist of:
- one _Dask scheduler_: coordinates pieces of works to be executed by workers.
- any number of _Dask workers_: performs work and reports back the scheduler.

With a Dask cluster available, we can create a _Dask client_ to communicate against the Dask scheduler and request work to be done.

In [2]:
# Create a gateway object to speak with dask-gateway,
# which in turn can create the dask cluster for you.
#
from dask_gateway import Gateway
gateway = Gateway()

In [3]:
# Request information about the options you can configure
# on a to-be-created dask cluster.
#
# All options are optional.
#
options = gateway.cluster_options()
options

VBox(children=(HTML(value='<h2>Cluster Options</h2>'), GridBox(children=(HTML(value="<p style='font-weight: bo…

In [4]:
# Now let's create a cluster. After running this cell, you get 
# a control panel view to add/remote workers. Manually add at
# least one.
#
# If a new server needs to be started, it will take take ~5 minutes
# for it to register and update the numbers of workers.
#
cluster = gateway.new_cluster(options)
cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

In [5]:
# Acquire a "dask client" to use in order to interact
# with your remote and personal dask cluster of workers.
#
client = cluster.get_client(set_as_default=True)
client

0,1
Client  Scheduler: gateway://traefik-prod-dask-gateway.prod:80/prod.ca819a8161594534b7c306ec8bd0d4a0  Dashboard: /services/dask-gateway/clusters/prod.ca819a8161594534b7c306ec8bd0d4a0/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


## Reference example - Running it _with_ Dask and remote workers

With a Dask cluster available, we are now ready to compare how fast the reference example could be executed with the dask workers we have made available compared to without using Dask and remote workers.

In [None]:
%%time

# Let's use our Dask client, which speaks with our Dask cluster's
# scheduler, which in turn control our remote Dask workers, to
# execute the reference example's dummy task.
#
# A recent execution took 12 seconds with 8 1-CPU workers.
#
with joblib.parallel_backend('dask'):
    search.fit(digits.data, digits.target)

Fitting 3 folds for each of 50 candidates, totalling 150 fits


KeyboardInterrupt: 

## Example - Install a pip package on remote Dask workers

Our Dask cluster's workers are running on separate servers using the base Python environment, so they won't have any packages that you have manually installed with `pip` or `conda`.

This example demonstrate how you can install `pip` packages on the remote workers.

For more information, see:
- https://docs.dask.org/en/latest/setup/environment.html#temporary-installations
- https://distributed.dask.org/en/latest/_modules/distributed/diagnostics/plugin.html#PipInstall

In [None]:
# We define a function to report the version of sklearn part of
# the scikit-learn Python package available for install with pip
#
def report_sklearn_version():
    import sklearn
    return sklearn.__version__

print(f"The version of sklearn is locally {report_sklearn_version()}")

In [None]:
# We try run this function on our remote dask worker as well
#
future = client.submit(report_sklearn_version, pure=False)

print(f"The version of sklearn on some remote dask worker is {future.result()}")

In [None]:
# We install a new version in our dask worker
#
# NOTE: If you try to run this multiple times, you must change
#       the name to avoid getting a response saying the plugin
#       is already installed which means the pip package won't
#
from distributed.diagnostics.plugin import PipInstall
client.register_worker_plugin(
    PipInstall(
        packages=["scikit-learn==0.23.0"],
    ),
    name="pip-install-1",
    restart=True,
)

In [None]:
client.restart()

In [None]:
# We try to run the function again on our remote dask worker
future = client.submit(report_sklearn_version, pure=False)

print(f"The version of sklearn on a remote dask worker is {future.result()}")

## Example - Send local source code to a remote Dask worker

Our Dask cluster's workers are running on separate servers and won't have access to local python scripts or packages (.py, .egg, .zip).

This example demonstrates how to send local source code (.py, .egg, .zip) to your remote workers to reference.

Note that even though the name of this functionality is called UploadFile, it can only be used to send and load source code, not transfer a data file.

For more information, see:
- https://docs.dask.org/en/latest/setup/environment.html#send-source
- https://distributed.dask.org/en/latest/_modules/distributed/diagnostics/plugin.html#UploadFile

In [None]:
# Let's consider a python script we create locally that won't be
# available to our remote Dask workers.
#
file_path = "/tmp/my-python-script.py"

# Create and write a primitive script to a file to upload.
#
# NOTE: We use a Jupyter feature to execute a bash command (!), and
#       reference a Python variable as part of the command ({file_path}).
#
!echo "def get_magic_number():" > {file_path}
!echo "    return(86)" >> {file_path}

# Print content of the file
#
!cat {file_path}

In [None]:
# We still need to define a function that will
# make use of the script in the separate file.
#
def call_function_defined_in_script_file():
    return get_magic_number()

In [None]:
# We try run this function on our remote dask worker as well
#
# This is expected to fail at this point, because get_magic_number is 
#
future = client.submit(call_function_defined_in_script_file, pure=False)

print(f"call_function_defined_in_script_file run remotely returned: {future.result()}")

In [None]:
# We upload and load the Python script file in our Dask workers,
# and by doing so, the code cell above will not fail.
#
# NOTE: If you try to run this multiple times, you must change
#       the name to avoid getting a response saying the plugin
#       is already installed which means the script file won't
#       be uploaded and loaded again.
#
# Reference: https://distributed.dask.org/en/latest/_modules/distributed/diagnostics/plugin.html#UploadFile
#
from distributed.diagnostics.plugin import UploadFile
client.register_worker_plugin(
    UploadFile(
        filepath=file_path,
    ),
    name="upload-file-1",
)

In [None]:
# We try run this function on our remote dask worker as well
#
# This is now expected to succeed.
#
future = client.submit(call_function_defined_in_script_file, pure=False)

print(f"call_function_defined_in_script_file run remotely returned: {future.result()}")

## Example - Pass data to remote workers

We don't have a concrete example provided on how to pass data to the remote workers yet. Common workflows involves letting the worker download the data it should process directly from some separate storage object storage of some kind (S3, GCS).

## FAQ

### What is a dask worker?
A dask workers in our case is represented by a Kubernetes _Pod_. A Kubernetes Pod is mostly an encapsulation of a docker container. A _Kubernetes cluster_ is what will ensure pods get running on actual servers which will drive costs. If more servers needs to be added to make all pods get running, the Kubernetes cluster will add servers. This addition of new servers can make the request for more workers take ~3-4 minutes sometimes.

### Shutdown of the dask cluster
If you shut down your user environment, or it gets shut down by being inactive, your associated dask clusters will also shut down. If you leave the dask cluster idle for an hour, it will shut down.

In [None]:
gateway.list_clusters()

[ClusterReport<name=prod.724736c71fdf420693b2b5a385cae45f, status=RUNNING>,
 ClusterReport<name=prod.6fad0ae09f544ee9b4613ff646b22687, status=RUNNING>,
 ClusterReport<name=prod.13df4373a41e48ec847785c417706ee7, status=RUNNING>]

In [None]:
gateway.stop_cluster("prod.13df4373a41e48ec847785c417706ee7")

### Cost of dask worker resources
If you are using dask workers, you are actively on demand starting up new servers on the cloud, so it will drive cost. But, the cost is relatively cheap because we use temporary machines and get a discount of more than 70%.