<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">


## Threads, Processes, and the Global Interpreter Lock

There are two main ways to achieve single-machine parallelism in python: multiple threads or multiple processes.
The best choice depends on the type of work you're doing and Python's GIL (global interpreter lock).

The GIL is an implementation detail of CPython. It's a lock deep inside your python process that limits things so *only one thread in your process can be running python code at once*. Let's see an example, using `concurrent.futures` to compute the fibonnaci numbers in parallel.

In [None]:
import concurrent.futures

def fib(n):
    """Compute the `n`th fibonnaci number.
    
    This is a deliberatly slow, CPU intensive implemenation.
    """
    if n < 2:
        return n
    return fib(n - 2) + fib(n - 1)

In [None]:
%time fib(34)

One `fib(34)` takes my machine about 2 seconds. Computing it 4 time should then take about 8 seconds.

In [None]:
results = map(fib, [34, 34, 34, 34])
%time _ = list(results)

Let's compute it in parallel. This is embarassingly parallel, so with 4 threads we *should* be back down to 2 seconds again.

In [None]:
thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=4)

results = thread_pool.map(fib, [34, 34, 34, 34])
%time _ = list(results)

It's actually *slower*! `concurrent.futures` (and Dask) make it easy to swap the compute backend between threads and processes.

## Exercise: Parallelize `fib` with Processes

Use a `concurrent.futures.ProcessPoolExecutor` to achieve the same task. Time how long it take.
See https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor, which has the same API as our `ThreadPoolExecutor`.

In [None]:
# Your solution


In [None]:
%load solutions/04-schedulers-fib-process.py

So, why use threads at all, if they can't actually run Python code in parallel? Becuase much of the code you're running isn't Python code. The performance-sennsitive parts of libraries like NumPy and Pandas are written in C or Cython. Wherever possible, these libraries release the GIL. The standard library does this in places too, like when you make an HTTP request.

When the GIL isn't a concern (NumPy and pandas), we tend to prefer threads to avoid data serialization.
To do an operation on a pandas DataFrame in multiple processes, the data has to be serialized (using e.g. pickle) in  the first process and deserialized in the second process. This takes time and memory. Threads don't have serialization overhead.

David Beazley has some nice materials on the GIL: http://www.dabeaz.com/GIL/

# Schedulers

In the previous notebooks, we used `dask.delayed` and `dask.dataframe` to parallelize computations.
These work by building a *task graph* instead of executing immediately.
Each *task* represents some function to call on some data, and the full *graph* is the relationship between all the tasks.

When we wanted the actual result, we called `compute`, which handed the task graph off to a *scheduler*.

**Schedulers are responsible for running a task graph and producing a result**.

