<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg" 
     width="30%" 
     align=right
     alt="Dask logo">

Custom Workloads
-------------------------

*Because not all problems are dataframes*

In [1]:
from dask.distributed import Client, progress
c = Client('127.0.0.1:8786')
c

<Client: scheduler="127.0.0.1:8786" processes=8 cores=8>

In [2]:
from time import sleep

def inc(x):
    from random import random
    sleep(random() / 5)
    return x + 1

def double(x):
    from random import random
    sleep(random() / 5)
    return 2 * x
    
def add(x, y):
    from random import random
    sleep(random() / 5)
    return x + y 

In [4]:
inc(1)

2

In [5]:
future = c.submit(inc, 1)  # returns immediately with pending future
future

<Future: status: pending, key: inc-911d66b289853e68c11b7d5aae9b49af>

In [7]:
future.result()

2

### Submit many tasks

We submit many tasks that depend on each other in a normal Python for loop

In [14]:
%%time
zs = []
for i in range(256):
    x = c.submit(inc, i)     # x = inc(i)
    y = c.submit(double, x)  # y = inc(x)
    z = c.submit(add, x, y)  # z = inc(y)
    zs.append(z)

CPU times: user 292 ms, sys: 12 ms, total: 304 ms
Wall time: 302 ms


In [10]:
c.gather(zs)

[3,
 6,
 9,
 12,
 15,
 18,
 21,
 24,
 27,
 30,
 33,
 36,
 39,
 42,
 45,
 48,
 51,
 54,
 57,
 60,
 63,
 66,
 69,
 72,
 75,
 78,
 81,
 84,
 87,
 90,
 93,
 96,
 99,
 102,
 105,
 108,
 111,
 114,
 117,
 120,
 123,
 126,
 129,
 132,
 135,
 138,
 141,
 144,
 147,
 150,
 153,
 156,
 159,
 162,
 165,
 168,
 171,
 174,
 177,
 180,
 183,
 186,
 189,
 192,
 195,
 198,
 201,
 204,
 207,
 210,
 213,
 216,
 219,
 222,
 225,
 228,
 231,
 234,
 237,
 240,
 243,
 246,
 249,
 252,
 255,
 258,
 261,
 264,
 267,
 270,
 273,
 276,
 279,
 282,
 285,
 288,
 291,
 294,
 297,
 300,
 303,
 306,
 309,
 312,
 315,
 318,
 321,
 324,
 327,
 330,
 333,
 336,
 339,
 342,
 345,
 348,
 351,
 354,
 357,
 360,
 363,
 366,
 369,
 372,
 375,
 378,
 381,
 384,
 387,
 390,
 393,
 396,
 399,
 402,
 405,
 408,
 411,
 414,
 417,
 420,
 423,
 426,
 429,
 432,
 435,
 438,
 441,
 444,
 447,
 450,
 453,
 456,
 459,
 462,
 465,
 468,
 471,
 474,
 477,
 480,
 483,
 486,
 489,
 492,
 495,
 498,
 501,
 504,
 507,
 510,
 513,
 516,
 519

### Custom computation: Tree summation

As an example of a non-trivial algorithm, consider the classic tree reduction.  We accomplish this with a nested for loop and a bit of normal Python logic.

```
finish           total             single output
    ^          /        \
    |        c1          c2        neighbors merge
    |       /  \        /  \
    |     b1    b2    b3    b4     neighbors merge
    ^    / \   / \   / \   / \
start   a1 a2 a3 a4 a5 a6 a7 a8    many inputs
```

In [16]:
L = zs
while len(L) > 1:
    new_L = []
    for i in range(0, len(L), 2):
        future = c.submit(add, L[i], L[i + 1])  # add neighbors
        new_L.append(future)
    L = new_L                                   # swap old list for new
    
progress(L)

In [17]:
L

[<Future: status: finished, type: int, key: add-b9552e39d09483e4ddab75d79af4cb69>]

In [18]:
c.gather(L)

[98688]

Example with data streams
----------------------------

The executor can map functions over lists or queues.  This is nothing more than calling `submit` many times.  We can chain maps on queues together to construct simple data processing pipelines.

All of this logic happens on the client-side.  None of this logic was hard-coded into the scheduler.  This simple streaming system is a good example of the kind of system that becomes easy for users to build when given access to custom task scheduling.

In [19]:
from queue import Queue
from threading import Thread

def multiplex(n, q, **kwargs):
    """ Convert one queue into several equivalent Queues
    
    >>> q1, q2, q3 = multiplex(3, in_q)
    """
    out_queues = [Queue(**kwargs) for i in range(n)]
    def f():
        while True:
            x = q.get()
            for out_q in out_queues:
                out_q.put(x)
    t = Thread(target=f)
    t.daemon = True
    t.start()
    return out_queues        

```
           ----inc---->
          /            \ 
in_q --> q              \_add__ results
          \             / 
           ---double-->/
```

In [20]:
in_q = Queue()
q = c.scatter(in_q)

In [21]:
in_q.put(1)
q.get()

<Future: status: finished, type: int, key: 5452d459ce51afca0b9ddc59c084a769>

In [22]:
q_1, q_2 = multiplex(2, q)

inc_q = c.map(inc, q_1)
double_q = c.map(double, q_2)

add_q = c.map(add, inc_q, double_q)

out_q = c.gather(add_q)

In [23]:
in_q.put(10)
out_q.get()

31

In [24]:
from random import random

def feed(q):
    for i in range(10000):
        sleep(random())
        q.put(i)
        
t = Thread(target=feed, args=(q,))
t.daemon = True
t.start()

In [31]:
out_q.qsize()

41