In [1]:
! uv pip install --pre anndata scanpy "dask>=2022.09.2,<2024.8.0" "dask-ml>=2022.09.2,<2024.8.0" "dask[distributed]>=2022.09.2,<2024.8.0" "bokeh!=3.0.*,>=2.4.0"

[2mAudited [1m6 packages[0m in 2.32s[0m


In [2]:
from dask.distributed import Client, LocalCluster
import dask.array as da
import dask
from scipy import sparse
import numpy as np
import anndata as ad
import scanpy as sc
from pathlib import Path
import h5py

# Setup

To begin, let's set up some compute resources locally.  We'll use `LocalCluster` because this tutorial will run on a laptop, but `dask` and `dask-cuda` offer deep customization + heterogeneous compute support:

- https://docs.dask.org/en/stable/deploying.html
- https://docs.rapids.ai/api/dask-cuda/nightly/

We'll then navigate to the URL listed to be able to monitor our resources - this handy functionality comes for free with `dask`: http://127.0.0.1:8787/status

*Note* We will only be working with numeric array data.  Both [dask.DataFrame](https://docs.dask.org/en/stable/dataframe.html) and [xarray](https://docs.xarray.dev/en/stable/) offer stable dataframe functionality.  `AnnData` objects will hopefully soon be integrated with `xarray`.

In [3]:
cluster = LocalCluster(n_workers=25)
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 25
Total threads: 225,Total memory: 1.95 TiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:8209,Workers: 25
Dashboard: http://127.0.0.1:8787/status,Total threads: 225
Started: Just now,Total memory: 1.95 TiB

0,1
Comm: tcp://127.0.0.1:8143,Total threads: 9
Dashboard: http://127.0.0.1:8223/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8373,
Local directory: /tmp/dask-scratch-space-1000113/worker-po3d5kxv,Local directory: /tmp/dask-scratch-space-1000113/worker-po3d5kxv

0,1
Comm: tcp://127.0.0.1:8235,Total threads: 9
Dashboard: http://127.0.0.1:8481/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8319,
Local directory: /tmp/dask-scratch-space-1000113/worker-jfb0n5zf,Local directory: /tmp/dask-scratch-space-1000113/worker-jfb0n5zf

0,1
Comm: tcp://127.0.0.1:8479,Total threads: 9
Dashboard: http://127.0.0.1:8273/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8429,
Local directory: /tmp/dask-scratch-space-1000113/worker-1nnv60s0,Local directory: /tmp/dask-scratch-space-1000113/worker-1nnv60s0

0,1
Comm: tcp://127.0.0.1:8313,Total threads: 9
Dashboard: http://127.0.0.1:8069/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8247,
Local directory: /tmp/dask-scratch-space-1000113/worker-dsk8pud9,Local directory: /tmp/dask-scratch-space-1000113/worker-dsk8pud9

0,1
Comm: tcp://127.0.0.1:8457,Total threads: 9
Dashboard: http://127.0.0.1:8389/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8463,
Local directory: /tmp/dask-scratch-space-1000113/worker-e_hz582j,Local directory: /tmp/dask-scratch-space-1000113/worker-e_hz582j

0,1
Comm: tcp://127.0.0.1:8071,Total threads: 9
Dashboard: http://127.0.0.1:8135/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8103,
Local directory: /tmp/dask-scratch-space-1000113/worker-c39elu0q,Local directory: /tmp/dask-scratch-space-1000113/worker-c39elu0q

0,1
Comm: tcp://127.0.0.1:8165,Total threads: 9
Dashboard: http://127.0.0.1:8061/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8067,
Local directory: /tmp/dask-scratch-space-1000113/worker-k1u5yg2b,Local directory: /tmp/dask-scratch-space-1000113/worker-k1u5yg2b

0,1
Comm: tcp://127.0.0.1:8385,Total threads: 9
Dashboard: http://127.0.0.1:8169/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8301,
Local directory: /tmp/dask-scratch-space-1000113/worker-depmwc_w,Local directory: /tmp/dask-scratch-space-1000113/worker-depmwc_w

0,1
Comm: tcp://127.0.0.1:8075,Total threads: 9
Dashboard: http://127.0.0.1:8309/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8257,
Local directory: /tmp/dask-scratch-space-1000113/worker-w52n_p5r,Local directory: /tmp/dask-scratch-space-1000113/worker-w52n_p5r

0,1
Comm: tcp://127.0.0.1:8041,Total threads: 9
Dashboard: http://127.0.0.1:8483/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8325,
Local directory: /tmp/dask-scratch-space-1000113/worker-5h5rxr7b,Local directory: /tmp/dask-scratch-space-1000113/worker-5h5rxr7b

0,1
Comm: tcp://127.0.0.1:8249,Total threads: 9
Dashboard: http://127.0.0.1:8477/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8009,
Local directory: /tmp/dask-scratch-space-1000113/worker-qcpmp49s,Local directory: /tmp/dask-scratch-space-1000113/worker-qcpmp49s

0,1
Comm: tcp://127.0.0.1:8451,Total threads: 9
Dashboard: http://127.0.0.1:8445/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8059,
Local directory: /tmp/dask-scratch-space-1000113/worker-furxx4mx,Local directory: /tmp/dask-scratch-space-1000113/worker-furxx4mx

0,1
Comm: tcp://127.0.0.1:8333,Total threads: 9
Dashboard: http://127.0.0.1:8439/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8077,
Local directory: /tmp/dask-scratch-space-1000113/worker-bcinuic8,Local directory: /tmp/dask-scratch-space-1000113/worker-bcinuic8

0,1
Comm: tcp://127.0.0.1:8499,Total threads: 9
Dashboard: http://127.0.0.1:8387/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8489,
Local directory: /tmp/dask-scratch-space-1000113/worker-bfsk2pkm,Local directory: /tmp/dask-scratch-space-1000113/worker-bfsk2pkm

0,1
Comm: tcp://127.0.0.1:8381,Total threads: 9
Dashboard: http://127.0.0.1:8413/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8133,
Local directory: /tmp/dask-scratch-space-1000113/worker-nhbngn5_,Local directory: /tmp/dask-scratch-space-1000113/worker-nhbngn5_

0,1
Comm: tcp://127.0.0.1:8233,Total threads: 9
Dashboard: http://127.0.0.1:8437/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8251,
Local directory: /tmp/dask-scratch-space-1000113/worker-mv1v7ow3,Local directory: /tmp/dask-scratch-space-1000113/worker-mv1v7ow3

0,1
Comm: tcp://127.0.0.1:8227,Total threads: 9
Dashboard: http://127.0.0.1:8425/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8471,
Local directory: /tmp/dask-scratch-space-1000113/worker-fg2jepxt,Local directory: /tmp/dask-scratch-space-1000113/worker-fg2jepxt

0,1
Comm: tcp://127.0.0.1:8475,Total threads: 9
Dashboard: http://127.0.0.1:8243/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8047,
Local directory: /tmp/dask-scratch-space-1000113/worker-d06u0k8j,Local directory: /tmp/dask-scratch-space-1000113/worker-d06u0k8j

0,1
Comm: tcp://127.0.0.1:8053,Total threads: 9
Dashboard: http://127.0.0.1:8161/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8171,
Local directory: /tmp/dask-scratch-space-1000113/worker-ke1zme3t,Local directory: /tmp/dask-scratch-space-1000113/worker-ke1zme3t

0,1
Comm: tcp://127.0.0.1:8341,Total threads: 9
Dashboard: http://127.0.0.1:8465/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8219,
Local directory: /tmp/dask-scratch-space-1000113/worker-ue_yz1eg,Local directory: /tmp/dask-scratch-space-1000113/worker-ue_yz1eg

0,1
Comm: tcp://127.0.0.1:8189,Total threads: 9
Dashboard: http://127.0.0.1:8035/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8173,
Local directory: /tmp/dask-scratch-space-1000113/worker-x_fd2ud9,Local directory: /tmp/dask-scratch-space-1000113/worker-x_fd2ud9

0,1
Comm: tcp://127.0.0.1:8377,Total threads: 9
Dashboard: http://127.0.0.1:8405/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8187,
Local directory: /tmp/dask-scratch-space-1000113/worker-8myodtvq,Local directory: /tmp/dask-scratch-space-1000113/worker-8myodtvq

0,1
Comm: tcp://127.0.0.1:8411,Total threads: 9
Dashboard: http://127.0.0.1:8063/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8123,
Local directory: /tmp/dask-scratch-space-1000113/worker-9_yv0lia,Local directory: /tmp/dask-scratch-space-1000113/worker-9_yv0lia

0,1
Comm: tcp://127.0.0.1:8361,Total threads: 9
Dashboard: http://127.0.0.1:8321/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8473,
Local directory: /tmp/dask-scratch-space-1000113/worker-ld4c83mq,Local directory: /tmp/dask-scratch-space-1000113/worker-ld4c83mq

0,1
Comm: tcp://127.0.0.1:8065,Total threads: 9
Dashboard: http://127.0.0.1:8485/status,Memory: 80.00 GiB
Nanny: tcp://127.0.0.1:8167,
Local directory: /tmp/dask-scratch-space-1000113/worker-1pkbg0z4,Local directory: /tmp/dask-scratch-space-1000113/worker-1pkbg0z4


# First steps

Let's get a feel for `dask.array.Array` by using `map_blocks` to generate some data.  Some notes:

- [`scipy.sparse`](https://docs.scipy.org/doc/scipy/reference/sparse.html) support with `dask` is crucial for our purposes, but somewhat lacking.  We are working on this!  But expect some sharp edges here!
- Note `chunks`; we have to be very specific about this information because `dask` uses it to compute `.shape` - there is no `shape` here!

# A real computation

Let's try to do something, calculate a gram/covaraince matrix using normal dense data. 

$$\Large X^T X$$

**Note** Take a look at the chunking. Why does this make sense for a lot of single-cell analysis?  Especially that in `scanpy`?

In [4]:
M, N = 2_000_000, 10_000
arr_dask = da.random.random((M, N), chunks=(1_000, N))
arr_dask

Unnamed: 0,Array,Chunk
Bytes,149.01 GiB,76.29 MiB
Shape,"(2000000, 10000)","(1000, 10000)"
Dask graph,2000 chunks in 1 graph layer,2000 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 149.01 GiB 76.29 MiB Shape (2000000, 10000) (1000, 10000) Dask graph 2000 chunks in 1 graph layer Data type float64 numpy.ndarray",10000  2000000,

Unnamed: 0,Array,Chunk
Bytes,149.01 GiB,76.29 MiB
Shape,"(2000000, 10000)","(1000, 10000)"
Dask graph,2000 chunks in 1 graph layer,2000 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [5]:
gram_matrix = arr_dask.T @ arr_dask
gram_matrix

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,762.94 MiB
Shape,"(10000, 10000)","(10000, 10000)"
Dask graph,1 chunks in 10 graph layers,1 chunks in 10 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 762.94 MiB 762.94 MiB Shape (10000, 10000) (10000, 10000) Dask graph 1 chunks in 10 graph layers Data type float64 numpy.ndarray",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,762.94 MiB
Shape,"(10000, 10000)","(10000, 10000)"
Dask graph,1 chunks in 10 graph layers,1 chunks in 10 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [6]:
%time gram_matrix.compute()

CPU times: user 36.6 s, sys: 15.6 s, total: 52.2 s
Wall time: 1min 50s


array([[666866.84569336, 500061.38819278, 499950.09634675, ...,
        499761.94383004, 499948.85923108, 499995.48534689],
       [500061.38819278, 666693.97790411, 499926.66760091, ...,
        499943.28579016, 500034.52630128, 499858.73719638],
       [499950.09634675, 499926.66760091, 666458.5009242 , ...,
        499669.74021847, 499633.75845863, 499689.66072292],
       ...,
       [499761.94383004, 499943.28579016, 499669.74021847, ...,
        666337.31245501, 499940.22253482, 499739.58033563],
       [499948.85923108, 500034.52630128, 499633.75845863, ...,
        499940.22253482, 666508.82048036, 499574.28689934],
       [499995.48534689, 499858.73719638, 499689.66072292, ...,
        499739.58033563, 499574.28689934, 666392.69851526]])

We won't do the same thing in `numpy` because it takes much longer i.e., generating the matrix + doing the computation.  In fact, the generation in `numpy` takes about as long as the `dask` generation + computation combined!

In [7]:
%%time
# arr_np = np.random.random((M, N))

CPU times: user 1min 43s, sys: 22.3 s, total: 2min 6s
Wall time: 1min 50s


In [8]:
%%time 
# arr_np.T @ arr_np

CPU times: user 1h 4min 10s, sys: 13min 45s, total: 1h 17min 56s
Wall time: 1min 21s


array([[666242.22047548, 499712.24062519, 499956.75107923, ...,
        499483.12112549, 499794.57351165, 499806.09208033],
       [499712.24062519, 666543.26948798, 499951.19647375, ...,
        499430.20490301, 500113.11793398, 499929.0571049 ],
       [499956.75107923, 499951.19647375, 667156.77081061, ...,
        499829.13176151, 500309.92173935, 500270.21216223],
       ...,
       [499483.12112549, 499430.20490301, 499829.13176151, ...,
        665944.57011269, 499450.67561122, 499633.12987764],
       [499794.57351165, 500113.11793398, 500309.92173935, ...,
        499450.67561122, 666766.70365393, 499906.67535439],
       [499806.09208033, 499929.0571049 , 500270.21216223, ...,
        499633.12987764, 499906.67535439, 666621.6326087 ]])

## Sparse

Now let's try to do the same thing in sparse.  We'll create our matrix chunk-by-chunk.

In [7]:
n_blocks = 1000
M_block = M / n_blocks
def make_chunk():
    return sparse.random(M_block, N, format="csr")

arr = da.map_blocks(
    make_chunk,
    meta=sparse.random(2, 2, format="csr"),
    dtype=np.float64,
    chunks=((M_block,) * n_blocks, (N,) * 1)
)
arr

Unnamed: 0,Array,Chunk
Shape,"(2000000, 10000)","(2000, 10000)"
Dask graph,1000 chunks in 1 graph layer,1000 chunks in 1 graph layer
Data type,float64 scipy.sparse._csr.csr_matrix,float64 scipy.sparse._csr.csr_matrix
"Array Chunk Shape (2000000, 10000) (2000, 10000) Dask graph 1000 chunks in 1 graph layer Data type float64 scipy.sparse._csr.csr_matrix",10000  2000000,

Unnamed: 0,Array,Chunk
Shape,"(2000000, 10000)","(2000, 10000)"
Dask graph,1000 chunks in 1 graph layer,1000 chunks in 1 graph layer
Data type,float64 scipy.sparse._csr.csr_matrix,float64 scipy.sparse._csr.csr_matrix


In [10]:
# (arr.T @ arr).compute()

Key:       ('chunk_sum-f0d8e4e2bf1f322cf55f1e6915e55c5a', 0, 697, 0)
State:     executing
Function:  subgraph_callable-9519ac31b06da7b8690e005fa3efc687
args:      (<2000x10000 sparse matrix of type '<class 'numpy.float64'>'
	with 200000 stored elements in Compressed Sparse Row format>, <2000x10000 sparse matrix of type '<class 'numpy.float64'>'
	with 200000 stored elements in Compressed Sparse Row format>)
kwargs:    {}
Exception: "ValueError('matmul: Input operand 0 does not have enough dimensions (has 0, gufunc core with signature (n?,k),(k,m?)->(n?,m?) requires 1)')"
Traceback: '  File "/ictstr01/home/icb/ilan.gold/venv/lib/python3.10/site-packages/dask/optimization.py", line 1001, in __call__\n    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))\n  File "/ictstr01/home/icb/ilan.gold/venv/lib/python3.10/site-packages/dask/core.py", line 157, in get\n    result = _execute_task(task, cache)\n  File "/ictstr01/home/icb/ilan.gold/venv/lib/python3.10/site-packages/das

ValueError: matmul: Input operand 0 does not have enough dimensions (has 0, gufunc core with signature (n?,k),(k,m?)->(n?,m?) requires 1)

But that doesn't quite work....support for `scipy.sparse` in `dask` is not great (but we are working on it! upstream!)

As I said, sharp edges.  So let's think about how to do this with a bit of custom code.  This is not a bad exercise if you ever want to do anything custom with your `AnnData.X` anyway!  We can think of our computation as follows.

First define the block size

$$ b = floor(\frac{M}{n_{blocks}}) $$

The our sub-matrix structure, defined by "chunks" of our matrix (as we declared in `dask`)

$$ \mathbf{\hat{X}}_j = [\mathbf{X}_{j \cdot  b}, \ldots, \mathbf{X}_{(j + 1) \cdot b)}] \in \mathbb{R}^{b \times N} $$

And finally the result we want, the covariance matrix:

$$\Large \mathbf{X}^T \mathbf{X} = \sum_{j = 0}^{n_{blocks}} \mathbf{\hat{X}}_j^T \mathbf{\hat{X}}_j \in \mathbb{R}^{M \times N}$$

We'll also compute `log1p` before hand.  This will demonstrate the **lazy** aspect of `dask` even more.  We now have three steps - an elementwise operation, the mapping to covariance blocks, and finally the reduction.

In [8]:
def __gram_block(block):
    return (block.T @ block).toarray()[None, ...]

lop1p_matrix = arr.map_blocks(
    lambda x: x.log1p(),
    meta=sparse.random(2, 2, format="csr")
)

gram_matrix = lop1p_matrix.map_blocks(
    __gram_block,
    new_axis=(1,),
    chunks=((1,) * n_blocks, (arr.shape[1],), (arr.shape[1],)),
    meta=np.array([]),
    dtype=arr.dtype,
).sum(axis=0)

Note some things - we convert to dense to save on memory since the block covariance matrix is fairly dense anyway.  We also have to be explicit with `dask` about our `chunks` structure - we add a different axis so that we can sum sum over it, and we must be explicit about this.  Now let's compute our result!  Note that we have done **no** computation at this point - everything is lazy!

In [9]:
%%time
computed_gram_matrix = gram_matrix.compute()
computed_gram_matrix.shape

CPU times: user 12.6 s, sys: 6.58 s, total: 19.2 s
Wall time: 1min 27s


(10000, 10000)

# Working with `AnnData`

Let's now do something with the `scverse` ecosystem.  We'll begin by reading in the data.  Note the use of our new APIs, [`anndata.read_elem`](https://anndata.readthedocs.io/en/latest/generated/anndata.read_elem.html) and [`anndata.experimental.read_elem_as_dask`](https://anndata.readthedocs.io/en/latest/generated/anndata.experimental.read_elem_as_dask.html).

In [10]:
file_name = "cell_atlas.h5ad"
if not Path(file_name).exists():
    !wget https://datasets.cellxgene.cziscience.com/82eac9c1-485f-4e21-ab21-8510823d4f6e.h5ad -O file_name

In [11]:
with h5py.File(file_name, "r") as f:
    adata = ad.AnnData(
        X = ad.experimental.read_elem_as_dask(f["X"]),
        obs = ad.read_elem(f["obs"]),
        var = ad.read_elem(f["var"])
    )
adata

AnnData object with n_obs × n_vars = 1462702 × 27714
    obs: 'celltype', 'majorType', 'City', 'sampleID', 'donor_id', 'Sample type', 'CoVID-19 severity', 'Sample time', 'Sampling day (Days after symptom onset)', 'BCR single cell sequencing', 'TCR single cell sequencing', 'Outcome', 'Comorbidities', 'COVID-19-related medication and anti-microbials', 'Leukocytes [G over L]', 'Neutrophils [G over L]', 'Lymphocytes [G over L]', 'Unpublished', 'disease_ontology_term_id', 'cell_type_ontology_term_id', 'tissue_ontology_term_id', 'development_stage_ontology_term_id', 'self_reported_ethnicity_ontology_term_id', 'assay_ontology_term_id', 'sex_ontology_term_id', 'is_primary_data', 'organism_ontology_term_id', 'suspension_type', 'tissue_type', 'cell_type', 'assay', 'disease', 'organism', 'sex', 'tissue', 'self_reported_ethnicity', 'development_stage', 'observation_joinid'
    var: 'feature_is_filtered', 'feature_name', 'feature_reference', 'feature_biotype', 'feature_length'

In [12]:
adata.X

Unnamed: 0,Array,Chunk
Shape,"(1462702, 27714)","(1000, 27714)"
Dask graph,1463 chunks in 2 graph layers,1463 chunks in 2 graph layers
Data type,float32 scipy.sparse._csr.csr_matrix,float32 scipy.sparse._csr.csr_matrix
"Array Chunk Shape (1462702, 27714) (1000, 27714) Dask graph 1463 chunks in 2 graph layers Data type float32 scipy.sparse._csr.csr_matrix",27714  1462702,

Unnamed: 0,Array,Chunk
Shape,"(1462702, 27714)","(1000, 27714)"
Dask graph,1463 chunks in 2 graph layers,1463 chunks in 2 graph layers
Data type,float32 scipy.sparse._csr.csr_matrix,float32 scipy.sparse._csr.csr_matrix


## What can we do?

We'll step throught the major steps we have `dask`-ified in `scanpy`.  This is a somehwat normal workflow - filtering, variable genes, and PCA.  More to come soon!

In [13]:
%%time
adata.layers["counts"] = adata.X.copy()  # Making sure we keep access to the raw counts
sc.pp.normalize_total(adata, target_sum=1e4)

CPU times: user 18.7 ms, sys: 6.34 ms, total: 25 ms
Wall time: 24.7 ms


In [14]:
%%time
sc.pp.log1p(adata)

CPU times: user 3.29 ms, sys: 0 ns, total: 3.29 ms
Wall time: 3.3 ms


In [15]:
%%time
sc.pp.highly_variable_genes(adata)

CPU times: user 39.9 s, sys: 1.79 s, total: 41.7 s
Wall time: 48 s


In [16]:
adata.var['highly_variable'].sum()

2105

In [17]:
adata.layers["dense"] = adata.X.rechunk((10_000, -1)).map_blocks(
    lambda x: x.toarray(), dtype=adata.X.dtype, meta=np.array([])
)

In [18]:
%%time
sc.pp.pca(adata, layer="dense")

CPU times: user 16.6 s, sys: 2.32 s, total: 18.9 s
Wall time: 39.6 s


In [19]:
%%time
adata.obsm["X_pca"] = adata.obsm["X_pca"].compute()



CPU times: user 13.6 s, sys: 1.61 s, total: 15.2 s
Wall time: 16.9 s


## GPU anyone?

What if we want to have some real fun?  What about GPU?

In [20]:
! uv pip install dask-cuda cupy-cuda12x rmm-cu12

[2mAudited [1m3 packages[0m in 828ms[0m


In [21]:
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import cupy as cp
import rmm
import numpy as np
from rmm.allocators.cupy import rmm_cupy_allocator
from cupyx.scipy import sparse as cp_sparse
import anndata as ad
from scipy import sparse
import h5py

# Set up rapids memory management, which allows spilling to disk among other nice features
def set_mem():
    rmm.reinitialize(managed_memory=True)
    cp.cuda.set_allocator(rmm_cupy_allocator)
cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0,1,2,3,4,5,6,7,8")
client = Client(cluster)
client.run(set_mem)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 8105 instead
2024-09-12 12:16:34,518 - distributed.nanny - ERROR - Failed to initialize worker
Traceback (most recent call last):
  File "/ictstr01/home/icb/ilan.gold/scverse_tutorial/venv/lib/python3.10/site-packages/distributed/nanny.py", line 952, in run
    worker = worker_factory()
  File "/ictstr01/home/icb/ilan.gold/scverse_tutorial/venv/lib/python3.10/site-packages/distributed/worker.py", line 715, in __init__
    ServerNode.__init__(
  File "/ictstr01/home/icb/ilan.gold/scverse_tutorial/venv/lib/python3.10/site-packages/distributed/core.py", line 307, in __init__
    self.monitor = SystemMonitor()
  File "/ictstr01/home/icb/ilan.gold/scverse_tutorial/venv/lib/python3.10/site-packages/distributed/system_monitor.py", line 129, in __init__
    gpu_extra = nvml.one_time()
  File "/ictstr01/home/icb/ilan.gold/scverse_tutorial/venv/lib/python3.10/site-packages/distributed/diagnostics/nvml.py", line 374, in o

0,1
Connection method: Cluster object,Cluster type: dask_cuda.LocalCUDACluster
Dashboard: http://127.0.0.1:8105/status,

0,1
Dashboard: http://127.0.0.1:8105/status,Workers: 8
Total threads: 8,Total memory: 1.74 TiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:8409,Workers: 8
Dashboard: http://127.0.0.1:8105/status,Total threads: 8
Started: 1 minute ago,Total memory: 1.74 TiB

0,1
Comm: tcp://127.0.0.1:8229,Total threads: 1
Dashboard: http://127.0.0.1:8197/status,Memory: 222.22 GiB
Nanny: tcp://127.0.0.1:8089,
Local directory: /tmp/dask-scratch-space-1000113/worker-8xmw5gff,Local directory: /tmp/dask-scratch-space-1000113/worker-8xmw5gff

0,1
Comm: tcp://127.0.0.1:8491,Total threads: 1
Dashboard: http://127.0.0.1:8183/status,Memory: 222.22 GiB
Nanny: tcp://127.0.0.1:8415,
Local directory: /tmp/dask-scratch-space-1000113/worker-bapd6u2o,Local directory: /tmp/dask-scratch-space-1000113/worker-bapd6u2o

0,1
Comm: tcp://127.0.0.1:8351,Total threads: 1
Dashboard: http://127.0.0.1:8213/status,Memory: 222.22 GiB
Nanny: tcp://127.0.0.1:8375,
Local directory: /tmp/dask-scratch-space-1000113/worker-1id6u8lv,Local directory: /tmp/dask-scratch-space-1000113/worker-1id6u8lv

0,1
Comm: tcp://127.0.0.1:8015,Total threads: 1
Dashboard: http://127.0.0.1:8109/status,Memory: 222.22 GiB
Nanny: tcp://127.0.0.1:8221,
Local directory: /tmp/dask-scratch-space-1000113/worker-lsj1yqpn,Local directory: /tmp/dask-scratch-space-1000113/worker-lsj1yqpn

0,1
Comm: tcp://127.0.0.1:8433,Total threads: 1
Dashboard: http://127.0.0.1:8379/status,Memory: 222.22 GiB
Nanny: tcp://127.0.0.1:8137,
Local directory: /tmp/dask-scratch-space-1000113/worker-5pgf9lb4,Local directory: /tmp/dask-scratch-space-1000113/worker-5pgf9lb4

0,1
Comm: tcp://127.0.0.1:8253,Total threads: 1
Dashboard: http://127.0.0.1:8369/status,Memory: 222.22 GiB
Nanny: tcp://127.0.0.1:8155,
Local directory: /tmp/dask-scratch-space-1000113/worker-1kx50vln,Local directory: /tmp/dask-scratch-space-1000113/worker-1kx50vln

0,1
Comm: tcp://127.0.0.1:8391,Total threads: 1
Dashboard: http://127.0.0.1:8001/status,Memory: 222.22 GiB
Nanny: tcp://127.0.0.1:8101,
Local directory: /tmp/dask-scratch-space-1000113/worker-i6e1p33g,Local directory: /tmp/dask-scratch-space-1000113/worker-i6e1p33g

0,1
Comm: tcp://127.0.0.1:8045,Total threads: 1
Dashboard: http://127.0.0.1:8131/status,Memory: 222.22 GiB
Nanny: tcp://127.0.0.1:8181,
Local directory: /tmp/dask-scratch-space-1000113/worker-30qdnbo3,Local directory: /tmp/dask-scratch-space-1000113/worker-30qdnbo3


In [22]:
with h5py.File("cell_atlas.h5ad", "r") as f:
    adata = ad.AnnData(
        X = ad.experimental.read_elem_as_dask(f["X"]),
        obs = ad.read_elem(f["obs"]),
        var = ad.read_elem(f["var"])
    )
adata.X = adata.X.map_blocks(
    cp_sparse.csr_matrix,
    meta=cp_sparse.csr_matrix(sparse.random(1, 1, format="csr"), dtype=adata.X.dtype),
    dtype=adata.X.dtype
)
adata.X

Unnamed: 0,Array,Chunk
Shape,"(1462702, 27714)","(1000, 27714)"
Dask graph,1463 chunks in 3 graph layers,1463 chunks in 3 graph layers
Data type,float32 cupyx.scipy.sparse._csr.csr_matrix,float32 cupyx.scipy.sparse._csr.csr_matrix
"Array Chunk Shape (1462702, 27714) (1000, 27714) Dask graph 1463 chunks in 3 graph layers Data type float32 cupyx.scipy.sparse._csr.csr_matrix",27714  1462702,

Unnamed: 0,Array,Chunk
Shape,"(1462702, 27714)","(1000, 27714)"
Dask graph,1463 chunks in 3 graph layers,1463 chunks in 3 graph layers
Data type,float32 cupyx.scipy.sparse._csr.csr_matrix,float32 cupyx.scipy.sparse._csr.csr_matrix


Let's do the same thing, but now with GPU functionality.  Note the different `meta` than before, but otherwise, remarkably similar!

In [23]:
def __gram_block_densify(block):
    densified = block.toarray()
    return (densified.T @ densified)[None, ...]

In [24]:
def __gram_block(block):
    return (block.T @ block).toarray()[None, ...]

In [25]:
n_blocks = len(adata.X.to_delayed().ravel())
gram_matrix_gpu_lazy = adata.X.map_blocks(
    __gram_block,
    new_axis=(1,),
    chunks=((1,) * n_blocks, (adata.shape[1],), (adata.shape[1],)),
    meta=cp.array([], dtype=adata.X.dtype),
    dtype=adata.X.dtype,
).sum(axis=0)
gram_matrix_gpu_lazy

Unnamed: 0,Array,Chunk
Bytes,2.86 GiB,2.86 GiB
Shape,"(27714, 27714)","(27714, 27714)"
Dask graph,1 chunks in 11 graph layers,1 chunks in 11 graph layers
Data type,float32 cupy.ndarray,float32 cupy.ndarray
"Array Chunk Bytes 2.86 GiB 2.86 GiB Shape (27714, 27714) (27714, 27714) Dask graph 1 chunks in 11 graph layers Data type float32 cupy.ndarray",27714  27714,

Unnamed: 0,Array,Chunk
Bytes,2.86 GiB,2.86 GiB
Shape,"(27714, 27714)","(27714, 27714)"
Dask graph,1 chunks in 11 graph layers,1 chunks in 11 graph layers
Data type,float32 cupy.ndarray,float32 cupy.ndarray


In [20]:
# gram_matrix_gpu_lazy.compute()

Key:       ('sum-263b98f08180ed267b5ccab308690283', 1022, 0, 0)
State:     executing
Function:  execute_task
args:      ((subgraph_callable-41356ea3a7459b12cbc3205022c1322d, functools.partial(<function make_dask_chunk at 0x7fdb09149f30>, PosixPath('cell_atlas.h5ad'), '/X', wrap=<function sparse_dataset at 0x7fdb090ce7a0>), (<class 'tuple'>, ['block_info']), {None: {'shape': (1462702, 27714), 'num-chunks': (1463, 1), 'array-location': [(1022000, 1023000), (0, 27714)], 'chunk-location': (1022, 0), 'chunk-shape': (1000, 27714), 'dtype': dtype('float32')}}))
kwargs:    {}
Exception: "CuSparseError('CUSPARSE_STATUS_INSUFFICIENT_RESOURCES: insufficient resources')"
Traceback: '  File "/ictstr01/home/icb/ilan.gold/venv/lib/python3.10/site-packages/dask/optimization.py", line 1001, in __call__\n    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))\n  File "/ictstr01/home/icb/ilan.gold/venv/lib/python3.10/site-packages/dask/core.py", line 157, in get\n    result = _execute_ta

CuSparseError: CUSPARSE_STATUS_INSUFFICIENT_RESOURCES: insufficient resources

In [21]:
gram_matrix_gpu_lazy = adata.X.map_blocks(
    __gram_block_densify,
    new_axis=(1,),
    chunks=((1,) * n_blocks, (adata.shape[1],), (adata.shape[1],)),
    meta=cp.array([], dtype=adata.X.dtype),
    dtype=adata.X.dtype,
).sum(axis=0)

In [22]:
%%time
# gram_matrix_gpu_in_memory = gram_matrix_gpu_lazy.compute()
# gram_matrix_gpu_in_memory

CPU times: user 1min 17s, sys: 25.6 s, total: 1min 42s
Wall time: 8min 53s


array([[6.6108523e+04, 2.6539505e-01, 2.6594014e+03, ..., 1.1166708e+01,
        1.7306891e-01, 7.7735577e+02],
       [2.6539505e-01, 3.1194468e+00, 0.0000000e+00, ..., 0.0000000e+00,
        0.0000000e+00, 0.0000000e+00],
       [2.6594014e+03, 0.0000000e+00, 3.4531633e+04, ..., 1.2455404e+01,
        0.0000000e+00, 3.9746011e+02],
       ...,
       [1.1166708e+01, 0.0000000e+00, 1.2455404e+01, ..., 2.3650439e+02,
        0.0000000e+00, 1.5137998e+00],
       [1.7306891e-01, 0.0000000e+00, 0.0000000e+00, ..., 0.0000000e+00,
        3.4908867e+00, 0.0000000e+00],
       [7.7735577e+02, 0.0000000e+00, 3.9746011e+02, ..., 1.5137998e+00,
        0.0000000e+00, 7.6181641e+03]], dtype=float32)

Hmmmm, we had to densify everything, which really slows things down....maybe that's one reason why [`rapids_singlecell`](https://rapids-singlecell.readthedocs.io/) exists? And maybe why we're working on this problem specifically?

In [26]:
! uv pip install git+https://github.com/scverse/rapids_singlecell@dask_mg_support[rapids12] --extra-index-url https://pypi.nvidia.com

[2K[2mResolved [1m100 packages[0m in 8.65s[0m                                               [0m
[2mAudited [1m100 packages[0m in 0.44ms[0m


In [27]:
import rapids_singlecell as rsc

  from .autonotebook import tqdm as notebook_tqdm


In [28]:
adata.X = adata.X.persist()

In [29]:
%%time
rsc.pp.highly_variable_genes(adata)
rsc.pp.pca(adata)

CPU times: user 22.1 s, sys: 2.77 s, total: 24.8 s
Wall time: 43.1 s


In [30]:
! uv pip freeze

[1maiohappyeyeballs[0m==2.4.0
[1maiohttp[0m==3.10.5
[1maiosignal[0m==1.3.1
[1manndata[0m==0.11.0rc1
[1manyio[0m==4.4.0
[1margon2-cffi[0m==23.1.0
[1margon2-cffi-bindings[0m==21.2.0
[1marray-api-compat[0m==1.8
[1marrow[0m==1.3.0
[1masttokens[0m==2.4.1
[1masync-lru[0m==2.0.4
[1masync-timeout[0m==4.0.3
[1mattrs[0m==24.2.0
[1mbabel[0m==2.16.0
[1mbeautifulsoup4[0m==4.12.3
[1mbleach[0m==6.1.0
[1mbokeh[0m==3.6.0.dev1
[1mcachetools[0m==5.5.0
[1mcertifi[0m==2024.8.30
[1mcffi[0m==1.17.1
[1mcharset-normalizer[0m==3.3.2
[1mclick[0m==8.1.7
[1mcloudpickle[0m==3.0.0
[1mcomm[0m==0.2.2
[1mcontourpy[0m==1.3.0
[1mcuda-python[0m==12.6.0
[1mcudf-cu12[0m==24.8.2
[1mcugraph-cu12[0m==24.8.0
[1mcuml-cu12[0m==24.8.0
[1mcupy-cuda12x[0m==13.3.0
[1mcycler[0m==0.12.1
[1mdask[0m==2024.7.1
[1mdask-cuda[0m==24.8.2
[1mdask-cudf-cu12[0m==24.8.2
[1mdask-expr[0m==1.1.9
[1mdask-glm[0m==0.3.2
[1mdask-ml[0m==2024.4.4
[1mdebugpy[0m==1.8.5
[1mdecorator