# RAPIDS & Scanpy Single-Cell RNA-seq Multi-GPU Workflow on 1 Million Cells

Copyright (c) 2020, NVIDIA CORPORATION.

Licensed under the Apache License, Version 2.0 (the "License") you may not use this file except in compliance with the License. You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0 

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

This notebook demonstrates a single-cell RNA analysis workflow that begins with preprocessing a count matrix of size `(n_gene, n_cell)` and results in a visualization of the clustered cells for further analysis.

For demonstration purposes, we use a dataset of 1M brain cells with Unified Virtual Memory to oversubscribe GPU memory. We then use dask to scale PCA, K-means clustering, and UMAP across multiple GPUs.

See the README for instructions to download this dataset.

## Import requirements

In [1]:
import time
import pandas as pd
import numpy as np
import scanpy as sc
import anndata

import matplotlib.pyplot as plt
import matplotlib.animation as animation

import cupy as cp
import cupyx as cpx
import cudf
import math


import h5py
import scipy

import dask
import dask_cudf
import rmm

from numba import cuda

from dask_cuda import initialize, LocalCUDACluster
from dask import delayed, dataframe
from dask.dataframe.utils import make_meta
from dask.distributed import Client
from dask_cuda.local_cuda_cluster import cuda_visible_devices

from cuml.manifold import TSNE, UMAP

from cuml.dask.decomposition import PCA as cu_dask_PCA
from cuml.dask.cluster import KMeans as cu_dask_KMeans
from cuml.dask.manifold import UMAP as cu_dask_UMAP
from cuml.dask.linear_model import LinearRegression as cu_dask_LinearRegression
from cuml.dask.neighbors import NearestNeighbors as cu_dask_NearestNeighbors

from bokeh.io.export import export_png
from bokeh.plotting import figure
from bokeh.models.tickers import FixedTicker
from bokeh.io import output_notebook, push_notebook, show

import rapids_scanpy_funcs
import utils

import warnings
warnings.filterwarnings('ignore', 'Expected ')
warnings.simplefilter('ignore')

output_notebook()

COLORS = ["#000000", "#FFFF00", "#1CE6FF", "#FF34FF", "#FF4A46", "#008941", "#006FA6", "#A30059",
          "#FFDBE5", "#7A4900", "#0000A6", "#63FFAC", "#B79762", "#004D43", "#8FB0FF", "#997D87",
          "#5A0007", "#809693", "#FEFFE6", "#1B4400", "#4FC601", "#3B5DFF", "#4A3B53", "#FF2F80",
          "#61615A", "#BA0900", "#6B7900", "#00C2A0", "#FFAA92", "#FF90C9", "#B903AA", "#D16100",
          "#DDEFFF", "#000035", "#7B4F4B", "#A1C299", "#300018", "#0AA6D8", "#013349", "#00846F",
          "#372101", "#FFB500", "#C2FFED", "#A079BF", "#CC0744", "#C0B9B2", "#C2FF99", "#001E09",
          "#00489C", "#6F0062", "#0CBD66", "#EEC3FF", "#456D75", "#B77B68", "#7A87A1", "#788D66",
          "#885578", "#FAD09F", "#FF8A9A", "#D157A0", "#BEC459", "#456648", "#0086ED", "#886F4C",
          "#34362D", "#B4A8BD", "#00A6AA", "#452C2C", "#636375", "#A3C8C9", "#FF913F", "#938A81",
          "#575329", "#00FECF", "#B05B6F", "#8CD0FF", "#3B9700", "#04F757", "#C8A1A1", "#1E6E00",
          "#7900D7", "#A77500", "#6367A9", "#A05837", "#6B002C", "#772600", "#D790FF", "#9B9700",
          "#549E79", "#FFF69F", "#201625", "#72418F", "#BC23FF", "#99ADC0", "#3A2465", "#922329",
          "#5B4534", "#FDE8DC", "#404E55", "#0089A3", "#CB7E98", "#A4E804", "#324E72", "#6A3A4C",]

We use the RAPIDS memory manager to enable Unified Virtual Memory management, which allows us to oversubscribe the GPU memory

In [2]:
cluster_start = time.time()

enable_tcp_over_ucx = True
enable_nvlink = True
enable_infiniband = False
CUDA_VISIBLE_DEVICES = cuda_visible_devices(0).split(',')
# CUDA_VISIBLE_DEVICES = [1, 2, 3, 4, 5, 6, 7]

rmm.reinitialize(managed_memory=True)
cp.cuda.set_allocator(rmm.rmm_cupy_allocator)

initialize.initialize(create_cuda_context=True,
                      enable_tcp_over_ucx=enable_tcp_over_ucx,
                      enable_nvlink=enable_nvlink,
                      enable_infiniband=enable_infiniband)

cluster = LocalCUDACluster(protocol="ucx",
                           CUDA_VISIBLE_DEVICES=CUDA_VISIBLE_DEVICES,
                           enable_tcp_over_ucx=enable_tcp_over_ucx,
                           enable_nvlink=enable_nvlink,
                           enable_infiniband=enable_infiniband)

