---
# **LAB 7 - CUDA Streams**
---

# ‚ñ∂Ô∏è CUDA setup

In [None]:
!nvcc --version

In [None]:
!nvidia-smi

## [GPU Compute Capability](https://developer.nvidia.com/cuda-gpus)

## NVCC Plugin for Jupyter notebook

*Usage*:


*   Load Extension `%load_ext nvcc_plugin`
*   Mark a cell to be treated as cuda cell
`%%cuda --name example.cu --compile false`

**NOTE**: The cell must contain either code or comments to be run successfully. It accepts 2 arguments. `-n | --name` - which is the name of either CUDA source or Header. The name parameter must have extension `.cu` or `.h`. Second argument -c | --compile; default value is false. The argument is a flag to specify if the cell will be compiled and run right away or not. It might be usefull if you're playing in the main function

*  We are ready to run CUDA C/C++ code right in your Notebook. For this we need explicitly say to the interpreter, that we want to use the extension by adding `%%cu` at the beginning of each cell with CUDA code. 




In [None]:
!pip install git+https://github.com/andreinechaev/nvcc4jupyter.git

In [None]:
%load_ext nvcc_plugin

In [None]:
# plugin for cpp sintax highlighting 

!wget -O cpp_plugin.py https://gist.github.com/akshaykhadse/7acc91dd41f52944c6150754e5530c4b/raw/cpp_plugin.py
%load_ext cpp_plugin

Clone GPUcomputing site on github...

In [None]:
!git clone https://github.com/giulianogrossi/GPUcomputing.git

# ‚ñ∂Ô∏è VS Code on Colab

In [None]:
#@title Colab-ssh tunnel
#@markdown Execute this cell to open the ssh tunnel. Check [colab-ssh documentation](https://github.com/WassimBenzarti/colab-ssh) for more details.

# Install colab_ssh on google colab
!pip install colab_ssh --upgrade --quiet

from colab_ssh import launch_ssh_cloudflared, init_git_cloudflared
ssh_tunnel_password = "gpu" #@param {type: "string"}
launch_ssh_cloudflared(password=ssh_tunnel_password)

# Optional: if you want to clone a Github or Gitlab repository
repository_url="https://github.com/giulianogrossi/GPUcomputing" #@param {type: "string"}
init_git_cloudflared(repository_url)

Define some paths...

In [None]:
# path setup
!mkdir -p /content/GPUcomputing/lab2
%cd /content/GPUcomputing/lab2
!mkdir -p src


# ‚ñ∂Ô∏è DeviceQuery

In [None]:
# DeviceQuery dell'attuale device (su Colab!)
!nvcc /content/GPUcomputing/utils/deviceQuery.cu -o deviceQuery
!./deviceQuery

Check whether the device can transfer in both directions simultaneously

In [None]:
%%cu
#include <stdio.h>

int main(void) {

  cudaDeviceProp dProp;
	cudaGetDeviceProperties(&dProp, 0);

  // Shows whether the device can transfer in both directions simultaneously
  printf("Device %s capable of simultaneous CPU-to-GPU and GPU-to-CPU datatransfers\n", dProp.deviceOverlap ? "IS": "NOT");
  return 0;
}

# ‚úÖ Somma array con stream

This example demonstrates overlapping computation and communication by
partitioning a data set and asynchronously launching the memory copies and kernels for each subset. Launching all transfers and kernels for a given subset in the same CUDA stream ensures that computation on the device is not started until the necessary data has been transferred. However, because the work of each subset is independent of all other subsets, the communication and computation of different subsets will overlap.

This example launches copies and kernels in breadth-first order.

In [None]:
%%cuda --name sumArrayStream.cu

#include "../GPUcomputing/utils/common.h"

#define NSTREAM 4
#define BDIM 128

void initialData(float *ip, int size) {
  int i;

  for(i = 0; i < size; i++)
    ip[i] = (float)(rand() & 0xFF) / 10.0f;
}

void sumArraysOnHost(float *A, float *B, float *C, const int N) {
  for (int idx = 0; idx < N; idx++)
    C[idx] = A[idx] + B[idx];
}

__global__ void sumArrays(float *A, float *B, float *C, const int N) {
  int idx = blockIdx.x * blockDim.x + threadIdx.x;

  if (idx < N)
      C[idx] = A[idx] + B[idx];
}

