![Dask Slide](img/HPCP_Dask/Folie5.PNG)

In [29]:
import os
import socket

import cupy
import dask
import dask.array as da
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
from dask_jobqueue import SLURMCluster

try:
    user_port = int(os.environ["USER"][-2:])
except ValueError:
    user_port = 55

Check if we have a GPU. 

**Note**: This notebook needs to be started on either the FHNW Jupyter which is GPU enabled by default or using the gmerlin cluster at PSI.

In [3]:
!nvidia-smi

Fri Aug 29 10:17:57 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.127.08             Driver Version: 550.127.08     CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA GeForce RTX 2080 Ti     On  |   00000000:B1:00.0 Off |                  N/A |
| 27%   28C    P8             17W /  250W |       1MiB /  11264MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
|   1  NVIDIA GeForce RTX 2080 Ti     On  |   00

If running on a GPU enabled host, you can use Dask without initzializing a cluster, like we did in the CPU example first.
By setting array.backend to "cupy", Dask will use cupy.random to create the random array and many operations will happen on the GPU. The computation is executed with the same .compute(). If you have one GPU, this will utilize that GPU . If you're on a node with multiple GPUs or multiple nodes with GPUs, you can run multiple tasks on different GPUs.

In [11]:
# Create a Dask array backed by CuPy arrays instead of NumPy
with dask.config.set({"array.backend": "cupy"}):
    x_gpu = da.random.random((100000, 100000), chunks=(1000, 1000))
    y_gpu = (x_gpu + x_gpu.T).sum()
    %time result_gpu = y_gpu.compute()
    print(result_gpu)

CPU times: user 7.4 s, sys: 338 ms, total: 7.74 s
Wall time: 7.29 s
9999940795.323856


#### Task 1

You can also convert an existing Dask array (CPU) to GPU by `.map_blocks(cupy.asarray)`, which applies cupy.asarray to each block, thus transferring it to GPU memory. Or when loading data, you could directly load into CuPy arrays. Try out both approaches.

In [None]:
#ToDo

You maybe noticed that the above example only uses 1 out of multiple GPUs. In order to use more than 1 GPU, we have to srat a LocalCUDACluster. 

In [21]:
# One worker per visible GPU, each pinned automatically
dashboard_port=int(f"100{user_port}")
cluster_port=int(f"101{user_port}")

cluster = LocalCUDACluster(
    n_workers=2,           # or omit: defaults to # GPUs
    threads_per_worker=1,  # avoid CPU thread contention on GPU
    scheduler_port=cluster_port,
    dashboard_address=f":{dashboard_port}"
)
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_cuda.LocalCUDACluster
Dashboard: http://127.0.0.1:10055/status,

0,1
Dashboard: http://127.0.0.1:10055/status,Workers: 2
Total threads: 2,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:10155,Workers: 0
Dashboard: http://127.0.0.1:10055/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:33289,Total threads: 1
Dashboard: http://127.0.0.1:35103/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:36375,
Local directory: /tmp/dask-scratch-space/worker-7e1_swvi,Local directory: /tmp/dask-scratch-space/worker-7e1_swvi

0,1
Comm: tcp://127.0.0.1:34065,Total threads: 1
Dashboard: http://127.0.0.1:42493/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:33159,
Local directory: /tmp/dask-scratch-space/worker-uoql2lb6,Local directory: /tmp/dask-scratch-space/worker-uoql2lb6


In [24]:
print("Use the following command to access your Dask dashboard. The cmd will create a port forwarding over SSH.")
print("You can then go to http://localhost:8787/status in your browser and see the dashboard.")
print()
print(f"ssh -L 8787:{socket.gethostname()}:100{user_port} {os.environ['USER']}@merlin-l-001")

Use the following command to access your Dask dashboard. The cmd will create a port forwarding over SSH.
You can then go to http://localhost:8787/status in your browser and see the dashboard.

ssh -L 8787:merlin-g-014.psi.ch:10055 ext-marcin_s@merlin-l-001


![SSH](img/ssh.png)

Let us rerun the example from above and run nvidia-smi on the console to check if we really use two GPUS.