![](https://raw.githubusercontent.com/dask/dask-org/master/images/grid_search_schedule.gif)

Dask includes several schedulers

Scheduler | Parallelism | Use Case
--------- | ----------- | ---------
threaded  | Local thread pool | Numeric, GIL-releasing code
multiprocessing | Local process pool | GIL holding code
local           | Single main thread | Debugging
distributed     | Multiple machines  | Large problems

In this section we first talk about changing schedulers.  Then we use the `dask.distributed` scheduler in more depth.

### Local Schedulers

Dask separates computation description (task graphs) from execution (schedulers). This allows you to write code once, and run it locally or scale it out across a cluster.

In each case we change the scheduler used in a few different ways:

- By providing a `get=` keyword argument to `compute`:

```python
total.compute(get=dask.multiprocessing.get)
# or 
dask.compute(a, b, get=dask.multiprocessing.get)
```

- Using `dask.set_options`:

```python
# Use multiprocessing in this block
with dask.set_options(get=dask.multiprocessing.get):
    total.compute()
# Use multiprocessing globally
dask.set_options(get=dask.multiprocessing.get)
```

Here we repeat a simple dataframe computation from the previous section using the different schedulers:

In [None]:
import dask 
import dask.multiprocessing
import dask.dataframe as dd
import pandas as pd
from glob import glob
import os

In [None]:
df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': object,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})

# Maximum non-cancelled delay
largest_delay = df[~df.Cancelled].DepDelay.max()

In [None]:
%time _ = largest_delay.compute()  # this uses threads by default

In [None]:
%time _ = largest_delay.compute(get=dask.multiprocessing.get)  # this uses processes

In [None]:
%time _ = largest_delay.compute(get=dask.get)  # This uses a single thread

By default the threaded and multiprocessing schedulers use the same number of workers as cores. You can change this using the `num_workers` keyword in the same way that you specified `get` above:

```
largest_delay.compute(get=dask.multiprocessing.get, num_workers=2)
```

To see how many cores you have on your computer, you can use `multiprocessing.cpu_count`

In [None]:
from multiprocessing import cpu_count
cpu_count()

### Some Questions to Consider:

- How much speedup is possible for this task (hint, look at the graph).
- Given how many cores are on this machine, how much faster could the parallel schedulers be than the single-threaded scheduler.
- How much faster was using threads over a single thread? Why does this differ from the optimal speedup?
- Why is the multiprocessing scheduler so much slower here?

---

## In what cases would you want to use one scheduler over another?

http://dask.pydata.org/en/latest/scheduler-choice.html

---

## Profiling

*You should skip this section if you are running low on time*.

The synchronous scheduler is particularly valuable for debugging and profiling.  

For example, the IPython `%%prun` magic gives us profiling information about which functions take up the most time in our computation.  Try this magic on the computation above with different schedulers.  How informative is this magic when running parallel code?

In [None]:
%prun -l 30 _ = largest_delay.compute(get=dask.threaded.get)

In [None]:
%prun -l 30 _ = largest_delay.compute(get=dask.get)

To aid in profiling parallel execution, dask provides several [`diagnostics`](http://dask.pydata.org/en/latest/diagnostics.html) for measuring and visualizing performance. These are useful for seeing bottlenecks in the *parallel* computation, whereas the above `prun` is useful for seeing bottlenecks in individual *tasks*.

In [None]:
from dask.diagnostics import Profiler, ResourceProfiler, visualize

with Profiler() as p, ResourceProfiler(0.25) as r:
    largest_delay.compute()
    
visualize([r, p]);

From the plot above, we can see that while tasks are running concurrently, due to GIL effects we're only achieving parallelism during early parts of `pd.read_csv` (mostly the byte operations).


*It should be noted that the `dask.diagnostics` module is only useful when profiling on a single machine. The `dask.distributed` scheduler has its own set of diagnostics..*

See Jim's talk at SciPy 2017 for more: https://www.youtube.com/watch?v=JoK8V2eWFPE

---

## Distributed Scheduler

The `dask.distributed` system is composed of a single centralized scheduler and several worker processes.  We can either set these up manually as command line processes or have Dask set them up for us from the notebook.  


#### Automatically setup a local cluster

Starting a single scheduler and worker on the local machine is a common case. Dask will set up a local cluster for you if you provide no scheduler address to `Client`:

```python
from dask.distributed import Client
client = Client()
```

If you choose this approach then there is no need to set up a `dask-scheduler` or `dask-worker` process as described below.


#### Other ways to setup a cluster

You can find more information at the following documentation pages:

- [Quickstart](http://distributed.readthedocs.io/en/latest/quickstart.html)
- [Many Ways to Setup](http://dask.pydata.org/en/latest/setup.html)

## Using a local cluster

As mentioned above, the multiprocessing scheduler can be inefficient for complicated workflows. The distributed scheduler doesn't have this downside, and works fine locally. This makes it often a good replacement for the multiprocessing scheduler, even when working on a single machine.

Here we startup a local cluster, and use it to repeat the same dataframe computation as done above:

In [None]:
from dask.distributed import Client

# Setup a local cluster.
# By default this sets up 1 worker per core
client = Client()
client

Be sure to click the `Dashboard` link to open up the diagnostics dashboard.

In [None]:
%time _ = largest_delay.compute(get=client.get)

#### Some Questions to Consider

- How does this compare to the optimal parallel speedup?
- Why is this faster than the threaded scheduler?

### Client takes over by default

Actually, we didn't need to add `get=client.get`.  The distributed scheduler takes over as the default scheduler for all collections when the Client is created:

In [None]:
%time _ = largest_delay.compute()  # This used to use threads by default, now it uses dask.distributed

---

### Exercise

Run the following computations while looking at the diagnostics page. In each case what is taking the most time?

In [None]:
# Number of flights
_ = len(df)

In [None]:
# Number of non-cancelled flights
_ = len(df[~df.Cancelled])

In [None]:
# Number of non-cancelled flights per-airport
_ = df[~df.Cancelled].groupby('Origin').Origin.count().compute()

In [None]:
# Average departure delay from each airport?
_ = df[~df.Cancelled].groupby('Origin').DepDelay.mean().compute()

In [None]:
# Average departure delay per day-of-week
_ = df.groupby(df.Date.dt.dayofweek).DepDelay.mean().compute()

---

### New API

The distributed scheduler is more sophisticated than the single machine schedulers.  It comes with more functions to manage data, computing in the background, and more.  The distributed scheduler also has entirely separate documentation

-  http://distributed.readthedocs.io/en/latest/
-  http://distributed.readthedocs.io/en/latest/api.html