client = Client(cluster)
n_workers = len(client.scheduler_info()['workers'].keys())
print(n_workers)
client

8


0,1
Client  Scheduler: ucx://127.0.0.1:34414  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 8  Memory: 540.95 GB


## Input data

In the cell below, we provide the path to the sparse `.h5ad` file containing the count matrix to analyze. Please see the README for instructions on how to download the dataset we use here.

To run this notebook using your own dataset, please see the README for instructions to convert your own count matrix into this format. Then, replace the path in the cell below with the path to your generated `.h5ad` file.

In [3]:
import os, wget

# input_file = "../data/krasnow_hlca_10x.sparse.h5ad"
input_file = "../data/1M_brain_cells_10X.sparse.h5ad"
if not os.path.exists(input_file):
    print('Downloading import file...')
    os.makedirs('../data', exist_ok=True)
    wget.download(
        'https://rapids-single-cell-examples.s3.us-east-2.amazonaws.com/1M_brain_cells_10X.sparse.h5ad',
        input_file)

## Set parameters

In [4]:
# marker genes
MITO_GENE_PREFIX = "mt-"              # Prefix for mitochondrial genes to regress out
markers = ["Stmn2", "Hes1", "Olig1"]  # Marker genes for visualization

# filtering cells
min_genes_per_cell = 200      # Filter out cells with fewer genes than this expressed 
max_genes_per_cell = 6000     # Filter out cells with more genes than this expressed 
min_cells=1                   # Genes containing a number of cells below this value will be filtered

# filtering genesinitialize
n_top_genes = 4000            # Number of highly variable genes to retain

# PCA
n_components = 50             # Number of principal components to compute
pca_train_ratio = 0.35        # percentage of cells to use for PCA training
n_pca_batches = 10

# t-SNE
tsne_n_pcs = 20               # Number of principal components to use for t-SNE

# k-means
k = 35                        # Number of clusters for k-means

# KNN
n_neighbors = 15              # Number of nearest neighbors for KNN graph
knn_n_pcs = 50                # Number of principal components to use for finding nearest neighbors

# UMAP
umap_train_ratio = 0.35
umap_min_dist = 0.3
umap_spread = 1.0

start = time.time()

MAX_RECS = 100000
BATCHSIZE = 20000

## Load and Prepare Data

## We load the sparse count matrix from an `h5ad` file using Scanpy. The sparse count matrix will then be placed on the GPU. 

In [5]:
data_load_start = time.time()

In [6]:
%%time

# TODO: Compute batches using total Rows and columns
BATCHES = n_workers * 3

# Read along with filtering
with h5py.File(input_file, 'r') as h5f:
    indptr = h5f['/X/indptr']
    data = h5f['/X/data']
    orginal_genes = h5f['/var/_index']
    
    orginal_genes = cudf.Series(orginal_genes, dtype=cp.dtype('object'))

    total_cols = orginal_genes.shape[0]
    total_rows = indptr.shape[0] - 1
    # batch_rows = math.ceil(total_rows / BATCHES)
print(total_cols, total_rows)

@delayed
def read_partition(sample_file, ds_data, ds_indices, ds_indptr, 
                   batch_start, rows, total_cols, gene_filter,
                   min_genes_per_cell=200, max_genes_per_cell=6000, target_sum=1e4):
    """
    Loads a single partition from HDF5 file.
    """
    batch_end = batch_start + rows

    with h5py.File(input_file, 'r') as h5f:
        # Read all things row pointers for one worker
        indptrs = h5f[ds_indptr]
        start_ptr = indptrs[batch_start]
        end_ptr = indptrs[batch_end]
        
        # Read all things data for one worker
        data = h5f[ds_data]
        sub_data = cp.array(data[start_ptr:end_ptr])

        # Read all things column pointers for one worker
        indices = h5f[ds_indices]
        sub_indices = cp.array(indices[start_ptr:end_ptr])

        # recompute the row pointer for the partial dataset
        sub_indptrs  = cp.array(indptrs[batch_start:(batch_end + 1)])
        first_ptr = sub_indptrs[0]
        sub_indptrs = sub_indptrs - first_ptr
        
    partial_sparse_array = cp.sparse.csr_matrix((sub_data, sub_indices, sub_indptrs),
                                                shape=(batch_end - batch_start, total_cols))

    # TODO: Add barcode filtering here.
    degrees = cp.diff(partial_sparse_array.indptr)
    query = ((min_genes_per_cell <= degrees) & (degrees <= max_genes_per_cell))
    partial_sparse_array = partial_sparse_array[query]

    if not gene_filter:
        partial_sparse_sq_array = cp.sparse.csr_matrix((cp.square(sub_data), sub_indices, sub_indptrs),
                                                shape=(batch_end - batch_start, total_cols))
        partial_sparse_sq_array = partial_sparse_sq_array[query]
        col1 = partial_sparse_array.sum(axis=0)
        col2 = partial_sparse_sq_array.sum(axis=0)
        ret_value = cp.hstack([col1, col2])
        
    else:
        partial_sparse_array = rapids_scanpy_funcs.normalize_total(
            partial_sparse_array, target_sum=target_sum)
        ret_value = partial_sparse_array[:, gene_filter]

    return ret_value

