Skip to content

Latest commit

 

History

History
669 lines (458 loc) · 37.3 KB

File metadata and controls

669 lines (458 loc) · 37.3 KB

五、流、事件、上下文和并发

在前面的章节中,我们看到在与 GPU 交互时,我们从主机执行两个主要操作:

  • 将内存数据复制到 GPU 和从 GPU 复制内存数据
  • 启动内核函数

我们知道单个内核中,其多个线程之间存在一个并发级别;然而,我们也可以使用多个内核GPU 内存操作,这意味着我们可以同时启动多个内存和内核操作,而无需等待每个操作完成。然而,另一方面,我们必须有一定的组织,以确保所有相互依赖的操作都是同步的;这意味着,在输入数据完全复制到设备内存之前,我们不应该启动特定内核,或者在内核完成执行之前,我们不应该将已启动内核的输出数据复制到主机

为此,我们有所谓的CUDA****流-一个是在 GPU 上按顺序运行的一系列操作。单流本身没有任何用处,关键是通过使用多个流来获得主机发出的 GPU 操作的并发性。这意味着我们应该将对应于不同流的 GPU 操作的启动交错,以便利用这一概念。

在本章中,我们将广泛讨论流的概念。此外,我们将研究事件,这是流的一个特性,用于精确计时内核,并向主机指示在给定流中完成了哪些操作。

最后,我们将简要介绍 CUDA上下文上下文可以被认为类似于操作系统中的进程,因为 GPU 将每个上下文的数据和内核代码隔离,并封装在 GPU 上当前存在的其他上下文之外。我们将在本章末尾看到这方面的基本知识。

以下是本章的学习成果:

  • 理解设备和流同步的概念
  • 学习如何有效地使用流来组织并行 GPU 操作
  • 学习如何有效使用 CUDA 活动
  • 理解 CUDA 上下文
  • 学习如何在给定上下文中显式同步
  • 学习如何显式创建和销毁 CUDA 上下文
  • 学习如何使用上下文,以便在主机上的多个进程和线程之间使用 GPU

技术要求

本章要求 Linux 或 Windows 10 PC 配备现代 NVIDIA GPU(2016 年及以后),并安装所有必要的 GPU 驱动程序和 CUDA 工具包(9.0 及以后)。还需要使用 PyCUDA 模块安装合适的 Python 2.7(如 Anaconda Python 2.7)。

本章的代码也可在 GitHub 上获得:

https://github.com/PacktPublishing/Hands-On-GPU-Programming-with-Python-and-CUDA

For more information about the prerequisites, check the Preface of this book, and for the software and hardware requirements, check the README in https://github.com/PacktPublishing/Hands-On-GPU-Programming-with-Python-and-CUDA.

CUDA 设备同步

在使用 CUDA 流之前,我们需要了解设备同步的概念。这是一种主机阻止任何进一步执行的操作,直到所有发给 GPU 的操作(内存传输和内核执行)完成。这是确保依赖于先前操作的操作不会无序执行所必需的,例如,确保 CUDA 内核启动在主机尝试读取其输出之前完成。

在 CUDA C 中,通过cudaDeviceSynchronize功能执行设备同步。此功能有效地阻止了主机上的进一步执行,直到所有 GPU 操作完成。cudaDeviceSynchronize非常重要,通常是大多数关于 CUDA C 的书籍中涉及的第一个主题之一,我们还没有看到这一点,因为 PyCUDA 一直在无形中根据需要自动为我们调用它。让我们看一看 CUDA C 代码的例子,看看这是如何手工完成的:

// Copy an array of floats from the host to the device.
cudaMemcpy(device_array, host_array, size_of_array*sizeof(float), cudaMemcpyHostToDevice);
// Block execution until memory transfer to device is complete.
cudaDeviceSynchronize();
// Launch CUDA kernel.
Some_CUDA_Kernel <<< block_size, grid_size >>> (device_array, size_of_array);
// Block execution until GPU kernel function returns.
cudaDeviceSynchronize();
// Copy output of kernel to host.
cudaMemcpy(host_array,  device_array, size_of_array*sizeof(float), cudaMemcpyDeviceToHost);
// Block execution until memory transfer to host is complete.
cudaDeviceSynchronize();

在这段代码中,我们看到我们必须在每次 GPU 操作后直接与设备同步。如果我们一次只需要调用一个 CUDA 内核,如图所示,这很好。但是,如果我们想同时启动多个独立的内核和在不同数据阵列上运行的内存操作,那么在整个设备上进行同步将是低效的。在这种情况下,我们应该跨多个流进行同步。我们现在就来看看怎么做。

