# Ch. 7 Global Thread Interactions and Reductions

In Chapters 2 and 4 we looked at the simplest case of parallelization
in which each thread performs an independent computation. In Chapters
5 and 6 we consider thread interactions, where multiple
threads share data, but limited the focus to __local__ interactions so that a thread shares data only with threads that lie close to it in the grid. The canonical local interaction problem involves stencil computations that arise from finite difference computations. Now we move on to the more general
case of global interaction, where all the threads in the grid can
contribute to a single result.

## 7.1 A Classic Reduction: The Dot Product

The classic problem of this variety involves computing the ___dot product___, also known as the ___inner product___ or ___scalar product__ because it produces a scalar value from two input vectors according to the formula:

$$ u \cdot v = \sum_{i=0}^{n-1} u_i * v_i$$

The dot product computation can be broken into two steps:

1. Compute a new vector `w` whose entries are the product of the corresponding
entries in `u` and `v`.

2. Sum the entries in `w` to get a single numerical result.

The first step involves no thread interaction, so it fits the map pattern discussed earlier, and we have already seen how to effectively implement and parallelize such a computation using CUDA kernels. The summation in the second step involves global thread interaction. The array of elementwise product values is "reduced" to a smaller entity (here a single number), so all threads interact to produce a single result. This process is referred to as a __reduction__.

## 7.2 Implementation with CUDA: Technical Considerations

We explore various approaches that span the gamut of effectiveness including:
- infeasible
- feasible, but unrealiable
- reliable, but inefficient
- reliable and efficient

Obviously, our main interest lies in the "reliable and efficient" methods, but it is enlightening to see how other approaches fall short of that standard.

Let's start by looking at a simple serial dot product implementation:

```
def dot(u,v):
    n = u.shape[0]
    accum = 0
    for i in range(n):
        accum += u[i] * v[i]
    return accum
```

Looking at the serial code, the key idea involves creating a accumulator variable `accum` that is initialized to zero and successively incremented by the product of the elements stored at each value of the index `i`. It is tempting to try a similar approach to create a parallel version, but one very quickly runs into technical restrictions. When trying to create a parallel dot product that follows the serial model by creating and implementing an accumulation variable, a question immediately arises: "Where would the accumulator variable be initialized and stored?" In the serial version, the variable `accum` is created in host memory and it is accessible throughout the computation. This is not a feasible approach for a parallel version that runs on the device, since computations running on the device generally does not have access to host memory. How could we create an accumulation variable that resides on the device? Including the variable `accum` in the list of arguments passed to the kernel will create an accumulation variable on the device, however a variable passed to the kernel is stored in register memory associated with an individual thread. Each thread will have the variable `accum`, but each thread will have its own version of that variable and will not have access to the values stored in other versions of the variable associated with other threads. 

How do we create a location to accumulate values that lives on the device and is accessible to all threads? Access by all threads points to global memory, and we can create storage in global memory by creating a device array. In the case of this simple reduction, we need to accumulate a single result, so the logical next step is to create `accum` as an array of length 1, copy it to global memory on the device, and allow each thread to add its contribution to the single element of `accum`. Sample code to implement this approach is show in Listings 7.1 and 7.2.

```
File: main.py
01: import numpy as np
02: from numba import cuda
03: 
04: N = 2**16
05: 
06: def main():
07: 	u = np.ones(N, dtype = np.float32) 
08: 	v = np.ones(N, dtype = np.float32) 
09: 
10: 	accum = 0 
11: 	for i in range(N):
12: 		accum += u[i]*v[i] 
13: 	print("Serial result: ", accum)
14: 
15: 	from naive import dot #from shared import dot
16: 	for j in range(8):
17: 		accum = dot(u, v) 
18: 		print("Naive parallel result: ", accum)
19: 
20: if __name__ == '__main__':
21: 	main()
```
$$\text{ Listing 7.1 - } apps/dot/main.py$$

