---
# **LAB 5 - CUDA Streams**
---

# ‚ñ∂Ô∏è CUDA tools...

In [None]:
!nvidia-smi

In [None]:
import numpy as np
import numba
from numba import cuda
import warnings
warnings.filterwarnings("ignore")

print(np.__version__)
print(numba.__version__)

cuda.detect()



In [None]:
# Suppress Numba deprecation and performance warnings
from numba.core.errors import NumbaDeprecationWarning, NumbaPerformanceWarning
import warnings

warnings.simplefilter('ignore', category=NumbaDeprecationWarning)
warnings.simplefilter('ignore', category=NumbaPerformanceWarning)

Utils for compiling and running Numba CUDA code.

In [None]:
from numba import cuda

def mem_snapshot(print=True):
    free, total = cuda.current_context().get_memory_info()
    used = total - free
    if print:
        # print GPU memory info
        print("\nMemory occupancy:")
        print(f"    GPU total: {total/1024**3:.3f} GB")
        print(f"    GPU free : {free/1024**3:.3f} GB")
        print(f"    GPU used : {used/1024**3:.3f} GB")
    else:
        return used, free, total

# Quick device spec report (Numba)
def device_info(show=True):
    dev = cuda.get_current_device()   # raises if no CUDA device
    _, total = cuda.current_context().get_memory_info() 
    
    if show:
        print("Device object repr:", dev)
        print("Device name:          ", getattr(dev, "name", "<unknown>"))
        print("Compute capability:   ", getattr(dev, "compute_capability", "<unknown>"))

    # Common numeric properties (use getattr to avoid attribute errors)
    props = {
        "  multi_processor_(SM)_count": ["MULTIPROCESSOR_COUNT"],
        "  max_threads_per_block": ["MAX_THREADS_PER_BLOCK"],
        "  max_block_dim_x":       ["MAX_BLOCK_DIM_X"],
        "  max_block_dim_y":       ["MAX_BLOCK_DIM_Y"],
        "  max_block_dim_z":       ["MAX_BLOCK_DIM_Z"],
        "  max_grid_dim_x":        ["MAX_GRID_DIM_X"],
        "  max_grid_dim_y":        ["MAX_GRID_DIM_Y"],
        "  max_grid_dim_z":        ["MAX_GRID_DIM_Z"],
        "  max_shared_memory_per_block (bytes)": ["MAX_SHARED_MEMORY_PER_BLOCK"],
        "  max_shared_memory_per_SM (bytes)": ["MAX_SHARED_MEMORY_PER_MULTIPROCESSOR"],
        "  warp_size":             ["WARP_SIZE"],
        "  compute_capability":    ["COMPUTE_CAPABILITY", "cc"],
    }
    
    feats = {}
    label = "  total_memory (bytes)"
    feats[label] = total
    if show:
        print(f"{label:40}: {total}")
    for label, keys in props.items():
        val = None
        for k in keys:
            val = getattr(dev, k, None)
            feats[k] = val
            if val is not None:
                break
        if show:
            print(f"{label:40}: {val}")
    return feats

_ = device_info()

# ‚úÖ Matrix multiplication pinned memory only

In [None]:
import numpy as np
from numba import cuda,  float32
import time

# Controls threads per block and shared memory usmemAge.
# The computation will be done on blocks of TILExTILE elements
# TILE should not be larger than 32 in this example

@cuda.jit
def matMul_SMEM(A, B, C):
    """
    Perform matrix multiplication of C = A * B using CUDA shared memory for improved performance. 
    Params:
        A : 2D array
            Input matrix A
        B : 2D array
            Input matrix B
        C : 2D array
            Output matrix C
    """
    # Shared memory tiles
    smemA = cuda.shared.array((TILE, TILE), dtype=float32)
    smemB = cuda.shared.array((TILE, TILE), dtype=float32)

    row, col = cuda.grid(2)   # (y, x) global indices
    ty = cuda.threadIdx.y
    tx = cuda.threadIdx.x

    # Dimensions
    M = A.shape[0]
    P = A.shape[1]
    N = B.shape[1]

    if row >= M or col >= N:
        return

    acc = float32(0.0)

    n_tiles = (P + TILE - 1) // TILE
    for t in range(n_tiles):
        kA = t * TILE + tx      # column index in A
        kB = t * TILE + ty      # row index in B

        # Load A tile
        if kA < P:
            smemA[ty, tx] = A[row, kA]
        else:
            smemA[ty, tx] = 0.0

        # Load B tile
        if kB < P:
            smemB[ty, tx] = B[kB, col]
        else:
            smemB[ty, tx] = 0.0

        # Synchronize to make sure the tiles are loaded
        cuda.syncthreads()

        # Multiply the two tiles
        for j in range(TILE):
            acc += smemA[ty, j] * smemB[j, tx]

        # Synchronize before loading the next tile
        cuda.syncthreads()

    C[row, col] = acc

        