27998 1306127
CPU times: user 5.38 s, sys: 992 ms, total: 6.37 s
Wall time: 6.11 s


In [7]:
%%time
print('Computing sum and creating gene count filter...')
dls_sum = []

for batch_start in range(0, MAX_RECS, BATCHSIZE):
    dls_sum.append(
        dask.array.from_delayed(
            read_partition(input_file, 
                           '/X/data', '/X/indices', '/X/indptr', 
                           batch_start, BATCHSIZE, total_cols, None,
                           min_genes_per_cell=min_genes_per_cell,
                           max_genes_per_cell=max_genes_per_cell),            
            dtype=cp.float32,
            shape=(BATCHSIZE, total_cols * 2)))

sum_gpu_arrays = dask.array.concatenate(dls_sum)
sum_gpu_arrays
# # First half of this array is sum and rest is square of sum
sum_gpu_arrays  = sum_gpu_arrays.compute()
# Split the sum and square
sum_gpu_array = sum_gpu_arrays[:, 0:sum_gpu_arrays.shape[1]/2]
sum_sq_gpu_array = sum_gpu_arrays[:,sum_gpu_arrays.shape[1]/2:]

# Filter genes with at-least <<min_cells>> number of cells rec.
sum_gpu_array = sum_gpu_array.sum(axis=0)
min_cell_filter = (sum_gpu_array >= min_cells)

sum_gpu_array = sum_gpu_array[min_cell_filter]
sum_sq_gpu_array = sum_sq_gpu_array[:, min_cell_filter]
sum_sq_gpu_array = sum_sq_gpu_array.sum(axis=0)

sum_sq_gpu_array.shape

Computing sum and creating gene count filter...
CPU times: user 684 ms, sys: 36 ms, total: 720 ms
Wall time: 8.46 s


(21673,)

In [8]:
%%time

print('Loading data into dataframes...')
dls_data = []
for batch_start in range(0, MAX_RECS, BATCHSIZE):
    dls_data.append(
        dask.array.from_delayed(
            read_partition(input_file,
                           '/X/data', '/X/indices', '/X/indptr', 
                           batch_start, BATCHSIZE, total_cols, min_cell_filter.tolist(),
                           min_genes_per_cell=min_genes_per_cell,
                           max_genes_per_cell=max_genes_per_cell),
            dtype=cp.float32,
            shape=(BATCHSIZE, sum_sq_gpu_array.shape[0])))

print('Concate sub-arrays...')
sparse_gpu_array = dask.array.concatenate(dls_data)

print('Compute and persist arrays...')
type(sparse_gpu_array)
sparse_gpu_array = sparse_gpu_array.persist()
sparse_gpu_array

Loading data into dataframes...
Concate sub-arrays...
Compute and persist arrays...
CPU times: user 692 ms, sys: 28 ms, total: 720 ms
Wall time: 711 ms


Unnamed: 0,Array,Chunk
Bytes,8.67 GB,1.73 GB
Shape,"(100000, 21673)","(20000, 21673)"
Count,5 Tasks,5 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 8.67 GB 1.73 GB Shape (100000, 21673) (20000, 21673) Count 5 Tasks 5 Chunks Type float32 numpy.ndarray",21673  100000,

Unnamed: 0,Array,Chunk
Bytes,8.67 GB,1.73 GB
Shape,"(100000, 21673)","(20000, 21673)"
Count,5 Tasks,5 Chunks
Type,float32,numpy.ndarray


For this example, we select the first 1 million cells in the dataset. We maintain the index of unique genes in our dataset:

Verify the shape of the resulting sparse matrix:

In [9]:
%%time
sparse_gpu_array = dask.array.log1p(sparse_gpu_array)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 535 µs


And the number of non-zero values in the matrix:

In [10]:
print("Total data load and format time: %s" % (time.time() - data_load_start))

Total data load and format time: 15.319685697555542


## Preprocessing

In [11]:
preprocess_start = time.time()

### Filter

In [12]:
# Filter out the genes that got removed from the original data
genes = orginal_genes[min_cell_filter]
genes.shape

(21673,)

We filter the count matrix to remove cells with an extreme number of genes expressed.

In [13]:
%%time
# FILTERING IS NOT DONE WHILE READING DATA
# sparse_gpu_array = rapids_scanpy_funcs.filter_cells(sparse_gpu_array, 
#                                                     min_genes=min_genes_per_cell, 
#                                                     max_genes=max_genes_per_cell)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 20.3 µs


Some genes will now have zero expression in all cells. We filter out such genes.

In [14]:
%%time
# FILTERING IS NOT DONE WHILE READING DATA
# sparse_gpu_array, genes = rapids_scanpy_funcs.filter_genes(sparse_gpu_array, genes.get(), min_cells=1)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 5.48 µs


The size of our count matrix is now reduced.

In [15]:
%%time
# sparse_gpu_array.shape

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 5.01 µs


### Normalize

