<img src="images/dask_horizontal.svg" width=250 />

# Flexibile Cloud Computing with `client.run()`

Dask gives you a lot of flexibility. 

After pointing Dask to your remote cluster, any Dask code will automatically run on that cluster.

But you're not restricted to running *only* Dask code on your cluster.You can also run custom Python code and system-level commands on your cluster.

Let's take a look at the flexibility you can achieve with `client.run()`

## Launch Cloud Computing Resources

In [None]:
import coiled

In [None]:
cluster = coiled.Cluster(
    name="client-run",
    n_workers=5,
    package_sync=True,
)

## Connect Dask to Cluster

In [6]:
from distributed import Client
client = Client(cluster)

## 1. Do some Dask things

In [19]:
import dask.dataframe as dd

In [20]:
ddf = dd.read_parquet("s3://coiled-datasets/github-archive/github-archive-2015.parq/")
ddf.head()

ValueError: No global client found and no address provided

In [None]:
ddf.groupby('user').count().head()

## 2. Do some generic Python things

In [9]:
def create_txt_file(content):
    file = open('myfile.txt', 'w+')
    file.write(content)
    return file

In [10]:
client.run(create_txt_file, "Add some content to our file.")

{'tls://10.0.15.133:39945': <_io.StringIO at 0x2c1c75bd0>,
 'tls://10.0.6.121:44257': <_io.StringIO at 0x1323b1cf0>,
 'tls://10.0.7.27:41709': <_io.StringIO at 0x1323b1e10>,
 'tls://10.0.7.47:38287': <_io.StringIO at 0x1323b1360>,
 'tls://10.0.8.170:39291': <_io.StringIO at 0x1323b0dc0>}

In [11]:
def read_file(filename):
    file = open(filename, "r")
    return file.read()

In [12]:
client.run(read_file, "myfile.txt")

{'tls://10.0.15.133:39945': 'Add some content to our file.',
 'tls://10.0.6.121:44257': 'Add some content to our file.',
 'tls://10.0.7.27:41709': 'Add some content to our file.',
 'tls://10.0.7.47:38287': 'Add some content to our file.',
 'tls://10.0.8.170:39291': 'Add some content to our file.'}

## 3. Do some system-level things

In [13]:
import os

In [14]:
client.run(os.getpid)

{'tls://10.0.15.133:39945': 35,
 'tls://10.0.6.121:44257': 35,
 'tls://10.0.7.27:41709': 35,
 'tls://10.0.7.47:38287': 35,
 'tls://10.0.8.170:39291': 35}

In [None]:
client.run(os.getpid, workers=[])

## 4. Do some Dask debugging

In [16]:
# get status of each worker in your cluster
def get_status(dask_worker):
    return dask_worker.status

In [17]:
client.run(get_status)

{'tls://10.0.15.133:39945': <Status.running: 'running'>,
 'tls://10.0.6.121:44257': <Status.running: 'running'>,
 'tls://10.0.7.27:41709': <Status.running: 'running'>,
 'tls://10.0.7.47:38287': <Status.running: 'running'>,
 'tls://10.0.8.170:39291': <Status.running: 'running'>}

In [18]:
# find where each worker is spilling data to disk
client.run(lambda dask_worker: dask_worker.local_directory)

{'tls://10.0.15.133:39945': '/scratch/dask-worker-space/worker-4i8a76uf',
 'tls://10.0.6.121:44257': '/scratch/dask-worker-space/worker-7b_e3ej2',
 'tls://10.0.7.27:41709': '/scratch/dask-worker-space/worker-u9vt3kxf',
 'tls://10.0.7.47:38287': '/scratch/dask-worker-space/worker-kp8vlhue',
 'tls://10.0.8.170:39291': '/scratch/dask-worker-space/worker-pka7wzsu'}

## Other `client.` functions you might find useful
The flexibility doesn't end with `client.run()`

Consider taking a look at:

`client.submit()`: to submit a function to Dask scheduler to be run asynchronously

`client.map()`: to map a function onto multiple objects

`client.scatter()`: to scatter data from local client into distributed memory

`client.upload_file`: to upload a single file or package (.zip, .egg, . ) to all workers

In [None]:
Docmentation.