## Scheduling

Operations like `dask.delayed` generate task graphs where each node in the graph is a normal Python function and edges between nodes are normal Python objects that are created by one task as outputs and used as inputs in another task.

After Dask generates these task graphs, it needs to execute them on parallel hardware. This is the job of a **task scheduler**.

Different task schedulers exist, and each will consume a task graph and compute the same result, but with different performance characteristics.

Dask has two families of task schedulers:

1.  **Single machine scheduler family**: This provides basic features on a local process or thread pool.  This is the default. It is simple and cheap to use, although it can only be used on a single machine, so scalability is limitted.
2.  **Distributed scheduler**: This scheduler is more sophisticated, offers more features, but also requires a bit more effort to set up.  It can run locally or distributed across a cluster.

<img src="https://github.com/ualberta-rcg/python-dask/blob/master/notebooks/assets/collections-schedulers.png?raw=1">

For different computations you may find better performance with particular scheduler settings. We'll explore different schedulers and their impact.

Consider the following, which is similar to what we explored in the last section:

In [1]:
from dask import delayed
from time import sleep
import random
import dask

def inc(x):
    sleep(1)
    # Or we can make it sleep a random number less than 20
    # sleep(random.randrange(20))
    return x + 1

data = [1, 2, 3, 4, 5, 6, 7, 8]
results = []

for x in data:
    y = delayed(inc)(x)
    results.append(y)

total = delayed(sum)(results)

# Computing ...
%time result = total.compute()
print("Printing result from computing total:", result)

CPU times: user 794 ms, sys: 101 ms, total: 895 ms
Wall time: 5.17 s
Printing result from computing total: 44


## Single thread

If we wanted to, we could compute the result using only a single thread. Each task in the graph is executed one-at-a-time. We do this by telling Dask to use the 'sychronous' scheduler, by passing a keyword option to the `compute` method:

In [2]:
%time result = total.compute(scheduler='synchronous')

CPU times: user 34 ms, sys: 6.32 ms, total: 40.3 ms
Wall time: 8 s


Why would you want to do this? Well, most of the time you wouldn't, but this can be helpful when trying to fix code using debugging tools that don't parallelize well.

## Local Threads

The threaded scheduler executes computations with a local `multiprocessing.pool.ThreadPool` (from the Python `multiprocessing` library). It is lightweight and requires no setup. It introduces very little task overhead and, because everything occurs in the same process, it incurs no costs to transfer data between tasks. However, due to Python’s Global Interpreter Lock (GIL), this scheduler only provides parallelism when your computation is dominated by non-Python code, such as is the case when operating on numeric data in NumPy arrays, Pandas DataFrames, or using any of the other C/C++/Cython based projects in the ecosystem.

The threaded scheduler is the **default** choice for Dask Delayed (and Dask Array and Dask DataFrame). However, if your computation is dominated by processing pure Python objects like strings, dicts, or lists, then you may want to try one of the process-based schedulers below (we currently recommend the distributed scheduler on a local machine).

Although this is the default (unless Dask is configured otherwise), we can use this scheduler by telling Dask to use the 'threads' scheduler, by passing a keyword option to the `compute` method:

In [3]:
%time result = total.compute(scheduler='threads')

CPU times: user 18.8 ms, sys: 4.7 ms, total: 23.5 ms
Wall time: 4 s


## Local Processes

The multiprocessing scheduler executes computations with a local `multiprocessing.Pool`.

It is lightweight to use and requires no setup.
Every task and all of its dependencies are shipped to a local process,
executed, and then their result is shipped back to the main process.
This means that it is able to bypass issues with the GIL and provide parallelism even on computations that are dominated by pure Python code,
such as those that process strings, dicts, and lists.

However, moving data to remote processes and back can introduce performance penalties, particularly when the data being transferred between processes is large. The multiprocessing scheduler is an excellent choice when workflows are relatively linear, and so does not involve significant inter-task data transfer as well as when inputs and outputs are both small, like filenames and counts.

This is common in basic data ingestion workloads, such as those are common in `Dask Bag`, where the multiprocessing scheduler is the default:

```python
# Read in a bunch of json files...
# parse them...
# grab all the name attributes...
# and compute frequencies

import dask.bag as db
db.read_text('*.json').map(json.loads).pluck('name').frequencies().compute()

{'alice': 100, 'bob': 200, 'charlie': 300}
```