We normalize the count matrix so that the total counts in each cell sum to 1e4.

In [16]:
%%time
# This is now done while loading data
# sparse_gpu_array = rapids_scanpy_funcs.normalize_total(sparse_gpu_array, target_sum=1e4)
# sparse_gpu_array = sparse_gpu_array.log1p()

# sparse_gpu_array = sparse_gpu_array.astype(cp.float32)
# sparse_gpu_array.shape

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 5.96 µs


Next, we log transform the count matrix.

In [17]:
%%time

marker_genes_raw = {}
genes1 = genes.reset_index(drop=True)
i = 0
for index in genes1[genes1.isin(markers)].index.to_arrow().to_pylist():
    marker_genes_raw[markers[i]] = sparse_gpu_array[:, index].compute().toarray().ravel()
    i += 1

CPU times: user 676 ms, sys: 164 ms, total: 840 ms
Wall time: 2.42 s


### Select Most Variable Genes

We convert the count matrix to an annData object.

In [18]:
%%time
# gpu_array = sparse_gpu_array.compute()
# adata = anndata.AnnData(gpu_array.get())
# adata.var_names = genes.to_pandas()

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 4.77 µs


Before filtering the count matrix, we save the 'raw' expression values of the marker genes to use for labeling cells afterward.

Using scanpy, we filter the count matrix to retain only the most variable genes.

In [19]:
%%time
# sc.pp.highly_variable_genes(adata, n_top_genes=n_top_genes, flavor="cell_ranger")
# print("Full time: %s" % (time.time() - start))
# sc_hightly_variable = genes[adata.var.highly_variable.values]
# sc_hightly_variable.head(10)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 5.25 µs


In [20]:
%%time
from statsmodels import robust

mean = sum_gpu_array / total_rows
mean[mean == 0] = 1e-12

mean_sq = sum_sq_gpu_array / total_rows
variance = mean_sq - mean ** 2
variance *= total_cols / (total_rows - 1) 
dispersion = variance / mean

df = pd.DataFrame()
df['means'] = mean.tolist()
df['dispersions'] = dispersion.tolist()
df['mean_bin'] = pd.cut(
    df['means'],
    np.r_[-np.inf, np.percentile(df['means'], np.arange(10, 105, 5)), np.inf],
)


disp_grouped = df.groupby('mean_bin')['dispersions']
disp_median_bin = disp_grouped.median()

with warnings.catch_warnings():
    warnings.simplefilter('ignore')
    disp_mad_bin = disp_grouped.apply(robust.mad)
    df['dispersions_norm'] = (
        df['dispersions'].values - disp_median_bin[df['mean_bin'].values].values
    ) / disp_mad_bin[df['mean_bin'].values].values

dispersion_norm = df['dispersions_norm'].values
dispersion_norm = dispersion_norm[~np.isnan(dispersion_norm)]
dispersion_norm[::-1].sort()

if n_top_genes > df.shape[0]:
    n_top_genes = df.shape[0]

disp_cut_off = dispersion_norm[n_top_genes - 1]
vaiable_genes = np.nan_to_num(df['dispersions_norm'].values) >= disp_cut_off
genes = genes[vaiable_genes]
genes.shape

CPU times: user 144 ms, sys: 16 ms, total: 160 ms
Wall time: 150 ms


(4000,)

### Regress out confounding factors (number of counts, mitochondrial gene expression)

We can now perform regression on the count matrix to correct for confounding factors -  for example purposes, we use the number of counts and the expression of mitochondrial genes (named starting with `mt-`).

We now calculate the total counts and the percentage of mitochondrial counts for each cell.

In [21]:
chunk_size = int(sparse_gpu_array.shape[0] / (n_workers*10))

# Filter highly variable genes
dask_array = sparse_gpu_array[:, vaiable_genes]

sparse_gpu_array.shape
dask_array.shape

(100000, 4000)

In [33]:
from cuml.common.memory_utils import with_cupy_rmm
from cuml.common import rmm_cupy_ary
from cuml.dask.common.part_utils import _extract_partitions
from cuml.dask.common.dask_df_utils import to_dask_cudf
from cuml.dask.common.input_utils import to_dask_cupy

import cupyx

@with_cupy_rmm
def csr_to_csc(csr_array, client):
    def _conv_csr_to_csc(x):
        return  x.tocsc()

    parts = client.sync(_extract_partitions, csr_array)
    futures = [client.submit(_conv_csr_to_csc, 
                             part, 
                             workers=[w], 
                             pure=False)
               for w, part in parts]
    objs = []
    shape = csr_array.shape
    for i in range(len(futures)):
        obj = dask.array.from_delayed(futures[i], 
                                      shape=futures[i].result().shape,
                                      dtype=cp.float32)
        objs.append(obj)
    return dask.array.concatenate(objs)

sparse_gpu_array = csr_to_csc(dask_array, client=client)
sparse_gpu_array.shape

(99109, 4000)

In [34]:
%%time

