# RAPIDS & Scanpy Single-Cell RNA-seq Multi-GPU Workflow on 1 Million Cells

Copyright (c) 2020, NVIDIA CORPORATION.

Licensed under the Apache License, Version 2.0 (the "License") you may not use this file except in compliance with the License. You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0 

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

This notebook demonstrates a single-cell RNA analysis workflow that begins with preprocessing a count matrix of size `(n_gene, n_cell)` and results in a visualization of the clustered cells for further analysis.

For demonstration purposes, we use a dataset of 1M brain cells with Unified Virtual Memory to oversubscribe GPU memory. We then use dask to scale PCA, K-means clustering, and UMAP across multiple GPUs.

See the README for instructions to download this dataset.

## Import requirements

In [1]:
import time
import pandas
import numpy as np
import scanpy as sc
import anndata

import cupy
import cudf
import math

import h5py
import scipy

import dask
import dask_cudf
import rmm

from numba import cuda

from dask_cuda import initialize
from dask_cuda import LocalCUDACluster
from dask import delayed, dataframe
from dask.dataframe.utils import make_meta
from dask.distributed import Client


from cuml.manifold import TSNE
from cuml.manifold import UMAP

from cuml.dask.decomposition import PCA
from cuml.dask.cluster import KMeans
from cuml.dask.manifold import UMAP as distUMAP

import rapids_scanpy_funcs
import utils

import warnings
warnings.filterwarnings('ignore', 'Expected ')
warnings.simplefilter('ignore')

We use the RAPIDS memory manager to enable Unified Virtual Memory management, which allows us to oversubscribe the GPU memory

In [2]:
# pca = PCA(n_components=50)

# dask_array = pca.fit_transform(dask_array)
# result = dask_array.compute().get()
# print(result)

## Input data

In the cell below, we provide the path to the sparse `.h5ad` file containing the count matrix to analyze. Please see the README for instructions on how to download the dataset we use here.

To run this notebook using your own dataset, please see the README for instructions to convert your own count matrix into this format. Then, replace the path in the cell below with the path to your generated `.h5ad` file.

In [3]:
import os, wget
# input_file = "../1M_neurons_filtered_gene_bc_matrices_h5.h5"
# input_file = "../data/1M_neurons_filtered_gene_bc_matrices_h5.h5"
input_file = "../data/1M_brain_cells_10X.sparse.h5ad"

if not os.path.exists(input_file):
    print('Downloading import file...')
    os.makedirs('../data', exist_ok=True)
    wget.download('https://rapids-single-cell-examples.s3.us-east-2.amazonaws.com/1M_brain_cells_10X.sparse.h5ad',
              input_file)

In [4]:
USE_FIRST_N_CELLS = 1000000

In [5]:
cluster_start = time.time()

enable_tcp_over_ucx = True
enable_nvlink = False
enable_infiniband = False

rmm.reinitialize(managed_memory=True)
cupy.cuda.set_allocator(rmm.rmm_cupy_allocator)

initialize.initialize(create_cuda_context=True,
                      enable_tcp_over_ucx=enable_tcp_over_ucx,
                      enable_nvlink=enable_nvlink,
                      enable_infiniband=enable_infiniband)
cluster = LocalCUDACluster(protocol="ucx",
                           CUDA_VISIBLE_DEVICES=[0, 1],
                           enable_tcp_over_ucx=enable_tcp_over_ucx,
                           enable_nvlink=enable_nvlink,
                           enable_infiniband=enable_infiniband)
client = Client(cluster)

In [8]:
%%time
# TODO: Compute batches using total Rows and columns
BATCHES = 100
input_file = "../data/1M_brain_cells_10X.sparse.h5ad"

with h5py.File(input_file, 'r') as h5f:
    genes = h5f['/var/gene_ids']
    indices = h5f['/X/indptr']
    data = h5f['/X/data']
    
    total_cols = genes.shape[0] 
    total_rows = indices.shape[0] - 1
    batch_rows = math.ceil(total_rows / BATCHES)

print(batch_rows, total_rows, total_cols)

