---
# **LAB 9 - CUDA in Python**
---

# ▶️ CUDA setup

In [None]:
!nvcc --version

In [None]:
!nvidia-smi

In [None]:
!pip install numba-cuda==0.4.0

In [None]:
from numba import config
config.CUDA_ENABLE_PYNVJITLINK = 1

# 🐍 Numba for CPU

Monte Carlo Method to determine Pi.

- Confirm the compiled version is behaving the same as the uncompiled version.
- Benchmark the uncompiled version.
- Benchmark the compiled version.

Note: Numba saves the original Python implementation of the function in the **.py_func** attribute, so we can call the original Python code to make sure we get the same answer

In [None]:
from numba import jit
from numpy import testing
import random

# Use the Numba compiler to compile this function
@jit(nopython=True)
def monte_carlo_pi(nsamples):
    acc = 0
    for i in range(nsamples):
        x = random.random()
        y = random.random()
        if (x**2 + y**2) < 1.0:
            acc += 1
    return 4.0 * acc / nsamples

# Run the function
nsamples = 1000000

# We will use numpy's `testing` library to confirm compiled and uncompiled versions run the same
testing.assert_almost_equal(monte_carlo_pi(nsamples), monte_carlo_pi.py_func(nsamples), decimal=2)

In [None]:
%timeit monte_carlo_pi(nsamples)

In [None]:
%timeit monte_carlo_pi.py_func(nsamples)

In [None]:
import numpy as np

def my_sum(x, y):
    return x + y

np_my_sum = np.frompyfunc(my_sum, 2, 1) # create ufunc
print(type(my_sum), type(np_my_sum))    # check types
print(my_sum(1, 1), np_my_sum(1, 1))    # check if the same

### Define a ufunc using numba's vectorize...

In [None]:
from numba import vectorize
import numpy as np

# Define a ufunc using numba's vectorize
@vectorize(['float64(float64, float64)'], target='cpu')
def multiply(x, y):
	return x * y

# Test the ufunc
a = np.array([1.0, 2.0, 3.0, 4.0])
b = np.array([10.0, 20.0, 30.0, 40.0])

result = multiply(a, b)
print(result)

In [None]:
import numpy as np
from numba import vectorize, int64, float32, float64

# create default ufunc with datatypes conversion
@vectorize([int64(int64,int64), float32(float32,float32), float64(float64,float64)])
def numba_dtype_sum(x, y):
    return x + y

print(type(numba_dtype_sum))  # check type
print(numba_dtype_sum(1, 1))  # check on scalars
print(numba_dtype_sum(np.ones(4), np.ones(4))) # check int arrays
print(numba_dtype_sum(np.random.rand(4), np.random.rand(4))) # check float arrays

### Numba can parallelize loops using **parallel=True**...

In [None]:
from numba import njit, prange
import numpy as np

@njit(parallel=True)
def parallel_sum(arr):
    total = 0
    for i in prange(len(arr)):  # Parallel execution
        total += arr[i]
    return total

arr = np.random.rand(1000000)
print(parallel_sum(arr))

In [None]:
%timeit parallel_sum(arr)

In [None]:
%timeit arr.sum()

### Type Specialization: Numba automatically specializes functions based on input types...

In [None]:
from numba import jit

@jit(nopython=True)
def multiply(x, y):
   return x * y

print(multiply(3, 4))      # Optimized for integers
print(multiply(3.5, 4.2))  # Optimized for floats


### Execution time: compilation + execution...

In [None]:
from numba import jit
import numpy as np
import time


@jit(nopython=True)
def go_fast(a): # Function is compiled and runs in machine code
	trace = 0.0
	for i in range(a.shape[0]):
		trace += np.tanh(a[i])
	return trace

x = np.arange(100000)

# DO NOT REPORT THIS... COMPILATION TIME IS INCLUDED IN THE EXECUTION TIME!
start = time.time()
go_fast(x)
end = time.time()
print("Elapsed (with compilation) = %s" % (end - start))