@with_cupy_rmm
def sum_csc(csc_array, client):

    shape = csc_array.shape
    def __sum(x):
        return x.sum(axis=1)

    parts = client.sync(_extract_partitions, csc_array)
    futures = [client.submit(__sum, 
                             part, 
                             workers=[w], 
                             pure=False)
               for w, part in parts]
    objs = []
    for i in range(len(futures)):
        obj = dask.array.from_delayed(futures[i], 
                                      shape=futures[i].result().shape,
                                      dtype=cp.float32)
        objs.append(obj)
    return dask.array.concatenate(objs)

mito_genes = genes.str.startswith(MITO_GENE_PREFIX)
mito_genes = cp.fromDlpack(mito_genes.to_dlpack())
mito_genes_indices = cp.where(mito_genes == 1)[0]

# # n_counts = dask.array.sum(sparse_gpu_array, axis=1, keepdims=False)
n_counts = sum_csc(sparse_gpu_array, client=client)
n_counts = dask.array.sum(n_counts, axis=1).compute()

result = dask.array.take(sparse_gpu_array,
                         mito_genes_indices.tolist(),
                         axis=1)
result = sum_csc(result, client=client)
result = dask.array.sum(result, axis=1)

result.shape, n_counts.shape
result = dask.array.true_divide(result, n_counts)
# result.compute()
percent_mito = dask.array.ravel(result)
percent_mito = percent_mito.compute()

CPU times: user 224 ms, sys: 40 ms, total: 264 ms
Wall time: 687 ms


In [31]:
# n_counts.shape, sparse_gpu_array.shape

And perform regression:

In [35]:
%%time
# dk_sparse_gpu_array = dask.array.from_array(sparse_gpu_array, chunks=chunk_size, asarray=False)
regressors = cp.ones((n_counts.shape[0]*3), dtype=cp.float32).reshape((n_counts.shape[0], 3), order="F")
regressors[:, 1] = n_counts
regressors[:, 2] = percent_mito

X = dask.array.from_array(regressors, asarray=False)

del regressors
X.shape

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 4.83 ms


(99109, 3)

In [24]:
%%time
# sparse_gpu_array = rapids_scanpy_funcs.regress_out(sparse_gpu_array, n_counts, percent_mito)
# sparse_gpu_array.shape

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 5.48 µs


In [37]:
%%time
from cuml.linear_model import LinearRegression

@delayed
def regress(X, y):
    lr = LinearRegression(fit_intercept=False, output_type="cupy")
    lr.fit(X, y, convert_dtype=True)
    return y.reshape(y.shape[0], ) - lr.predict(X).reshape(y.shape[0])

chunk_size = sparse_gpu_array.shape[1] // (n_workers*2)

print('Creating dask arrays for LinearRegression...')
ld_delay = []
row_cnt = sparse_gpu_array.shape[0]
for i in range(sparse_gpu_array.shape[1]):
    if i % 500 == 0:
        print(i)
    y = sparse_gpu_array[:, i]
#     y = sparse_gpu_array[:, i].todense()
    ld_delay.append(
        dask.array.from_delayed(
            regress(X, y),
            dtype=cp.float32, 
            shape=(sparse_gpu_array.shape[0], 1)))

print('Computing...')
result = dask.array.concatenate(ld_delay, axis=1)
# TODO: The above line is causing distributed modules getting into what seems tobe a endless loop.

# result = dask.compute(ld_delay)
# result = cp.vstack(result[0])
# sparse_gpu_array = result.T
# sparse_gpu_array.shape

Creating dask arrays for LinearRegression...
0
500
1000
1500
2000
2500
3000
3500
Computing...
CPU times: user 2.53 s, sys: 300 ms, total: 2.83 s
Wall time: 2.71 s


In [38]:
type(sparse_gpu_array)

dask.array.core.Array

### Scale

Finally, we scale the count matrix to obtain a z-score and apply a cutoff value of 10 standard deviations, obtaining the preprocessed count matrix.

In [39]:
%%time
# max_value=10
# mean = result.mean()
# # mean
# sparse_gpu_array = dask.array.subtract(result, mean)

# stddev = dask.array.sqrt(mean.var())
# sparse_gpu_array = dask.array.true_divide(sparse_gpu_array, stddev)
# # sparse_gpu_array = dask.array.clip(sparse_gpu_array)
# sparse_gpu_array.shape()

sparse_gpu_array = rapids_scanpy_funcs.scale(sparse_gpu_array, max_value=10)

ValueError: Unsupported dtype object

## Cluster & Visualize

We store the preprocessed count matrix as an AnnData object, which is currently in host memory. We also add the expression levels of the marker genes as observations to the annData object.

### Reduce

We use PCA to reduce the dimensionality of the matrix to its top 50 principal components. Here, we use Dask to parallelize across multiple GPUs.

In [None]:
import dask.dataframe as dd
from cuml.dask.common.dask_arr_utils import to_dask_cudf, to_sparse_dask_array
from cuml.dask.common.dask_df_utils import to_dask_cudf
from cuml.dask.common.input_utils import to_dask_cupy

# dask_df = dask.dataframe.from_array(sparse_gpu_array, columns=gene.to_arrow().to_pylist())
# dask_cu_df = dask_cudf.from_dask_dataframe(dask_df)


