# Accelerating Spark with GPU

This notebook will walk through a quick tutorial of how to use Spark with GPUs (using Numba) to accelerate processes.

In [4]:
import numpy as np
from numba import cuda

In [2]:
sc

Lets check that your Spark cluster has the correct setup. This includes using a GPU enabled VM as well as using one of the Nvidia Docker images. More info here: https://github.com/Azure/aztk/blob/master/docs/60-gpu.md

In [5]:
if cuda.is_available():
    print("Hurray! You're cluster is gpu enabled!")
else:
    print("Check that you are running on gpu enabled vms. You can visit this link to learn more: https://github.com/Azure/aztk/blob/master/docs/60-gpu.md")

Hurray! You're cluster is gpu enabled!


Define cuda kernel

In [6]:
from numba import jit

@cuda.jit('(float32[:], float32[:])')
def func(inp, out):
    i = cuda.grid(1)
    if i < out.size:
        out[i] = inp[i] ** 2

Wrap cuda kernel launching logic

In [7]:
def gpu_work(xs):
    inp = np.asarray(list(xs), dtype=np.float32)
    out = np.zeros_like(inp)
    block_size = 32 * 4
    grid_size = (inp.size + block_size - 1)
    func[grid_size, block_size](inp, out)
    return out

Create Spark RDD partitions

In [8]:
rdd = sc.parallelize(list(range(100)))
print("Partitions", rdd.getNumPartitions())

Partitions 12


Apply `gpu_work` on each partition

In [9]:
rdd = rdd.mapPartitions(gpu_work)
print(rdd.collect())

[0.0, 1.0, 4.0, 9.0, 16.0, 25.0, 36.0, 49.0, 64.0, 81.0, 100.0, 121.0, 144.0, 169.0, 196.0, 225.0, 256.0, 289.0, 324.0, 361.0, 400.0, 441.0, 484.0, 529.0, 576.0, 625.0, 676.0, 729.0, 784.0, 841.0, 900.0, 961.0, 1024.0, 1089.0, 1156.0, 1225.0, 1296.0, 1369.0, 1444.0, 1521.0, 1600.0, 1681.0, 1764.0, 1849.0, 1936.0, 2025.0, 2116.0, 2209.0, 2304.0, 2401.0, 2500.0, 2601.0, 2704.0, 2809.0, 2916.0, 3025.0, 3136.0, 3249.0, 3364.0, 3481.0, 3600.0, 3721.0, 3844.0, 3969.0, 4096.0, 4225.0, 4356.0, 4489.0, 4624.0, 4761.0, 4900.0, 5041.0, 5184.0, 5329.0, 5476.0, 5625.0, 5776.0, 5929.0, 6084.0, 6241.0, 6400.0, 6561.0, 6724.0, 6889.0, 7056.0, 7225.0, 7396.0, 7569.0, 7744.0, 7921.0, 8100.0, 8281.0, 8464.0, 8649.0, 8836.0, 9025.0, 9216.0, 9409.0, 9604.0, 9801.0]


In [10]:
from numba import vectorize
import math

@vectorize(["float32(float32, float32)", "float64(float64, float64)"], target='cpu')
def cpu_some_trig(x, y):
    return math.cos(x) + math.sin(y)

@vectorize(["float32(float32, float32)", "float64(float64, float64)"], target='cuda')
def cuda_some_trig(x, y):
    return math.cos(x) + math.sin(y)

In [11]:
nelem = 10 ** 6
xs = np.random.random(nelem).astype(np.float32)
ys = np.random.random(nelem).astype(np.float32)

In [12]:
%%timeit
res = cpu_some_trig(xs, ys)

15.4 ms ± 217 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [13]:
%%timeit
res = cuda_some_trig(xs, ys)

4.54 ms ± 451 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


We can observe roughly as 4X speed up with GPUs!