!["Anaconda"](img/anaconda-logo.png)
<br>
*Copyright Continuum 2012-2016 All Rights Reserved.*

# Dask Custom Workflows

<img src="img/dask_horizontal.svg" 
     width="30%" 
     align=right
     alt="Dask logo">

It is possible to submit tasks directly to the task scheduler.  This demonstrates the flexibility that can be achieved with the `submit` function and normal Python for loops.

Later on we map functions across Python queues to construct data processing pipelines.

## Table of Contents
* [Dask Custom Workflows](#Dask-Custom-Workflows)
* [Using Dask as a Task Scheduler](#Using-Dask-as-a-Task-Scheduler)
	* [Write some Code to Distribute](#Write-some-Code-to-Distribute)
	* [Lauch the Scheduler](#Lauch-the-Scheduler)
	* [Launch the Workers](#Launch-the-Workers)
	* [Submit a task with the Executor](#Submit-a-task-with-the-Executor)
	* [Submit many tasks](#Submit-many-tasks)
	* [Custom computation: Tree summation](#Custom-computation:-Tree-summation)
* [Streams and Task Scheduling](#Streams-and-Task-Scheduling)
* [Exercise: Optimization](#Exercise:-Optimization)


# Using Dask as a Task Scheduler

## Write some Code to Distribute

In [None]:
from time import sleep

def inc(x):
    from random import random
    sleep(random())
    return x + 1

def double(x):
    from random import random
    sleep(random())
    return 2 * x
    
def add(x, y):
    from random import random
    sleep(random())
    return x + y 

## Lauch the Scheduler

First thing we need to do is launch the Dask schedular on a head node.  For this local demo, the head and worker nodes will all run on the local machines, but in principle these could run on as many machines as are available for you.  Once launched, you will see terminal output similar to the below.

```bash

$ dask-scheduler


distributed.scheduler - INFO - Scheduler at:         192.168.1.73:8786
distributed.scheduler - INFO -      http at:         192.168.1.73:9786
distributed.scheduler - INFO -  Bokeh UI at:  http://192.168.1.73:8787/status/
bokeh.command.subcommands.serve - INFO - Check for unused sessions every 50 milliseconds
bokeh.command.subcommands.serve - INFO - Unused sessions last for 1 milliseconds
bokeh.command.subcommands.serve - INFO - Starting Bokeh server on port 8787 with applications at paths ['/status', '/tasks']
distributed.core - INFO - Connection from 127.0.0.1:65440 to Scheduler
distributed.core - INFO - Connection from 127.0.0.1:65441 to Scheduler
distributed.core - INFO - Connection from 127.0.0.1:50703 to Scheduler
distributed.scheduler - INFO - Register 127.0.0.1:50701
distributed.scheduler - INFO - Starting worker compute stream, 127.0.0.1:50701
```

## Launch the Workers

After the scheduler is launched, we want to start one or more workers by connecting them to the scheduler.  In this case we connect to `localhost` on the default port of 8786.  For the cluster scenario, copy the address echoed when you launched `dask-scheduler`; in this case, you could use `192.168.1.73:8786`.  See `dask-worker --help` for more options.  In particular, you may want to fine-tune the number of processes and/or threads devoted to participating in the Dask-Distributed work (this is not to limit resource allocation, but rather to take best advantage of the cores on your nodes; the defaults are sensible most of the time).

```bash

$ dask-worker localhost:8786


distributed.worker - INFO -       Start worker at:            127.0.0.1:50701
distributed.worker - INFO -              nanny at:            127.0.0.1:50698
distributed.worker - INFO -               http at:            127.0.0.1:50700
distributed.worker - INFO - Waiting to connect to:            127.0.0.1:8786
distributed.worker - INFO -         Registered to:            127.0.0.1:8786
distributed.core - INFO - Connection from 127.0.0.1:50704 to Worker
distributed.core - INFO - Connection from 127.0.0.1:50705 to Worker
distributed.nanny - INFO - Nanny 127.0.0.1:50698 starts worker process 127.0.0.1:50701
```

## Submit a task with the Executor

In [None]:
from dask.distributed import Executor, progress

In [None]:
# Depending on platform, might need to launch using `127.0.0.1` rather than `localhost`

#e = Executor('52.91.11.18:8786')  # We might connect to a remote cluster
e = Executor('127.0.0.1:8786')
e

We can execute the `inc()` function locally to test.

In [None]:
inc(1)

Now, let's submit the job instead to the executor:

> *Note: Before issuing the `.submit()`, if you open a browser window/tab pointed to http://localhost:8787/status/, you get a very nice status monitor during the task execution.* 

In [None]:
future = e.submit(inc, 1)  # returns immediately with pending future
future

In [None]:
future  # scheduler and client talk constantly

In [None]:
future.result()

## Submit many tasks

We submit many tasks that depend on each other in a normal Python for loop

In [None]:
%%time
zs = []
for i in range(16):
    x = e.submit(inc, i)     # x = inc(i)
    y = e.submit(double, x)  # y = inc(x)
    z = e.submit(add, x, y)  # z = inc(y)
    zs.append(z)

In [None]:
e.gather(zs)

## Custom computation: Tree summation

As an example of a non-trivial algorithm, consider the classic tree reduction.  We accomplish this with a nested for loop and a bit of normal Python logic.

```
finish           total             single output
    ^          /        \
    |        c1          c2        neighbors merge
    |       /  \        /  \
    |     b1    b2    b3    b4     neighbors merge
    ^    / \   / \   / \   / \
start   a1 a2 a3 a4 a5 a6 a7 a8    many inputs
```

In [None]:
L = zs
while len(L) > 1:
    new_L = []
    for i in range(0, len(L), 2):
        future = e.submit(add, L[i], L[i + 1])  # add neighbors
        new_L.append(future)
    L = new_L                                   # swap old list for new
    
progress(L)

In [None]:
L

In [None]:
e.gather(L)

# Streams and Task Scheduling

The executor can map functions over lists or queues.  
* This is nothing more than calling `submit` many times.  
* We can chain maps on queues together to construct simple data processing pipelines.

All of this logic happens on the client-side.  
* None of this logic was hard-coded into the scheduler.  
* This simple streaming system is a good example of the kind of system that becomes easy for users to build when given access to custom task scheduling.

In [None]:
from queue import Queue
from threading import Thread

def multiplex(n, q, **kwargs):
    """ Convert one queue into several equivalent Queues
    
    >>> q1, q2, q3 = multiplex(3, in_q)
    """
    out_queues = [Queue(**kwargs) for i in range(n)]
    def f():
        while True:
            x = q.get()
            for out_q in out_queues:
                out_q.put(x)
    t = Thread(target=f)
    t.daemon = True
    t.start()
    return out_queues        

```
           ----inc---->
          /            \ 
in_q --> q              \_add__ results
          \             / 
           ---double-->/
```

In [None]:
in_q = Queue()
q = e.scatter(in_q)

In [None]:
in_q.put(1)
q.get()

In [None]:
q_1, q_2 = multiplex(2, q)

inc_q = e.map(inc, q_1)
double_q = e.map(double, q_2)

add_q = e.map(add, inc_q, double_q)

out_q = e.gather(add_q)

In [None]:
in_q.put(10)
out_q.get()

In [None]:
from random import random

def feed(q):
    for i in range(10000):
        sleep(random())
        q.put(i)
        
t = Thread(target=feed, args=(q,))
t.daemon = True
t.start()

In [None]:
out_q.qsize()

# Exercise: Optimization

1. Identify a more realistically difficult computation that is parallelizable.  Play around with parallelizing it over your machine cores (or over a cluster if one is available in class).

2. Try using Numba JITing to further speed up this distributed computation.

*Copyright Continuum 2012-2016 All Rights Reserved.*