Listing 7.1 shows the code for _apps/dot/main.py_. On lines 7-8 two vectors, u and v (each filled with 1s) are created. Lines 10-13 contain a serial implementation of the dot product that provides a "known" result for comparison and verification. (Note that the dot product of 2 vectors of size `N` filled with ones should be `N`.) Lines 15-18 imports the naive parallel version of  `dot()` (the parallelized function used to compute the dot product) and executes it repeatedly. 

Listing 7.2 shows the code for _apps/dot/naive.py_ which includes the kernel function `dot_kernel()` on lines 6-15 and the wrapper function `dot()` on lines 17-30. On line 19, `accum = np.zeros(1, dtype = np.float32)` creates an array of length 1 initialized to 0. On line 23, `accum` is copied to the device array `d_accum` that is included, along with `d_u` and `d_v`, that provide copies of the input data, as an argument in the kernel call on line 28. The kernel code instructs the thread with index `i` to compute the product of the corresponding entries in the input arrays and add the result to the value stored in `d_accum[0]`.


```
# Listing 7.2: apps/dot/naive.py
1 import numpy as np
2 from numba import cuda
3
4 TPB = 32
5
6 @cuda.jit ('void(f4 [:] , f4 [:] , f4 [:])') #f4 is shorthand for float32
7 def dot_kernel( d_accum , d_u , d_v):
8     i = cuda.grid(1)
9     n = d_u.shape [0]
10
11     if i >= n:
12         return
13
14     w = d_u[i]*d_v[i]
15     d_accum [0] += w
16
17 def dot(u, v):
18     n = u.shape[0]
19     accum = np.zeros(1, dtype = np.float32)
20
21     d_u = cuda.to_device (u)
22     d_v = cuda.to_device (v)
23     d_accum = cuda.to_device( accum )
24
25     blocks = (n + TPB - 1)//TPB
26     threads = TPB
27
28     dot_kernel[threads,blocks](d_accum , d_u , d_v)
29
30     return d_accum.copy_to_host()[0]
```
$$\text{ Listing 7.2 - } apps/dot/naive.py$$

Nothing about the naïve approach seems overtly incorrect but, when executed, the parallel dot product returns a value that is not only different than the serial result, but not even reproducible. The situation we have run into, when repeated execution of the same code produces different results, is referred to as a ___race condition___, a name whose origin should be clear shortly. On line 15 in the kernel, `d_accum[0] += w` instructs each thread to read the value stored in `d_accum[0]` in global memory, add to that value `w = d_u[i] * d_v[i]` (computed on line 14), and write the result back into the memory location for `d_accum[0]`. Unlike the serial version where this same set of operations occurs sequentially for increasing values of `i` (producing the desired result), here a large number of threads are all performing these operations concurrently. The thread with index `i` reads `accum[0]` with plans to increment the value and store the result  back in `accum[0]`, so another thread with index `j` can read the updated value and add its contribution to the accumulated value. However, that is not how things work in the asynchronous parallel world. When thread `i` reads the value of `accum[0]`, nothing stops other threads from reading the value of `accum[0]`. Whether the value stored there includes the contribution from thread `i` is not clearly determined; it depends on whether thread `i` happened to complete execution before or after thread `j` reads the value of `accum[0]]`. When multiple threads read the same value from `accum[0]`, the result that will be stored in `accum[0]` depends in an ill-defined and unpredictable way on which of the threads happened to finish execution last. The result cannot be known for sure. The only thing certain is that the stored result is wrong, because one or more of the results gets lost from the accumulated value. This undefined behavior associated with the uncertainty of the order of execution (where the observed results depend on which thread completes execution first) is the origin of the term "race condition".

This is another situation where it is necessary to re-assert some control over the order of execution to ensure that correct results are obtained. 

> Recall that the efficiency of parallel computation is a result of giving up control over order of execution so the scheduler can assign tasks to keep the SMs busy and hide latency. Taking back control of execution order comes with a performance cost, and we should choose to pay that cost in situations where synchronization is necessary to obtain correct results, and this is one of those stiuations.

