# 演示CUDA流的用法及其所带来的性能提升

In [1]:
import pycuda.autoinit
import pycuda.driver as drv
from pycuda import gpuarray
from pycuda.compiler import SourceModule
import numpy as np
from time import time

# 指定需要处理多少个数组
num_arrays = 200
# 指定数组大小
array_len = 1024**2

# 遍历数组中的所有元素，先乘2，再除以2，并进行50次这样的运算
ker = SourceModule("""       
__global__ void mult_ker(float * array, int array_len)
{
     int thd = blockIdx.x*blockDim.x + threadIdx.x;
     int num_iters = array_len / blockDim.x;
     for(int j=0; j < num_iters; j++)
     {
         int i = j * blockDim.x + thd;
         for(int k = 0; k < 50; k++)
         {
              array[i] *= 2.0;
              array[i] /= 2.0;
         }
     }
}
""")

mult_ker = ker.get_function('mult_ker')

data = []
data_gpu = []
gpu_out = []

# generate random arrays.
for _ in range(num_arrays):
    data.append(np.random.randn(array_len).astype('float32'))

t_start = time()

# copy arrays to GPU.
for k in range(num_arrays):
    data_gpu.append(gpuarray.to_gpu(data[k]))

# process arrays.
for k in range(num_arrays):
    mult_ker(data_gpu[k], np.int32(array_len), block=(64,1,1), grid=(1,1,1))

# copy arrays from GPU.
for k in range(num_arrays):
    gpu_out.append(data_gpu[k].get())

t_end = time()

for k in range(num_arrays):
    assert (np.allclose(gpu_out[k], data[k]))

print('Total time: %f' % (t_end - t_start))

Total time: 2.445618


我们对上面的程序稍加修改，使其使用CUDA流。

In [2]:
import pycuda.autoinit
import pycuda.driver as drv
from pycuda import gpuarray
from pycuda.compiler import SourceModule
import numpy as np
from time import time

num_arrays = 200
array_len = 1024**2

ker = SourceModule("""       
__global__ void mult_ker(float * array, int array_len)
{
     int thd = blockIdx.x*blockDim.x + threadIdx.x;
     int num_iters = array_len / blockDim.x;
     for(int j=0; j < num_iters; j++)
     {
         int i = j * blockDim.x + thd;
         for(int k = 0; k < 50; k++)
         {
              array[i] *= 2.0;
              array[i] /= 2.0;
         }
     }
}
""")

mult_ker = ker.get_function('mult_ker')

data = []
data_gpu = []
gpu_out = []
# 为每个单独的数组启动创建一个单独的流对象
streams = []
# 用新建的CUDA流对象来填充列表
for _ in range(num_arrays):
    streams.append(drv.Stream())

# generate random arrays.
for _ in range(num_arrays):
    data.append(np.random.randn(array_len).astype('float32'))

t_start = time()

# copy arrays to GPU.
for k in range(num_arrays):
    data_gpu.append(gpuarray.to_gpu_async(data[k], stream=streams[k]))

# process arrays.
for k in range(num_arrays):
    mult_ker(data_gpu[k], np.int32(array_len), block=(64,1,1), grid=(1,1,1), stream=streams[k])

# copy arrays from GPU.
for k in range(num_arrays):
    gpu_out.append(data_gpu[k].get_async(stream=streams[k]))

t_end = time()

for k in range(num_arrays):
    assert (np.allclose(gpu_out[k], data[k]))

print('Total time: %f' % (t_end - t_start))

Total time: 0.788665


各个CUDA流必须拥有自己的专用事件对象集合，多个CUDA流不能共享同一个事件对象。我们修改前面的程序，应用一下事件对象。

In [1]:
import pycuda.autoinit
import pycuda.driver as drv
from pycuda import gpuarray
from pycuda.compiler import SourceModule
import numpy as np
from time import time

num_arrays = 200
array_len = 1024**2

ker = SourceModule("""       
__global__ void mult_ker(float * array, int array_len)
{
     int thd = blockIdx.x*blockDim.x + threadIdx.x;
     int num_iters = array_len / blockDim.x;
     for(int j=0; j < num_iters; j++)
     {
         int i = j * blockDim.x + thd;
         for(int k = 0; k < 50; k++)
         {
              array[i] *= 2.0;
              array[i] /= 2.0;
         }
     }
}
""")