We tell Dask to use the `threads` scheduler by passing a keyword option to the `compute` method:

In [4]:
%time result = total.compute(scheduler='processes')

CPU times: user 35.5 ms, sys: 11.4 ms, total: 46.9 ms
Wall time: 6.48 s


## Distributed (local)

### Note: some things won't work on Colab

The Dask distributed scheduler can either be setup on a cluster or run locally on a personal machine. Despite having the name “distributed”, it is often pragmatic on local machines for a few reasons:

* It provides access to asynchronous API, notably Futures
* It provides a diagnostic dashboard that can provide valuable insight on performance and progress
* It handles data locality with more sophistication, and so can be more efficient than the multiprocessing scheduler on workloads that require multiple processes

Here's how we start a distributed local **client**:

In [None]:
# NOTE!!! Won't work directly on Colab, see below

from dask.distributed import Client
client = Client()
client

For Colab, we need to do this:

In [5]:
# NOTE!!! Only for Colab, not for local Jupyter

from dask.distributed import Client
client = Client(processes=False, diagnostics_port=None)
client

INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at: inproc://172.28.0.12/1427/1
INFO:distributed.scheduler:  dashboard at:  http://172.28.0.12:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.worker:      Start worker at: inproc://172.28.0.12/1427/4
INFO:distributed.worker:         Listening to:          inproc172.28.0.12
INFO:distributed.worker:          Worker name:                          0
INFO:distributed.worker:         dashboard at:          172.28.0.12:41265
INFO:distributed.worker:Waiting to connect to: inproc://172.28.0.12/1427/1
INFO:distributed.worker:-------------------------------------------------
INFO:distributed.worker:              Threads:                          2
INFO:distributed.worker:               Memory:                  12.67 GiB
INFO:dist

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://172.28.0.12:8787/status,

0,1
Dashboard: http://172.28.0.12:8787/status,Workers: 1
Total threads: 2,Total memory: 12.67 GiB
Status: running,Using processes: False

0,1
Comm: inproc://172.28.0.12/1427/1,Workers: 1
Dashboard: http://172.28.0.12:8787/status,Total threads: 2
Started: Just now,Total memory: 12.67 GiB

0,1
Comm: inproc://172.28.0.12/1427/4,Total threads: 2
Dashboard: http://172.28.0.12:41265/status,Memory: 12.67 GiB
Nanny: None,
Local directory: /tmp/dask-scratch-space/worker-d1dqe86r,Local directory: /tmp/dask-scratch-space/worker-d1dqe86r


**Note: When we create a `Client` object it registers itself as the default Dask scheduler**. All `.compute()` methods will automatically start using the distributed system.

The `Client` connects to a `Cluster`, which is a pool of workers (in software) that will execute any tasks we sent to it. On my laptop, the notebook tells me that I can use `4` workers, `8` cores, and `~17 GB` of memory.

The client also gives a network address for scheduling (`tcp://127.0.0.1:64821`), and a link to a dashboard to view the execution of task graphs (`http://127.0.0.1:8787/status`).

Checkout the workers tab of the dashboard: it will show you the configuration and the load on the available workers.

We can now send our work to the distributed local cluster:

In [6]:
%time result = total.compute()

CPU times: user 256 ms, sys: 22.4 ms, total: 278 ms
Wall time: 4.02 s


We can shutdown the cluster when we are done with it:

In [7]:
client.close()

INFO:distributed.scheduler:Remove client Client-a9776908-0d84-11f0-8593-0242ac1c000c
INFO:distributed.core:Received 'close-stream' from inproc://172.28.0.12/1427/6; closing.
INFO:distributed.scheduler:Remove client Client-a9776908-0d84-11f0-8593-0242ac1c000c
INFO:distributed.scheduler:Close client connection: Client-a9776908-0d84-11f0-8593-0242ac1c000c
INFO:distributed.scheduler:Retire worker addresses (stimulus_id='retire-workers-1743352525.7192552') [0]
INFO:distributed.worker:Stopping worker at inproc://172.28.0.12/1427/4. Reason: worker-close
INFO:distributed.worker:Removing Worker plugin shuffle
INFO:distributed.core:Received 'close-stream' from inproc://172.28.0.12/1427/5; closing.
INFO:distributed.scheduler:Remove worker addr: inproc://172.28.0.12/1427/4 name: 0 (stimulus_id='handle-worker-cleanup-1743352525.7253897')
INFO:distributed.scheduler:Lost all workers
INFO:distributed.core:Connection to inproc://172.28.0.12/1427/1 has been closed.
INFO:distributed.scheduler:Closing sch