# NOW THE FUNCTION IS COMPILED, RE-TIME IT EXECUTING FROM CACHE
start = time.time()
go_fast(x)
end = time.time()
print("Elapsed (after compilation) = %s" % (end - start))


# 🐍 Numba for GPU

### Numba check and device detect...

In [None]:
import numba
from numba import cuda
import numpy as np

print(f'NumPy version: {np.__version__}')
print(f'Numba version: {numba.__version__}')
print(f'CUDA driver version: {cuda.driver.get_version()}')
print(f'CUDA runtime version: {cuda.runtime.get_version()}')

# device detect
cuda.detect()

### Kernel configuration and definition...

In [None]:
import numpy as np

# Define the kernel function
@cuda.jit
def increment_by_one(an_array):
	# Thread id in a 1D block
	tx = cuda.threadIdx.x
	# Block id in a 1D grid
	bx = cuda.blockIdx.x
	# Block width, i.e. number of threads per block
	bw = cuda.blockDim.x
	# Compute flattened index inside the array
	pos = tx + bx * bw
	if pos < an_array.size:  # Check array boundaries
		an_array[pos] += 1

In [None]:
# Create a random array
an_array = np.random.rand(1000000).astype(np.float32)
# Allocate device memory
d_an_array = cuda.to_device(an_array)
# Define the number of threads per block
threads_per_block = 256
# Define the number of blocks in the grid
blocks_per_grid = (an_array.size + (threads_per_block - 1)) // threads_per_block
# Launch the kernel
increment_by_one[blocks_per_grid, threads_per_block](d_an_array)
# Copy the result back to host
an_array = d_an_array.copy_to_host()
# Check the result
print(an_array[:10])  # Print first 10 elements to verify



# 🐍 Mat multiplication

Multiplication...

In [None]:
from numba.types import float32

BLOCK_SIZE = 32

# kernel matrix multiplication
@cuda.jit
def matmul_gpu(A, B, C):
	"""Perform square matrix multiplication of C = A * B."""
	i, j = cuda.grid(2)
	if i < C.shape[0] and j < C.shape[1]:
		tmp = 0.
		for k in range(A.shape[1]):
			tmp += A[i, k] * B[k, j]
		C[i, j] = tmp

# kernel matrix multiplication with shared memory
@cuda.jit
def fast_matmul(A, B, C):
	"""Perform square matrix multiplication of C = A * B using shared memory."""
	# Define an array in the shared memory
	sA = cuda.shared.array(shape=(BLOCK_SIZE, BLOCK_SIZE), dtype=float32)
	sB = cuda.shared.array(shape=(BLOCK_SIZE, BLOCK_SIZE), dtype=float32)

	# Calculate thread indices
	x, y = cuda.grid(2)
	tx = cuda.threadIdx.x
	ty = cuda.threadIdx.y
	bpg = cuda.gridDim.x
	tmp = float32(0.)

	# Each thread computes one element in the result matrix.
	for i in range(bpg):
		sA[ty, tx] = 0
		sB[ty, tx] = 0
		if y < A.shape[0] and (tx + i * BLOCK_SIZE) < A.shape[1]:
			sA[ty, tx] = A[y, tx + i * BLOCK_SIZE]
		if x < B.shape[1] and (ty + i * BLOCK_SIZE) < B.shape[0]:
			sB[ty, tx] = B[ty + i * BLOCK_SIZE, x]

		# Synchronize threads to ensure all data is loaded into shared memory
		cuda.syncthreads()

		# Each thread computes one element in the result matrix.
		for j in range(BLOCK_SIZE):
			tmp += sA[ty, j] * sB[j, tx]

		# Wait until all threads finish computing
		cuda.syncthreads()

	# Write the result to global memory
	if y < C.shape[0] and x < C.shape[1]:
		C[y, x] = tmp