Several mechanisms for controlling order of execution are supported in `numba.cuda`. In Ch. 5, we encountered a situation where we needed to ensure that shared arrays were fully loaded before threads proceeded with stencil computations, and `cuda.syncthreads()` was introduced as the appropriate method for that situation.

Here, we introduce the second mechanism: ___atomic operations___. To avoid the race conditions described above, we need to make sure that once a thread reads the value of the accumulator, that thread gets to complete the "read-increment-write" sequence before any other thread gets to read the value of the accumulator. This approach can be thought in terms of a "lending library" model: a "borrower" thread "checks out" the accumulator to read its value, and no other "readers" have access until the "borrower" thread completes the read-increment-write sequence and "returns" the variable so that it is available for the next borrower. The CUDA implementation of the lending library model is known as ___atomic operations___ which are "atomic" in the sense that the read-increment-write "molecule" of operations becomes a single "atomic" operation that cannot be subdivided into smaller components.

Listing 7.3 shows the code from _apps/dot/atomic.py_ which includes an updated version of `dot_kernel`. The key change occurs on line 15 where `accum[0] += w` is replaced by `cuda.atomic.add(d_accum , 0, w)`. You should run the revised code and convince yourself that the revision employing atomic addition produces correct results that agree with the serial results.


```
# Listing 7.3: apps/dot/atomic.py
1 import numpy as np
2 from numba import cuda , float32
3
4 TPB = 32
5
6 @cuda.jit('void (f4 [:] , f4 [:] , f4 [:])')
7 def dot_kernel( d_accum , d_u , d_v):
8     i = cuda.grid(1)
9     n = d_u.shape[0]
10
11     if i >= n:
12         return
13
14     w = d_u[i]*d_v[i]
15     cuda.atomic.add( d_accum , 0, w)
16
17 def dot(u, v):
18     n = u.shape[0]
19     accum = np.zeros (1, dtype = np.float32)
20
21     d_u = cuda.to_device(u)
22     d_v = cuda.to_device(v)
23     d_accum = cuda.to_device( accum )
24
25     gridDim = (n + TPB - 1)//TPB
26     blockDim = TPB
27
28     dot_kernel[gridDim,blockDim]( d_accum , d_u , d_v)
29
30     return d_accum.copy_to_host()[0]
```
$$\text{ Listing 7.3 - } apps/dot/atomic.py$$

> Other atomic operations provided by numba include `cuda.atomic.max()`
and `cuda.atomic.min()`.

Let's pause (synchronize!?) to consider the performance cost of using atomic addition. In the implementation shown in Listing 7.3, each thread includes an atomic operation that prevents any other thread from making progress (because the accumulator is unavailable to every other thread while it is checked out). Thus, inserting the single line that calls the atomic operation essentially causes the computation to be __completely serialized__ which is a very unsatisfactory outcome when aiming for parallelization. Is there some other tool available so that we can produce reliable results without completely giving up parallelization? The key again lies in thinking about the type and scope of available memory. We want to use atomic operations to produce reliable summation into global memory, but without having an atomic operation occur in every thread. The next section presents such an approach.


## 7.3 Optimizing a Reduction

Atomic operations are a useful tool for achieving reliable reductions but, when used as illustrated in Listing 7.3, they greatly
defeat the purpose of parallelization. In this section we explore an approach that achieves reliability without unnecessary loss of efficiency by utilizing atomic operations together with shared memory as shown in Listing 7.4. The strategy behind the code is to take advantage of the ___key properties of shared memory arrays___: 

- Rapid access
- Accessible to all threads in a block.

The fundamental approach to efficiency improvement is a tiled or blocked approach where the necessary data is read from global memory and stored in a shared array so that the block can compute a "block sum" (the net contribution to the reduction result from data associated with all of the threads in the block). The block sum can then be reliably added to the accumulator in global memory with a single atomic operation for the entire block, rather than an atomic operation bor each thread. The number of atomic operations (each of which inhibits parallelism) is thus reduced by a factor equal to the number of threads in a block.