void checkResult(float *hostRef, float *gpuRef, const int N) {
  double epsilon = 1.0E-8;
  bool match = 1;

  for (int i = 0; i < N; i++) {
    if (abs(hostRef[i] - gpuRef[i]) > epsilon) {
      match = 0;
      printf("Arrays do not match!\n");
      printf("host %5.2f gpu %5.2f at %d\n", hostRef[i], gpuRef[i], i);
      break;
    }
  }
  if (match) 
    printf("Arrays match.\n\n");
}

//# MAIN
int main(int argc, char **argv) {
  printf("> %s Starting...\n", argv[0]);

  int dev = 0;
  cudaDeviceProp deviceProp;
  CHECK(cudaGetDeviceProperties(&deviceProp, dev));
  printf("> Using Device %d: %s\n", dev, deviceProp.name);
  CHECK(cudaSetDevice(dev));

  //# check if device support hyper-q
  if (deviceProp.major < 3 || (deviceProp.major == 3 && deviceProp.minor < 5)) {
    if (deviceProp.concurrentKernels == 0) {
      printf("> GPU does not support concurrent kernel execution (SM 3.5 or higher required)\n");
      printf("> CUDA kernel runs will be serialized\n");
    }
    else {
      printf("> GPU does not support HyperQ\n");
      printf("> CUDA kernel runs will have limited concurrency\n");
    }
  }

  // Shows whether the device can transfer in both directions simultaneously
  printf("> Device %s capable of simultaneous CPU-to-GPU datatransfers\n", deviceProp.deviceOverlap ? "IS": "NOT");

  printf("> Compute Capability %d.%d hardware with %d multi-processors\n",
          deviceProp.major, deviceProp.minor, deviceProp.multiProcessorCount);

  printf ("> with streams = %d\n", NSTREAM);

  // set up data size of vectors
  int nElem = 1 << 18;
  printf("> vector size = %d\n", nElem);
  size_t nBytes = nElem * sizeof(float);

  // malloc pinned host memory for async memcpy
  float *h_A, *h_B, *hostRef, *gpuRef;
  CHECK(cudaHostAlloc((void**)&h_A, nBytes, cudaHostAllocDefault));
  CHECK(cudaHostAlloc((void**)&h_B, nBytes, cudaHostAllocDefault));
  CHECK(cudaHostAlloc((void**)&gpuRef, nBytes, cudaHostAllocDefault));
  CHECK(cudaHostAlloc((void**)&hostRef, nBytes, cudaHostAllocDefault));

  // initialize data at host side
  initialData(h_A, nElem);
  initialData(h_B, nElem);
  memset(hostRef, 0, nBytes);
  memset(gpuRef,  0, nBytes);

  // add vector at host side for result checks
  sumArraysOnHost(h_A, h_B, hostRef, nElem);

  // malloc device global memory
  float *d_A, *d_B, *d_C;
  CHECK(cudaMalloc((float**)&d_A, nBytes));
  CHECK(cudaMalloc((float**)&d_B, nBytes));
  CHECK(cudaMalloc((float**)&d_C, nBytes));

  cudaEvent_t start, stop;
  CHECK(cudaEventCreate(&start));
  CHECK(cudaEventCreate(&stop));

  // invoke kernel at host side
  dim3 block (BDIM);
  dim3 grid  ((nElem + block.x - 1) / block.x);
  printf("> grid (%d, %d) block (%d, %d)\n", grid.x, grid.y, block.x, block.y);

  // sequential operation
  CHECK(cudaEventRecord(start, 0));
  CHECK(cudaMemcpy(d_A, h_A, nBytes, cudaMemcpyHostToDevice));
  CHECK(cudaMemcpy(d_B, h_B, nBytes, cudaMemcpyHostToDevice));
  CHECK(cudaEventRecord(stop, 0));
  CHECK(cudaEventSynchronize(stop));
  float memcpy_h2d_time;
  CHECK(cudaEventElapsedTime(&memcpy_h2d_time, start, stop));

  CHECK(cudaEventRecord(start, 0));
  sumArrays<<<grid, block>>>(d_A, d_B, d_C, nElem);
  CHECK(cudaEventRecord(stop, 0));
  CHECK(cudaEventSynchronize(stop));
  float kernel_time;
  CHECK(cudaEventElapsedTime(&kernel_time, start, stop));

  CHECK(cudaEventRecord(start, 0));
  CHECK(cudaMemcpy(gpuRef, d_C, nBytes, cudaMemcpyDeviceToHost));
  CHECK(cudaEventRecord(stop, 0));
  CHECK(cudaEventSynchronize(stop));
  float memcpy_d2h_time;
  CHECK(cudaEventElapsedTime(&memcpy_d2h_time, start, stop));
  float itotal = kernel_time + memcpy_h2d_time + memcpy_d2h_time;

  printf("\n");
  printf("Measured timings (throughput):\n");
  printf(" Memcpy host to device\t: %f ms (%f GB/s)\n", memcpy_h2d_time, (nBytes * 1e-6) / memcpy_h2d_time);
  printf(" Memcpy device to host\t: %f ms (%f GB/s)\n", memcpy_d2h_time, (nBytes * 1e-6) / memcpy_d2h_time);
  printf(" Kernel\t\t\t: %f ms (%f GB/s)\n", kernel_time, (nBytes * 2e-6) / kernel_time);
  printf(" Total\t\t\t: %f ms (%f GB/s)\n", itotal, (nBytes * 2e-6) / itotal);

  // grid parallel operation
  int iElem = nElem / NSTREAM;
  size_t iBytes = iElem * sizeof(float);
  grid.x = (iElem + block.x - 1) / block.x;

  cudaStream_t stream[NSTREAM];

  for (int i = 0; i < NSTREAM; ++i)
    CHECK(cudaStreamCreate(&stream[i]));

  CHECK(cudaEventRecord(start, 0));

  // initiate all asynchronous transfers to the device
  for (int i = 0; i < NSTREAM; ++i) {
    int ioffset = i * iElem;
    CHECK(cudaMemcpyAsync(&d_A[ioffset], &h_A[ioffset], iBytes, cudaMemcpyHostToDevice, stream[i]));
    CHECK(cudaMemcpyAsync(&d_B[ioffset], &h_B[ioffset], iBytes, cudaMemcpyHostToDevice, stream[i]));
  }

  // launch a kernel in each stream
  for (int i = 0; i < NSTREAM; ++i) {
    int ioffset = i * iElem;
    sumArrays<<<grid, block, 0, stream[i]>>>(&d_A[ioffset], &d_B[ioffset], &d_C[ioffset], iElem);
  }

  // enqueue asynchronous transfers from the device
  for (int i = 0; i < NSTREAM; ++i) {
    int ioffset = i * iElem;
    CHECK(cudaMemcpyAsync(&gpuRef[ioffset], &d_C[ioffset], iBytes, cudaMemcpyDeviceToHost, stream[i]));
  }

  CHECK(cudaEventRecord(stop, 0));
  CHECK(cudaEventSynchronize(stop));
  float execution_time;
  CHECK(cudaEventElapsedTime(&execution_time, start, stop));

  printf("\n");
  printf("Actual results from overlapped data transfers:\n");
  printf(" overlap with %d streams : %f ms (%f GB/s)\n", NSTREAM, execution_time, (nBytes * 2e-6) / execution_time );
  printf(" speedup                : %f \n", ((itotal - execution_time) * 100.0f) / itotal);

  // check kernel error
  CHECK(cudaGetLastError());

  // check device results
  checkResult(hostRef, gpuRef, nElem);

  // free device global memory
  CHECK(cudaFree(d_A));
  CHECK(cudaFree(d_B));
  CHECK(cudaFree(d_C));

  // free host memory
  CHECK(cudaFreeHost(h_A));
  CHECK(cudaFreeHost(h_B));
  CHECK(cudaFreeHost(hostRef));
  CHECK(cudaFreeHost(gpuRef));

  // destroy events
  CHECK(cudaEventDestroy(start));
  CHECK(cudaEventDestroy(stop));

  // destroy streams
  for (int i = 0; i < NSTREAM; ++i)
    CHECK(cudaStreamDestroy(stream[i]));

  CHECK(cudaDeviceReset());
  return(0);
}


