# Using Dask Gateway

This cluster has been configured to use [Dask-Gateway](https://gateway.dask.org/). This notebook demonstrates how to use it.

## Creating a Cluster

We've done most of the configuration for you. All that remains for you is to either create a new cluster or connect to an existing one. You'll do this with a `dask_gateway.Gateway` object.

If the default settings are appropriate, you can create a new cluster without specifying any additional information.

In [1]:
from dask_gateway import Gateway
gateway = Gateway()

In [2]:
gateway.list_clusters()

[]

In [4]:
cluster = gateway.new_cluster()
cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

`gateway.new_cluster()` created a new Dask cluster.
You can pass this cluster to a `dask.distributed.Client` to create a client
so that any computations using Dask will be executed on the cluster.

In [5]:
from distributed import Client

client = Client(cluster)
client

0,1
Client  Scheduler: gateway://scheduler-public-dev-prod-dask-gateway:8786/b6e27a4eeb74470bb740f707f629f15b  Dashboard: https://hub.pangeo.io/services/dask-gateway/gateway/clusters/b6e27a4eeb74470bb740f707f629f15b/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [9]:
import dask.array as da

arr = da.random.random((1000, 1000), chunks=100).persist()

By default, the cluster doesn't have any workers. You'll need to either set your cluster to adaptive mode or scale manually.

In adaptive mode, the cluster will automatically resize itself based on the workload.

In [10]:
cluster.adapt()

In [11]:
arr.compute()

array([[0.32648987, 0.94304574, 0.60109591, ..., 0.4754234 , 0.51921505,
        0.50322152],
       [0.41447147, 0.53073857, 0.07778848, ..., 0.79051681, 0.25704898,
        0.5573166 ],
       [0.12524794, 0.48594165, 0.2305868 , ..., 0.96619814, 0.17872878,
        0.13046179],
       ...,
       [0.97258084, 0.13736629, 0.20938757, ..., 0.2601917 , 0.1276606 ,
        0.91900657],
       [0.92047548, 0.19178722, 0.86989839, ..., 0.3586717 , 0.39191175,
        0.07321737],
       [0.16046721, 0.3757023 , 0.169577  , ..., 0.1446228 , 0.46621836,
        0.8630376 ]])

The `arr.compute` call may take more or less time, depending on the state of the Kubernetes cluster. If the cluster has been idle recently, we may be starting up additional machines in the background to do your work.

Subsequent computations, now that those machines are around, will be much faster.

In [12]:
client.close()
cluster.close()

## Specifying options

We've configured things like the the number of cores per worker to be appropriate for the most common use cases. For additional control over the cluster, use the `gateway.cluster_options`.

In [13]:
options = gateway.cluster_options()
options

Box(children=(Box(children=(HTML(value='<h2>Cluster Options</h2>'), Box(children=(HTML(value="<p style='font-w…

You can either manually adjust these options in widget, or set it programatically.

In [14]:
options.worker_cores = 4

The "Image" is a URL to a Docker image with the environment that will be loaded on the Dask scheduler and workers. This determines the versions of libraries that are available). By default, this matches the image you're currently working in.

In [None]:
cluster = gateway.new_cluster(cluster_options=options)
cluster.scale(1)
client.wait_for_workers(1)
cluster

Notice that this cluster has 4 cores per worker, as we requested in the `options`.