# Advanced Topics

## Working with data that doesn't fit into GPU memory

You've probably worked with data that is too large to fit into GPU or even system memory in the past.
In this section, we're going to explore methods to work with data that is larger than GPU memory.

### Quickly generating data

Since we're going to be generating larger datasets, we're also going to use a `mp_generate_data` from [extras.py](extras.py).
If you look at the code, you'll notice that it not only is multi-core, but also generates the data in FP32.

Run the code below to test out the function:

In [None]:
from extras import generate_data, mp_generate_data
import numpy as np
from time import time

k, d = 4, 4

for N in [10**power for power in range(6,9)]:
    # Testing the origninal generate_data
    print(f"Generating {N} points")
    start_time = time()
    data, real_centroids = generate_data(k=k, n=N, d=d, cmin=0, cmax=100)
    elapsed_orig = time()-start_time
    data_mb = np.round(data.size * data.itemsize / 2**20, 3)
    print(f" - original  (FP64) - {elapsed_orig:5.2f} sec - {data_mb:4.2f} MB")

    # Testing mp_generate_data
    start_time = time()
    data, real_centroids = mp_generate_data(k=k, n=N, d=d, cmin=0, cmax=100)
    elapsed_orig = time()-start_time
    data_mb = np.round(data.size * data.itemsize / 2**20, 3)
    print(f" - multiproc (FP32) - {elapsed_orig:5.2f} sec - {data_mb:4.2f} MB")

Looking at the results, `mp_generate_data` is worth using when generating 10,000,000 or more data points.
The FP32 values it generates are also not only more suited to running on the GPU, but they take up less memory as well (at the expense of precision).

### Testing the limits of GPU memory

Now that we can generate data quickly, we can start exploring the limits of GPU-accelerated algorithms.

> This was developed on a T4 with 16GB of VRAM. Increase the data size if 400M points does not fail on your GPU.

In [2]:
from sklearn.cluster import KMeans as skl_KMeans
from cuml.cluster import KMeans as cuml_KMeans
from extras import generate_data, mp_generate_data
from time import time
import numpy as np

k, d = 4, 4

for N in [10**8, 8*10**8]:# for power in range(7,10)]:
    data, real_centroids = mp_generate_data(k=k, n=N, d=d, cmin=0, cmax=100)
    data_mb = np.round(data.size * data.itemsize / 2**20, 3)
    print(f"Generated {N} points ({data_mb} MB)")
    
    #start_time = time()
    #assignments = skl_KMeans(n_clusters=k, random_state=0, max_iter=1).fit_predict(data)
    #elapsed_time = time()-start_time
    #print(f"  - sklearn {np.round(elapsed_time,4)} seconds")
    
    start_time = time()
    assignments = cuml_KMeans(n_clusters=k, random_state=0, n_init='auto', max_iter=1).fit_predict(data)
    elapsed_time = time()-start_time
    print(f"  - cuml {np.round(elapsed_time,4)} seconds")
    del data, real_centroids, assignments
print("Done") # Tell me when to stop waiting

Generated 100000000 points (1525.879 MB)
  - cuml 5.7894 seconds
Generated 800000000 points (12207.031 MB)


MemoryError: std::bad_alloc: out_of_memory: CUDA error at: /usr/include/rmm/mr/device/cuda_memory_resource.hpp:62: cudaErrorMemoryAllocation out of memory

You can see that GPU-based KMeans failed on the second data set because it ran out of memory.
While slower, the CPU-based scikit-learn KMeans version completed because it ran from (the usually larger) system memory (RAM).

### RAPIDS Memory Manager