In [None]:
# Compilazione ed esecuzione

!nvcc -arch=sm_75  src/sumArrayStream.cu  -o sumArray
!./sumArray

# ‚úÖ Tabular

# üî¥ TODO


1. Come modificare il kernel per usare gli stream
2. Gestione della memoria pinned e device

Applicare
3. Schema: loop over {copy, kernel, copy}
4. Schema: loop over {copy H2D}, loop over {kernel}, loop over {copy D2H}


In [None]:
%%cuda --name tabular.cu

#include <stdio.h>
#include "../GPUcomputing/utils/common.h"

#define PI 3.141592f

/*
 * Kernel: tabular function
 */
__global__ void tabular(float *a, int n) {
	int i = threadIdx.x + blockIdx.x * blockDim.x;
	if (i < n) {
		float x = PI * (float)i / (float)n;
		float s = sinf(x);
		float c = cosf(x);
		a[i] = sqrtf(abs(s * s - c * c));
	}
}

/*
 * Kernel: tabular function using streams
 */
__global__ void tabular_streams(float *a, int n, int offset) {
	int i = offset + threadIdx.x + blockIdx.x * blockDim.x;
  if (i < n) {
    float x = PI * (float)i / (float)n;
    float s = sinf(x);
    float c = cosf(x);
    a[i] = sqrtf(abs(s * s - c * c));
  }
}

