<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">

# 使用 `dask.delayed` 并行化代码

在本节中，我们使用Dask和`Dask.delayed`,来并行化简单的for循环样式代码。通常，这是您将函数转换为与Dask一起使用的唯一函数。

这是一种使用 `dask` 并行化现有代码库或构建 [复杂系统](https://blog.dask.org/2018/02/09/credit-models-with-dask) 的简单方法。 这也将有助于我们对后面的部分进行理解。 

**相关文档**

* [Delayed 文档](https://docs.dask.org/en/latest/delayed.html)
* [Delayed 屏幕录像](https://www.youtube.com/watch?v=SHqFmynRxVU)
* [Delayed API](https://docs.dask.org/en/latest/delayed-api.html)
* [Delayed 样例](https://examples.dask.org/delayed.html)
* [Delayed 最佳实践](https://docs.dask.org/en/latest/delayed-best-practices.html)

正如在[分布式调度器notebook](05_distributed.ipynb) 中看到的那样，Dask有几种并行执行代码的方法。我们将通过创建一个 `dask.distributed.Client`来使用分布式调度器。现在，这将为我们提供一些不错的诊断。稍后我们将深入讨论调度器。 

In [None]:
from dask.distributed import Client

client = Client(n_workers=4)

## 基础

首先让我们制作一些玩具功能，例如`inc`和`add`，它们会休眠一段时间以模拟工作。 然后，我们将正常运行这些功能。

在下一节中，我们将并行化此代码。

In [None]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)
    return x + y

我们使用 `%%time` 魔术命令来计时这段普通代码的执行时间，这是Jupyter Notebook的一个特殊功能。 

In [None]:
%%time
# 这需要三秒钟才能运行，
# 因为我们一个接一个地按顺序调用每个函数

x = inc(1)
y = inc(2)
z = add(x, y)

### 使用`dask.delayed`装饰器并行

这两个增量调用*可以*并行调用，因为它们完全相互独立。

我们将使用 `dask.delayed` 函数转换 `inc` 和 `add` 函数。 当我们通过传递参数调用延迟版本时，与以前完全一样，原始函数实际上还没有被调用 —— 这就是单元执行很快完成的原因。
取而代之的是，生成了一个*延迟对象*，该对象跟踪要调用的函数以及要传递给该函数的参数。

In [None]:
from dask import delayed

In [None]:
%%time
# 这会立即运行，它所做的只是构建一个图

x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)

这立即运行，因为还没有真正发生任何事情。

要获得结果，请调用 `compute`。 请注意，这比原始代码运行得更快。

In [None]:
%%time
# 这里才事实上地使用本地线程池运行我们的计算 

z.compute()

## 刚才发生了什么？

`z`对象是一个惰性(`lazy`)的`Delayed`对象。 该对象包含我们计算最终结果所需的一切，包括对所有必需功能的引用以及它们的输入以及与另一个函数的关系。 我们可以如上所述使用`.compute()`评估结果，也可以使用`.visualize()`可视化该值的任务图。

In [None]:
z

In [None]:
# 查看`z`的任务图
# 译者注:除了适用conda或pip安装python-graphviz，还需要在系统上安装graphviz。以ubuntu为例:sudo apt install graphviz
z.visualize()

注意，这包括之前的函数名称，以及`inc`函数的输出到`add`的输入的逻辑流程。

### 需要考虑的一些问题：

-为什么从3秒变成2秒？ 为什么我们不能并行化到1s？  
-如果inc和add函数不包含`sleep(1)`会发生什么？ Dask仍然可以加快此代码的速度吗？  
-如果我们有多个输出或者还想访问`x`或`y`，该怎么办？  

## 练习：并行化for循环

`for`循环是我们要并行化的最常见的事物之一。 在`inc`和`sum`上使用`dask.delayed`来并行化以下计算：

In [None]:
data = [1, 2, 3, 4, 5, 6, 7, 8]

In [None]:
%%time
# 串行代码

results = []
for x in data:
    y = inc(x)
    results.append(y)
    
total = sum(results)

In [None]:
total

In [None]:
%%time
# 将你的并行代码写在这儿

In [None]:
results = []

for x in data:
    y = delayed(inc)(x)
    results.append(y)
    
total = delayed(sum)(results)
print("Before computing:", total)  # 让我们看看total是什么变量类型的
result = total.compute()
print("After computing :", result)  # 计算完成后

与直接使用 `sum` 函数而不是用 `delayed` 包裹的版本相比，图形可视化与给定的解决方案相比如何？ 你能解释一下后面的版本吗？ 您可能会发现以下表达式的结果很有启发性：
```python
delayed(inc)(1) + delayed(inc)(2)
```

## 练习：使用控制流并行化 for 循环代码

通常，我们只希望延迟*某些*功能，立即运行其中一些功能。当这些函数快速运行并帮助我们确定应该调用哪些其他较慢的函数时，这特别有用。这个决定，延迟或不延迟，通常是我们在使用`dask.delayed`时需要考虑的地方。

在下面的示例中，我们遍历输入列表。 如果输入是偶数，那么我们要调用`inc`。 如果输入为奇数，则我们要调用`double`。 必须立即（而不是懒惰地）做出决定调用`inc`或`double`的`is_even`，以便我们的图形构建Python代码继续进行。

In [None]:
def double(x):
    sleep(1)
    return 2 * x

def is_even(x):
    return not x % 2

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [None]:
%%time
# 串行代码

results = []
for x in data:
    if is_even(x):
        y = double(x)
    else:
        y = inc(x)
    results.append(y)
    
total = sum(results)
print(total)

In [None]:
%%time
# 将你的并行代码写在这儿。。。
# 目标: 使用 dask.delayed 并行化上面的串行代码
# 你需要延迟某些功能，但不是全部

In [None]:
results = []
for x in data:
    if is_even(x):  # even
        y = delayed(double)(x)
    else:          # odd
        y = delayed(inc)(x)
    results.append(y)
    
total = delayed(sum)(results)

In [None]:
%time total.compute()

In [None]:
total.visualize()

### 需要考虑的一些问题：

- 还有哪些我们不能使用延迟执行的控制流示例？
- 如果在上面的示例中延迟执行了对`is_even（x）`的求值，将会发生什么？
- 您对延迟执行`sum`有何想法？ 这个函数既是计算，又运行速度也很快。

## 练习：并行化Pandas Groupby归约

在本练习中，我们读取了多个CSV文件，并且并行执行groupby操作。 我们获得了串行代码来执行此操作，并将其与`dask.delayed`并行化。

我们将并行进行的计算是根据一些历史飞行数据来计算每个机场的平均离港延误。 我们将通过使用`dask.delayed`和`pandas`来做到这一点。 在以后的部分中，我们将使用`dask.dataframe`进行相同的练习。

## 创建数据

运行此代码以准备一些数据。

此下载并提取了1990年到2000年之间纽约市以外航班的一些历史航班数据。原始数据来自[这里](http://stat-computing.org/dataexpo/2009/the-data.html)。

In [None]:
%run prep.py -d flights

### 检查数据

In [None]:
import os
sorted(os.listdir(os.path.join('data', 'nycflights')))

### 用`pandas.read_csv`读取一个文件并计算平均出发延迟

In [None]:
import pandas as pd
df = pd.read_csv(os.path.join('data', 'nycflights', '1990.csv'))
df.head()

In [None]:
# 是什么样的数据类型？
df.dtypes

In [None]:
# 数据中包含哪些始发机场？
df.Origin.unique()

In [None]:
# 每个机场的年平均离港延误
df.groupby('Origin').DepDelay.mean()

### 顺序代码：每个机场的平均离港延误

上面的单元格计算了每个机场一年的平均起飞延迟。 在这里，我们使用顺序for循环将其扩展到所有年份。

In [None]:
from glob import glob
filenames = sorted(glob(os.path.join('data', 'nycflights', '*.csv')))

In [None]:
%%time

sums = []
counts = []
for fn in filenames:
    # 读取文件
    df = pd.read_csv(fn)
    
    # Groupby出发地机场
    by_origin = df.groupby('Origin')
    
    # 所有始发航班延误的总和
    total = by_origin.DepDelay.sum()
    
    # 始发航班数
    count = by_origin.DepDelay.count()
    
    # 保存中间变量
    sums.append(total)
    counts.append(count)

# 合并中间量以获得总的平均始发延迟
total_delays = sum(sums)
n_flights = sum(counts)
mean = total_delays / n_flights

In [None]:
mean

### 并行化上面的代码

使用`dask.delayed`并行化上面的代码。 您需要了解一些额外的信息。

1.  对delayed对象的方法和属性访问会自动工作，因此，如果您有delayed对象，则可以对其执行常规的算术，切片和方法调用，它将产生正确的延迟调用。

    ```python
    x = delayed(np.arange)(10)
    y = (x + 1)[::2].sum()  # 这里的一切都延迟执行了
    ```
2.  当您只有一个输出时，调用`.compute()`方法效果很好。 当您有多个输出时，您可能想使用`dask.compute`函数：

    ```python
    >>> from dask import compute
    >>> x = delayed(np.arange)(10)
    >>> y = x ** 2
    >>> min_, max_ = compute(y.min(), y.max())
    >>> min_, max_
    (0, 81)
    ```
    
    这样Dask可以共享中间值（例如`y = x ** 2`）
    
因此，您的目标是使用`dask.delayed`并行化上面的代码（已在下面复制）。 您可能还需要可视化一些计算，以查看是否正确执行了计算。

In [None]:
from dask import compute

In [None]:
%%time

# 复制的串行代码

sums = []
counts = []
for fn in filenames:
    # 读取文件
    df = pd.read_csv(fn)
    
    # Groupby出发地机场
    by_origin = df.groupby('Origin')
    
    # 所有始发航班延误的总和
    total = by_origin.DepDelay.sum()
    
    # 始发航班数
    count = by_origin.DepDelay.count()
    
    # 保存中间变量
    sums.append(total)
    counts.append(count)

# 合并中间量以获得总的平均始发延迟
total_delays = sum(sums)
n_flights = sum(counts)
mean = total_delays / n_flights

In [None]:
mean

In [None]:
%%time
# 将你的代码写在这儿。。。

如果您加载解决方案，请在单元格顶部添加`%%time`以测量运行时间。 

In [None]:
%%time

# 这只是一种可能的解决方案，有好几种方法可以使用`delayed`来做到这一点 

sums = []
counts = []
for fn in filenames:
    # 读取文件
    df = delayed(pd.read_csv)(fn)

    # Groupby 始发机场
    by_origin = df.groupby('Origin')

    # 所有始发地延误的总和
    total = by_origin["DepDelay"].sum()

    # 始发地的延误的航班数量
    count = by_origin["DepDelay"].count()

    # 保存中间变量
    sums.append(total)
    counts.append(count)

# 计算中间变量
sums, counts = compute(sums, counts)

# 组合中间体以获得每个始发地的总平均延迟
total_delays = sum(sums)
n_flights = sum(counts)
mean = total_delays / n_flights

In [None]:
# 确保结果仍然匹配
mean

## 要考虑的一些问题：

- 您获得了多少提速？ 这是您期望的加速速度吗？
- 尝试在何处调用`compute`。 当您用`sums`和`counts`调用它时会发生什么？ 如果您等待并按`mean`调用该怎么办？
- 尝试delaying call `sum`。 如果`sum`被延迟，图形将是什么样？ 如果不是，则图形看起来像什么？
- 您能想出为什么要以一种方式比另一种方式进行归约的任何原因吗？

### 了解更多 

访问 [Delayed 文档](https://docs.dask.org/en/latest/delayed.html). 特别是，这个[delayed 屏幕录像](https://www.youtube.com/watch?v=SHqFmynRxVU) 将强化您在这里学到的概念和[delayed 最佳实践](https://docs.dask.org/en/latest/delayed-best-practices.html) 文档收集了关于如何使用 `dask.delayed` 的建议。 

## 关闭客户端 

在继续下一个练习之前，请确保关闭您的客户端或停止此内核。 

In [None]:
client.close()