@delayed
def partial_data(sample_file, ds_data, ds_indices, ds_indptr, batch, total_rows, batch_rows):
    batch_start = batch * batch_rows
    batch_end = min(total_rows, batch * batch_rows + batch_rows)
    
    with h5py.File(input_file, 'r') as h5f:
        # Read all things row pointers for one worker
        indptrs = h5f[ds_indptr]
        start_ptr = indptrs[batch_start]
        end_ptr = indptrs[batch_end]
        sub_indptrs = cupy.array(indptrs[batch_start:batch_end])

        # Read all things data for one worker
        data = h5f[ds_data]
        sub_data = cupy.array(data[start_ptr:end_ptr])

        # Read all things column pointers for one worker
        indices = h5f[ds_indices]
        sub_indices = cupy.array(data[start_ptr:end_ptr])

        # recompute the row pointer for the partial dataset
        sub_indptrs = sub_indptrs - start_ptr
        
    print(batch_start, batch_end, len(sub_indptrs), start_ptr, end_ptr, type(sub_data))
    print(sub_indptrs)
        
    return cupy.sparse.csr_matrix((sub_data, sub_indices, sub_indptrs)).toarray()

print('Creating delayed sub-arrays')
dls = []
for batch in range(BATCHES):
    rows = min(batch_rows, total_rows - (batch_rows * batch))
    dls.append(
        dask.array.from_delayed(
            partial_data(input_file, '/X/data', '/X/indices', '/X/indptr', batch, total_rows, batch_rows),
            dtype=cupy.float32,
            shape=(rows, total_cols)))


print('Concate sub-arrays...')
darray = dask.array.concatenate(dls)

print('Communute and persist arrays...')
darray = darray.persist()
# darray = darray.compute()
print(darray.shape)

print('--------------Done--------------')