The PCA multi-GPU time includes initial data transfer, which is about 20gb. 

In [29]:
%%time

import dask.dataframe as dd
from cuml.dask.common.dask_arr_utils import to_dask_cudf, to_sparse_dask_array
from cuml.dask.common.dask_df_utils import to_dask_cudf
from cuml.dask.common.input_utils import to_dask_cupy

# dask_df = dask.dataframe.from_array(sparse_gpu_array, columns=gene.to_arrow().to_pylist())
# dask_cu_df = dask_cudf.from_dask_dataframe(dask_df)
#
# The above line does not work due to worker failure while converting sparse
# array to dataframes. Folloiwng code is a workaround for loading data using 
# delayed functions.

@delayed
def read_sparse_gpu_array(sparse_array, start, batch_size):
    return cudf.DataFrame(sparse_array[start: start + batch_size])

BATCHES = n_workers * 10
num_recs = sparse_gpu_array.shape[0]
batch_size = math.ceil(num_recs / BATCHES)
columns = gene.to_arrow().to_pylist()

print("Appending delayed function returning cudf dataframe...")
dls = []
for start in range(0, num_recs, batch_size):
    bsize = min(num_recs - start, batch_size)
    dls.append(read_sparse_gpu_array(sparse_gpu_array, start, bsize))

import pandas

print("Creating dask df from delays...")
prop_meta = {i: pandas.Series([], dtype='float32') for i in range(sparse_gpu_array.shape[1])}
meta_df = cudf.DataFrame(prop_meta)

print("Computing delayed functions...")
dls = client.compute(dls)

print("Creating Dataframe from futures...")
dask_cu_df = dask.dataframe.from_delayed(dls, meta=meta_df)
dask_cu_df

Appending delayed function returning cudf dataframe...
Creating dask df from delays...
CPU times: user 5min 38s, sys: 8min 50s, total: 14min 29s
Wall time: 14min 9s


In [None]:
%%time
pca = cu_dask_PCA(n_components=n_components, client=client)
# dask_reduced_df = pca.fit_transform(dask_cu_df)

pca.fit(dask_cu_df)
dask_reduced_df = pca.transform(dask_cu_df)

# TODO: Following line is a bottleneck
reduced_df = dask_reduced_df.compute()

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7fd022353f10>>, <Task finished name='Task-52512' coro=<UCX.close() done, defined at /opt/conda/envs/rapids/lib/python3.8/site-packages/distributed/comm/ucx.py:297> exception=UCXError("<[Send #15399] ep: 0x7fcf60fad1f8, tag: 0xa40f5589adf1d8fe, nbytes: 16, type: <class 'bytes'>>: Input/output error")>)
Traceback (most recent call last):
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/distributed/comm/ucx.py", line 299, in close
    await self.ep.send(struct.pack("?Q", True, 0))
  File "/opt/conda/envs/rapids/lib/python3.8/site-packages/ucp/endpoint_reuse.py", l

In [None]:
# dask_cu_df.compute().shape

### t-SNE + k-Means

In [None]:
%%time
# TODO: Bottleneck
tsne = TSNE().fit_transform(reduced_df.iloc[:, :tsne_n_pcs])

We cluster the cells using k-means on the principal components. For example purposes, we set k=35.

In [None]:
%%time
# K-means
dask_array = dask_cudf.from_cudf(reduced_df, npartitions=BATCHES)
dask_kmeans_output = cu_dask_KMeans(n_clusters=k).fit_predict(dask_array)
kmeans = dask_kmeans_output.compute()

In [38]:
tsne.index = kmeans.index
tsne['kmeans'] = kmeans
for marker_gene in marker_genes_raw:
    print(marker_genes_raw[marker_gene].shape)
    tsne[marker_gene + '_raw'] = marker_genes_raw[marker_gene]
tsne.shape

(297504,)
(297504,)
(297504,)


(3719, 3)

We visualize the cells using t-SNE and label cells by color according to the k-means clustering.

In [44]:
%%time

def show_tsne(df, x, y, cluster_col, title):
    tsne_fig = figure(title=title, width=800, output_backend="webgl")
    clusters = df[cluster_col].unique().values_host

    for cluster in clusters:
        cdf = df.query(cluster_col + ' == ' + str(cluster))
        if cdf.shape[0] == 0:
            continue

        x_array = cp.fromDlpack(cdf[0].to_dlpack())
        y_array = cp.fromDlpack(cdf[1].to_dlpack())

        tsne_fig.circle(x_array.get(),
                        y_array.get(),
                        size=2,
                        color=COLORS[cluster],
                        legend = 'Cluster ' + str(cluster))

    tsne_fig.legend.location = 'top_right'
    tsne_fig.legend.title = 'Clusters'

    tsne_fig_handle = show(tsne_fig, notebook_handle=True)
    push_notebook(handle=tsne_fig_handle)

show_tsne(tsne, 0, 1, 'kmeans', 'kmeans')

# sc.pl.tsne(adata, color=["kmeans"])
# x_embedded = TSNE().fit_transform(sparse_gpu_array)