```bash
nvidia-smi
Fri Aug 29 10:31:13 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.127.08             Driver Version: 550.127.08     CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|=========================================+========================+======================|
|   0  NVIDIA GeForce RTX 2080 Ti     On  |   00000000:B1:00.0 Off |                  N/A |
| 26%   43C    P2             81W /  250W |    1944MiB /  11264MiB |     55%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
|   1  NVIDIA GeForce RTX 2080 Ti     On  |   00000000:DB:00.0 Off |                  N/A |
| 30%   46C    P2             76W /  250W |    3982MiB /  11264MiB |     97%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                                                         
+-----------------------------------------------------------------------------------------+
| Processes:                                                                              |
|  GPU   GI   CI        PID   Type   Process name                              GPU Memory |
|        ID   ID                                                               Usage      |
|=========================================================================================|
|    0   N/A  N/A   1442066      C   ...s/summer-school-hpc-2025/bin/python        254MiB |
|    0   N/A  N/A   1445319      C   ...s/summer-school-hpc-2025/bin/python       1686MiB |
|    1   N/A  N/A   1445322      C   ...s/summer-school-hpc-2025/bin/python       3978MiB |
+-----------------------------------------------------------------------------------------+
```

In [26]:
# Create a Dask array backed by CuPy arrays instead of NumPy
with dask.config.set({"array.backend": "cupy"}):
    x_gpu = da.random.random((100000, 100000), chunks=(10000, 10000))
    y_gpu = (x_gpu + x_gpu.T).sum()
    %time result_gpu = y_gpu.compute()
    print(result_gpu)

CPU times: user 279 ms, sys: 41.7 ms, total: 321 ms
Wall time: 6.01 s
10000060411.745394


Close the cluster and client.

In [27]:
client.close()
cluster.close()

We can also launch a GPU-enabled `SLURMCluster` by explicitly requesting GPU resources.  
In the example below, we use the standard `dask-worker` (as shown earlier in this notebook), assigning **one GPU per worker**.

In [36]:
# Configure cluster parameters:
cluster = SLURMCluster(
    queue='gpu-short',        # your Slurm partition/queue name
    cores=4,               # total cores per job (across all processes in a job)
    processes=1,           # how many processes per job (if 1, then 4 threads in one process in this example)
    memory="10GB",         # memory per job
    walltime="00:30:00",   # time per job
    scheduler_options={'dashboard_address': f':100{user_port}'},
    job_extra_directives=["--reservation=psicourse01", "--cluster=gmerlin6"],

    #interface="ib0",
)
print(cluster.job_script()) 

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p gpu-short
#SBATCH -n 1
#SBATCH --cpus-per-task=4
#SBATCH --mem=10G
#SBATCH -t 00:30:00
#SBATCH --reservation=psicourse01
#SBATCH --cluster=gmerlin6

/opt/psi/Programming/anaconda/2024.08/conda/envs/summer-school-hpc-2025/bin/python -m distributed.cli.dask_worker tcp://129.129.185.194:38563 --name dummy-name --nthreads 4 --memory-limit 9.31GiB --nanny --death-timeout 60



In [40]:
cluster.scale(2)

Let's try the GPU enabled SlurmCluster:

In [41]:
with dask.config.set({"array.backend": "cupy"}):
    x_gpu = da.random.random((100000, 100000), chunks=(10000, 10000))
    y_gpu = (x_gpu + x_gpu.T).sum()
    %time result_gpu = y_gpu.compute()
    print(result_gpu)

CPU times: user 1.45 s, sys: 14.2 ms, total: 1.47 s
Wall time: 1.46 s
10000046768.03562


In [42]:
client.close()
cluster.close()

**Note:** Interactive sessions can reserve a large amount of GPU resources for long periods of time.  
For production workloads, it is recommended to package your code into an **SBATCH script** and submit it as a self-contained Dask application.  
This way, the job only consumes resources while it is actively running, making cluster usage more efficient.


### HPC special configs

Dask‑CUDA workers seamlessly integrate with high‑speed interconnects such as InfiniBand, NVLink, and UCX.
You can enable this in Python via LocalCUDACluster, for example:

```python
from dask_cuda import LocalCUDACluster
from dask_cuda.initialize import initialize

# Configure GPU networking
initialize(
    create_cuda_context=True,
    enable_tcp_over_ucx=True,
    enable_infiniband=False,
    enable_nvlink=True
)

cluster = LocalCUDACluster(
    protocol="ucx",
    interface="ib0",          # eg. infiniband interface
    rmm_pool_size="25GB",     # GPU memory pool size
    enable_tcp_over_ucx=True,
    enable_infiniband=False,
    enable_nvlink=True,
)

client = Client(cluster)
```

Dask‑CUDA workers support preallocating GPU memory pools using the RAPIDS Memory Manager (RMM), which helps reduce memory allocation overhead and fragmentation. This can be configured with parameters like `rmm_pool_size`, as shown above.

**To achieve maximum performance, it is important to understand the underlying hardware architecture and configure it appropriately.  
Only with the right configuration can Dask’s optimization techniques fully leverage the hardware and deliver peak efficiency.**