/*
 * Error measure
 */
float maxError(float *a, int n) {
	float maxE = 0;
	for (int i = 0; i < n; i++) {
		float error = fabs(a[i] - 1.0f);
		if (error > maxE)
			maxE = error;
	}
	return maxE;
}

/*
 * Main: tabular function
 */
int main(void) {
	
  // main params
  uint MB = 1024*1024; 
  uint n = 256*MB;
	int blockSize = 256;

	// streams
	
	// allocate pinned host memory and device memory
	
	// create events and streams
	
	// baseline case - sequential transfer and execute
	
	// asynchronous version 1: loop over {copy, kernel, copy}
	
	// asynchronous version 2: loop over copy, loop over kernel, loop over copy
	
	// cleanup
	
	return 0;
}


In [None]:
# Compilazione ed esecuzione

!nvcc -arch=sm_75 src/tabular.cu  -o tabular
!./tabular

In [None]:
# profilazione (senza unified memory - d√† errore)

!nvprof ./tabular

# ‚úÖ MQDB con stream

# üî¥ TODO

- Disegnare un kernel per il prodotto tra matrici MQDB con le seguenti specifiche:
- Allocare spazio per matrici MQDB su CPU e GPU 
- Confrontare uso di memoria unificata vs memoria asincrona
- Introdurre gli stream su cui distribuire il carico (grid parall.)
- Analisi di prestazioni usando i tempi ricavati con CUDA event

In [None]:
%%cuda --name  MQDB_stream_Unified.cu


#include "../GPUcomputing/utils/MQDB/mqdb.h"
#include "../GPUcomputing/utils/common.h"

#define BLOCK_SIZE 16     // block size
#define TEST_CPU 0

/*
 * Kernel for standard (naive) matrix product
 */
__global__ void matProdKernel(mqdb *A, mqdb *B, mqdb *C, int n) {
	// row & col indexes
	int row = blockIdx.y * blockDim.y + threadIdx.y;
	int col = blockIdx.x * blockDim.x + threadIdx.x;

	// each thread computes an entry of the product matrix
	if ((row < n) && (col < n)) {
		float val = 0;
		for (int k = 0; k < n; k++)
			val += A->elem[row * n + k] * B->elem[k * n + col];
		C->elem[row * n + col] = val;
	}
}

/*
 * Kernel for block sub-matrix product of mqdb
 */
__global__ void mqdbBlockProd(mqdb *A, mqdb *B, mqdb *C, uint sdim, uint d, uint n) {
	int row = blockIdx.y * blockDim.y + threadIdx.y;
	int col = blockIdx.x * blockDim.x + threadIdx.x;

	// jump to the right block sub-matrix
	uint  offset = (n+1)*sdim;

	// each thread computes an entry of the product matrix
	if ((row < d) && (col < d)) {
		float val = 0;
		for (int k = 0; k < d; k++)
			val += A->elem[row * n + k + offset] * B->elem[k * n + col + offset];
		C->elem[row * n + col + offset] = val;
	}
}


