<img src="images/dask_horizontal.svg" align="right" width="30%">

# 分布式,高级话题

## 分布式执行的未来

In [None]:
from dask.distributed import Client
c = Client(n_workers=4)
c.cluster

在上一章中，我们展示了用分布式执行器执行一个计算（使用延迟创建）与任何其他执行器是相同的。然而，我们现在可以访问额外的功能，并控制哪些数据被保存在内存中。

首先，"futures"接口（从内置的 "concurrent.futures "派生）允许类似map-reduce的功能。我们可以用一组输入提交单个函数进行评估，或者用`submit()`和`map()`对一序列输入进行评估。请注意，调用会立即返回，给出一个或多个*futures*，其状态开始为 "待定"，随后变为 "完成"。本地Python会话没有被阻塞。

最简单的 "submit"操作的例子：

In [None]:
def inc(x):
    return x + 1

fut = c.submit(inc, 1)
fut

我们可以随意重新执行下面的单元格，以此来调查未来的状态。当然，这可以以循环的方式进行，在每次迭代时暂停一小段时间。我们可以继续我们的工作，或者查看仍在进行的工作进度条，或者强制等待直到未来准备好。

同时，`status`仪表盘（上面Cluster小组件旁边的链接）的任务流中获得了一个新的元素，表示`inc()`已经完成，问题处的进度部分显示一个任务完成并保存在内存中。

In [None]:
fut

能想到的可能的替代方案：
```python
from dask.distributed import wait, progress
progress(fut)
```
会在*这个*笔记本中显示一个进度条，而不是必须去仪表板。这个进度条也是异步的，不会阻挡其他代码在这期间的执行。

```python
wait(fut)
```
会阻塞并迫使笔记本等待，直到`fut`指向的计算完成。然而，请注意，`inc()`的结果就在集群中，现在执行计算不需要**时间，因为Dask注意到我们在请求一个它已经知道的计算结果。稍后再谈。

In [None]:
# 抢回信息--如果期货还没有准备好，这就会被阻断。
c.gather(fut)
# 等效:
# fut.result()

在这里，我们看到了在集群上执行工作的另一种方式：当你将输入作为期货提交或映射时，*计算会移动到数据*，而不是相反，客户端在本地Python会话中，永远不需要看到中间值。这与使用delayed构建图类似，事实上，delayed可以与期货结合使用。这里我们使用之前的延迟对象`total`。

In [None]:
# 一些琐碎的工作，需要时间从分布式章节重复。

from dask import delayed
import time

def inc(x):
    time.sleep(5)
    return x + 1

def dec(x):
    time.sleep(3)
    return x - 1

def add(x, y):
    time.sleep(7)
    return x + y

x = delayed(inc)(1)
y = delayed(dec)(2)
total = delayed(add)(x, y)

In [None]:
# notice the difference from total.compute()
# notice that this cell completes immediately
fut = c.compute(total)
fut

In [None]:
c.gather(fut) # waits until result is ready

### `Client.submit`

`submit`接受一个函数和参数，将其推送给集群，返回一个*Future*，代表要计算的结果。该函数被传递给一个工人进程进行评估。请注意，这个单元格会立即返回，而计算可能仍在集群上进行。

In [None]:
fut = c.submit(inc, 1)
fut

这看起来很像上面的 "compute()"，除了现在我们直接将函数和参数传递给集群。对于习惯于使用`concurrent.futures`的人来说，这看起来很熟悉。这个新的 "fut "的行为方式和上面的一样。请注意，我们现在已经重写了之前的`fut`定义，它将被垃圾回收，因此，之前的结果被集群释放了。

### 练习：用 "Client.submit "代替 "Client.submit "重建上面的延迟计算


传递给`submit`的参数可以是其他提交操作的期货或延迟对象。特别是前者，展示了*将计算移动到数据*的概念，这是Dask编程中最强大的元素之一。


In [None]:
# Your code here

In [None]:
x = c.submit(inc, 1)
y = c.submit(dec, 2)
total = c.submit(add, x, y)

print(total)     # This is still a future
c.gather(total)   # This blocks until the computation has finished


每一个Future代表一个结果，或被集群所持有或正在评估的结果。因此我们可以控制中间值的缓存--当一个期货不再被引用时，它的值就会被遗忘。在上面的解决方案中，每个函数调用都持有期货。如果我们选择提交更多需要它们的工作，这些结果将不需要重新评估。

我们可以使用`scatter()`明确地将数据从我们的本地会话传递到集群中，但通常更好的是构建函数，在worker本身内部进行数据加载，这样就不需要对数据进行序列化和通信。Dask内部的大部分加载函数，sudh为`dd.read_csv`，都是这样工作的。同样，我们通常也不希望`gather()`的结果在内存中太大。