The remaining question involves how to sum the contributions from the threads in the block. The simplest approach is to assign a particular thread in the block to compute the block sum. A simple shared memory implementation is shown in Listing 7.4 below. The shared array `sh_w` is estabished on line 11 and line 12 initializes the entry corresponding to the thread index to zero. Line 14 performs an array bounds check, and the element producdt is computed and stored in the shared array on line 15. Again, we use the appropriate index for each array: `i` for the global memory arrays `d_u` and `d_v`, and `tIdx` for the shared memory array `sh_w`. On line 16, the call to `cuda.syncthreads()` ensures that every thread has computed its element product and stored the result in the shared array before the summation starts. Lines 17-21 instructs the thread with index 0 to loop over the thread indices, sum the entries in the shared array, and then `atomic.add` it into the global memory accumulator `d_accum[0]`.

```
File: simple_shared.py
01: import numpy as np
02: from numba import cuda, float32
03: 
04: TPB = 512
05: 
06: @cuda.jit('void(f4[:],f4[:],f4[:])')
07: def dot_kernel(d_accum, d_u, d_v):
08: 	i = cuda.grid(1)
09: 	tIdx = cuda.threadIdx.x
10: 	n = d_u.shape[0]
11: 	sh_w = cuda.shared.array(shape = TPB, dtype = float32) #establish shared array
12: 	sh_w[tIdx] = 0 #initialize shared array to zero
13: 
14: 	if i < n: #bounds check
15: 		sh_w[tIdx] = d_u[i]*d_v[i] #store element product in shared array
16: 	cuda.syncthreads() #make sure all element products are stored before summing
17: 	if tIdx == 0: #assign thread 0 to compute the block sum
18: 		block_sum = 0
19: 		for j in range(cuda.blockDim.x):
20: 			block_sum += sh_w[tIdx]
21: 		cuda.atomic.add(d_accum , 0, block_sum)
22: 
23: def dot(u, v):
24: 	d_u = cuda.to_device(u)
25: 	d_v = cuda.to_device(v)
26: 	gridDim = int(np.ceil(u.size/TPB))
27: 	blockDim = TPB
28: 	accum = np.zeros(1, dtype = np.float32)
29: 	d_accum = cuda.to_device(accum)
30: 
31: 	dot_kernel[gridDim, blockDim](d_accum, d_u, d_v)
32: 
33: 	return d_accum.copy_to_host()[0]
34: 
```
$$\text{ Listing 7.4 - } apps/dot/simple_shared.py$$

Since reduction is one of the basic computation patterns, a lot of effort has gone into making reductions more efficient and numerous performance enhancements are available.

> Check out the "Parallel for All ($\parallel \forall$)" blog to find more details on optimizing reductions.

A more efficient implementation of the dot product is presented in Listing 7.5, and a detailed description of the code follows.

```
#Listing 7.5: apps/dot/shared.py
1 import numpy as np
2 from numba import cuda , float32
3
4 TPB = 512
5
6 @cuda.jit ('void (f4 [:] , f4 [:])')
7 def reduction(d_w , d_accum ):
8     i = cuda.grid (1)
9     tdx = cuda.threadIdx.x
10    bdx = cuda.blockIdx.x
11    sh_acccum = cuda.shared.array( shape = 0, dtype = float32)
12    n = d_w.shape [0]
13
14    if i >= n:
15        return
16
17     sh_acccum [tdx] = d_w[i]
18     cuda.syncthreads ()
19
20     stride = 1
21     while stride < cuda.blockDim.x :
22     if tdx%(2* stride ) == 0 and i + stride < n:
23         sh_acccum [tdx] += sh_acccum [tdx + stride ]
24     stride *= 2
25     cuda.syncthreads ()
26
27     if tdx == 0:
28         d_accum [bdx] = sh_acccum [tdx]
29
30 @cuda.jit ('void (f4 [:] , f4 [:] , f4 [:])')
31 def dot_kernel (d_w , d_u , d_v):
32     i = cuda.grid (1)
33     n = d_u.shape [0]
34
35     if i >= n:
36         return
37
38     d_w[i] = d_u[i]*d_v[i]
39
40 def dot(u, v):
41     n = u.shape [0]
42
43     d_u = cuda.to_device (u)
44     d_v = cuda.to_device (v)
45     d_w = cuda.device_array (n, dtype = np.float32)
46
47     gridDim = int( np.ceil (n/TPB))
48     blockDim = TPB
49
50     dot_kernel[gridDim,blockDim ]( d_w , d_u , d_v)
51
52     reduce_ = True
53     while reduce_ :
54     if gridDim == 1:
55         reduce_ = False
56
57     d_accum = cuda.device_array ( shape = gridDim , dtype = np.float32)
58     reduction [ gridDim , blockDim , 0, TPB*4]( d_w , d_accum )
59     d_w = d_accum
60     gridDim = int( np.ceil ( d_w.shape [0]/TPB))
61
62     return d_w.copy_to_host ()[0]
```
$$ \text{Lsiting 7.5 -  } apps/dot/shared.py$$

