# 2.2. Dask Schedulers

The Dask *Schedulers* orchestrate the tasks in the Task Graphs so that they can be run in parallel.  *How* they run in parallel, though, is determined by which *Scheduler* you choose.

There are 3 *local* schedulers:

- **Single-Thread Local:** For debugging, profiling, and diagnosing issues
- **Multi-threaded:** Using the Python built-in `threading` package (the default for all Dask things except `Bags`)
- **Multi-process:** Using the Python built-in `multiprocessing` package (the default for Dask `Bags`)

and 1 *distributed* scheduler, which we will talk about later:

- **Distributed:** Using the Dask package `distributed` (which uses `tornado` for TCP communication)

You can use *any* of these schedulers for *any* computation, but you might only get good performance with one of them!

In [None]:
from sleeplock import sleep
import dask
import dask.multiprocessing

## How to Specify Local Schedulers?

There are many ways to specify which scheduler you want to use and when you want to use them.  In all cases, you specify your scheduler with the appropriate `scheduler` parameter:

#### Single-Threaded:

> ```python
> scheduler='single-threaded'
> ```

or 


> ```python
> scheduler='sync'
> ```

or 


> ```python
> scheduler='synchronous'
> ```

#### Multi-Threaded (*Default*):

> ```python
> scheduler='threads'
> ```

or 

> ```python
> scheduler='threading'
> ```

#### Multi-Processing:

> ```python
> scheduler='processes'
> ```

or

> ```python
> scheduler='multiprocessing'
> ```

And then you can specify your chosen scheduler in multiple ways within your code:

#### Globally:

> ```python
> import dask
> dask.set_options(scheduler=...)
> ```

#### Locally with a Context Manager:

> ```python
> with dask.set_options(scheduler=...):
>     ...
> ```

#### Locally with the Dask Compute Function:

> ```python
> dask.compute(obj1, obj2, ..., scheduler=...)
> ```

#### Locally with the Delayed Object Compute Methods:

> ```python
> delayed_obj.compute(scheduler=...)
> ```

## Example: *Slow Arithmetic*

Let's now consider a more complicated example than the ones previously, and see how different schedulers handle them.

Imagine the following arithmetic operation:

    (1 + 3) * (2 - 4) / 3.5
    
First, we define some slow, delayed functions to define this operation.

In [None]:
@dask.delayed
def add(x, y):
    sleep(1)
    return x + y

In [None]:
@dask.delayed
def sub(x, y):
    sleep(1)
    return x - y

In [None]:
@dask.delayed
def mul(x, y):
    sleep(1)
    return x * y

In [None]:
@dask.delayed
def div(x, y):
    sleep(1)
    return x / y

Now, we can construct the operation using the delayed functions, such that:

    x = 1 + 3
    y = 2 - 4
    z = x * y = -8

And the final result can be written:

    RESULT = z / 5.0 = -1.6

In [None]:
%time x = add(1,3)
x

In [None]:
%time y = sub(2,4)
y

In [None]:
%time z = mul(x,y)
z

In [None]:
%time RESULT = div(z, 5.0)
RESULT

## Visualize the RESULT

Let's take a look at the resulting Task Graph.

In [None]:
RESULT.visualize()

## How long should this take to compute?

Keep in mind that each function takes 1 second to run.

#### Serial:

In [None]:
%time RESULT.compute(scheduler='single-threaded')

#### Multi-Processing:

In [None]:
%time RESULT.compute(scheduler='multiprocessing')

#### Multi-Threading:

In [None]:
%time RESULT.compute(scheduler='threads')

## Do the numbers make sense?  Did you predict the outcomes correctly?

You should dive a little deeper into the details here and notice a few key points.

1. The serial scheduler took 4 seconds to compute because it could not parallelize any of the operations.
2. The multi-threaded scheduler took as lock to compute as the serial scheduler!  Why?
3. The multi-processing scheduler was able to parallelize the `add` and `sub` calls because they are obviously independent.
4. The multi-processing scheduler should have an obversably larger latency when compared to the multi-threaded scheduler.

## So, why didn't the multi-threading example parallelize the code?

Take a look at the `sleep` function in the `sleeplock` module.  What is it doing?

In [None]:
# %load sleeplock.py
import time
from threading import Lock
lock = Lock()

def sleep(n):
    lock.acquire()
    time.sleep(n)
    lock.release()


This is an example of what happens when multiple threads compete for the same resource (e.g., memory).  Python is *not thread-safe*, in general.  But it *can be* thread-safe, with the help of the Python **Global Interpreter Lock (GIL)**.  

The GIL prevents different threads from corrupting the same memory by *locking* the memory object while one thread writes to it, preventing all other threads from writing to that memory until the lock is released.

#### NOTE: Be careful with multi-threading and pay close attention to the Python GIL!