CPU times: user 5.6 s, sys: 96 ms, total: 5.7 s
Wall time: 5.49 s


We label the cells using the `Stmn2` and `Hes1` marker genes, for neuronal and glial cells respectively. These visualizations show us the separation of neuronal and glial cells on the t-SNE plot.

In [36]:
%%time

def show_tsne_grad(df, x, y, cluster_col, color_col, title):
    tsne_fig = figure(title=title, width=800, output_backend="webgl")
    clusters = df[cluster_col].unique().values_host
    for cluster in clusters:
        cdf = df.query(cluster_col + ' == ' + str(cluster))
        if cdf.shape[0] == 0:
            continue

        x_array = cp.fromDlpack(cdf[0].to_dlpack())
        y_array = cp.fromDlpack(cdf[1].to_dlpack())
        color_array = cp.fromDlpack(cdf[color_col].to_dlpack())
        tsne_fig.circle(x_array.get(),
                        y_array.get(),
                        size=2,
                        color=color_array.get(),
                        legend = 'Cluster ' + str(cluster))

    tsne_fig.legend.location = 'top_right'
    tsne_fig.legend.title = 'Clusters'

    tsne_fig_handle = show(tsne_fig, notebook_handle=True)
    push_notebook(handle=tsne_fig_handle)

show_tsne_grad(tsne, 0, 1, 'kmeans', 'Stmn2_raw', 'Stmn2')
show_tsne_grad(tsne, 0, 1, 'kmeans', 'Hes1_raw', 'Stmn2')

# sc.pl.tsne(adata, color=["Stmn2_raw"], color_map="Blues", vmax=1, vmin=-0.05)
# sc.pl.tsne(adata, color=["Hes1_raw"], color_map="Blues", vmax=1, vmin=-0.05)

CPU times: user 2.2 s, sys: 56 ms, total: 2.25 s
Wall time: 2.18 s


### UMAP + Graph clustering

In [69]:
reduced_df

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,40,41,42,43,44,45,46,47,48,49
0,-1.360810,-0.113604,0.982468,9.109477,2.118804,-1.784601,-3.274710,-3.752516,-2.529793,-3.072853,...,0.947091,0.036218,0.382349,-1.940166,0.318814,-0.279056,-0.181169,1.628333,0.106105,0.271454
1,-3.484391,-1.575349,0.411552,3.729936,1.599177,-1.786675,1.863293,2.444724,-8.186501,-0.612851,...,-2.021606,-0.186247,-1.837676,1.098224,-0.209091,0.481969,-1.398694,0.003970,-2.382414,-2.445073
2,10.893725,5.832059,-6.564771,9.793081,1.745506,-1.227044,2.264982,-3.338192,-3.143614,1.915057,...,-0.205891,0.161677,-2.019209,3.483032,0.195425,-0.485079,-2.737942,1.328566,-2.418057,-3.095875
3,-5.181816,-2.531816,1.367650,-2.421814,-3.944922,-0.425659,6.222536,-6.065768,2.015984,-5.050279,...,-0.448897,-0.691009,-0.101914,-1.811269,0.080937,-1.678282,-4.920358,0.986628,1.364390,-0.439942
4,-6.433423,-0.617907,1.732349,1.555974,1.908829,0.434938,-1.609256,3.352144,-0.784177,0.899810,...,0.344058,0.193001,0.969225,2.696614,0.819697,-0.104311,-0.854026,-0.396278,-0.139689,0.350312
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1400,-11.027006,3.345924,1.419838,0.329949,-3.328462,-0.040827,4.411818,-8.023420,2.579068,-2.765866,...,-0.621595,0.089814,-1.170707,-0.790083,-0.316261,1.057088,1.351864,0.169567,2.748370,0.184327
1401,-0.110869,10.697875,-9.532808,-4.424552,-5.409941,4.611927,2.687689,-9.822119,9.148540,4.421216,...,1.460683,-2.042260,-3.446643,0.221506,-0.685131,0.642369,-0.894774,-2.047567,2.010583,-1.377794
1402,-6.230055,-2.733839,0.659228,-4.072932,-1.509475,-0.816852,3.634532,3.022795,-2.488100,2.701477,...,0.583452,2.632347,0.243993,-0.451509,-1.421912,-0.015063,0.483453,0.358436,-0.025175,-0.596023
1403,39.715939,-32.277817,12.117223,3.198272,14.399944,18.060724,6.116024,-8.492730,1.964065,4.731811,...,7.188445,1.112357,-1.780134,-1.161728,4.248825,1.334377,0.615755,-2.732564,5.176321,2.845205


We can also visualize the cells using the UMAP algorithm in Rapids. Before UMAP, we need to construct a k-nearest neighbors graph in which each cell is connected to its nearest neighbors. This can be done conveniently using rapids functionality already integrated into Scanpy.

Note that Scanpy uses an approximation to the nearest neighbors on the CPU while the GPU version performs an exact search. While both methods are known to yield useful results, some differences in the resulting visualization and clusters can be observed.