While it may see like the end of the road, the [RAPIDS Memory Manager](https://docs.rapids.ai/api/rmm/stable/) (RMM) was created help allocate GPU memory in highly configurable ways.
One of those ways is to use Unified Virtual Memory (UVM) provides a single, unified memory address space accessible from both the CPU and GPU, simplifying memory management and enabling efficient data sharing between processors.
UVM also allows data in system (CPU) RAM to be larger than GPU memory, and handles all the transactions necessary when computing on it.

UVM can be enabled by setting `managed_memory=True` when initializing RMM.
Do this and try re-running cuML KMeans that previously failed due to OOM.

In [3]:
import rmm
# Enabled managed memory (allows spilling)
rmm.reinitialize(managed_memory=True)
from cuml.cluster import KMeans as cuml_KMeans
from extras import mp_generate_data
from time import time
import numpy as np

k, d = 4, 4

for N in [10**8, 8*10**8]:# for power in range(7,10)]:
    data, real_centroids = mp_generate_data(k=k, n=N, d=d, cmin=0, cmax=100)
    data_mb = np.round(data.size * data.itemsize / 2**20, 3)
    print(f"Generated {N} points ({data_mb} MB)")

    # Skip CPU since it works
    
    start_time = time()
    assignments = cuml_KMeans(n_clusters=k, random_state=0, n_init='auto', max_iter=1).fit_predict(data)
    elapsed_time = time()-start_time
    print(f"  - cuml {np.round(elapsed_time,4)} seconds")
    del data, real_centroids, assignments
print("Done") # I sometimes wait for another step

Generated 100000000 points (1525.879 MB)
  - cuml 6.2042 seconds
Generated 800000000 points (12207.031 MB)
  - cuml 67.8275 seconds
Done


You can see that there is a performance hit, but it does complete without errors and no longer limits your computation to data that fits in GPU memory.

### Exercises:

- Try larger datasets to see if there's a failure point
  - Make sure to look at system memory (RAM)

## Using Multiple GPUs

For multiple GPUs, we recommend [Dask](https://docs.dask.org/en/stable/) and [dask-cuda](https://docs.rapids.ai/api/dask-cuda/nightly/quickstart/). Dask is an open-source Python library that enables parallel and distributed computing for large-scale data processing, providing advanced data structures and algorithms that scale from single machines to large clusters.
dask-cuda takes this a step further by doing this with GPU data structures like CuPy arrays and cuDF dataframes for scaling computations to multiple GPUs and GPU-nodes.

In [4]:
import cupy as cp
import numpy as np
from time import time

import dask
# Specify that the dask cluster will use cupy (gpu-numpy) for arrays
dask.config.set({"array.backend": "cupy", "logging.distributed": "error"})

from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import dask.array as da
# Import dask version of KMeans
from cuml.dask.cluster import KMeans as cuml_KMeans

# Create a local (single-node) cluster and used RMM managed memory (OOM otherwise)
cluster = LocalCUDACluster(rmm_managed_memory=True, rmm_pool_size='15GB')
# Connect to the cluster
client = Client(cluster)

# Generate data as dask arrays
def dask_generate_data(k=3, n=10, d=2, cmin=0, cmax=10):
    # Make sure to specify float32!
    C = da.random.random((k,d), dtype=cp.float32)*(cmax-cmin)-cmin
    R = da.random.normal(loc=0, scale=1, size=(n, d), dtype=cp.float32)+C[da.random.choice(k, n, replace=True)]
    return R, C

k, d = 4, 4

for N in [10**8, 8*10**8]:# for power in range(7,10)]:
    dask_data, real_centroids = dask_generate_data(k=k, n=N, d=d, cmin=0, cmax=100)
    data_mb = np.round(dask_data.size * dask_data.itemsize / 2**20, 3)
    print(f"Generated {N} points ({data_mb} MB)")
    
    start_time = time()
    assignments = cuml_KMeans(n_clusters=k, random_state=0, n_init='auto', max_iter=1).fit_predict(dask_data)
    assignments.compute()
    #print(assignments.map_blocks(cp.asnumpy).compute()[:10])
    elapsed_time = time()-start_time
    print(f"  - dask+cuml {np.round(elapsed_time,4)} seconds")
    del dask_data, real_centroids, assignments
print("Done") # I sometimes wait for another step

client.close()
cluster.close()

Generated 100000000 points (1525.879 MB)


double free or corruption (!prev)
double free or corruption (!prev)
2025-06-17 12:19:06,665 - distributed.scheduler - ERROR - broadcast to tcp://127.0.0.1:41839 failed: CommClosedError: in <TCP (closed) Scheduler Broadcast local=tcp://127.0.0.1:35086 remote=tcp://127.0.0.1:41839>: Stream is closed
2025-06-17 12:19:06,719 - distributed.scheduler - ERROR - broadcast to tcp://127.0.0.1:39269 failed: CommClosedError: in <TCP (closed) Scheduler Broadcast local=tcp://127.0.0.1:49242 remote=tcp://127.0.0.1:39269>: Stream is closed


CommClosedError: in <TCP (closed) Scheduler Broadcast local=tcp://127.0.0.1:35086 remote=tcp://127.0.0.1:41839>: Stream is closed

You'll notice that using Dask on two T4s (no NVLink) was actually slower for a 1.5GB dataset, but faster for the 6GB scale.
Whenever an algorithm is scaled to multiple workers, there's overhead from communication that limits the speed.
You saw this with `mp_generate_data` and now with Dask.

### Exercises:

- Try running on more than two GPUs
    - What does the scaling efficiency look like?
- What happens if you exclude RMM?
- What happens if you exclude the pool size?
- Try [enabling UCX communication](https://docs.rapids.ai/api/dask-cuda/stable/examples/ucx/) on a system with NVLink
- What scale of data does using Dask for multiple-GPUs make sense on your system?

> Make sure to restart the kernel between runs

### Additional Dask features

Dask is also capable of scaling to GPUs on multiple nodes.
We're not going to cover this topic today, but take a look at their [cluster deployment documentation](https://docs.dask.org/en/stable/deploying.html#) to figure out which method is suitable for your compute infrastructure.

In addition to scaling algorithms, Dask commonly known for splitting large data objects (chunking) and persisting chunks in different tiers of memory (spilling).
There's support for this at the GPU level (without RMM), but has best support with [cuDF data frames](https://docs.rapids.ai/api/dask-cudf/stable/). Some RAPIDS alorithms (like Kmeans) also may not work well with this feature.

## Topics not covered, but worth exploring

- [Numba](https://numba.readthedocs.io/en/stable/cuda/index.html) - Numba is a compiler for Python array and numerical functions that gives you the power to speed up your applications with high performance functions written directly in Python.
- [NVIDIA cuPyNumeric](https://docs.nvidia.com/cupynumeric/latest/index.html) - NVIDIA-developed distributed and accelerated drop-in replacement for NumPy built on top of the Legate framework