使用 PyCUDA 流类

我们将从一个简单的 PyCUDA 程序开始;所有这些都将生成一系列随机 GPU 阵列,用一个简单的内核处理每个阵列,然后将阵列复制回主机。然后我们将修改它以使用流。请记住,除了说明如何使用流和您可以获得的一些基本性能增益之外,这个程序将毫无意义。(这个程序可以在 GitHub 存储库的5目录下的multi-kernel.py文件中看到。)

当然,我们将首先导入适当的 Python 模块以及time函数:

import pycuda.autoinit
import pycuda.driver as drv
from pycuda import gpuarray
from pycuda.compiler import SourceModule
import numpy as np
from time import time

现在我们将指定我们希望在这里处理多少个数组,每个数组将由不同的内核启动处理。我们还指定将生成的随机数组的长度,如下所示:

num_arrays = 200
array_len = 1024**2

我们现在有一个在每个数组上运行的内核;所有这些都将在数组中的每个点上进行迭代,并将其乘以 2 除以 50 次,最终使数组保持完整。我们希望限制每个内核启动将使用的线程数量,这将帮助我们在 GPU 上的多个内核启动之间获得并发性,以便让每个线程通过for循环迭代阵列的不同部分。(同样,请记住,除了学习流和同步之外,此内核函数对于任何其他方面都是完全无用的!)如果每次内核启动都使用太多线程,那么以后获得并发性将更加困难:

ker = SourceModule(""" 
__global__ void mult_ker(float * array, int array_len)
{
     int thd = blockIdx.x*blockDim.x + threadIdx.x;
     int num_iters = array_len / blockDim.x;

     for(int j=0; j < num_iters; j++)
     {
         int i = j * blockDim.x + thd;

         for(int k = 0; k < 50; k++)
         {
              array[i] *= 2.0;
              array[i] /= 2.0;
         }
     }
}
""")

mult_ker = ker.get_function('mult_ker')

现在,我们将生成一些随机数据数组,将这些数组复制到 GPU,通过 64 个线程在每个数组上迭代地启动内核,然后将输出数据复制回主机,并使用 NumPy 的allclose函数断言这一点。我们将使用 Python 的time函数对所有操作从开始到结束的持续时间进行计时,如下所示:

data = []
data_gpu = []
gpu_out = []

# generate random arrays.
for _ in range(num_arrays):
    data.append(np.random.randn(array_len).astype('float32'))

t_start = time()

# copy arrays to GPU.
for k in range(num_arrays):
    data_gpu.append(gpuarray.to_gpu(data[k]))

# process arrays.
for k in range(num_arrays):
    mult_ker(data_gpu[k], np.int32(array_len), block=(64,1,1), grid=(1,1,1))

# copy arrays from GPU.
for k in range(num_arrays):
    gpu_out.append(data_gpu[k].get())

t_end = time()

for k in range(num_arrays):
    assert (np.allclose(gpu_out[k], data[k]))

print 'Total time: %f' % (t_end - t_start)

我们现在准备运行这个程序。我将立即运行它:

所以,这个程序花了将近三秒钟的时间才完成。我们将进行一些简单的修改,以便我们的程序可以使用流,然后看看我们是否可以获得任何性能提升(这可以在存储库中的multi-kernel_streams.py文件中看到)。

首先,我们注意到,对于每一次内核启动,我们都有一个单独的数据数组进行处理,这些数据存储在 Python 列表中。我们必须为每个单独的阵列/内核启动对创建一个单独的流对象,因此我们首先添加一个名为streams的空列表,它将保存我们的流对象:

data = []
data_gpu = []
gpu_out = []
streams = []

我们现在可以生成一系列流,用于组织内核启动。我们可以通过Stream类从pycuda.driver子模块中获取流对象。由于我们已导入此子模块并将其别名为drv,因此我们可以用新的流对象填充列表,如下所示:

for _ in range(num_arrays):
    streams.append(drv.Stream())

现在,我们必须首先修改将数据传输到 GPU 的内存操作。考虑以下步骤:

  1. 使用gpuarray.to_gpu函数查找将阵列复制到 GPU 的第一个循环。我们希望切换到此函数的异步和流友好版本,gpu_array.to_gpu_async。(我们现在还必须指定每个内存操作应使用的流与stream参数):