#  Matrix sizes:
#     A: (N x P) float32
#     B: (P x M) float32
#     C: (N x M) float32

TILE = 16  # Threads per block
N = TILE *1000  # Number of rows
M = TILE *1000  # Number of columns
P = TILE *2000  # Inner dimension
print(f"Allocating host arrays for A({N}x{P}), B({P}x{M}), C({N}x{M})...")
A = np.ones((N,P), dtype=np.float32)        # A matrix 
B = 2* np.ones((P,M), dtype=np.float32)     # B matrix 
C = np.zeros((N, M), dtype=np.float32)      # Output matrix

# GPU setup
t0 = time.perf_counter()
threads = (TILE, TILE)
#                  cols                                     rows    
blocks = ((M + (threads[0] - 1)) // threads[0], (N + (threads[1] - 1)) // threads[1])
d_A = cuda.to_device(A)
d_B = cuda.to_device(B)
d_C = cuda.device_array((N, M), dtype=A.dtype)

# launch kernels and time
matMul_SMEM[blocks, threads](d_A, d_B, d_C)
cuda.synchronize()

# Final reduction on CPU
C = d_C.copy_to_host()  # Final reduction in CPU

t1 = time.perf_counter()
print(f"Kernel blockParReduceSMEM execution time: {t1 - t0:.4f} seconds")

## ‚ÜòÔ∏è TODO...

**Tiled Matrix Multiplication on GPU**

Given three matrices:
-	$A \in \mathbb{R}^{N \times P}$
-	$B \in \mathbb{R}^{P \times M}$
-	$C \in \mathbb{R}^{N \times M}$

stored in float32, compute:
$$
C = A \cdot B
$$
using CUDA with shared memory tiling


Tasks
1.	Kernel implementation: Implement a CUDA kernel using shared memory tiles of size TILE √ó TILE that computes matrix multiplication.
2.	Host setup
    -	Allocate pinned host memory for A, B, and C.
    -	Copy A and B to the device.
    -	Allocate C on the device.
3.	Kernel launch
    -   Use a 2D grid and 2D blocks of size (TILE, TILE).
    -	Ensure correct bounds checking for non-multiple dimensions.
4.	Timing
    - Measure kernel execution time.
-   Optionally include host‚Äìdevice transfer time.
5.	Validation
    - For $A = 1$ and $B = 2$, verify that $C_{ij} \approx 2P$
for all valid indices



- Experiments
	-	Vary TILE = 8, 16, 32.
	-	Measure execution time and discuss performance differences.
	-	Identify the tile size that gives the best performance on your GPU.


# ‚úÖ Tabular a function with streams

## ‚ÜòÔ∏è TODO...


Given a 1D array $x ‚àà R‚Åø$ (float32), uniformly sampled in $[0, 2\pi]$

Compute
$$
y_i = \sqrt{|\sin^2(x_i) - \cos^2(x_i)|}, \quad i = 1,\dots,n
$$


1Ô∏è‚É£ Create the domain (host)

```
import numpy as np, math
n = 64 * 1024 * 1024
x_host = np.linspace(0, 2*math.pi, n, dtype=np.float32)
```



2Ô∏è‚É£ Allocate pinned host memory
```
from numba import cuda
x_pinned = cuda.pinned_array(n, np.float32)
y_pinned = cuda.pinned_array(n, np.float32)
x_pinned[:] = x_host
```

3Ô∏è‚É£ CUDA kernel
```
@cuda.jit
def tabular_xy(d_x, d_y, n):
    i = cuda.grid(1)
    if i < n:
        s = math.sin(d_x[i])
        c = math.cos(d_x[i])
        d_y[i] = math.sqrt(abs(s*s - c*c))
```

4Ô∏è‚É£ Baseline (single stream)
```
d_x = cuda.to_device(x_pinned)
d_y = cuda.device_array(n, np.float32)

threads = 256
blocks = (n + threads - 1) // threads

tabular_xy[blocks, threads](d_x, d_y, n)
cuda.synchronize()
y_seq = d_y.copy_to_host()
```

**Measure total time and validate results.**


5Ô∏è‚É£ Multi-stream execution


**V1 ‚Äì per-stream pipeline**
1.	Create streams
2.	For each chunk:
    -	Copy chunk to device (H2D)
    -	Launch kernel
    -	Copy chunk back to host (D2H)

**V2 ‚Äì staged pipeline**
1.	Copy all chunks (H2D)
2.	Launch all kernels
3.	Copy all chunks back (D2H)



6Ô∏è‚É£ Experiments
-	Vary n_streams = 1,2,4,8
-	Vary threads = 128,256,512

# ‚úÖ 1D convolution using pinned memory + streams

## ‚ÜòÔ∏è TODO...

<br> üîπ What to use

- A shared-memory kernel: conv1d_shared(out, x, mask, n)

- A mask constructor: moving_avg_mask(radius)

- A multi-stream host pipeline: conv1d_streams(h_x, radius, chunk, nstreams)

<br> üîπ Setup Constants

TPB = 256
MAX_RADIUS = 512
MAX_MASK = 2 * MAX_RADIUS + 1
TILE = TPB + MAX_MASK - 1

Constraints:

radius <= MAX_RADIUS

radius < TPB

<br> üîπ Step 1 ‚Äî Create the Mask (Moving Average)

def moving_avg_mask(radius: int):
    # size = 2*radius + 1
    # return np.full(size, 1.0/size, np.float32)

<br> üîπ Step 5 ‚Äî Chunking Strategy

Split the output into center chunks of size chunk, each extended by radius on both sides.

<br> üîπ Step 6 ‚Äî Extended Chunk Bounds

Compute ext_start, ext_end, ext_len, and halo_left correctly.

<br> üîπ Step 7 ‚Äî Create Streams

streams = [cuda.stream() for _ in range(nstreams)]

<br> üîπ Step 8 ‚Äî Allocate Pinned Host Buffers

pin_in  = [cuda.pinned_array(ext_max, np.float32) for _ in range(nstreams)]
pin_out = [cuda.pinned_array(chunk,   np.float32) for _ in range(nstreams)]

<br> üîπ Step 9 ‚Äî Allocate Device Buffers

d_in  = [cuda.device_array(ext_max, np.float32) for _ in range(nstreams)]
d_out = [cuda.device_array(ext_max, np.float32) for _ in range(nstreams)]

<br> üîπ Step 10 ‚Äî Stream Pipeline Loop

Schedule async H2D ‚Üí kernel ‚Üí async D2H on each stream.

<br> üîπ Step 11 ‚Äî Synchronize-on-Reuse

Before reusing a stream, synchronize and commit its pinned output to the final array.

<br> üîπ Step 12 ‚Äî Drain Remaining Streams

Synchronize remaining streams and copy results back.

<br> üîπ Step 13 ‚Äî Correctness Check

Compare GPU output with a CPU reference on a small test.

# ‚úÖ Transfer test

In [None]:
# transfer_benchmark.py
import numpy as np
from numba import cuda
import time

# --------------------
# Parameters (adjustable)
# --------------------
TILE = 16
N = TILE * 1000   # rows of A / C
P = TILE * 2000   # inner dim
M = TILE * 1000   # cols of B / C

dtype = np.float32

# size in bytes for reporting
def bytes_of(arr):
    return arr.nbytes

# --------------------
# Helper measurement functions
# --------------------
def time_h2d(host_arr, use_stream=False):
    """
    Measure host -> device transfer time.
    If use_stream True, perform an async to_device on a stream and synchronize the stream.
    Returns seconds and device array (returned device array to be used later for D2H test).
    """
    if use_stream:
        s = cuda.stream()
        t0 = time.perf_counter()
        d = cuda.to_device(host_arr, stream=s)    # async when host_arr is pinned
        s.synchronize()                            # ensure transfer complete for measurement
        t1 = time.perf_counter()
    else:
        t0 = time.perf_counter()
        d = cuda.to_device(host_arr)               # synchronous copy
        t1 = time.perf_counter()
    return (t1 - t0), d

def time_d2h(device_arr, host_out, use_stream=False):
    """
    Measure device -> host transfer time.
    host_out must be a host array with correct shape/dtype (pinned if desired).
    If use_stream True, perform async copy on a stream and synchronize the stream.
    Returns seconds.
    """
    if use_stream:
        s = cuda.stream()
        t0 = time.perf_counter()
        device_arr.copy_to_host(host_out, stream=s)   # async if host_out pinned
        s.synchronize()
        t1 = time.perf_counter()
    else:
        t0 = time.perf_counter()
        host_tmp = device_arr.copy_to_host()           # synchronous copy (returns new array)
        # copy into host_out to keep consistent memory object (timing mainly device->host)
        host_out[:] = host_tmp
        t1 = time.perf_counter()
    return (t1 - t0)

def print_bytes(b):
    kb = b / 1024.0
    mb = kb / 1024.0
    return f"{b} B ({kb:.1f} KB / {mb:.3f} MB)"

# --------------------
# Test scenario runner
# --------------------
def run_transfer_tests(shape, dtype=np.float32):
    n_elems = shape[0]*shape[1]
    print(f"\nArray shape: {shape}, elements: {n_elems:,}, dtype: {dtype}")

    # 1) Unpinned host array
    host_unpinned = np.ones(shape, dtype=dtype)

    # 2) Pinned host array (allocate pinned and write into it in-place)
    host_pinned = cuda.pinned_array(shape, dtype=dtype)
    host_pinned[:] = 2.0  # fill pinned array

    print("Memory sizes (host):")
    print("  unpinned:", print_bytes(bytes_of(host_unpinned)))
    print("  pinned  :", print_bytes(bytes_of(host_pinned)))

    results = {}

    # Warm-up JIT + tiny transfers to avoid measuring first-call overheads
    _ = cuda.to_device(np.ones((4,4), dtype=dtype))
    cuda.synchronize()

    # ---------- H2D (synchronous) ----------
    t_up_unpinned, d_unpinned = time_h2d(host_unpinned, use_stream=False)
    t_up_pinned, d_pinned = time_h2d(host_pinned, use_stream=False)

    results['H2D_sync_unpinned'] = t_up_unpinned
    results['H2D_sync_pinned'] = t_up_pinned
    print(f"\nH2D synchronous: unpinned = {t_up_unpinned:.6f} s, pinned = {t_up_pinned:.6f} s, speedup = {t_up_unpinned / t_up_pinned if t_up_pinned>0 else float('inf'):.2f}x")

    # ---------- D2H (synchronous) ----------
    # Prepare host output arrays of same dtype/shape (unpinned and pinned)
    host_out_unpinned = np.empty(shape, dtype=dtype)
    host_out_pinned = cuda.pinned_array(shape, dtype=dtype)

    # Copy device arrays we created earlier back
    t_down_unpinned = time_d2h(d_unpinned, host_out_unpinned, use_stream=False)
    t_down_pinned = time_d2h(d_pinned, host_out_pinned, use_stream=False)

    results['D2H_sync_unpinned'] = t_down_unpinned
    results['D2H_sync_pinned'] = t_down_pinned
    print(f"\nD2H synchronous: unpinned = {t_down_unpinned:.6f} s, pinned = {t_down_pinned:.6f} s, speedup = {t_down_unpinned / t_down_pinned if t_down_pinned>0 else float('inf'):.2f}x")

    # ---------- H2D (async on stream) ----------
    t_up_unpinned_s, d_unpinned_s = time_h2d(host_unpinned, use_stream=True)
    t_up_pinned_s, d_pinned_s = time_h2d(host_pinned, use_stream=True)

    results['H2D_async_unpinned'] = t_up_unpinned_s
    results['H2D_async_pinned'] = t_up_pinned_s
    print(f"\nH2D async (stream): unpinned = {t_up_unpinned_s:.6f} s, pinned = {t_up_pinned_s:.6f} s")

    # ---------- D2H (async on stream) ----------
    # allocate host_out buffers for async D2H
    host_out_unpinned2 = np.empty(shape, dtype=dtype)
    host_out_pinned2 = cuda.pinned_array(shape, dtype=dtype)

    t_down_unpinned_s = time_d2h(d_unpinned_s, host_out_unpinned2, use_stream=True)
    t_down_pinned_s = time_d2h(d_pinned_s, host_out_pinned2, use_stream=True)

    results['D2H_async_unpinned'] = t_down_unpinned_s
    results['D2H_async_pinned'] = t_down_pinned_s
    print(f"\nD2H async (stream): unpinned = {t_down_unpinned_s:.6f} s, pinned = {t_down_pinned_s:.6f} s")

    # ---------- Overlap test (H2D + kernel + D2H) ----------
    # A small kernel that just touches memory (to simulate compute)
    @cuda.jit
    def touch_kernel(x, y):
        i, j = cuda.grid(2)
        if i < x.shape[0] and j < x.shape[1]:
            y[i, j] = x[i, j] * 2.0

    # Use pinned buffers + streams to try to overlap copy<->compute
    s = cuda.stream()
    host_src = cuda.pinned_array(shape, dtype=dtype)
    host_src[:] = 1.0
    host_dst = cuda.pinned_array(shape, dtype=dtype)

    # allocate device arrays reusing device array shape
    d_tmp = cuda.device_array(shape, dtype=dtype)
    d_out = cuda.device_array(shape, dtype=dtype)

    # launch: async H2D, then kernel on stream, then async D2H; measure total wall time
    t0 = time.perf_counter()
    cuda.to_device(host_src, stream=s)           # async H2D into new device array
    # kernel (on same stream) - determine blocks/threads
    threads = (16, 16)
    blocks = ((shape[0] + threads[0] - 1) // threads[0],
              (shape[1] + threads[1] - 1) // threads[1])
    touch_kernel[blocks, threads, s](d_tmp, d_out)
    d_out.copy_to_host(host_dst, stream=s)      # async D2H
    s.synchronize()
    t1 = time.perf_counter()
    overlap_time = t1 - t0
    print(f"\nPinned overlap scenario (H2D async + kernel + D2H async) wall time: {overlap_time:.6f} s")

    # same sequence but **without** pinned host arrays -> likely no overlap
    s2 = cuda.stream()
    host_src2 = np.ones(shape, dtype=dtype)     # unpinned
    host_dst2 = np.empty(shape, dtype=dtype)
    t0 = time.perf_counter()
    cuda.to_device(host_src2, stream=s2)        # generally synchronous or staged -> blocks
    touch_kernel[blocks, threads, s2](d_tmp, d_out)
    d_out.copy_to_host(host_dst2, stream=s2)
    s2.synchronize()
    t1 = time.perf_counter()
    no_overlap_time = t1 - t0
    print(f"Unpinned overlap scenario wall time: {no_overlap_time:.6f} s")

    results['overlap_pinned'] = overlap_time
    results['overlap_unpinned'] = no_overlap_time

    return results

# --------------------
# Run tests for the matrices described
# --------------------
if __name__ == "__main__":
    print("TRANSFER BENCHMARK (H2D / D2H) ‚Äî pinned vs unpinned\n")
    shape_A = (N, P)
    shape_B = (P, M)
    # We test with one large array shape; you can test both A and B separately
    print("Testing transfer behavior for array A (N x P):")
    resA = run_transfer_tests(shape_A, dtype)
    print("\nTesting transfer behavior for array B (P x M):")
    resB = run_transfer_tests(shape_B, dtype)

    # Nice summary
    print("\nSUMMARY (A):")
    for k, v in resA.items():
        print(f"  {k:25s}: {v:.6f} s")
    print("\nSUMMARY (B):")
    for k, v in resB.items():
        print(f"  {k:25s}: {v:.6f} s")