/*
 * Test on MQDB kernels using Unified Memory
 */
void testKernelsMQDB_unified(uint n, uint k, cudaEvent_t start, cudaEvent_t stop) {

	// matrix instance generation - Unified Memory
	

  // random fill mat entries
  

	
	/***********************************************************/
	/*                     GPU mat product                     */
	/***********************************************************/
	
  printf("Kernel (naive) mat product...\n");
	
	/***********************************************************/
	/*                     GPU MQDB product                    */
	/***********************************************************/
	
  printf("Kernel MQDB product...\n");
	

  /***********************************************************/
	/*             GPU MQDB product using streams              */
	/***********************************************************/
	
  printf("Kernel MQDB product using streams...\n");

  

	// clean up streams and events
	

}

/*
 * main function
 */
int main(int argc, char *argv[]) {
  
  // set up device
	int dev = 0;
	cudaDeviceProp deviceProp;
	CHECK(cudaGetDeviceProperties(&deviceProp, dev));
	printf("%s starting mqdb product at ", argv[0]);
	printf("device %d: %s\n", dev, deviceProp.name);
	CHECK(cudaSetDevice(dev));

	// events to measure time
	cudaEvent_t start, stop;
	cudaEventCreate(&start);
	cudaEventCreate(&stop);

	uint n = 8*1024;         // matrix size
	uint min_k = 20;         // min num of blocks
	uint max_k = 30;         // max num of blocks

	// multiple tests for k = # diag blocks
	for (uint k = min_k; k <= max_k; k+=5) {
		printf("\n*****   k = %d --- (avg block size = %f)\n",k,(float)n/k);
		testKernelsMQDB_unified(n, k, start, stop);
	}

  cudaEventDestroy(start);
	cudaEventDestroy(stop);
	return 0;
}




In [None]:
# Compilazione ed esecuzione
!nvcc -arch=sm_75  src/MQDB_stream_Unified.cu GPUcomputing/utils/MQDB/mqdb.cpp -o MQDBS
!./MQDBS

In [None]:
%%cuda --name MQDB_stream_manual.cu


#include "../GPUcomputing/utils/MQDB/mqdb.h"
#include "../GPUcomputing/utils/common.h"

#define BLOCK_SIZE 16     // block size

/*
 * Kernel for block sub-matrix product of mqdb
 */
__global__ void mqdbBlockProd(mqdb A, mqdb B, mqdb C, uint sdim, uint d, uint n) {
	int row = blockIdx.y * blockDim.y + threadIdx.y;
	int col = blockIdx.x * blockDim.x + threadIdx.x;

	// jump to the right block sub-matrix
	uint  offset = (n+1)*sdim;

	// each thread computes an entry of the product matrix
	if ((row < d) && (col < d)) {
		float val = 0;
		for (int k = 0; k < d; k++)
			val += A.elem[row * n + k + offset] * B.elem[k * n + col + offset];
		C.elem[row * n + col + offset] = val;
	}
}

/*
 * Test on MQDB kernels using manual async memory
 */