for k in range(num_arrays):
    data_gpu.append(gpuarray.to_gpu_async(data[k], stream=streams[k]))
  1. 我们现在可以发布内核了。这与前面一样,只是我们必须使用stream参数指定要使用的流:
for k in range(num_arrays):
    mult_ker(data_gpu[k], np.int32(array_len), block=(64,1,1), grid=(1,1,1), stream=streams[k])
  1. 最后,我们需要从 GPU 中提取数据。我们可以通过将gpuarray get功能切换到get_async,然后再次使用stream参数来实现,如下所示:
for k in range(num_arrays):
    gpu_out.append(data_gpu[k].get_async(stream=streams[k]))

现在,我们已经准备好运行流友好型修改程序:

在这种情况下,我们的性能提高了三倍,考虑到我们必须进行的修改数量很少,这还不算太坏。但在我们继续之前,让我们试着更深入地理解为什么会这样。

让我们考虑两个 CUDA 内核启动的情况。我们还将在启动内核之前和之后执行与每个内核对应的 GPU 内存操作,总共执行六个操作。我们可以通过在x轴上向右移动的图形来可视化 GPU 上发生的关于时间的操作,而y轴对应于在特定时间在 GPU 上执行的操作。下图对此进行了描述:

不难想象为什么流在性能提升方面工作得这么好,因为单个流中的操作被阻塞,直到所有必要的之前的操作都被竞争,我们将在不同的 GPU 操作之间获得并发性,并充分利用我们的设备。这可以从并发操作的大量重叠中看出。随着时间的推移,我们可以可视化基于流的并发,如下所示:

使用 CUDA streams 并发康威生命游戏

现在我们将看到一个更有趣的应用程序,我们将修改上一章中的生活(康威的生活游戏模拟),这样我们将同时显示四个独立的动画窗口。(如果还没有,建议您查看上一章中的示例。)

让我们从存储库的最后一章中获取一份旧生命模拟的副本,它应该位于4目录的conway_gpu.py下。现在,我们将把它修改为新的基于 CUDA 流的并发生命模拟。(我们稍后将看到的这种新的基于流的模拟也可以在本章目录5conway_gpu_streams.py文件中找到。)

转到文件末尾的 main 函数。我们将设置一个新变量,该变量指示我们将使用num_concurrent一次显示多少并发动画(其中N与前面一样指示模拟晶格的高度/宽度)。我们将在此处将其设置为4,但您可以随意尝试其他值:

if __name__ == '__main__':

    N = 128
    num_concurrent = 4

