In [1]:
"""
This task implements the more advanced features of dask.dsitributed
"""

# be sure to shut down other kernels running distributed clients
from dask.distributed import Client
c = Client()

In [2]:
# Some trivial work that takes time
# repeated from the Distributed chapter.

from dask import delayed
import time
import distributed

def inc(x):
    time.sleep(5)
    return x + 1

def dec(x):
    time.sleep(3)
    return x - 1

def add(x, y):
    time.sleep(7)
    return x + y

x = delayed(inc)(1)
y = delayed(dec)(2)

In [10]:
fut = c.submit(inc, 1)
distributed.diagnostics.progress(fut)

In [11]:
# functions runs on the cluster for a while, we have a local handle to
# that work
fut

# try the preceding cell again, but with
# distributed.diagnostics.progress(fut)
# or
# distributed.client.wait(fut)

In [12]:
# grab the information back - this blocks if fut is not ready
c.gather(fut)

2

In [13]:
total = delayed(add)(x, y)

In [16]:

# notice the difference from total.compute()
# notice that this cell completes immediately
fut = c.compute(total)

In [15]:
c.gather(fut)

3

In [3]:
# filters data to get only records where name is Alice and grabs her transaction amounts
import dask.bag as db
import os
import json
filename = os.path.join('../data', 'accounts.*.json.gz')
lines = db.read_text(filename)
js = lines.map(json.loads)

f = c.compute(js.filter(lambda record: record['name'] == 'Alice')
       .pluck('transactions')
       .flatten()
       .pluck('amount')
       .mean())

In [33]:

# note that progress must be the last line of a cell
# in order to show up
distributed.diagnostics.progress(f)

In [34]:
c.gather(f)

1054.7568280885337

In [36]:
# computes random data using arrays and chunks 
import h5py
import os
f = h5py.File(os.path.join('../data', 'random.hdf5'), mode='r')
dset = f['/x']
import dask.array as da
x = da.from_array(dset, chunks=(1000000,))

%time x.sum().compute()
%time x.sum().compute()

CPU times: user 1.86 s, sys: 0 ns, total: 1.86 s
Wall time: 6.36 s
CPU times: user 1.94 s, sys: 0 ns, total: 1.94 s
Wall time: 6.54 s


81001808.0

In [37]:
# changes x from a set of delayed prescritions
# to a set of futures pointing to data in RAM
# See this on the UI dashboard.
x = c.persist(x)

In [38]:
%time x.sum().compute()
%time x.sum().compute()

CPU times: user 13.6 s, sys: 3.85 s, total: 17.5 s
Wall time: 2min 58s
CPU times: user 12.6 s, sys: 1.2 s, total: 13.8 s
Wall time: 1min 22s


81001808.0

In [4]:
# example causing intential errors to show debugging feature
# first cell computes arbitray data in arrays
# second cell causes intential divide by zero error

@delayed
def ratio(a, b):
    return a // b

@delayed
def summation(*a):
    return sum(*a)

ina = [5, 25, 30]
inb = [5, 5, 6]
out = summation([ratio(a, b) for (a, b) in zip(ina, inb)])
f = c.compute(out)
f

In [7]:
c.gather(f)

11

In [5]:
ina = [5, 25, 30]
inb = [5, 0, 6]
out = summation([ratio(a, b) for (a, b) in zip(ina, inb)])
f = c.compute(out)
c.gather(f)

ZeroDivisionError: integer division or modulo by zero

In [6]:
import dask

with dask.set_options(get=dask.local.get_sync):
    # do NOT use c.compute(out) here - we specifically do not
    # want the distributed scheduler
    out.compute()

ZeroDivisionError: integer division or modulo by zero

In [7]:
debug

> [0;32m<ipython-input-4-85efec34ee4d>[0m(3)[0;36mratio[0;34m()[0m
[0;32m      1 [0;31m[0;34m@[0m[0mdelayed[0m[0;34m[0m[0m
[0m[0;32m      2 [0;31m[0;32mdef[0m [0mratio[0m[0;34m([0m[0ma[0m[0;34m,[0m [0mb[0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0m
[0m[0;32m----> 3 [0;31m    [0;32mreturn[0m [0ma[0m [0;34m//[0m [0mb[0m[0;34m[0m[0m
[0m[0;32m      4 [0;31m[0;34m[0m[0m
[0m[0;32m      5 [0;31m[0;34m@[0m[0mdelayed[0m[0;34m[0m[0m
[0m
ipdb> 
ipdb> 
ipdb> exit


In [9]:
c.recreate_error_locally(f)

ZeroDivisionError: integer division or modulo by zero

In [10]:
debug

> [0;32m<ipython-input-4-85efec34ee4d>[0m(3)[0;36mratio[0;34m()[0m
[0;32m      1 [0;31m[0;34m@[0m[0mdelayed[0m[0;34m[0m[0m
[0m[0;32m      2 [0;31m[0;32mdef[0m [0mratio[0m[0;34m([0m[0ma[0m[0;34m,[0m [0mb[0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0m
[0m[0;32m----> 3 [0;31m    [0;32mreturn[0m [0ma[0m [0;34m//[0m [0mb[0m[0;34m[0m[0m
[0m[0;32m      4 [0;31m[0;34m[0m[0m
[0m[0;32m      5 [0;31m[0;34m@[0m[0mdelayed[0m[0;34m[0m[0m
[0m
ipdb> exit


In [15]:

c.cluster.scheduler.nbytes

{'finalize-a6d814cd9faf8850e2cf5492884083a8': 28,
 'ratio-09261831-dc72-402c-ac31-e13f40f02bd7': 28,
 'ratio-41d8d12a-79c7-4a53-b4d5-416274cd1835': 28,
 'ratio-551bb6ab-7205-47c6-9b0b-d20d6529574f': 28,
 'ratio-d8eab5d6-9b2b-4567-97b9-e043db9fba9b': 28,
 'summation-9519c632-dc11-4e20-b292-a022a7bfffbb': 28}

In [12]:

workdict = c.start_ipython_workers()

In [13]:
w0 = list(workdict.values())[0]

In [14]:
%remote w0 worker.keys()

['finalize-a6d814cd9faf8850e2cf5492884083a8']