mult_ker = ker.get_function('mult_ker')

data = []
data_gpu = []
gpu_out = []
streams = []
# 用事件对象来填充这些列表
start_events = []
end_events = []

for _ in range(num_arrays):
    streams.append(drv.Stream())
    start_events.append(drv.Event())
    end_events.append(drv.Event())

# generate random arrays.
for _ in range(num_arrays):
    data.append(np.random.randn(array_len).astype('float32'))

t_start = time()

# copy arrays to GPU.
for k in range(num_arrays):
    data_gpu.append(gpuarray.to_gpu_async(data[k], stream=streams[k]))

# process arrays.
for k in range(num_arrays):
    # 记录开始
    start_events[k].record(streams[k])
    mult_ker(data_gpu[k], np.int32(array_len), block=(64,1,1), grid=(1,1,1), stream=streams[k])
for k in range(num_arrays):
    # 记录结束
    end_events[k].record(streams[k])
    
# copy arrays from GPU.
for k in range(num_arrays):
    gpu_out.append(data_gpu[k].get_async(stream=streams[k]))

t_end = time()

for k in range(num_arrays):
    assert (np.allclose(gpu_out[k], data[k]))

kernel_times = []

for k in range(num_arrays):
    kernel_times.append(start_events[k].time_till(end_events[k]))

print('Total time: %f' % (t_end - t_start))
print('Mean kernel duration (milliseconds): %f' % np.mean(kernel_times))
print('Mean kernel standard deviation (milliseconds): %f' % np.std(kernel_times))

Total time: 0.798797
Mean kernel duration (milliseconds): 13.582968
Mean kernel standard deviation (milliseconds): 3.784286


继续修改前面的代码，在生成的各个线程中运行用于完成乘、除运算的内核函数

In [1]:
import pycuda
import pycuda.driver as drv
from pycuda import gpuarray
from pycuda.compiler import SourceModule
import numpy as np
from time import time
import threading 

# 通常来说主机上创建的线程数量不宜超过20个
num_arrays = 10
array_len = 1024**2
# 我们将原来的内核函数存储为字符串对象，因此它只能在一个上下文中进行编译，所以我们必须在各个线程中单独对其进行编译
kernel_code = """       
__global__ void mult_ker(float * array, int array_len)
{
     int thd = blockIdx.x*blockDim.x + threadIdx.x;
     int num_iters = array_len / blockDim.x;
     for(int j=0; j < num_iters; j++)
     {
         int i = j * blockDim.x + thd;
         for(int k = 0; k < 50; k++)
         {
              array[i] *= 2.0;
              array[i] /= 2.0;
         }
     }
 
}
"""

class KernelLauncherThread(threading.Thread):
    def __init__(self, input_array):
        threading.Thread.__init__(self)
        self.input_array = input_array
        self.output_array = None
  
    def run(self):
        # 选择设备
        self.dev = drv.Device(0)
        # 创建上下文
        self.context = self.dev.make_context()
        # 编译内核
        self.ker = SourceModule(kernel_code)
        # 提取内核函数的引用
        self.mult_ker = self.ker.get_function('mult_ker')
        # 将数组复制到GPU
        self.array_gpu = gpuarray.to_gpu(self.input_array)
        # 启动内核函数
        self.mult_ker(self.array_gpu, np.int32(array_len), block=(64,1,1), grid=(1,1,1))
        # 获取结果复制到主机
        self.output_array = self.array_gpu.get()
        # 销毁上下文
        self.context.pop()
        
    def join(self):
        # 将output_array返回至主机
        threading.Thread.join(self)
        return self.output_array

drv.init()

data = []
gpu_out = []
threads = []

# generate random arrays and thread objects.
for _ in range(num_arrays):
    data.append(np.random.randn(array_len).astype('float32'))

for k in range(num_arrays):
    # create a thread that uses data we just generated
    threads.append(KernelLauncherThread(data[k]))

# launch threads to process arrays.
for k in range(num_arrays):
    threads[k].start()
    
# get data from launched threads.
for k in range(num_arrays):
    gpu_out.append(threads[k].join())

for k in range(num_arrays):
    assert (np.allclose(gpu_out[k], data[k]))