The dot product is now broken into two parts, as described before. We compute the product of `u` and `v` using the map pattern,
within `dot()`, and then we apply the reduction in a separate kernel, `reduction()`. Lines 52-60 contain new code to control execution
of the `reduction()` kernel. When performing the reduction, if the array we are reducing is larger than `TPB`, the kernel will need to be
called more than one time to fully reduce the array. Because of this, we move the kernel into a while loop, calling the function repeatedly until the exit condition is met. With each iteration the exit condition
on Lines 52-55 checks the size of the grid. Once `gridDim` reaches a value of $1$, it means the array can fit inside a single block within the
kernel (thus allowing the kernel to reduce the array to a single value) and the kernel is called one last time before exiting the loop.

After the exit condition is checked we create a device array, `d_accum`,
equal in size to the number of blocks within the kernel. (Each block in the kernel calculates a
single value so each entry in d_accum
stores the sum from each block.) The input array, `d_w`, is passed into `reduction()` where each block takes a `TPB`
sized chunk of the total array. (If the array cannot be divided evenly
the block with the highest index contains
the remaining array element that
are left over.) Each block takes its assigned chunk and computes its local sum. Each block then stores its local sum in
`d_accum`. After the kernel is finished the output, `d_accum`, is assigned
as the new input of the next iteration, gridDim is recalculated
to fit the updated `d_w`, and the kernel is called again on the new `d_w`.
This process repeats until `d_w` is reduced to a single value.

There is one final change made to `dot()` that must be mentioned
before discussing how the `reduction()` kernel functions. The kernel
call on Line 58 includes two additional arguments. The third arguments
indicates which stream to assign to the kernel and the forth
indicates the amount of dynamic shared memory to allot to the kernel.
Streams are a topic we will cover at a later time but we cannot
declare an amount of dynamic shared memory without first indicating
the stream. For the time being the stream number can be set to 0
until it is explained later. Dynamic shared memory is simply shared
memory who’s size is assigned to a kernel at run time, not at compile
time (it does not need to be known before the code is run). 

> It is not necessary in this particular
example to use dynamic shared memory
as every kernel iteration has the
same amount of shared memory but it
is worthing mentioning dynamic shared
memory so it was integrating into this
example.

The argument in the kernel call for creating dynamic shared memory is given
in bytes not in array elements. In this particular example our shared
memory array, `d_accum`, is an array of `float32` ’s so we can multiply
the desired size of the shared array by four to get the appropriate size.
Within the kernel itself the shared array size must also be set to zero.
This can be seen on Line 11 with the creation of `sh_acccum`. (If the size of the shared array is
needed at some point in the kernel
it will need to be passed in as an
argument. `sh_acccum.shape[0]` will
yield a value of zero.)

The contents of the `reduction()` kernel are on Lines 6-28. Lines
8-18 contain the usual kernel setup along with the additional shared memory setup. We start by defining a few variables to help index
into the shared array. Next we create the shared array. Lastly we fill
the shared array with the corresponding values from `d_w`, the array
containing the product of u and v calculated with `dot_kernel`.