void testKernelsMQDB_manual_mem(uint n, uint k, cudaEvent_t start, cudaEvent_t stop) {

	// matrices
	mqdb *A, *B, *C;         // host 
	mqdb d_A, d_B, d_C;      // device

  ulong nBytes = n * n * sizeof(float);
  int kBytes = k * sizeof(int);
	printf("Memory size required = %3.4f (MB)\n",(float)nBytes/(1024.0*1024.0));


  // host and device Memory
	CHECK(cudaMallocHost(&A, sizeof(mqdb)));
  CHECK(cudaMallocHost(&A->blkSize, kBytes));
  CHECK(cudaMallocHost(&A->elem, nBytes));
  CHECK(cudaMalloc(&d_A.blkSize, kBytes));
  CHECK(cudaMalloc(&d_A.elem, nBytes));

  CHECK(cudaMallocHost(&B, sizeof(mqdb)));
  CHECK(cudaMallocHost(&B->blkSize, kBytes));
  CHECK(cudaMallocHost(&B->elem, nBytes));
	CHECK(cudaMalloc(&d_B.blkSize, kBytes));
  CHECK(cudaMalloc(&d_B.elem, nBytes));
	
  CHECK(cudaMallocHost(&C, sizeof(mqdb)));
  CHECK(cudaMallocHost(&C->blkSize, kBytes));
  CHECK(cudaMallocHost(&C->elem, nBytes));
	CHECK(cudaMalloc(&d_C.blkSize, kBytes));
  CHECK(cudaMalloc(&d_C.elem, nBytes));

  // random fill mat entries
  int seed = 1;
	genRandDims(A, n, k, seed);
	genRandDims(B, n, k, seed);
	genRandDims(C, n, k, seed);
	fillBlocks(A, n, k, 'C', 1);
	fillBlocks(B, n, k, 'C', 2);
	fillBlocks(C, n, k, 'C', 0);

	// copy blk sizes on device memory
	//CHECK(cudaMemcpy(d_A->blkSize, A->blkSize, kBytes, cudaMemcpyHostToDevice));
	//CHECK(cudaMemcpy(d_B->blkSize, B->blkSize, kBytes, cudaMemcpyHostToDevice));
	//CHECK(cudaMemcpy(d_C->blkSize, C->blkSize, kBytes, cudaMemcpyHostToDevice));
	
  /***********************************************************/
	/*       GPU MQDB product using streams & async copy       */
	/***********************************************************/
	printf("GPU MQDB product using streams...\n");

	dim3 block(BLOCK_SIZE, BLOCK_SIZE);
  int nstreams = A->nBlocks;
	cudaStream_t streams[nstreams];
	uint dsum = 0;  // bound dx
	CHECK(cudaEventRecord(start));
	for (int i = 0; i < nstreams; i++) {
		CHECK(cudaStreamCreate(&streams[i]));
		uint d = A->blkSize[i];
    int offset = dsum*n;
    int streamBytes = d*n;
		CHECK(cudaMemcpyAsync(&d_A.elem[offset], &A->elem[offset], streamBytes, cudaMemcpyHostToDevice, streams[i]));
    CHECK(cudaMemcpyAsync(&d_B.elem[offset], &B->elem[offset], streamBytes, cudaMemcpyHostToDevice, streams[i]));
		dim3 grid((d + block.x - 1) / block.x, (d + block.y - 1) / block.y);
		mqdbBlockProd<<<grid, block, 0, streams[i]>>>(d_A, d_B, d_C, dsum, d, n);
    CHECK(cudaMemcpyAsync(&C->elem[offset], &d_C.elem[offset], streamBytes, cudaMemcpyDeviceToHost, streams[i]));
		dsum += d;
	}
	CHECK(cudaEventRecord(stop));
	CHECK(cudaEventSynchronize(stop));
  float milliseconds;
	CHECK(cudaEventElapsedTime(&milliseconds, start, stop));
	float GPUtime3 = milliseconds / 1000.0;
	printf("   elapsed time                  : %.5f (sec)\n", GPUtime3);

	// clean up streams and events
	for (int i = 0; i < nstreams; i++)
		cudaStreamDestroy(streams[i]);

} 

/*
 * main function
 */
int main(int argc, char *argv[]) {
  
  // set up device
	int dev = 0;
	cudaDeviceProp deviceProp;
	CHECK(cudaGetDeviceProperties(&deviceProp, dev));
	printf("%s starting mqdb product at ", argv[0]);
	printf("device %d: %s\n", dev, deviceProp.name);
	CHECK(cudaSetDevice(dev));

	// events to measure time
	cudaEvent_t start, stop;
	cudaEventCreate(&start);
	cudaEventCreate(&stop);

	uint n = 16*1024;         // matrix size
	uint min_k = 20;       // max num of blocks
	uint max_k = 30;       // max num of blocks

	// multiple tests for k = # diag blocks
	for (uint k = min_k; k <= max_k; k+=5) {
		printf("\n*****   k = %d --- (avg block size = %f)\n",k,(float)n/k);
		testKernelsMQDB_manual_mem(n, k, start, stop);
	}

  cudaEventDestroy(start);
	cudaEventDestroy(stop);
	return 0;
}





In [None]:
# Compilazione ed esecuzione
!nvcc -arch=sm_75  src/MQDB_stream_manual.cu GPUcomputing/utils/MQDB/mqdb.cpp -o MQDBS
!./MQDBS

In [None]:
!pwd