If we want, we can create a `Client` that only uses a single worker and uses all of the CPUs available.

In [8]:
from dask.distributed import Client

# COLAB! Do this line instead...(comment out below)
# client = Client(processes=False, diagnostics_port=None)

client = Client(processes=False)
client

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at: inproc://172.28.0.12/1427/10
INFO:distributed.scheduler:  dashboard at:  http://172.28.0.12:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.worker:      Start worker at: inproc://172.28.0.12/1427/13
INFO:distributed.worker:         Listening to:          inproc172.28.0.12
INFO:distributed.worker:          Worker name:                          0
INFO:distributed.worker:         dashboard at:          172.28.0.12:39937
INFO:distributed.worker:Waiting to connect to: inproc://172.28.0.12/1427/10
INFO:distributed.worker:-------------------------------------------------
INFO:distributed.worker:              Threads:                          2
INFO:distributed.worker:               Memory:                  12.67 GiB
INFO:distributed.worker:      Local Directory: /tmp/dask-scratch-space/worker-d5qau3d8
INFO:distributed.worker:----------------------------------------------

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://172.28.0.12:8787/status,

0,1
Dashboard: http://172.28.0.12:8787/status,Workers: 1
Total threads: 2,Total memory: 12.67 GiB
Status: running,Using processes: False

0,1
Comm: inproc://172.28.0.12/1427/10,Workers: 1
Dashboard: http://172.28.0.12:8787/status,Total threads: 2
Started: Just now,Total memory: 12.67 GiB

0,1
Comm: inproc://172.28.0.12/1427/13,Total threads: 2
Dashboard: http://172.28.0.12:39937/status,Memory: 12.67 GiB
Nanny: None,
Local directory: /tmp/dask-scratch-space/worker-d5qau3d8,Local directory: /tmp/dask-scratch-space/worker-d5qau3d8


In [9]:
%time result = total.compute()

CPU times: user 234 ms, sys: 26.5 ms, total: 260 ms
Wall time: 4.03 s


In [10]:
client.close()

INFO:distributed.scheduler:Remove client Client-0fa11374-0d85-11f0-8593-0242ac1c000c
INFO:distributed.core:Received 'close-stream' from inproc://172.28.0.12/1427/15; closing.
INFO:distributed.scheduler:Remove client Client-0fa11374-0d85-11f0-8593-0242ac1c000c
INFO:distributed.scheduler:Close client connection: Client-0fa11374-0d85-11f0-8593-0242ac1c000c
INFO:distributed.scheduler:Retire worker addresses (stimulus_id='retire-workers-1743352575.347534') [0]
INFO:distributed.worker:Stopping worker at inproc://172.28.0.12/1427/13. Reason: worker-close
INFO:distributed.worker:Removing Worker plugin shuffle
INFO:distributed.core:Received 'close-stream' from inproc://172.28.0.12/1427/14; closing.
INFO:distributed.scheduler:Remove worker addr: inproc://172.28.0.12/1427/13 name: 0 (stimulus_id='handle-worker-cleanup-1743352575.3558192')
INFO:distributed.scheduler:Lost all workers
INFO:distributed.core:Connection to inproc://172.28.0.12/1427/10 has been closed.
INFO:distributed.scheduler:Closing

So far `Client` has been creating a 'cluster' of workers for us (this is from class `LocalCluster`).

We can create the cluster our selves and explicitly specify what resources we need for it, and hook up a client to the cluster:

In [None]:
# NOTE!!! Won't work on Colab

from distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=2, threads_per_worker=1)
client = Client(cluster)
client

In [None]:
%time result = total.compute()

We can then scale up the cluster as needed:

In [None]:
cluster.scale(4)

# The client doesn't update right away, wait a couple of seconds
sleep(2)

client

In [None]:
%time result = total.compute()

In [None]:
client.close()
cluster.close()

## Dask Distributed (Cluster)