> This is our first example with multiple
kernel calls, both of which share a
parameter, `d_w`. Because `d_w` is used
in both kernels and is not needed by
the CPU at any point between the two
kernel calls we can keep the array on
the device until the second kernel is
finished.


The actual reduction is done between Lines 20 and 25 after the kernel
synchronization call, ensuring the shared memory array has been
filled. A stride value is set and doubled after each iteration of the loop.
Within the first iteration, every even thread adds the value to its right,
or one stride length away, to itself. The next iteration, half the number
of threads add the number twice as far away, or two stride lengths
away, to itself. This halving process continues until the first thread
accumulates the sum of its `TPB` sized block. The accumulated value
is then written back to the global memory array, `d_accum`.

As mentioned earlier, if `reduction()` is called with an array size
that is larger than `TPB`, `reduction()` must be called more than a single
time. This is due to the use of shared memory. Shared memory
allows for memory accesses that are orders of magnitude faster than
global memory access but the memory itself cannot be referenced outside
of the block that owns the shared memory array. Once each block
has computed its local sum, each of the local sums cannot be shared
amongst the other blocks. To fix this problem each local sum is written
to global memory and the kernel is launched a subsequent time. The
kernel is repeatedly called until the array can fit onto a single block
at which point it can fit onto a single shared memory array where the
final accumulation can take place.

It is worth mentioning that the depth of the reduction is arbitrary.
In this example `reduction()` is called recursively until a single blocks
returns a single value (running a total of `n/TPB` times). It is entirely
possible to call `reduction()` a single time and then call `cuda.atomic.add()`
on the resulting smaller array. This would eliminate the repeated
`reduction()` calls, however, if the resulting array is still large after
the reduction, `cuda.atomic.add()` will still dramatically increase the
time it takes to calculate the reduction. For this example we generalize
the reduction to arrays of arbitrary size and eliminate the use of
`cuda.atomic.add()  .

## 7.4 Numba’s Built-in Reduction

The sections above should provide an awareness of how to compute
reductions and how one might optimize a reduction if performance
is the foremost concern. However, implementing your own reduction
is not entirely necessary. Reductions are used very frequently in the
world of parallel computing. Numba has recognized this and have implemented their own built-in reduction decorator.

Listing 7.6 illustrates a dot product using Numba’s `@cuda.reduce` decorator to implement the reduction. All that is needed to perform the reduction is to add the `@cuda.reduce` decorator to the function that describes what type of atomic operation you want to perform. Here we are performing a sum reduction so we indicate this by returning the sum of
the arguments of the function. If we wanted to perform a maximum
reduction we would replace `u + v` with `max(u, v)`.

The only oddity that needs to be addressed is the argument mismatch
between the `reduction()` call and the `reduction()` definition.
In the function call we pass in the array we want to reduce. In the
function definition, `u` and `v` represent how we compare the values
within the array `d_w`. When the code is compiled the `@cuda.reduce`
decorator will alter the function so that it can perform the reduction.
The decorator altering the function is where the argument mismatch
comes from.

```
# Listing 7.6: apps/dot/built_in.py
1 import numpy as np
2 from numba import cuda , float32
3
4 TPB = 1024
5
6 @cuda.reduce
7 def reduction (u, v):
8     return u + v
9
10 @cuda.jit('void (f4 [:] , f4 [:] , f4 [:])')
11 def dot_kernel(d_w , d_u , d_v):
12     i = cuda.grid(1)
13     n = d_u.shape [0]
14
15     if i >= n:
16         return
17
18     d_w[i] = d_u[i]*d_v[i]
19
20 def dot(u, v):
21     n = u.shape [0]
22
23     d_u = cuda.to_device(u)
24     d_v = cuda.to_device(v)
25     d_w = cuda.device_array(n, dtype = np.float32)
26
27     gridDim = int( np.ceil (n/TPB))
28     blockDim = TPB
29
30     dot_kernel[gridDim ,blockDim ](d_w , d_u , d_v)
31
32     w = reduction(d_w)
33
34     return w
```
$$\text{Listing 7.6 - } apps/dot/built_in.py$$
 
 
