# **Out-of-Core Single-Cell Analysis with RAPIDS-SingleCell & Dask**  
**Author:** [Severin Dicks](https://github.com/Intron7)
**Copyright** [scverse](https://scverse.org)

In this notebook, we demonstrate the **out-of-core computation** capabilities of **rapids-singlecell** using **Dask**.  
This approach allows us to analyze **1.3 million cells** efficiently, even on relatively small hardware.  

By leveraging **Dask**, we can:  
- **Process large-scale single-cell datasets** without exceeding memory limits.  
- **Enable chunk-based execution**, loading only the necessary data into memory at any time.  

This method makes **large-scale single-cell analysis feasible** on standard hardware setups,  
removing barriers to working with massive datasets.


In [None]:
import dask
import time
import gc
import cuvs
from dask_cuda import LocalCUDACluster
from dask.distributed import Client

In [None]:
import rmm
import cupy as cp

from rmm.allocators.cupy import rmm_cupy_allocator

## **Initializing a Dask Cluster for Out-of-Core Computation**  

To enable **out-of-core computation** and parallel processing,  
we initialize a **Dask CUDA cluster**, which distributes computations across available GPU resource.  

### Additional parameters for `LocalCUDACluster`

- **CUDA_VISIBLE_DEVICES=preprocessing_gpus**: selects GPUs to use (e.g., `"0,1"`).
- **threads_per_worker=10**: CPU threads per GPU worker; tune for your workload and I/O.
- **protocol="ucx"**: enables UCX for high-throughput GPU-aware communication (NVLink/InfiniBand/RDMA).
- **rmm_pool_size="10GB"**: initial per-worker RAPIDS Memory Manager (RMM) pool; reduces allocation overhead.
- **rmm_maximum_pool_size="110GB"**: maximum pool growth per worker; allows RMM to expand up to this cap.
- **rmm_allocator_external_lib_list="cupy"**: integrates CuPy with RMM so CuPy allocations come from the pool.
- **Client(cluster)**: attaches the Dask client to the cluster (dashboard link available when running).

In [None]:
%%time
preprocessing_gpus="0"
cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES=preprocessing_gpus,
                           threads_per_worker=10,
                           protocol="ucx",
                           rmm_pool_size= "10GB",
                           rmm_maximum_pool_size = "20GB",
                           rmm_allocator_external_lib_list= "cupy",
                          )

client = Client(cluster)

client

In [None]:
import rapids_singlecell as rsc
import scanpy as sc
import anndata as ad

## **Loading Large Datasets into AnnData with Dask**  

To efficiently handle large-scale single-cell datasets, we load data directly from an **HDF5 (`h5`) or Zarr file**  
into an **AnnData object** using **Dask arrays**. This enables **lazy loading**, allowing data to be processed in chunks  
without exceeding memory limits.  

We achieve this using **`read_elem_as_dask`**, which loads the expression matrix (`X`) as a **Dask array**

In [None]:
from packaging.version import parse as parse_version
import zarr
if parse_version(ad.__version__) < parse_version("0.12.0rc1"):
    from anndata.experimental import read_elem_as_dask as read_dask
else:
    from anndata.experimental import read_elem_lazy as read_dask


SPARSE_CHUNK_SIZE = 20_000
#data_pth = "zarr/cell_atlas.zarr" #11Million Cells
data_pth = "zarr/nvidia_1.3M.zarr" #1.3Million Cells

f = zarr.open(data_pth)
X = f["X"]
shape = X.attrs["shape"]
adata = ad.AnnData(
    X = read_dask(X, (SPARSE_CHUNK_SIZE, shape[1])),
    obs = ad.io.read_elem(f["obs"]),
    var = ad.io.read_elem(f["var"])
)

In [None]:
t1 = time.time()

In [None]:
rsc.get.anndata_to_GPU(adata)

## **Transferring AnnData to GPU for Accelerated Computation**  

Once the dataset is loaded as a **Dask-backed AnnData object**,  
we transfer it to the **GPU** to leverage **RAPIDS-SingleCell’s** accelerated processing.  

We use **`rsc.get.anndata_to_GPU()`**, which efficiently moves the AnnData object to GPU memory:  


## **Quality Control (QC) Metrics Calculation (Requires Synchronization)**  

Before proceeding with further analysis, we compute **quality control (QC) metrics**  
to assess dataset quality and filter out low-quality cells or genes.  

We use **`rsc.pp.calculate_qc_metrics()`** to calculate key QC metrics

Although we are working with Dask-backed AnnData, this operation requires a synchronization step.
This means that Dask computations must be evaluated immediately,
so the process is not completely lazy like other out-of-core operations.

In [None]:
%%time
rsc.pp.calculate_qc_metrics(adata)

## **Filtering Cells and Genes Without Additional Computation**  

Instead of using **`rsc.pp.filter_cells`** and **`rsc.pp.filter_genes`**,  
we apply filtering directly using boolean indexing to **avoid extra computation**.

**Why Use Direct Indexing Instead of Built-in Functions?**
* More Efficient with Dask → Avoids triggering additional computations.
* Preserves Lazy Execution → Filtering is applied without forcing full dataset evaluation.
* Copy is preferred → Using `.copy()` prevents views. Views slow down computations with Dask-backed AnnData.

In [None]:
adata = adata[(adata.obs["n_genes_by_counts"]<=10000) 
            & (adata.obs["n_genes_by_counts"]>=200)].copy()

## **Log Normalization (Fully Lazy Execution)**  

Next, we apply **log normalization** to scale gene expression values.  
This step ensures that differences in sequencing depth across cells do not dominate downstream analysis.  

In [None]:
%%time
rsc.pp.normalize_total(adata,target_sum = 10000)
rsc.pp.log1p(adata)

## **Selecting Highly Variable Genes (Requires Synchronization)**  

To focus on the most informative features, we identify **highly variable genes (HVGs)**  
using the **Cell Ranger** method and subset the dataset accordingly.  

**Important Considerations:**
* Requires Synchronization → Computing highly variable genes triggers evaluation,
meaning this step is not fully lazy when using Dask.
* Copy is preferred → Using `.copy()` prevents views.


In [None]:
%%time
rsc.pp.highly_variable_genes(adata,n_top_genes=5000, flavor="cell_ranger")
adata = adata[:,adata.var.highly_variable].copy()

In [None]:
# adata.X = adata.X.persist()

## **Scaling Gene Expression (Requires Synchronization)**  

To standardize gene expression values, we apply **feature scaling**,  
ensuring that all genes contribute equally to downstream analysis.

**Important Considerations:**
* Requires Synchronization → Since the input matrix is in CSR format (Compressed Sparse Row),
this step forces an immediate computation, meaning it is not fully lazy like earlier transformations.
* Scaling → Divides each gene’s expression values by its standard deviation.
* zero_center=False → Keeps the scaled values non-centered,
which is beneficial for sparse matrices and GPU acceleration.


In [None]:
%%time
rsc.pp.scale(adata, zero_center= False)

## **Principal Component Analysis (PCA) on GPU (Two-Step Synchronization Process)**  

To reduce dimensionality while preserving meaningful variation,  
we perform **Principal Component Analysis (PCA)** using **GPU acceleration**.

Understanding the Two-Step Synchronization in PCA:
1. Mandatory Synchronization for Covariance & Mean Calculation
    * PCA requires computing the covariance matrix and mean vector,
    which must be explicitly synchronized before proceeding.
    * This step is handled automatically within `rsc.pp.pca()`.

2. Finalizing the Transformation with `.compute()`
    * After computing the principal components, the data remains lazy (Dask CuPy array).
    * Calling `.compute()` on `adata.obsm["X_pca"]` performs the final transformation,
      projecting the data onto the computed PCs and materializing the result as a fully computed CuPy array.

**Why This Matters?**
* The first synchronization (**covariance & mean**) ensures the PCA model is ready.
* The second synchronization (`compute()`) ensures that the transformed data is fully realized
for downstream analyses like clustering and UMAP.

In [None]:
%%time
rsc.pp.pca(adata, n_comps = 100,mask_var=None)
adata.obsm["X_pca"]=adata.obsm["X_pca"].compute()

In [None]:
print(f"Time for the notebook: {time.time()-t1}s")

In [None]:
adata.obsm["X_pca"] = adata.obsm["X_pca"].get()

In [None]:
%%time
rsc.pp.neighbors(adata, n_neighbors=15, n_pcs=50, algorithm="ivfflat")

In [None]:
%%time
rsc.tl.umap(adata)

In [None]:
%%time
rsc.tl.leiden(adata)

In [None]:
sc.pl.umap(adata, color="leiden")