In [None]:
import time
import numpy as np

# generate random vals
np.random.seed(42)
SIZE = 1024*8
A = np.ones((SIZE,SIZE)).astype('float32')  # mat 1
B = np.ones((SIZE,SIZE)).astype('float32')  # mat 2
#C = np.zeros((SIZE,SIZE)).astype('float32')                       # mat where we store answer

# transfer data to device
d_A = cuda.to_device(A) # Copy of A on the device
d_B = cuda.to_device(B) # Copy of B on the device
d_C = cuda.device_array_like(A) # malloc on the device

# Define the number of threads in each block
block = (32, 32)  # each block will contain 32x32 threads, typically 128 - 512 threads/block
grid_x = int(np.ceil(A.shape[0] / block[0]))
grid_y = int(np.ceil(A.shape[1] / block[1]))
grid = (grid_x, grid_y)  # we calculate the gridsize (number of blocks) from array
print(f"Matrix size: {SIZE}x{SIZE}")
print(f"Grid size: {grid_x}x{grid_y}")
print(f"Block size: {block[0]}x{block[1]}")

In [None]:
# execution of the kernel matmul_gpu
start = time.time()
matmul_gpu[grid, block](d_A, d_B, d_C)
# host and device sync
cuda.synchronize()
end = time.time()
print("Elapsed (sec) = %s" % (end - start))

In [None]:
# execution of the kernel fast_matmul
start = time.time()
fast_matmul[grid, block](d_A, d_B, d_C)
# host and device sync
cuda.synchronize()
end = time.time()
print("Elapsed (sec) = %s" % (end - start))

In [None]:
# using numpy function
start = time.time()
C = np.matmul(A, B)
end = time.time()
print("Elapsed (sec) = %s" % (end - start))

In [None]:
import torch

# the first multiplication here is significantly slower
m1 = torch.from_numpy(A).cuda()
m2 = torch.from_numpy(B).cuda()
c = torch.zeros((m1.shape[0], m2.shape[1]), dtype=torch.float32).cuda()

start = time.time()
torch.matmul(m1, m2, out=c)
torch.cuda.synchronize()
end = time.time()
print("Elapsed (sec) = %s" % (end - start))

# 🐍 Convolution 2D

Parallel reduce...

In [None]:
import numpy as np
from numba import cuda
from numba.types import int32

SHARED_DIM = 1024

@cuda.jit
def reduce(data):
	tid = cuda.threadIdx.x
	size = len(data)
	if tid < size:
		i = cuda.grid(1)

		# Declare an array in shared memory
		shr = cuda.shared.array(SHARED_DIM, int32)
		shr[tid] = data[i]

		# Ensure writes to shared memory are visible
		# to all threads before reducing
		cuda.syncthreads()

		s = 1
		while s < cuda.blockDim.x:
			if tid % (2 * s) == 0:
					# Stride by `s` and add
					shr[tid] += shr[tid + s]
			s *= 2
			cuda.syncthreads()

		# After the loop, the zeroth  element contains the sum
		if tid == 0:
			data[tid] = shr[tid]

In [None]:
# generate data
nelem = SHARED_DIM
a = cuda.to_device(np.arange(nelem, dtype=np.int32))
print("Number of elements: ", len(a))

# kernel
reduce[1, nelem](a)

# copy to host
b = a.copy_to_host()

print(b[0])  # 523776
print(sum(np.arange(nelem)))  # 523776

↩ TODO: Convolution 2D...

In [None]:
import numpy as np
from numba import cuda

# Define kernel size
KERNEL_SIZE = 3
BLOCK_SIZE = 16
TILE = (BLOCK_SIZE + KERNEL_SIZE - 1, BLOCK_SIZE + KERNEL_SIZE - 1)

@cuda.jit
def conv2d_kernel(image, kernel, output):
	# TODO: Implement the kernel function


