##### Copyright 2021 Google Inc.

Licensed under the Apache License, Version 2.0 (the "License").
<!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
-->


# Use GPUs with Apache Beam

This notebook uses the [Monte Carlo method](https://en.wikipedia.org/wiki/Monte_Carlo_method) to calculate pi (3.14159...) and demonstrate the performance difference at the same scale of sample size between different runtimes:

- Native Python code
- Jitted machine code
- On GPU (CUDA)
- Beam pipeline locally on GPU

**Note**: this notebook does not work if your notebook instance does not have a GPU or the drivers are not installed. If you haven't done so, create a [new Dataflow Notebooks](https://pantheon.corp.google.com/dataflow/notebooks/list/instances) instance `With 1 NVIDIA Tesla T4` and check the option to `Install NVIDIA GPU driver automatically for me`.

More details can be found on [Dataflow support for GPUs](https://cloud.google.com/dataflow/docs/concepts/gpu-support) if you want to productionize Beam pipelines on Dataflow with GPUs.

## Dependencies

We will use [numba](https://numba.pydata.org/) to compile code in this notebook for different runtimes.

>Numba is an open source JIT compiler that translates a subset of Python and NumPy code into fast machine code.

It also supports GPU acceleration.

**Note**: you might need to restart the kernel if this is the first time you've installed the package.

In [None]:
%pip install numba

Let's check if the CUDA libraries are available (*if not, your notebook instance probably wasn't created with a GPU*):

In [None]:
!find /usr/local/cuda-* -iname 'libdevice'
!find /usr/local/cuda-* -iname 'libnvvm.so'

Let's disable non-error logs for clarity.

In [None]:
import logging
logging.getLogger().setLevel(logging.ERROR)

## Native Python & jitted machine code

Below we have defined two functions with exactly the same code:

- `python_cpu_monte_carlo_pi` is a plain native Python function that runs on the CPU.
- `machine_code_cpu_monte_carlo_pi` has an annotation `@jit(nopython=True)` that compiles the Python code into machine code that runs on the CPU too.

In [None]:
import random
from timeit import default_timer as timer

from numba import jit


def python_cpu_monte_carlo_pi(sample_size):
    acc = 0
    for i in range(sample_size):
        x = random.random()
        y = random.random()
        if (x * x + y * y) < 1.0:
            acc += 1
    return 4.0 * acc / sample_size


@jit(nopython=True)
def machine_code_cpu_monte_carlo_pi(sample_size):
    acc = 0
    for i in range(sample_size):
        x = random.random()
        y = random.random()
        if (x * x + y * y) < 1.0:
            acc += 1
    return 4.0 * acc / sample_size

Let's choose a sample size (100,000,000) as a constant computation complexity between both runs.

The most expensive yet parallelizable part of the computation is generating the random numbers. Neither function optimizes that part though.

In [None]:
sample_size = 100_000_000

### Performance of native Python code

It should take ~40 seconds for native Python code to calculate pi with a sample size of 100,000,000.

**Note**: The performance might vary from run to run. It might also vary between different notebook instances if they are configured differently. The same applies to other runtimes.

In [None]:
start = timer()
pi = python_cpu_monte_carlo_pi(sample_size)
dt = timer() - start
print(f'Monte Carlo pi calculated as {pi} in {dt} s.')

### Performance of jitted machine code

It should only take a little over 1 second for jitted machine code to calculate pi with a sample size of 100,000,000.

In [None]:
start = timer()
pi = machine_code_cpu_monte_carlo_pi(sample_size)
dt = timer() - start
print(f'Monte Carlo pi calculated as {pi} in {dt} s.')

## On GPU (CUDA)

In the below example, we have rearranged the native Python function into two parts:

- The first part, `gpu_monte_carlo_pi_sampler`, which generates random points and aggregate counts for a subset of the target sample size, is executed on the GPU.
- The second part, `calculate_pi`, which aggregates value from all sub sample sets and calculates pi, is compiled into machine code and executed on the GPU.

**Note**:`njit` is similar to `jit` but for `numpy`.

The example code uses only 1 grid, 24 blocks and 64 threads per block.

In [None]:
from numba import cuda, njit
from numba.cuda.random import create_xoroshiro128p_states, xoroshiro128p_uniform_float32
import numpy as np


@cuda.jit
def gpu_monte_carlo_pi_sampler(rng_states, sub_sample_size, acc):
    """Uses GPU to sample random values and accumulates the sub count
    of values within the circle.
    """
    pos = cuda.grid(1)
    if pos < acc.size:
        sub_acc = 0
        for i in range(sub_sample_size):
            x = xoroshiro128p_uniform_float32(rng_states, pos)
            y = xoroshiro128p_uniform_float32(rng_states, pos)
            if (x * x + y * y) < 1.0:
                sub_acc += 1
        acc[pos] = sub_acc


@njit(fastmath=True)
def calculate_pi(acc, sample_size):
    """Uses machine code on CPU to aggregate and calculate pi since there
    is less parallelism here.
    """
    return 4 * np.sum(acc) / sample_size

threadsperblock = 64
blocks = 24
# An 1-d array to hold sub count of acc.
acc = np.zeros(threadsperblock * blocks, dtype=np.float32)
rng_states = create_xoroshiro128p_states(threadsperblock * blocks, seed=1)
# Each thread should only generate sub_sample_size of random points.
sub_sample_size = sample_size // acc.shape[0]

### Performance of on GPU (CUDA)

It should take ~0.3 second to calculate pi on a GPU using above configuration with a sample size of 100,000,000.

It's **130 times faster** than native Python on CPU and **3 times** faster than machine code on CPU. And we haven't even pushed the single GPU to its limit.

In [None]:
start=timer()
gpu_monte_carlo_pi_sampler[blocks, threadsperblock](rng_states, sub_sample_size, acc)
pi = calculate_pi(acc, sample_size)
dt = timer() - start
print(f'Monte Carlo pi calculated as {pi} in {dt} s.')

## Running a Beam pipeline locally on GPU

It might not be obvious why you need to wrap your code that can already run on a GPU
into an Apache Beam pipeline.

The advantage of Beam with GPU is that you can later run the pipeline on Dataflow or any other runner/clusters
so that you can distribute/scale the work to as many GPUs as you want.

Note: you might need to configure worker machines so that they have GPU devices and drivers available.
For example, [using GPUs with Dataflow](https://cloud.google.com/dataflow/docs/guides/using-gpus).
You also need to constrain your GPU usages to work within the hardware limit. See
[GPUs and worker parallelism](https://cloud.google.com/dataflow/docs/concepts/gpu-support#gpus_and_worker_parallelism).

In the below example, we build a Beam pipeline with code similar to the plain [on-GPU example](#On-GPU-(CUDA)) and run the pipeline locally on this notebook instance.

First, we create a pipeline instance with options that utilize all CPU cores to schedule work when running the pipeline.

In [None]:
import typing as t

import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.runners.interactive import interactive_beam as ib


options = pipeline_options.PipelineOptions(
    direct_num_workers=0,  # default threads/subprocess to the number of cores on this machine
    direct_running_mode='multi_threading')
p = beam.Pipeline(InteractiveRunner(), options=options)

Then, we define a `DoFn` as a `Sampler` that uses the GPU to process elements.

Each element is a tuple of 2 `int` values:

- first value: the seed of a random number generator
- second value: the size of sample values to be generated

The `DoFn` itself runs as native Python code on the CPU while the inner logic of `gpu_monte_carlo_pi_sampler` runs on GPU.

In [None]:
class Sampler(beam.DoFn):
    def process(self, element: t.Tuple[int, int]):
        rng_seed = element[0]
        sample_size_per_sampler = element[1]

        threadsperblock_per_sampler = 64
        blocks_per_sampler = 24
        acc_per_sampler = np.zeros(
            threadsperblock_per_sampler * blocks_per_sampler, dtype=np.float32)
        rng_states_per_sampler = create_xoroshiro128p_states(
            threadsperblock_per_sampler * blocks_per_sampler, seed=rng_seed)
        sub_sample_size_per_thread = sample_size_per_sampler // acc_per_sampler.shape[0]
        gpu_monte_carlo_pi_sampler[blocks_per_sampler, threadsperblock_per_sampler](
            rng_states_per_sampler, sub_sample_size_per_thread, acc_per_sampler)
        yield acc_per_sampler

We can divide the work to 100 samplers. In a distributed environment, each sampler can run on a different machine with its own GPU(s).

Here for simplicity, we run the pipeline locally and all samplers would share the same GPU on this notebook instance.

We start the pipeline by creating a PCollection of 100 tuples of random number seeds (from 0 to 99) and sample size per sampler (1,000,000).

Then we let the `Sampler DoFn` take these tuples as inputs to generate sample values.

Each sampler has threadsperblock_per_sampler * blocks_per_sampler = **1536 threads** running in parallel
**on the GPU**. 

And we have **100 samplers** running concurrently scheduled by Beam **on all CPU cores** on the machine running this notebook.

In the data visualization, you can see the aggregated counts of "random points within the circle" for each GPU thread by each sampler.

In [None]:
sampler_count = 100
sample_size_per_sampler = sample_size // sampler_count


samplers_per_gpu_thread = (p 
                           | beam.Create([(i, sample_size_per_sampler) for i in range(sampler_count)])
                           | beam.ParDo(Sampler()))

# The output might be too big to be saved in an ipynb file, you can clear it before saving your work.
ib.show(samplers_per_gpu_thread)

Everything below runs as native Python code on the CPU.

To calculate pi, we need to aggregate all values from all GPU threads for each sampler.

**Note**: we use numpy to sum the values (`np.float32`) and then convert them back to the native Python type (`int`).

In [None]:
acc_per_sampler = samplers_per_gpu_thread | beam.Map(lambda x: np.sum(x).astype(int).item()).with_output_types(int)

# Sum up per-gpu-thread count of values falling into the circle of Monte Carlo pi calculation for each sampler.
ib.show(acc_per_sampler)

Then we combine all values from all samplers.

In [None]:
sum_acc = acc_per_sampler | beam.CombineGlobally(sum)

# Sum up the count of values falling into the circle of Monte Carlo pi calculation from all samplers.
ib.show(sum_acc)

Once we have all values aggregated, we can use the formula to calculate and print pi.

The visualization shows the pipeline graph. Let's make sure it's correctly constructed and doesn't have corrupted states from notebook executions.

**Note**: if the graph is mixed with transforms that are applied by out-of-order execution and re-execution of notebooks, please re-execute the code
from where the pipeline is created or restart the kernel and re-execute the whole notebook.

In [None]:
print_pi = sum_acc | beam.Map(lambda x: print(4 * x / sample_size))
            
ib.show_graph(p)

### Performance of Beam pipeline locally on GPU

It should only take 2.6 seconds to calculate pi using Beam on GPU with a sample size of 100,000,000.

Though it's not as fast as pure GPU + machine code executed on a single machine, it provides the scalability to run the code
on distributed systems, and the performance is almost on par with pure machine code on a single machine. You can also improve the
performance further by compiling those transforms written in native Python code into machine code with jit/njit.

The example has demonstrated how to write a Beam pipeline using a GPU, the performance increment when using a GPU compared to native Python code on CPU, and the small overhead of a Beam pipeline on a local machine.

In [None]:
start=timer()
print('Monte Carlo pi calculated as: ')
p.run()
dt = timer() - start
print(f'in {dt} s.')