13062 1306127 27998
Creating delayed sub-arrays
13062 0
13062 1
13062 2
13062 3
13062 4
13062 5
13062 6
13062 7
13062 8
13062 9
13062 10
13062 11
13062 12
13062 13
13062 14
13062 15
13062 16
13062 17
13062 18
13062 19
13062 20
13062 21
13062 22
13062 23
13062 24
13062 25
13062 26
13062 27
13062 28
13062 29
13062 30
13062 31
13062 32
13062 33
13062 34
13062 35
13062 36
13062 37
13062 38
13062 39
13062 40
13062 41
13062 42
13062 43
13062 44
13062 45
13062 46
13062 47
13062 48
13062 49
13062 50
13062 51
13062 52
13062 53
13062 54
13062 55
13062 56
13062 57
13062 58
13062 59
13062 60
13062 61
13062 62
13062 63
13062 64
13062 65
13062 66
13062 67
13062 68
13062 69
13062 70
13062 71
13062 72
13062 73
13062 74
13062 75
13062 76
13062 77
13062 78
13062 79
13062 80
13062 81
13062 82
13062 83
13062 84
13062 85
13062 86
13062 87
13062 88
13062 89
13062 90
13062 91
13062 92
13062 93
13062 94
13062 95
13062 96
13062 97
13062 98
12989 99
Concate sub-arrays...
Communute and persist arrays...
(1306127

## Set parameters

In [9]:
# marker genes
MITO_GENE_PREFIX = "mt-" # Prefix for mitochondrial genes to regress out
markers = ["Stmn2", "Hes1", "Olig1"] # Marker genes for visualization
# markers = ["Stmn2", "Hes1"] # Marker genes for visualization

# filtering cells
min_genes_per_cell = 200 # Filter out cells with fewer genes than this expressed 
max_genes_per_cell = 6000 # Filter out cells with more genes than this expressed 

# filtering genesinitialize
n_top_genes = 4000 # Number of highly variable genes to retain

# PCA
n_components = 50 # Number of principal components to compute
pca_train_ratio = 0.35 # percentage of cells to use for PCA training
n_pca_batches = 10

# t-SNE
tsne_n_pcs = 20 # Number of principal components to use for t-SNE

# k-means
k = 35 # Number of clusters for k-means

# KNN
n_neighbors = 15 # Number of nearest neighbors for KNN graph
knn_n_pcs = 50 # Number of principal components to use for finding nearest neighbors

# UMAP
umap_train_ratio = 0.35
umap_min_dist = 0.3 
umap_spread = 1.0

In [None]:
start = time.time()

## Load and Prepare Data

We load the sparse count matrix from an `h5ad` file using Scanpy. The sparse count matrix will then be placed on the GPU. 

In [None]:
data_load_start = time.time()

In [None]:
%%time
adata = sc.read(input_file)
adata.var_names_make_unique()
adata.shape

For this example, we select the first 1 million cells in the dataset. We maintain the index of unique genes in our dataset:

In [None]:
%%time
genes = cudf.Series(adata.var_names)
sparse_gpu_array = cp.sparse.csr_matrix(adata.X[:USE_FIRST_N_CELLS], dtype=cp.float32)

Verify the shape of the resulting sparse matrix:

In [None]:
sparse_gpu_array.shape

And the number of non-zero values in the matrix:

In [None]:
sparse_gpu_array.nnz

In [None]:
data_load_time = time.time()
print("Total data load and format time: %s" % (data_load_time-data_load_start))

## Preprocessing

In [None]:
preprocess_start = time.time()

### Filter

We filter the count matrix to remove cells with an extreme number of genes expressed.

In [None]:
%%time
sparse_gpu_array = rapids_scanpy_funcs.filter_cells(sparse_gpu_array, 
                                                    min_genes=min_genes_per_cell, 
                                                    max_genes=max_genes_per_cell)

Some genes will now have zero expression in all cells. We filter out such genes.

In [None]:
%%time
sparse_gpu_array, genes = rapids_scanpy_funcs.filter_genes(sparse_gpu_array, genes, min_cells=1)

The size of our count matrix is now reduced.

In [None]:
sparse_gpu_array.shape

### Normalize

We normalize the count matrix so that the total counts in each cell sum to 1e4.

In [None]:
%%time
sparse_gpu_array = rapids_scanpy_funcs.normalize_total(sparse_gpu_array, target_sum=1e4)

Next, we log transform the count matrix.

In [None]:
%%time
sparse_gpu_array = sparse_gpu_array.log1p()

### Select Most Variable Genes

We convert the count matrix to an annData object.

In [None]:
%%time
adata = anndata.AnnData(sparse_gpu_array.get())
adata.var_names = genes.to_pandas()

Using scanpy, we filter the count matrix to retain only the most variable genes.

In [None]:
%%time
sc.pp.highly_variable_genes(adata, n_top_genes=n_top_genes, flavor="cell_ranger")
adata = adata[:, adata.var.highly_variable]

### Regress out confounding factors (number of counts, mitochondrial gene expression)

We can now perform regression on the count matrix to correct for confounding factors -  for example purposes, we use the number of counts and the expression of mitochondrial genes (named starting with `mt-`).

Before regression, we save the 'raw' expression values of the marker genes to use for labeling cells afterward.

In [None]:
genes = adata.var_names
mito_genes = genes.str.startswith(MITO_GENE_PREFIX)

We now calculate the total counts and the percentage of mitochondrial counts for each cell.

In [None]:
%%time
filtered = adata.X
n_counts = filtered.sum(axis=1)
percent_mito = (filtered[:,mito_genes].sum(axis=1) / n_counts).ravel()

n_counts = cp.array(n_counts).ravel()
percent_mito = cp.array(percent_mito).ravel()

And perform regression:

In [None]:
%%time
sparse_gpu_array = cp.sparse.csc_matrix(filtered)

In [None]:
%%time
sparse_gpu_array = rapids_scanpy_funcs.regress_out(sparse_gpu_array, n_counts, percent_mito)

### Scale

Finally, we scale the count matrix to obtain a z-score and apply a cutoff value of 10 standard deviations, obtaining the preprocessed count matrix.

In [None]:
%%time
sparse_gpu_array = rapids_scanpy_funcs.scale(sparse_gpu_array, max_value=10)

In [None]:
preprocess_time = time.time()
print("Total Preprocessing time: %s" % (preprocess_time-preprocess_start))

## Cluster & Visualize

We store the preprocessed count matrix as an AnnData object, which is currently in host memory. We also add the expression levels of the marker genes as observations to the annData object.

In [None]:
%%time
adata = anndata.AnnData(sparse_gpu_array.get())
adata.var_names = genes

In [None]:
%%time
genes = cudf.Series(adata.var_names)

In [None]:
%%time

marker_genes_raw = {
    ("%s_raw" % marker): sparse_gpu_array[:, genes[genes == marker].index[0]].ravel()
    for marker in markers
}
for name, data in marker_genes_raw.items():
    adata.obs[name] = data.get()

In [None]:
adata.shape

### Reduce

We use PCA to reduce the dimensionality of the matrix to its top 50 principal components. Here, we use Dask to parallelize across multiple GPUs.

In [None]:
%%time
n_workers = len(client.scheduler_info()['workers'].keys())
print(n_workers)

chunk_size = int(sparse_gpu_array.shape[0] / (n_workers*10))

dask_array = dask.array.from_array(
    sparse_gpu_array, 
    chunks=(chunk_size, -1),
    asarray=False)

The PCA multi-GPU time includes initial data transfer, which is about 20gb. 

In [None]:
pca = PCA(n_components=50)
dask_array = pca.fit_transform(dask_array)
adata.obsm["X_pca"] = dask_array.compute().get()

### t-SNE + k-Means

In [None]:
%%time
adata.obsm['X_tsne'] = TSNE().fit_transform(adata.obsm["X_pca"][:,:tsne_n_pcs])

We cluster the cells using k-means on the principal components. For example purposes, we set k=35.

In [None]:
%%time
# K-means
dask_kmeans_output = KMeans(n_clusters=k).fit_predict(dask_array)
adata.obs['kmeans'] = dask_kmeans_output.compute().get().astype(str)

We visualize the cells using t-SNE and label cells by color according to the k-means clustering.

In [None]:
sc.pl.tsne(adata, color=["kmeans"])

We label the cells using the `Stmn2` and `Hes1` marker genes, for neuronal and glial cells respectively. These visualizations show us the separation of neuronal and glial cells on the t-SNE plot.

In [None]:
%%time
sc.pl.tsne(adata, color=["Stmn2_raw"], color_map="Blues", vmax=1, vmin=-0.05)
sc.pl.tsne(adata, color=["Hes1_raw"], color_map="Blues", vmax=1, vmin=-0.05)

### UMAP + Louvain

We can also visualize the cells using the UMAP algorithm in Rapids. Before UMAP, we need to construct a k-nearest neighbors graph in which each cell is connected to its nearest neighbors. This can be done conveniently using rapids functionality already integrated into Scanpy.

Note that Scanpy uses an approximation to the nearest neighbors on the CPU while the GPU version performs an exact search. While both methods are known to yield useful results, some differences in the resulting visualization and clusters can be observed.

In [None]:
%%time
sc.pp.neighbors(adata, n_neighbors=n_neighbors, n_pcs=knn_n_pcs, method='rapids')

The UMAP function from Rapids is also integrated into Scanpy.

In [None]:
%%time
local_model = UMAP(n_epochs=1000, min_dist=umap_min_dist, spread=umap_spread)
local_model.fit(adata.obsm["X_pca"][:350000,:])

In [None]:
%%time
dist_embeddings = distUMAP(local_model).transform(dask_array)
adata.obsm["X_umap"] = dist_embeddings.compute().get()

Finally, we use the Louvain algorithm for graph-based clustering, once again using the `rapids` option in Scanpy.

In [None]:
%%time
sc.tl.louvain(adata, flavor='rapids')

We plot the cells using the UMAP visualization, and using the Louvain clusters as labels.

In [None]:
%%time
sc.pl.umap(adata, color=["louvain"])

And also visualize the cells labeled by expression of the `Stmn2` and `Hes1` marker genes, for neuronal and glial cells respectively.

In [None]:
%%time
sc.pl.umap(adata, color=["Stmn2_raw"], color_map="Blues", vmax=1, vmin=-0.05)
sc.pl.umap(adata, color=["Hes1_raw"], color_map="Blues", vmax=1, vmin=-0.05)

In [None]:
cluster_time = time.time()
print("Total cluster time : %s" % (cluster_time-cluster_start))

## Create Zoomed View

The speedup offered by Rapids makes it easy to interactively re-analyze subsets of cells. To illustrate this, we select glial cells (Hes1+) from the dataset.

In [None]:
reanalysis_start = time.time()

In [None]:
%%time
hes1_cells = marker_genes_raw["Hes1_raw"] > 0.0
adata = adata[hes1_cells.get(),:]
adata.shape

We can repeat the dimension reduction, clustering and visualization using this subset of cells in seconds. 

Finally, we visualize the selected neuronal cells labeled by their new clusters, and by the expression of `Olig1`, a marker gene for oligodendrocytes.

In [None]:
del dask_array

In [None]:
%%time

chunk_size = int(sparse_gpu_array.shape[0] / (n_workers*10))
dask_array = dask.array.from_array(cp.asarray(adata.X), 
                                   chunks=(chunk_size, -1))
dask_array = dask_array.persist()

client.rebalance()

In [None]:
%%time
dask_array = pca.fit_transform(dask_array)
adata.obsm['X_pca'] = dask_array.compute().get()

In [None]:
%%time
sc.pp.neighbors(adata, n_neighbors=n_neighbors, n_pcs=knn_n_pcs, method='rapids')
sc.tl.umap(adata, min_dist=umap_min_dist, spread=umap_spread, method='rapids')
sc.tl.louvain(adata, flavor='rapids')

sc.pl.umap(adata, color=["louvain"])
sc.pl.umap(adata, color=["Olig1_raw"], color_map="Blues", vmax=1, vmin=-0.05)

In [None]:
reanalysis_time = time.time()
print("Total reanalysis time : %s" % (reanalysis_time-reanalysis_start))

In [None]:
client.close()

In [None]:
print("Full time: %s" % (time.time() - start))