我们现在需要一组num_concurrent流对象,还需要在 GPU 上分配一组输入和输出晶格。当然,我们只需将它们存储在列表中,并像以前一样初始化晶格。我们将设置一些空列表,并在循环中使用适当的对象填充每个空列表(注意我们如何在每次迭代中设置新的初始状态晶格,将其发送到 GPU,并将其连接到lattices_gpu

streams = []
lattices_gpu = []
newLattices_gpu = []

for k in range(num_concurrent):
    streams.append(drv.Stream())
    lattice = np.int32( np.random.choice([1,0], N*N, p=[0.25, 0.75]).reshape(N, N) )
    lattices_gpu.append(gpuarray.to_gpu(lattice)) 
    newLattices_gpu.append(gpuarray.empty_like(lattices_gpu[k])) 

Since we're only doing this loop once during the startup of our program and the virtually all of the computational work will be in the animation loop, we really don't have to worry about actually using the streams we just immediately generated.

现在,我们将使用 SubPlot 函数使用 Matplotlib 设置环境;请注意,我们如何通过设置ncols参数来设置多个动画绘图。我们将有另一个列表结构,对应于imgs中动画更新所需的图像。请注意,我们现在如何使用get_async和相应的流进行设置:

fig, ax = plt.subplots(nrows=1, ncols=num_concurrent)
imgs = []

for k in range(num_concurrent):
    imgs.append( ax[k].imshow(lattices_gpu[k].get_async(stream=streams[k]), interpolation='nearest') )

main 函数中最后要更改的是倒数第二行,以ani = animation.FuncAnimation开头。让我们修改update_gpu函数的参数,以反映我们正在使用的新列表,并再添加两个参数,一个用于传递streams列表,另一个参数用于指示应该有多少并发动画:

ani = animation.FuncAnimation(fig, update_gpu, fargs=(imgs, newLattices_gpu, lattices_gpu, N, streams, num_concurrent) , interval=0, frames=1000, save_count=1000)    

我们现在对update_gpu函数进行必要的修改,以获取这些额外参数。在文件中向上滚动一点,并按如下方式修改参数:

def update_gpu(frameNum, imgs, newLattices_gpu, lattices_gpu, N, streams, num_concurrent)

我们现在需要修改这个函数来迭代num_concurrent次,并将imgs的每个元素都像以前一样进行设置,最后返回整个imgs列表:

for k in range(num_concurrent):
    conway_ker( newLattices_gpu[k], lattices_gpu[k], grid=(N/32,N/32,1), block=(32,32,1), stream=streams[k] )
     imgs[k].set_data(newLattices_gpu[k].get_async(stream=streams[k]) )
     lattices_gpu[k].set_async(newLattices_gpu[k], stream=streams[k])

 return imgs

请注意,我们对每个内核所做的更改都是在适当的流中启动的,而get已切换到与同一流同步的get_async

最后,循环中的最后一行将 GPU 数据从一个设备阵列复制到另一个设备阵列,而无需任何重新分配。以前,我们可以使用速记切片操作符[:]在阵列之间直接复制元素,而无需在 GPU 上重新分配任何内存;在这种情况下,切片运算符表示法充当 GPU 阵列的 PyCUDAset函数的别名。(set当然是将一个 GPU 阵列复制到另一个相同大小的 GPU 阵列的函数,无需任何重新分配。)幸运的是,确实有一个流同步版本的此函数set_async,但我们需要专门使用它来调用此函数,显式指定要复制的阵列和要使用的流

我们现在已经完成并准备好运行这个。进入终端,在命令行输入python conway_gpu_streams.py即可欣赏节目:

事件

事件是 GPU 上存在的对象,其目的是作为一系列操作的里程碑或进度标记。事件通常用于在设备端提供测量持续时间以精确计时操作;到目前为止,我们一直在使用基于主机的 Python 分析器和标准 Python 库函数(如time)进行测量。此外,它们还可用于为主机提供流状态及其已完成的操作的状态更新,以及基于流的显式同步。

让我们从一个示例开始,该示例不使用显式流,只使用事件来度量一次内核启动。(如果我们在代码中没有显式地使用流,CUDA 实际上无形地定义了一个默认流,所有操作都将放入其中)。

在这里,我们将使用与本章开头相同的无用乘法/除法循环内核和头,并修改以下大部分内容。在本例中,我们希望单个内核实例能够长时间运行,因此我们将生成一个巨大的随机数数组,供内核处理,如下所示:

array_len = 100*1024**2
data = np.random.randn(array_len).astype('float32')
data_gpu = gpuarray.to_gpu(data)

我们现在使用pycuda.driver.Event构造函数构造事件(当然,在前面的 import 语句中将pycuda.driver化名为drv

我们将在这里创建两个事件对象,一个用于内核启动的开始,另一个用于内核启动的结束(我们将始终需要两个事件对象来测量任何单个 GPU 操作,我们将很快看到):

start_event = drv.Event()
end_event = drv.Event()

现在,我们已经准备好启动内核了,但是首先,我们必须用事件记录函数标记start_event实例在执行流中的位置。我们启动内核,然后在执行流中标记end_event的位置,并用record标记:

start_event.record()
mult_ker(data_gpu, np.int32(array_len), block=(64,1,1), grid=(1,1,1))
end_event.record()

事件有一个二进制值,指示它们是否已到达,该值由函数查询给出。让我们在内核启动后立即打印这两个事件的状态更新:

print 'Has the kernel started yet? {}'.format(start_event.query())
 print 'Has the kernel ended yet? {}'.format(end_event.query())

现在让我们运行这个程序,看看会发生什么:

我们的目标是最终测量内核执行的持续时间,但是内核还没有启动。PyCUDA 中的内核是异步启动的(无论它们是否存在于特定流中),因此我们必须确保主机代码与 GPU 正确同步。

由于end_event排在最后,我们可以通过此事件对象的同步功能阻止进一步的主机代码执行,直到内核完成;这将确保在执行任何其他主机代码行之前,内核已经完成。让我们在适当的位置添加一行代码:

end_event.synchronize()

print 'Has the kernel started yet?  {}'.format(start_event.query())

print 'Has the kernel ended yet? {}'.format(end_event.query())

最后,我们准备度量内核的执行时间;我们通过事件对象的time_tilltime_since 操作与另一个事件对象进行比较,以毫秒为单位获得这两个事件之间的时间。让我们在end_event上使用start_eventtime_till 操作:

print 'Kernel execution time in milliseconds: %f ' % start_event.time_till(end_event)

通过time_tilltime_since功能,可以测量 GPU 上已经发生的两个事件之间的持续时间。请注意,这些函数总是以毫秒为单位返回值!

现在让我们再次尝试运行我们的程序:

(此示例也可在存储库中的simple_event_example.py文件中找到。)

事件和流

现在我们将了解如何使用事件对象来处理流;这将为我们提供对各种 GPU 操作流程的高度复杂的控制,使我们能够通过query功能准确了解每个流的进展情况,甚至允许我们在忽略其他流的情况下与主机同步特定流

但是,首先,我们必须认识到,每个流都必须有自己的事件对象的专用集合;多个流不能共享一个事件对象。让我们通过修改前面的示例multi_kernel_streams.py来了解这到底意味着什么。在内核定义之后,我们再添加两个空列表-start_eventsend_events。我们将用事件对象填充这些列表,这些对象将对应于我们拥有的每个流。这将允许我们在每个流中计时一个 GPU 操作,因为每个 GPU 操作需要两个事件:

data = []
data_gpu = []
gpu_out = []
streams = []
start_events = []
end_events = []

for _ in range(num_arrays):
    streams.append(drv.Stream())
    start_events.append(drv.Event())
    end_events.append(drv.Event())

现在,我们可以通过修改第二个循环,在启动开始和结束时使用事件记录,来分别为每个内核启动计时。注意,这里,由于有多个流,我们必须输入适当的流作为每个事件对象的record函数的参数。另外,请注意,我们可以在第二个循环中捕获结束事件;这仍然允许我们完美地捕获内核执行持续时间,而不会延迟后续内核的启动。现在考虑下面的代码:

for k in range(num_arrays):
    start_events[k].record(streams[k])
    mult_ker(data_gpu[k], np.int32(array_len), block=(64,1,1), grid=(1,1,1), stream=streams[k])

for k in range(num_arrays):
    end_events[k].record(streams[k])

现在我们将提取每个单独内核启动的持续时间。让我们在迭代断言检查之后添加一个新的空列表,并通过time_till函数将其填入持续时间:

kernel_times = []
for k in range(num_arrays):
   kernel_times.append(start_events[k].time_till(end_events[k]))

现在让我们在最后添加两条print语句,告诉我们内核执行时间的平均值和标准偏差:

print 'Mean kernel duration (milliseconds): %f' % np.mean(kernel_times)
print 'Mean kernel standard deviation (milliseconds): %f' % np.std(kernel_times)

我们现在可以运行以下程序:

(此示例也可在存储库中作为multi-kernel_events.py提供。)

我们看到,内核持续时间的标准偏差程度相对较低,这很好,考虑到每个内核在相同的块和网格大小上处理相同数量的数据,如果偏差程度较高,则意味着我们在内核执行中对 GPU 的使用极不均衡,我们必须重新调整参数以获得更高级别的并发性。

上下文

CUDA上下文通常被描述为类似于操作系统中的进程。让我们回顾一下这意味着什么——进程是在计算机上运行的单个程序的实例;操作系统内核之外的所有程序都在一个进程中运行。每个进程都有自己的一组指令、变量和分配的内存,一般来说,对其他进程的操作和内存是盲目的。当进程结束时,操作系统内核将执行清理,确保进程分配的所有内存都已取消分配,并关闭进程使用的所有文件、网络连接或其他资源。(好奇的 Linux 用户可以使用命令行top命令查看其计算机上运行的进程,而 Windows 用户可以使用 Windows 任务管理器查看这些进程)。

与进程类似,上下文与使用 GPU 的单个主机程序相关联。上下文在内存中保存所有 CUDA 内核和分配的内存,这些内存正在使用,并且对当前存在的其他上下文的内核和内存是盲的。当上下文被破坏时(例如,在基于 GPU 的程序结束时),GPU 将清除上下文中的所有代码和分配的内存,从而释放资源用于其他当前和将来的上下文。到目前为止,我们编写的程序都存在于一个单一的上下文中,因此这些操作和概念对我们来说是不可见的

我们还要记住,一个程序作为一个进程启动,但它可以自行分叉以跨多个进程或线程运行。类似地,单个 CUDA 主机程序可以在 GPU 上生成和使用多个 CUDA 上下文。通常,当我们分叉一个主机进程的新进程或线程时,当我们想要获得主机端并发性时,我们将创建一个新的上下文。(但是,应该强调的是,主机进程和 CUDA 上下文之间没有精确的一对一关系)。

与生活中的许多其他领域一样,我们将从一个简单的例子开始。我们将首先了解如何访问程序的默认上下文并在其中进行同步。

同步当前上下文

我们将看到如何在 Python 中的上下文中显式地同步我们的设备,就像在 CUDAC 中一样;这实际上是 CUDA C 中最基本的技能之一,并且在大多数其他相关书籍的第一章或第二章中都有介绍。到目前为止,我们已经能够避免这个话题,因为 PyCUDA 已经为我们自动执行了大多数同步,并使用了to_gpugetpycuda.gpuarray功能;否则,在to_gpu_asyncget_async函数的情况下,同步是由流处理的,正如我们在本章开头所看到的。

我们将谦虚地开始修改我们在第 3 章PyCUDA 入门中编写的程序,该程序使用显式上下文同步生成 Mandelbrot 集的图像。(这在这里作为存储库中3目录下的文件gpu_mandelbrot0.py提供。)

We won't get any performance gains over our original Mandelbrot program here; the only point of this exercise is just to help us understand CUDA contexts and GPU synchronization.

看看标题,我们当然看到了import pycuda.autoinit线。我们可以通过pycuda.autoinit.context访问当前上下文对象,也可以通过调用pycuda.autoinit.context.synchronize()函数在当前上下文中进行同步。

现在,让我们修改gpu_mandelbrot函数来处理显式同步。我们看到的第一条 GPU 相关线路是:

mandelbrot_lattice_gpu = gpuarray.to_gpu(mandelbrot_lattice)

我们现在可以将其更改为显式同步。我们可以通过to_gpu_async异步复制到 GPU,然后同步如下:

mandelbrot_lattice_gpu = gpuarray.to_gpu_async(mandelbrot_lattice)
pycuda.autoinit.context.synchronize()

然后我们看到下一行使用gpuarray.empty函数在 GPU 上分配内存。根据 GPU 体系结构的性质,CUDA 中的内存分配是自动同步的;这里没有等效的异步内存分配。因此,我们保持这条线不变。

Memory allocation in CUDA is always synchronized!

我们现在看到接下来的两行,我们的 Mandelbrot 内核是通过对mandel_ker的调用启动的,我们通过对get的调用复制 Mandelbrotgpuarray对象的内容。我们在内核启动后进行同步,将get切换到get_async,最后同步最后一行:

mandel_ker( mandelbrot_lattice_gpu, mandelbrot_graph_gpu, np.int32(max_iters), np.float32(upper_bound))
pycuda.autoinit.context.synchronize()
mandelbrot_graph = mandelbrot_graph_gpu.get_async()
pycuda.autoinit.context.synchronize()

我们现在可以运行它,它将生成一个 Mandelbrot 映像到磁盘,就像第 3 章开始使用 PyCUDA 一样。

(此示例也可在存储库中作为gpu_mandelbrot_context_sync.py提供。)

手动上下文创建

到目前为止,我们在所有 PyCUDA 项目开始时一直在导入pycuda.autoinit;这在程序开始时有效地创建了一个上下文,并在程序结束时将其销毁

让我们尝试手动执行此操作。我们将制作一个小程序,只需将一个小阵列复制到 GPU,然后将其复制回主机,打印阵列,然后退出。

我们从进口开始:

import numpy as np
from pycuda import gpuarray
import pycuda.driver as drv

首先,我们使用pycuda.driver.init函数初始化 CUDA,在这里别名为drv

drv.init()

现在我们选择要使用的 GPU;这对于具有多个 GPU 的情况是必要的。我们可以使用pycuda.driver.Device选择特定的 GPU;如果您只有一个 GPU,就像我一样,您可以通过pycuda.driver.Device(0)访问它,如下所示:

dev = drv.Device(0)

我们现在可以使用make_context在此设备上创建一个新的上下文,如下所示:

ctx = dev.make_context()

现在我们有了一个新的上下文,它将自动成为默认上下文。让我们将阵列复制到 GPU 中,将其复制回主机,然后打印:

x = gpuarray.to_gpu(np.float32([1,2,3]))
print x.get()

现在我们完成了。我们可以通过调用pop函数来破坏上下文:

ctx.pop()

就这样!我们应该始终记住在程序存在之前销毁我们显式使用pop创建的上下文。

(这个例子可以在存储库中本章目录下的simple_context_create.py文件中看到。)

主机端多处理和多线程

当然,我们可以通过在主机的 CPU 上使用多个进程或线程来获得主机端的并发性。现在让我们通过快速概述来区分主机端操作系统进程和线程。

存在于操作系统内核之外的每个主机端程序都作为一个进程执行,也可以存在于多个进程中。一个进程有自己的地址空间,因为它与所有其他进程同时运行,并且独立于所有其他进程。一般来说,一个进程对其他进程的操作是盲目的,尽管多个进程可以通过套接字或管道进行通信。在 Linux 和 Unix 中,fork 系统调用产生了新的进程。

相反,主机端线程存在于单个进程中,多个线程也可以存在于单个进程中。单个进程中的多个线程同时运行。同一进程中的所有线程在进程内共享相同的地址空间,并且可以访问相同的共享变量和数据。通常,资源锁用于在多个线程之间访问数据,以避免争用情况。在编译语言如 C、C++或 FORTRAN 中,多个进程线程通常用 P 螺纹或 OpenMP API 来管理。

线程比进程轻得多,操作系统内核在单个进程中的多个线程之间切换任务要比在多个进程之间切换任务快得多。通常,操作系统内核会在不同的 CPU 内核上自动执行不同的线程和进程,以建立真正的并发性。

Python 的一个特点是,尽管它支持通过threading模块执行多线程,但所有线程都将在同一个 CPU 核心上执行。这是由于 Python 作为解释脚本语言的技术性,并且与 Python 的全局标识符锁(GIL)有关。不幸的是,为了通过 Python 在主机上实现真正的多核并发,我们必须使用multiprocessing模块生成多个进程。(不幸的是,由于 Windows 处理进程的方式,多处理模块目前在 Windows 下无法完全发挥作用。令人遗憾的是,Windows 用户如果想实现任何形式的主机端并发,就必须坚持单核多线程。)

我们现在将看到如何在 Python 中使用这两个线程来使用基于 GPU 的操作;Linux 用户应该注意,这可以通过切换对threadingmultiprocessing的引用,以及对ThreadProcess的引用来轻松地扩展到进程,因为这两个模块的外观和行为都类似。然而,根据 PyCUDA 的性质,我们必须为我们将使用的每个线程或进程创建一个新的 CUDA 上下文,以利用 GPU。现在让我们看看如何做到这一点。

主机端并发的多上下文

让我们首先通过一个简单的示例简要回顾一下如何在 Python 中创建一个可以向主机返回值的主机线程。(这个例子也可以在存储库中5下的single_thread_example.py文件中看到。)我们将使用threading模块中的Thread类来创建Thread的子类,如下所示:

import threading
class PointlessExampleThread(threading.Thread):

我们现在设置了构造函数。我们调用父类的构造函数,并在对象中设置一个空变量,该变量将作为线程的返回值:

def __init__(self):
    threading.Thread.__init__(self)
    self.return_value = None

现在,我们在 thread 类中设置了 run 函数,这是线程启动时将执行的函数。我们将让它打印一行并设置返回值:

def run(self):
    print 'Hello from the thread you just spawned!'
    self.return_value = 123

我们最后必须设置连接函数。这将允许我们从线程接收返回值:

def join(self):
    threading.Thread.join(self)
    return self.return_value

现在我们已经完成了线程类的设置。让我们启动这个类的一个实例作为NewThread对象,通过调用start方法生成新线程,然后通过调用join阻止执行并从主机线程获取输出:

NewThread = PointlessExampleThread()
NewThread.start()
thread_output = NewThread.join()
print 'The thread completed and returned this value: %s' % thread_output

现在让我们运行这个:

现在,我们可以在主机上的多个并发线程之间扩展这个想法,通过多个上下文和线程来启动并发 CUDA 操作。我们现在来看最后一个例子。让我们从本章开始重新使用无意义的乘法/除法内核,并在生成的每个线程中启动它。

首先,让我们看看进口。由于我们正在创建显式上下文,请记住删除pycuda.autoinit并在末尾添加导入threading

import pycuda
import pycuda.driver as drv
from pycuda import gpuarray
from pycuda.compiler import SourceModule
import numpy as np
from time import time
import threading 

我们将使用与前面相同的数组大小,但这次我们将在线程数和数组数之间建立直接对应关系。通常,我们不希望在主机上产生超过 20 个线程,因此我们只会选择10数组。现在,考虑下面的代码:

num_arrays = 10
array_len = 1024**2

现在,我们将旧内核存储为字符串对象;由于这只能在上下文中编译,因此我们必须在每个线程中分别编译:

kernel_code = """ 
__global__ void mult_ker(float * array, int array_len)
{
     int thd = blockIdx.x*blockDim.x + threadIdx.x;
     int num_iters = array_len / blockDim.x;
    for(int j=0; j < num_iters; j++)
     {
     int i = j * blockDim.x + thd;
     for(int k = 0; k < 50; k++)
     {
         array[i] *= 2.0;
         array[i] /= 2.0;
     }
 }
}
"""

现在我们可以开始上课了。我们将像前面一样创建threading.Thread的另一个子类,并设置构造函数以一个参数作为输入数组。我们将使用None初始化一个输出变量,就像我们之前所做的那样:

class KernelLauncherThread(threading.Thread):
    def __init__(self, input_array):
        threading.Thread.__init__(self)
        self.input_array = input_array
        self.output_array = None

现在我们可以编写run函数了。我们选择设备,在该设备上创建上下文,编译内核,并提取内核函数引用。注意self对象的用法:

def run(self):
    self.dev = drv.Device(0)
    self.context = self.dev.make_context()
    self.ker = SourceModule(kernel_code)
    self.mult_ker = self.ker.get_function('mult_ker')

现在,我们将阵列复制到 GPU,启动内核,并将输出复制回主机。然后我们破坏上下文:

self.array_gpu = gpuarray.to_gpu(self.input_array)
self.mult_ker(self.array_gpu, np.int32(array_len), block=(64,1,1), grid=(1,1,1))
self.output_array = self.array_gpu.get()
self.context.pop()

最后,我们建立了连接函数。这将返回output_array到主机:

 def join(self):
     threading.Thread.join(self)
     return self.output_array

我们现在完成了我们的子类。我们将设置一些空列表来保存我们的随机测试数据、线程对象和线程输出值,与之前类似。然后,我们将生成一些随机数组进行处理,并设置一个内核启动器线程列表,这些线程将在每个对应的数组上运行:

data = []
gpu_out = []
threads = []
for _ in range(num_arrays):
    data.append(np.random.randn(array_len).astype('float32'))
for k in range(num_arrays):
 threads.append(KernelLauncherThread(data[k]))

现在我们将启动每个线程对象,并使用join将其输出提取到gpu_out列表中:

for k in range(num_arrays):
    threads[k].start()

for k in range(num_arrays):
    gpu_out.append(threads[k].join())

最后,我们只需对输出数组执行一个简单的断言,以确保它们与输入相同:

for k in range(num_arrays):
    assert (np.allclose(gpu_out[k], data[k]))

这个例子可以在存储库中的multi-kernel_multi-thread.py文件中看到。

总结

本章开始时,我们学习了设备同步以及主机上 GPU 操作同步的重要性;这允许依赖操作允许先行操作在继续之前完成。这个概念对我们来说是隐藏的,因为 PyCUDA 一直在为我们自动处理同步,直到现在。然后我们了解了 CUDA 流,它允许独立的操作序列在 GPU 上同时执行,而无需在整个 GPU 上同步,这可以大大提高性能;然后我们了解了 CUDA 事件,它允许我们对给定流中的单个 CUDA 内核计时,并确定流中是否发生了特定操作。接下来,我们学习了上下文,它类似于主机操作系统中的进程。我们学习了如何显式地跨整个 CUDA 上下文进行同步,然后了解了如何创建和销毁上下文。最后,我们了解了如何在 GPU 上生成多个上下文,以允许主机上多个线程或进程之间使用 GPU。

问题

  1. 在第一个示例中的内核启动参数中,我们的内核都是通过 64 个线程启动的。如果我们将 GPU 中的线程数增加到或超过内核数,这将如何影响原始版本和流版本的性能?
  2. 考虑 CUDA C 示例,在本章的开头给出,说明了使用 Type T0.您认为不使用流而只使用cudaDeviceSynchronize就可以在多个内核之间获得某种程度的并发性吗?
  3. 如果您是 Linux 用户,请修改上一个在进程而不是线程上操作的示例。
  4. 考虑 ALE T0 程序;我们说过,内核执行持续时间的标准偏差很低是好的。如果标准差很高,为什么会不好?
  5. 在上一个示例中,我们只使用了 10 个主机端线程。列举两个为什么我们必须使用相对较少的线程或进程在主机上启动并发 GPU 操作的原因。