You can also run Dask on a distributed cluster. There are a variety of ways to set this up depending on your cluster.

We won't get too deep into this. If you are interested, I did a webinar for WestGrid that included more information on how to use Dask with a HPC cluster: https://www.youtube.com/watch?v=uGy5gT2vLdI

Colab users need something like this:

In [11]:
# May need to restart your runtime after
!pip install dask_jobqueue

Collecting dask_jobqueue
  Downloading dask_jobqueue-0.9.0-py2.py3-none-any.whl.metadata (1.3 kB)
Downloading dask_jobqueue-0.9.0-py2.py3-none-any.whl (52 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/52.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m52.0/52.0 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dask_jobqueue
Successfully installed dask_jobqueue-0.9.0


Here is an example that sets up workers on an HPC cluster that uses the SLURM scheduler:

In [12]:
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
from dask.distributed import progress
import time

cluster = SLURMCluster(cores=2,
                       memory="8000MB",
                       walltime='00:30:00',
                       project='def-blahblahblah')
cluster

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:   tcp://172.28.0.12:46713
INFO:distributed.scheduler:  dashboard at:  http://172.28.0.12:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle


Tab(children=(HTML(value='<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-OutputArea-outpu…

At this point no workers have been allocated to do work. We can checkout the job submission script that Dask uses to create a single worker (with one or more threads):

In [13]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -A def-blahblahblah
#SBATCH -n 1
#SBATCH --cpus-per-task=2
#SBATCH --mem=8G
#SBATCH -t 00:30:00

/usr/bin/python3 -m distributed.cli.dask_worker tcp://172.28.0.12:46713 --name dummy-name --nthreads 1 --memory-limit 3.73GiB --nworkers 2 --nanny --death-timeout 60





We can hook up a client to the cluster to monitor and inspect it:

In [14]:
# Colab: this won't work

client = Client(cluster)
client

ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-40338' coro=<_wrap_awaitable() done, defined at /usr/local/lib/python3.11/dist-packages/distributed/deploy/spec.py:124> exception=FileNotFoundError(2, 'No such file or directory')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/distributed/deploy/spec.py", line 125, in _wrap_awaitable
    return await aw
           ^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/distributed/deploy/spec.py", line 74, in _
    await self.start()
  File "/usr/local/lib/python3.11/dist-packages/dask_jobqueue/core.py", line 426, in start
    out = await self._submit_job(fn)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/dask_jobqueue/core.py", line 409, in _submit_job
    return await self._call(shlex.split(self.submit_command) + [script_filename])
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lo

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://172.28.0.12:8787/status,

0,1
Dashboard: http://172.28.0.12:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://172.28.0.12:46713,Workers: 0
Dashboard: http://172.28.0.12:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


Finally, when we want to allocate workers for our computations, we instruct Dask to submit jobs to SLURM (one job is submitted per worker):

In [15]:
# This submits eight jobs to create workers ... this will fail on your laptop
cluster.scale(8)



This function doesn't wait for the workers to be scheduled -- it returns right away.
We can wait for all of the workers to be ready with:

In [16]:
# Example with 8 workers ...
while ((client.status == "running") and \
       (len(client.scheduler_info()["workers"]) < 8)):
    time.sleep(1)

KeyboardInterrupt: 

For a demo of this sort of workflow, check out the linked webinar at the top of this section.

## Configuration

Back to your local laptop ...

You can configure the global default scheduler by using the `dask.config.set(scheduler...)` command. This can be done globally:

In [17]:
import dask
dask.config.set(scheduler='threads')

%time result = total.compute()



CPU times: user 212 ms, sys: 16.3 ms, total: 228 ms
Wall time: 4.01 s


or within the context of a block:

In [18]:
# Up until here we are using the default task scheduler

with dask.config.set(scheduler='processes'):
    # Anything in this block uses the 'processes' scheduler
    # Do stuff
    %time result = total.compute()
    # Do more stuff

# Outside of the block we return back to the default scheduler

CPU times: user 388 ms, sys: 23 ms, total: 411 ms
Wall time: 6.46 s


or as we have already seen, within a single compute call:

In [19]:
%time result = total.compute(scheduler='synchronous')

CPU times: user 468 ms, sys: 20.9 ms, total: 489 ms
Wall time: 8 s


[On to the next notebook (`dask.array`)](04-array.ipynb) ...