# 使用 Dask 进行多 GPU 计算

利用 GPU 加速你的工作负载可以带来数量级的性能提升，但一旦你的工作负载完全利用了设备，你就会开始达到新的性能上限。

这就是多 GPU 和多节点工作负载发挥作用的地方。可以将多个 GPU 一起使用，从而实现性能的又一次飞跃。

在我们深入研究多 GPU 工作负载之前，我想提醒一下，分布式计算可能会增加代码的复杂性。本章讨论的工具会尽一切努力减轻分布式计算的负担，但我们应该确保在开始横向扩展之前，已经榨干了单个 GPU 的每一滴性能。

In [None]:
!git clone https://github.com/rapidsai/rapidsai-csp-utils.git
!python rapidsai-csp-utils/colab/pip-install.py

## Dask

[Dask](https://dask.org) 是一个用于扩展 Python 代码的 Python 库。在其核心，Dask 将你的 Python 代码转换为由函数调用、输入和输出组成的计算图。然后它有一系列调度器可以用来并行执行这个图。这里我们将重点关注 Dask 的分布式调度器。

In [1]:
from dask.distributed import Client

client = Client()
client

  return importlib.import_module(spec.name)
  return importlib.import_module(spec.name)
  return importlib.import_module(spec.name)
  return importlib.import_module(spec.name)
  return importlib.import_module(spec.name)
  return importlib.import_module(spec.name)


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 5
Total threads: 20,Total memory: 78.05 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:34689,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:38757,Total threads: 4
Dashboard: http://127.0.0.1:36207/status,Memory: 15.61 GiB
Nanny: tcp://127.0.0.1:34093,
Local directory: /tmp/dask-scratch-space/worker-q98sjod2,Local directory: /tmp/dask-scratch-space/worker-q98sjod2

0,1
Comm: tcp://127.0.0.1:33969,Total threads: 4
Dashboard: http://127.0.0.1:40363/status,Memory: 15.61 GiB
Nanny: tcp://127.0.0.1:45281,
Local directory: /tmp/dask-scratch-space/worker-o2c12blj,Local directory: /tmp/dask-scratch-space/worker-o2c12blj

0,1
Comm: tcp://127.0.0.1:37209,Total threads: 4
Dashboard: http://127.0.0.1:42729/status,Memory: 15.61 GiB
Nanny: tcp://127.0.0.1:32987,
Local directory: /tmp/dask-scratch-space/worker-757_nh24,Local directory: /tmp/dask-scratch-space/worker-757_nh24

0,1
Comm: tcp://127.0.0.1:38461,Total threads: 4
Dashboard: http://127.0.0.1:40841/status,Memory: 15.61 GiB
Nanny: tcp://127.0.0.1:37713,
Local directory: /tmp/dask-scratch-space/worker-ld2gd1s5,Local directory: /tmp/dask-scratch-space/worker-ld2gd1s5

0,1
Comm: tcp://127.0.0.1:36445,Total threads: 4
Dashboard: http://127.0.0.1:42269/status,Memory: 15.61 GiB
Nanny: tcp://127.0.0.1:39447,
Local directory: /tmp/dask-scratch-space/worker-8p3l4aaf,Local directory: /tmp/dask-scratch-space/worker-8p3l4aaf


In [2]:
# Submit a function to be executed on the Dask cluster
f = client.submit(lambda: 10 + 1)
f.result()

11

In [3]:
# Use a high level collection API to distribute familiar work on the cluster
import dask.array as da
arr = da.random.random((1000, 1000), chunks=(100, 100))
arr.mean().compute()

np.float64(0.5002270886844968)

Dask 并不太关心你的代码在做什么，它只是尝试用其工作进程池尽可能快地运行整个图。因为我们所有的 GPU 计算都是在 Python 中完成的，所以 Dask 也可以分发我们的 GPU 代码。

In [4]:
client.close()

### 分布式集群

为了让 Dask 将图分发到多台机器上，它需要一个调度器进程和多个工作进程。我们可以手动启动这些进程，可以通过 CLI 命令 `dask-scheduler` 和 `dask-worker`，也可以使用 Dask 的任意数量的集群管理器。

#### 集群管理器

只要你有 Python 环境、网络连接并且可以启动调度器和工作进程，Dask 就可以在任意数量的计算环境中运行。

为了使创建 Dask 集群成为一致的体验，有许多集群管理器类可以导入和实例化，它们会为你构建集群。

大多数人首先接触的是 `LocalCluster`。当你创建这个类的实例时，它会检查本地计算机上可用的 CPU 和内存资源，并自动为调度器和适当数量的工作进程创建子进程。

In [5]:
from dask.distributed import LocalCluster

cluster = LocalCluster()
cluster

  return importlib.import_module(spec.name)
  return importlib.import_module(spec.name)
  return importlib.import_module(spec.name)
  return importlib.import_module(spec.name)
  return importlib.import_module(spec.name)


0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 5
Total threads: 20,Total memory: 78.05 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:42781,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:46009,Total threads: 4
Dashboard: http://127.0.0.1:37239/status,Memory: 15.61 GiB
Nanny: tcp://127.0.0.1:36311,
Local directory: /tmp/dask-scratch-space/worker-iphlcyet,Local directory: /tmp/dask-scratch-space/worker-iphlcyet

0,1
Comm: tcp://127.0.0.1:37653,Total threads: 4
Dashboard: http://127.0.0.1:39813/status,Memory: 15.61 GiB
Nanny: tcp://127.0.0.1:35217,
Local directory: /tmp/dask-scratch-space/worker-wjvbhhnu,Local directory: /tmp/dask-scratch-space/worker-wjvbhhnu

0,1
Comm: tcp://127.0.0.1:43417,Total threads: 4
Dashboard: http://127.0.0.1:36985/status,Memory: 15.61 GiB
Nanny: tcp://127.0.0.1:35679,
Local directory: /tmp/dask-scratch-space/worker-p19egkf8,Local directory: /tmp/dask-scratch-space/worker-p19egkf8

0,1
Comm: tcp://127.0.0.1:44437,Total threads: 4
Dashboard: http://127.0.0.1:43163/status,Memory: 15.61 GiB
Nanny: tcp://127.0.0.1:35351,
Local directory: /tmp/dask-scratch-space/worker-ya4cvwsv,Local directory: /tmp/dask-scratch-space/worker-ya4cvwsv

0,1
Comm: tcp://127.0.0.1:37045,Total threads: 4
Dashboard: http://127.0.0.1:45153/status,Memory: 15.61 GiB
Nanny: tcp://127.0.0.1:36761,
Local directory: /tmp/dask-scratch-space/worker-if053pp6,Local directory: /tmp/dask-scratch-space/worker-if053pp6


这对于试用 Dask 并使用它来利用本地机器上所有可用的 CPU 核心非常有用。

一旦你准备好超越计算机的限制，就有适用于 HPC 平台（如 SLURM、PBS 和 SGE）的集群管理器。还有适用于 Kubernetes、Hadoop 和公共云提供商（包括 Amazon Web Services、Microsoft Azure 和 Google Cloud Platform）的集群管理器。有关更多详细信息，请参阅 [Dask 部署文档](https://docs.dask.org/en/stable/deploying.html)。

```python
# 你可以将 LocalCluster 替换为其他集群类型

# from dask.distributed import LocalCluster
from dask_kubernetes import KubeCluster

# cluster = LocalCluster()
cluster = KubeCluster()  # 例如，替换为 Kubernetes

client = cluster.get_client()
```

In [6]:
cluster.close()

#### Dask CUDA

在使用 GPU 与 Dask 时，我们需要记住几件事。每个 Dask 工作进程需要恰好有一个 GPU，所以如果你的机器有多个 GPU，你需要每个设备一个工作进程。还有一些其他事情需要做，以便 Dask 工作进程能够成功利用 GPU。为了简化用户的操作，你可以使用 Python 包 `dask-cuda` 中的工具。

Dask CUDA 包有一个名为 `LocalCUDACluster` 的集群管理器和一个名为 `dask-cuda-worker` 的替代工作进程 CLI 命令。这两者都会检查你的硬件，并为每个 GPU 启动一个工作进程，并正确配置每个工作进程仅使用其分配的设备。

In [7]:
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster()
cluster

  return importlib.import_module(spec.name)


0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 1
Total threads: 1,Total memory: 78.05 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:45987,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:37161,Total threads: 1
Dashboard: http://127.0.0.1:43165/status,Memory: 78.05 GiB
Nanny: tcp://127.0.0.1:41793,
Local directory: /tmp/dask-scratch-space/worker-so2htgjs,Local directory: /tmp/dask-scratch-space/worker-so2htgjs


In [8]:
client = Client(cluster)

也可以配置利用 HPC 和云功能的其他集群管理器，使用 Dask CUDA 工作进程而不是常规工作进程。

一旦我们有了带有 GPU 工作进程的 Dask 集群，我们就可以手动提交一些用 Numba 编写的 CUDA 内核在这些 GPU 上执行。

In [9]:
from numba import cuda
from numba import config as numba_config
numba_config.CUDA_ENABLE_PYNVJITLINK = True

@cuda.jit
def some_kernel():
    i = 0
    while i < 1_000_000:
        i += 1

f = client.submit(some_kernel[1024*1024, 1024])
f

### 高级集合

幸运的是，我们不必在 Dask 中手动完成所有操作。Dask 有一个高级集合的概念，它实现了流行 Python 包的 API，但将数据结构和任务分块/分区，以便它们可以在 Dask 集群上运行。通常人们使用遵循 NumPy API 的 `dask.array`、遵循 Pandas API 的 `dask.dataframe` 和遵循 Scikit-Learn API 的 `dask.ml`。

这种方法可能听起来很熟悉，我们已经看到 RAPIDS 库模仿这些库的 API 来提供加速计算。Dask 做同样的事情，但用于分布式计算。这种方法的好处之一是我们可以将它们结合起来，对我们已经熟悉和喜爱的工具进行分布式和加速计算。

当 `dask.dataframe` 创建一个 DataFrame 时，它会构建一个由许多较小的 Pandas DataFrame 组成的任务图。然后，像计算序列平均值这样的操作将首先在每个 Pandas DataFrame 上执行，然后聚合结果以获得总体平均值。但 Dask 不仅限于在其 DataFrame 集合中使用 Pandas，它还可以利用遵循 Pandas API 的其他库，如 cuDF。

cuDF 附带了一个有用的辅助库，用于构建由 cuDF DataFrame 组成的 Dask DataFrame，我们可以像之前看到的那样加载数据并执行操作。

In [10]:
import dask
import cudf
import dask_cudf

In [None]:
def gen_partition():
    return cudf.datasets.timeseries()

gddf = dask_cudf.from_map(gen_partition, list(range(30)))
gddf

In [None]:
gddf.head()

In [None]:
len(gddf)

In [None]:
gddf.groupby("name").x.mean().compute()

但现在我们的 DataFrame 分布在所有 GPU 上，计算可以利用所有硬件的性能。

### 通信

在第 1 章探索 Numba CUDA 时，我们看到将数据从 CPU 内存移动到 GPU 内存时会有性能损失。在 GPU 内存之间以及不同机器上的 GPU 之间移动数据时也是如此。

默认情况下，Dask 使用自定义 TCP 协议在工作进程之间进行通信。这意味着从一个 GPU 到另一个 GPU 的任何内存传输都必须通过 PCI-e 通道返回到 CPU，进入操作系统的网络堆栈以路由到其目的地。如果目标 GPU 在同一台机器上，它将沿着 PCI-e 通道返回并进入 GPU。如果它位于另一台机器上，它将通过 IP 网络传输，很可能通过以太网连接。

在我们的两个 GPU 在主板上彼此相邻的情况下，这是非常浪费的。它们甚至可以通过 NVLINK 直接相互连接，或者至少连接到主板上的同一个 PCI-e 交换机。通过 CPU 路由每次传输都是浪费的，这就是 UCX 发挥作用的地方。

#### UCX

[UCX](https://openucx.org/) 是一种网络协议，可以检查系统的拓扑结构并通过加速硬件找到最佳路由。如果两个 GPU 通过 NVLINK 连接，那么 UCX 将使用它来传输数据，如果它们连接到同一个 PCI-e 交换机，那是次优选择。如果 GPU 在两台不同的机器上，但这些机器有 Infiniband 网卡，那么 UCX 可以利用 RDMA over Infiniband 在 GPU 之间直接传输数据。

UCX 将尽其所能在两个位置之间尽可能直接和高效地传输数据，最终才会回退到 TCP。

#### Dask 通信协议

Dask 支持用户可以配置的替代通信协议。这包括我们可以利用以获得更高性能的 UCX，以及其他协议，如 websockets，由于更容易代理，在现代系统架构中可能更灵活。

如果我们在 GPU 工作进程中使用 UCX，并且有像 NVLINK 或 Infiniband 这样的加速网络硬件，那么我们可以看到 GPU 工作进程之间的内存传输时间大大减少。

### 资源注解

我想在 Dask 和 GPU 方面讨论的最后一个主题是注解。Dask 有一个功能，任务图中的每个任务都可以用工作进程需要具备的要求进行注解，以便能够运行它。

当我们启动工作进程时，我们还可以添加资源标签，以便调度器可以将适当的任务放在适当的工作进程上。当我们的工作进程是混合配置时，此功能最强大。


```console
$ dask-cuda-worker scheduler:8786 --resources "GPU=2"
```

在你的任务图中可能有一些步骤，在中间计算期间内存使用量会大幅增加。将这些任务引导到比其他工作进程拥有更多内存的工作进程上可能会有所帮助。

如果不是所有工作进程都有 GPU，我们也可以将其用于 GPU 工作。拥有一些常规 Dask 工作进程来承担大部分任务，同时也有几个 GPU 工作进程来运行已优化为在 GPU 上运行的步骤，这是合理的。

如果你有一个利用 Dask 的现有工作负载并且想要尝试 GPU，这可能最有用。你可以添加另一个具有 GPU 的工作进程，选择工作流中的一些任务用 Numba 进行优化，并注解这些任务仅在你的 GPU 工作进程上运行。

```python
foo = client.submit(some_non_gpu_function)

with dask.annotate(resources={'GPU': 1}):
    bar = client.submit(a_gpu_function, foo)
    
baz = client.submit(another_non_gpu_function, bar)
```