In [71]:
%%time
# sc.pp.neighbors(adata, n_neighbors=n_neighbors, n_pcs=knn_n_pcs, method='rapids')
import cudf
from cuml.neighbors import NearestNeighbors

model = NearestNeighbors(n_neighbors=n_neighbors, output_type="input")
model.fit(sparse_gpu_array)

graph = model.kneighbors_graph(X=sparse_gpu_array, n_neighbors=n_neighbors, mode='connectivity') 

# distances, indices = model.kneighbors(reduced_df)
# nn = cu_dask_NearestNeighbors(client=client, n_neighbors=n_neighbors)
# model = nn.fit(dask_reduced_df)
# neighbors = nn.kneighbors(dask_reduced_df)
# neighbors

CPU times: user 4.65 s, sys: 4.04 s, total: 8.69 s
Wall time: 8.59 s


The UMAP function from Rapids is also integrated into Scanpy.

In [55]:
%%time
local_model = UMAP(n_epochs=1000, min_dist=umap_min_dist, spread=umap_spread)
local_model.fit(reduced_df[:350000])

CPU times: user 160 ms, sys: 316 ms, total: 476 ms
Wall time: 500 ms


UMAP(n_neighbors=15, n_components=2, n_epochs=1000, learning_rate=1.0, min_dist=0.3, spread=1.0, set_op_mix_ratio=1.0, local_connectivity=1.0, repulsion_strength=1.0, negative_sample_rate=5, transform_queue_size=4.0, init='spectral', verbose=4, a=0.9921756197688717, b=1.1122533842193434, target_n_neighbors=-1, target_weights=0.5, target_metric='categorical', handle=<cuml.raft.common.handle.Handle object at 0x7f7af0d85a70>, hash_input=False, random_state=1159439447837401987, optim_batch_size=0, callback=None, output_type='input')

In [58]:
%%time
dist_embeddings = cu_dask_UMAP(local_model).transform(dask_array)
X_umap = dist_embeddings.compute()

CPU times: user 2.23 s, sys: 556 ms, total: 2.79 s
Wall time: 7.66 s


Finally, we use the Louvain algorithm for graph-based clustering, once again using the `rapids` option in Scanpy.

In [85]:
%%time
import cugraph as cg

graph = graph.tocoo()
edge_list = cudf.DataFrame({'g_row':graph.row, 'g_col':graph.col})
G = cg.Graph()
G.from_cudf_edgelist(edge_list, 'g_row', 'g_col')
louvain_parts, score = cg.louvain(G)

# sc.tl.louvain(adata, flavor='rapids')

CPU times: user 276 ms, sys: 184 ms, total: 460 ms
Wall time: 443 ms


We plot the cells using the UMAP visualization, and using the Louvain clusters as labels.

In [None]:
%%time
sc.pl.umap(adata, color=["louvain"])

And also visualize the cells labeled by expression of the `Stmn2` and `Hes1` marker genes, for neuronal and glial cells respectively.

In [None]:
%%time
sc.pl.umap(adata, color=["Stmn2_raw"], color_map="Blues", vmax=1, vmin=-0.05)
sc.pl.umap(adata, color=["Hes1_raw"], color_map="Blues", vmax=1, vmin=-0.05)

In [None]:
cluster_time = time.time()
print("Total cluster time : %s" % (cluster_time-cluster_start))

## Create Zoomed View

The speedup offered by Rapids makes it easy to interactively re-analyze subsets of cells. To illustrate this, we select glial cells (Hes1+) from the dataset.

In [None]:
reanalysis_start = time.time()

In [None]:
%%time
hes1_cells = marker_genes_raw["Hes1_raw"] > 0.0
adata = adata[hes1_cells.get(),:]
adata.shape

We can repeat the dimension reduction, clustering and visualization using this subset of cells in seconds. 

Finally, we visualize the selected neuronal cells labeled by their new clusters, and by the expression of `Olig1`, a marker gene for oligodendrocytes.

In [None]:
del dask_array

In [None]:
%%time

chunk_size = int(sparse_gpu_array.shape[0] / (n_workers*10))
dask_array = dask.array.from_array(cp.asarray(adata.X), 
                                   chunks=(chunk_size, -1))
dask_array = dask_array.persist()

client.rebalance()

In [None]:
%%time
dask_array = pca.fit_transform(dask_array)
adata.obsm['X_pca'] = dask_array.compute().get()

In [None]:
%%time
sc.pp.neighbors(adata, n_neighbors=n_neighbors, n_pcs=knn_n_pcs, method='rapids')
sc.tl.umap(adata, min_dist=umap_min_dist, spread=umap_spread, method='rapids')
sc.tl.louvain(adata, flavor='rapids')

sc.pl.umap(adata, color=["louvain"])
sc.pl.umap(adata, color=["Olig1_raw"], color_map="Blues", vmax=1, vmin=-0.05)

In [None]:
reanalysis_time = time.time()
print("Total reanalysis time : %s" % (reanalysis_time-reanalysis_start))

In [None]:
client.close()

In [None]:
print("Full time: %s" % (time.time() - start))