分布式调度器的[完整API](http://distributed.readthedocs.io/en/latest/api.html)给出了与集群交互的细节，记住，可以在你的本地机器上，也可能在一个庞大的计算资源上。

Future API提供了一种工作提交方式，可以很容易地模仿很多人可能熟悉的map/reduce范式（见`c.map()`）。由futures表示的中间结果可以传递给新的任务，而不必从集群中带来本地的拉动，新的工作可以被分配到还没有开始的之前工作的输出上工作。

一般来说，任何使用`.compute()`执行的Dask操作都可以使用`c.compute()`来代替提交异步执行，这适用于所有集合。下面是一个例子，前面在Bag章节中看到的计算。我们在那里用分布式客户端版本替换了`.compute()`方法，所以，我们又可以继续提交更多的工作（也许是基于计算结果），或者，在下一个单元格中，跟踪计算的进度。类似的进度条出现在监控UI页面。

In [None]:
%run prep.py -d accounts

In [None]:
import dask.bag as db
import os
import json
filename = os.path.join('data', 'accounts.*.json.gz')
lines = db.read_text(filename)
js = lines.map(json.loads)

f = c.compute(js.filter(lambda record: record['name'] == 'Alice')
       .pluck('transactions')
       .flatten()
       .pluck('amount')
       .mean())

In [None]:
from dask.distributed import progress
# 注意，进度必须是单元格的最后一行才能显示出来
progress(f)

In [None]:
# get result.
c.gather(f)

In [None]:
# release values by deleting the futures
del f, fut, x, y, total

### Persist

考虑哪些数据应该由worker加载，而不是传递，以及哪些中间值应该在worker内存中持久化，在很多情况下将决定一个进程的计算效率。

在这里的例子中，我们重复了Array一章的计算--注意到每次调用`compute()`的速度大致相同，因为每次都包含了数据的加载。

In [None]:
%run prep.py -d random

In [None]:
import h5py
import os
f = h5py.File(os.path.join('data', 'random.hdf5'), mode='r')
dset = f['/x']
import dask.array as da
x = da.from_array(dset, chunks=(1000000,))

%time x.sum().compute()
%time x.sum().compute()

如果，我们在前面将数据持久化到RAM中（这需要几秒钟的时间来完成--我们可以在这个过程中`wait()`），那么进一步的计算就会快很多。

In [None]:
# changes x from a set of delayed prescriptions
# to a set of futures pointing to data in RAM
# See this on the UI dashboard.
x = c.persist(x)

In [None]:
%time x.sum().compute()
%time x.sum().compute()

当然，持久化沿途的每一个中间点是个坏主意，因为这往往会填满所有可用的RAM，使整个系统变慢（或崩溃！）。理想的持久化点往往是在一组数据清理步骤的最后，当数据处于一个会经常被查询的形式时。

**练习**：一旦我们知道我们已经完成了与`x`相关的内存，如何释放它？

## 异步计算
<img style="float: right;" src="https://upload.wikimedia.org/wikipedia/commons/thumb/3/32/Rosenbrock_function.svg/450px-Rosenbrock_function.svg.png" height=200 width=200>

使用期货API的一个好处是，你可以有动态计算，随着事情的进展而调整。在这里，我们通过在结果进来时循环执行一个简单的天真搜索，并在其他结果还在运行时提交新的点来计算。

在这个运行过程中，观察[诊断仪表板](.../.../9002/status)，你可以看到计算正在并发运行，而更多的计算正在提交。这种灵活性对于需要一定程度同步的并行算法来说是很有用的。

让我们使用动态编程来执行一个非常简单的最小化。感兴趣的函数被称为Rosenbrock。

In [None]:
# a simple function with interesting minima
import time

def rosenbrock(point):
    """Compute the rosenbrock function and return the point and result"""
    time.sleep(0.1)
    score = (1 - point[0])**2 + 2 * (point[1] - point[0]**2)**2
    return point, score

初始设置，包括创建一个图形。我们使用了Bokeh，它可以随着结果的到来动态更新图形。

In [None]:
from bokeh.io import output_notebook, push_notebook
from bokeh.models.sources import ColumnDataSource
from bokeh.plotting import figure, show
import numpy as np
output_notebook()

# set up plot background
N = 500
x = np.linspace(-5, 5, N)
y = np.linspace(-5, 5, N)
xx, yy = np.meshgrid(x, y)
d = (1 - xx)**2 + 2 * (yy - xx**2)**2
d = np.log(d)

p = figure(x_range=(-5, 5), y_range=(-5, 5))
p.image(image=[d], x=-5, y=-5, dw=10, dh=10, palette="Spectral11");

我们从(0, 0)处的一个点开始，在它周围随机分散测试点。每次评估需要100ms，当结果出来时，我们测试是否有新的最佳点，并在新的最佳点周围选择随机点，因为搜索框缩小了。

每次有新的最佳值时，我们都会打印出函数值和当前的最佳位置。

In [None]:
from dask.distributed import as_completed
from random import uniform

scale = 5                  # Intial random perturbation scale
best_point = (0, 0)        # Initial guess
best_score = float('inf')  # Best score so far
startx = [uniform(-scale, scale) for _ in range(10)]
starty = [uniform(-scale, scale) for _ in range(10)]

# set up plot
source = ColumnDataSource({'x': startx, 'y': starty, 'c': ['grey'] * 10})
p.circle(source=source, x='x', y='y', color='c')
t = show(p, notebook_handle=True)

# initial 10 random points
futures = [c.submit(rosenbrock, (x, y)) for x, y in zip(startx, starty)]
iterator = as_completed(futures)

for res in iterator:
    # take a completed point, is it an improvement?
    point, score = res.result()
    if score < best_score:
        best_score, best_point = score, point
        print(score, point)

    x, y = best_point
    newx, newy = (x + uniform(-scale, scale), y + uniform(-scale, scale))
    
    # update plot
    source.stream({'x': [newx], 'y': [newy], 'c': ['grey']}, rollover=20)
    push_notebook(document=t)
    
    # add new point, dynamically, to work on the cluster
    new_point = c.submit(rosenbrock, (newx, newy))
    iterator.add(new_point)  # Start tracking new task as well

    # Narrow search and consider stopping
    scale *= 0.99
    if scale < 0.001:
        break
point

## Debugging

当分布式作业中出现问题时，很难弄清楚问题出在哪里，该如何处理。当一个任务引发异常时，当收集到该结果或其他依赖于该结果的结果时，异常就会显示出来。

考虑以下要由集群计算的延迟计算。像往常一样，我们会得到一个未来，集群正在努力计算（对于琐碎的程序来说，这种情况发生得非常缓慢）。

In [None]:
@delayed
def ratio(a, b):
    return a // b

ina = [5, 25, 30]
inb = [5, 5, 6]
out = delayed(sum)([ratio(a, b) for (a, b) in zip(ina, inb)])
f = c.compute(out)
f

我们只有在收集结果的时候才会知道发生了什么（对于`out.compute()`来说也是如此，只是在此期间我们不可能做其他事情）。对于第一组输入，它工作得很好。

In [None]:
c.gather(f)

但如果我们引入了不好的输入，就会出现异常。异常发生在 "比率 "中，但只有在计算总和时才会引起我们的注意。

In [None]:
ina = [5, 25, 30]
inb = [5, 0, 6]
out = delayed(sum)([ratio(a, b) for (a, b) in zip(ina, inb)])
f = c.compute(out)
c.gather(f)

这种情况下的显示可以很明显的看出异常的来源，但事实并非总是如此。这应该如何调试，我们会如何去找出导致异常的具体条件呢？

第一步，当然是写出经过良好测试的代码，对其输入进行适当的断言，并在出错时发出明确的警告和错误信息。这适用于所有的代码。

最典型的做法是在本地线程中执行一部分计算，这样我们就可以运行Python调试器，查询异常发生时的状态。显然，在集群上处理大数据时，不能对整个数据集执行，但即使如此，一个合适的样本可能也能做到。

In [None]:
import dask
with dask.config.set(scheduler="sync"):
    # do NOT use c.compute(out) here - we specifically do not
    # want the distributed scheduler
    out.compute()

In [None]:
# uncomment to enter post-mortem debugger
# %debug

这种方法的问题是，Dask是为了执行大型数据集/计算--你可能无法简单地运行整个事情。在一个本地线程中，否则当初就不会使用Dask了。所以，上面的代码只应该用在一小部分数据上，也会抑制错误的发生。
此外，当你处理的是期货（如上面的`f`，或持久化后）而不是基于延迟的计算时，该方法将不起作用。

作为一种替代方法，你可以要求调度器分析你的计算，并找到导致错误的具体子任务，然后只将它和它的依赖关系拉到本地执行。

In [None]:
c.recreate_error_locally(f)

In [None]:
# 取消注释进入事后调试器。
# %debug

最后，除了异常之外，还有一些错误，这时我们需要查看调度器/工作者的状态。在我们开始的标准 "LocalCluster "中，我们可以直接访问这些。

In [None]:
[(k, v.state) for k, v in c.cluster.scheduler.tasks.items() if v.exception is not None]