In [None]:
# Test the kernel
image = np.random.rand(8*1024, 8*1024).astype(np.float32)  # Example image
kernel = np.random.rand(KERNEL_SIZE, KERNEL_SIZE).astype(np.float32)  # Example Gaussian Blur kernel
print('image.shape: ',image.shape)
print('kernel.shape:', kernel.shape)

# copy data H2D
d_image = cuda.to_device(image)
d_kernel = cuda.to_device(kernel)
d_output = cuda.device_array(image.shape, dtype=np.float32)

# grdi and block dim
block_dim = (BLOCK_SIZE, BLOCK_SIZE)
grid_dim = (image.shape[0] // BLOCK_SIZE, image.shape[1] // BLOCK_SIZE)

# kernel
start = time.time()
conv2d_kernel[grid_dim, block_dim](d_image, d_kernel, d_output)
cuda.synchronize()
end = time.time()
print("Elapsed (sec) = %s" % (end - start))
output = d_output.copy_to_host()
print('output.shape:', output.shape)

In [None]:
from scipy import signal

start = time.time()
grad = signal.convolve2d(image, kernel)
end = time.time()
print("Elapsed (sec) = %s" % (end - start))

# 🐍 Atomic functions

In [None]:
import numpy as np
from numba import cuda

@cuda.jit
def find_max(result, values):
	"""Find the maximum value in values and store in result[0]"""
	tid = cuda.threadIdx.x
	bid = cuda.blockIdx.x
	bdim = cuda.blockDim.x
	i = (bid * bdim) + tid
	cuda.atomic.max(result, 0, values[i])

#### RUNNING THE KERNEL ####
arr = np.random.rand(1024*1024).astype(np.float32)  # Example array
d_arr = cuda.to_device(arr)  # Copy to device
result = np.zeros(1, dtype=np.float64)
d_result = cuda.to_device(result)  # Copy to device
find_max[1024,1024](d_result, d_arr)
print(d_result[0]) # Found using cuda.atomic.max
print(max(arr))  # Print max(arr) for comparison (should be equal!)

Write an Accelerated Histogramming Kernel:

For this assessment, you will create an accelerated histogramming kernel. This will take an array of input data, a range, and a number of bins, and count how many of the input data elements land in each bin

In [None]:
def cpu_histogram(x, xmin, xmax, histogram_out):
	'''Increment bin counts in histogram_out, given histogram range [xmin, xmax).'''
	# Note that we don't have to pass in nbins explicitly, because the size of histogram_out determines it
	nbins = histogram_out.shape[0]
	bin_width = (xmax - xmin) / nbins

	# This is a very slow way to do this with NumPy, but looks similar to what you will do on the GPU
	for element in x:
		bin_number = np.int32((element - xmin)/bin_width)
		if bin_number >= 0 and bin_number < histogram_out.shape[0]:
			# only increment if in range
			histogram_out[bin_number] += 1

In [None]:
x = np.random.normal(size=10000, loc=0, scale=1).astype(np.float32)
xmin = np.float32(-4.0)
xmax = np.float32(4.0)
histogram_out = np.zeros(shape=10, dtype=np.int32)

cpu_histogram(x, xmin, xmax, histogram_out)

histogram_out

↩ TODO: Convolution 2D...

In [None]:
@cuda.jit
def cuda_histogram(x, xmin, xmax, histogram_out):

	# TODO

In [None]:
d_x = cuda.to_device(x)
d_histogram_out = cuda.to_device(np.zeros(shape=10, dtype=np.int32))

blocks = 128
threads_per_block = 64
start = time.time()
cuda_histogram[blocks, threads_per_block](d_x, xmin, xmax, d_histogram_out)

histogram_out = d_histogram_out.copy_to_host()
print(histogram_out)  # Print histogram
print(histogram_out.sum())  # Print sum of histogram
print(histogram_out.sum() == x.shape[0])  # Check if sum of histogram equals number of elements in x

In [None]:
# Define host array
threads_per_block = 256
blocks_per_grid = 32 * 40
a = np.ones(10_000_000, dtype=np.float32)
print(f"Old sum: {a.sum():.2f}")

# 🐍 Streams

In [None]:
# Numba CUDA Stream Semantics
@cuda.jit
def partial_reduce(array, partial_reduction):
    i_start = cuda.grid(1)
    threads_per_grid = cuda.blockDim.x * cuda.gridDim.x
    s_thread = 0.0
    for i_arr in range(i_start, array.size, threads_per_grid):
        s_thread += array[i_arr]

    s_block = cuda.shared.array((threads_per_block,), numba.float32)
    tid = cuda.threadIdx.x
    s_block[tid] = s_thread
    cuda.syncthreads()

    i = cuda.blockDim.x // 2
    while (i > 0):
        if (tid < i):
            s_block[tid] += s_block[tid + i]
        cuda.syncthreads()
        i //= 2

    if tid == 0:
        partial_reduction[cuda.blockIdx.x] = s_block[0]

@cuda.jit
def single_thread_sum(partial_reduction, sum):
	sum[0] = 0.0
	for element in partial_reduction:
		sum[0] += element

@cuda.jit
def divide_by(array, val_array):
	i_start = cuda.grid(1)
	threads_per_grid = cuda.gridsize(1)
	for i in range(i_start, array.size, threads_per_grid):
		array[i] /= val_array[0]

# Pin memory
with cuda.pinned(a):
	# Create a CUDA stream
	stream = cuda.stream()

	# Array copy to device and creation in the device
	dev_a = cuda.to_device(a, stream=stream)
	dev_a_reduce = cuda.device_array((blocks_per_grid,), dtype=dev_a.dtype, stream=stream)
	dev_a_sum = cuda.device_array((1,), dtype=dev_a.dtype, stream=stream)

	# configuration, and it comes after the block dimension (`threads_per_block`)
	partial_reduce[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_reduce)
	single_thread_sum[1, 1, stream](dev_a_reduce, dev_a_sum)
	divide_by[blocks_per_grid, threads_per_block, stream](dev_a, dev_a_sum)

	# Array copy to host: like the copy to device, when a stream is passed, the copy
	# is asynchronous. Note: the printed output will probably be nonsensical since
	# the write has not been synchronized yet.
	dev_a.copy_to_host(a, stream=stream)

# Whenever we want to ensure that all operations in a stream are finished from
# the point of view of the host, we call:
stream.synchronize()

# After that call, we can be sure that `a` has been overwritten with its normalized version
print(f"New sum: {a.sum():.2f}")

↩ TODO: Convolution 2D...

In [None]:
# Multiple streams
from time import perf_counter

N_streams = 10

# Create base arrays
arrays = [i * np.ones(10_000_000, dtype=np.float32) for i in range(1, N_streams + 1)]

for i, arr in enumerate(arrays):
	print(f"sum array {i}: {arr.sum():12.2f}")

 # TODO

# 🐍 Pseudo-random generator

In [None]:
from numba.cuda.random import create_xoroshiro128p_states, xoroshiro128p_uniform_float32

@cuda.jit
def compute_pi(rng_states, iterations, out):
	"""Find the maximum value in values and store in result[0]"""
	tid = cuda.grid(1)

	# Compute pi by drawing random (x, y) points and finding what
	# fraction lie inside a unit circle
	inside = 0
	for i in range(iterations):
		x = xoroshiro128p_uniform_float32(rng_states, tid)
		y = xoroshiro128p_uniform_float32(rng_states, tid)
		if x**2 + y**2 <= 1.0:
			inside += 1

	out[tid] = 4.0 * inside / iterations

In [None]:
# params
n_iter = 100000     # Number of iterations per thread
threads_per_block = 1024
blocks = 1024

# Create random number generator states
rng_states = create_xoroshiro128p_states(threads_per_block * blocks, seed=1)
out = np.zeros(threads_per_block * blocks, dtype=np.float32)
d_output = cuda.to_device(out)
d_rng_states = cuda.to_device(rng_states)

# Launch the kernel
compute_pi[blocks, threads_per_block](d_rng_states, n_iter, d_output)

# Copy the result back to host
out = d_output.copy_to_host()
print('pi:', out.mean())
print('error:', np.abs(np.pi-out.mean()))

↩ TODO: Convolution 2D...

In [None]:
import math
from numba import cuda

@cuda.jit
def Gauss_GPU(rng_states, iterations, out, a, b):

	# TODO


In [None]:

n_iter = 100000     # Number of iterations per thread
threads_per_block = 1024
blocks = 1024
a = -1
b = 2

# Create random number generator states
rng_states = create_xoroshiro128p_states(threads_per_block * blocks, seed=1)
out = np.zeros(threads_per_block * blocks, dtype=np.float32)
d_output = cuda.to_device(out)
d_rng_states = cuda.to_device(rng_states)

# Launch the kernel
Gauss_GPU[blocks, threads_per_block](d_rng_states, n_iter, d_output, a, b)

# Copy the result back to host
out = d_output.copy_to_host()
print('prob:', out.mean())


# 🐍 Mandelbrot

In [None]:
import math

@cuda.jit
def mandelbrot_gpu(mat, maxiter=100, xmin=-2.6, xmax=1.85, ymin=-1.25, ymax=1.25):
	x = cuda.blockIdx.x
	y = cuda.threadIdx.x

	# Mapping pixel to C
	creal = xmin + x / mat.shape[0] * (xmax - xmin)
	cim = ymin + y / mat.shape[1] * (ymax - ymin)

	# Initialisation of C and Z
	c = complex(creal, cim)
	z = complex(0, 0)

	# Mandelbrot iteration
	for n in range(maxiter):
		z = z*z+c
		# If unbounded: save iteration count and break
		if z.real*z.real + z.imag*z.imag > 4.0:
			# Smooth iteration count
			mat[x,y] = n + 1 - math.log(math.log(abs(z*z+c)))/math.log(2)
			break
		# Otherwise: leave it to 0

In [None]:
# Parameters
xmin, xmax = -2.6, 1.85
ymin, ymax = -1.25, 1.25
xpixels = 512
ypixels = round(xpixels / (xmax-xmin) * (ymax-ymin))

maxiter = 100
mat = np.zeros((xpixels, ypixels))
# Allocate device memory
d_mat = cuda.to_device(mat)

# Running and plotting result
mandelbrot_gpu[xpixels, ypixels](d_mat, maxiter, xmin, xmax, ymin, ymax)
# Copy the result back to host
mat = d_mat.copy_to_host()
print(mat.shape)


In [None]:
import matplotlib.pyplot as plt
import matplotlib as cm

def draw_image(mat, cmap='inferno', powern=0.5, dpi=72):
  ## Value normalization
  # Apply power normalization, because number of iteration is
  # distributed according to a power law (fewer pixels have
  # higher iteration number)
  mat = np.power(mat, powern)

  # Colormap: set the color the black for values under vmin (inner points of
  # the set), vmin will be set in the imshow function
  new_cmap = cm.colormaps[cmap]
  new_cmap.set_under('black')

  ## Plotting image

  # Figure size
  plt.figure(figsize=(mat.shape[0]/dpi, mat.shape[1]/dpi))

  # Plotting mat with cmap
  # vmin=1 because smooth iteration count is always > 1
  # We need to transpose mat because images use row-major
  # ordering (C convention)
  # origin='lower' because mat[0,0] is the lower left pixel
  plt.imshow(mat.T, cmap=new_cmap, vmin=1, origin = 'lower')

  # Remove axis and margins
  plt.subplots_adjust(left=0, right=1, bottom=0, top=1)
  plt.axis('off')


draw_image(mat)
