In [1]:
import numpy as np
import scipy.spatial.distance as spd
import dask.array as da

In [3]:
def pairwise_cityblock_cpu(x):
    assert x.ndim == 2
    out = spd.pdist(x.T, metric='cityblock')
    out = out.reshape((1, out.shape[0]))
    return out


TODO explain what the data mean.

In [4]:
# simulate some genetic data
x = np.random.choice(np.array([0, 1, 2], dtype='i1'), 
                     p=[.7, .2, .1,], 
                     size=(20_000, 1_000))

In [5]:
%%time
pairwise_cityblock_cpu(x)

CPU times: user 8.39 s, sys: 88.4 ms, total: 8.47 s
Wall time: 8.47 s


array([[12015., 12127., 12098., ..., 12033., 12028., 11877.]])

In [6]:
x_dask = da.from_array(x, chunks=(2_000, None))
x_dask

Unnamed: 0,Array,Chunk
Bytes,20.00 MB,2.00 MB
Shape,"(20000, 1000)","(2000, 1000)"
Count,11 Tasks,10 Chunks
Type,int8,numpy.ndarray
"Array Chunk Bytes 20.00 MB 2.00 MB Shape (20000, 1000) (2000, 1000) Count 11 Tasks 10 Chunks Type int8 numpy.ndarray",1000  20000,

Unnamed: 0,Array,Chunk
Bytes,20.00 MB,2.00 MB
Shape,"(20000, 1000)","(2000, 1000)"
Count,11 Tasks,10 Chunks
Type,int8,numpy.ndarray


In [7]:
def pairwise_cityblock_dask(x, f):
    
    # Compute number of blocks.
    n_blocks = len(x.chunks[0])

    # Compute number of pairs.
    n = x.shape[1]
    n_pairs = n * (n - 1) // 2
    
    # Compute distance in blocks.
    chunks = ((1,) * n_blocks, (n_pairs,))
    d = da.map_blocks(
        f, x, chunks=chunks, dtype=np.float64
    )

    # Sum blocks.
    out = da.sum(d, axis=0, dtype=np.float64)

    return out


In [8]:
%%time
pairwise_cityblock_dask(x_dask, f=pairwise_cityblock_cpu).compute()

CPU times: user 20.6 s, sys: 121 ms, total: 20.7 s
Wall time: 3.78 s


array([12015., 12127., 12098., ..., 12033., 12028., 11877.])

In [17]:
!nvidia-smi | head

NVIDIA-SMI has failed because it couldn't communicate with the NVIDIA driver. Make sure that the latest NVIDIA driver is installed and running.



In [18]:
from numba import cuda

How long to move the data?

In [19]:
%%time
x_cuda = cuda.to_device(x)
x_cuda

CudaSupportError: Error at driver init: 
[100] Call to cuInit results in CUDA_ERROR_NO_DEVICE:

In [9]:
import math


@cuda.jit(device=True)
def square_coords_cuda(pair_index, n):
    pair_index = np.float32(pair_index)
    n = np.float32(n)
    j = (((2 * n) - 1) - math.sqrt((1 - (2 * n)) ** 2 - (8 * pair_index))) // 2
    k = pair_index - (j * ((2 * n) - j - 1) / 2) + j + 1
    j = np.int64(j)
    k = np.int64(k)
    return j, k


@cuda.jit
def kernel_cityblock_cuda(x, out):
    m = x.shape[0]
    n = x.shape[1]
    n_pairs = (n * (n - 1)) // 2
    pair_index = cuda.grid(1)
    if pair_index < n_pairs:
        # Unpack the pair index to column indices.
        j, k = square_coords_cuda(pair_index, n)
        # Iterate over rows, accumulating distance.
        d = np.float32(0)
        for i in range(m):
            u = np.float32(x[i, j])
            v = np.float32(x[i, k])
            d += math.fabs(u - v)
        # Store distance result.
        out[pair_index] = d

        
def pairwise_cityblock_cuda(x):

    # Set up output array.
    n = x.shape[1]
    n_pairs = (n * (n - 1)) // 2
    out = cuda.device_array(n_pairs, dtype=np.float32)

    # Let numba decide number of threads and blocks.
    kernel_spec = kernel_cityblock_cuda.forall(n_pairs)
    kernel_spec(x, out)

    # Reshape to allow for map blocks.
    out = out.reshape((1, out.shape[0]))
    
    return out



NameError: name 'cuda' is not defined

In [22]:
# warm-up jit
pairwise_cityblock_cuda(x_cuda)
cuda.synchronize()

NameError: name 'x_cuda' is not defined

In [23]:
%%time
dist_cuda = pairwise_cityblock_cuda(x_cuda)
cuda.synchronize()

NameError: name 'x_cuda' is not defined

In [24]:
# how long to copy data back to CPU
%time dist_cuda.copy_to_host()

NameError: name 'dist_cuda' is not defined

## Larger dataset

In [9]:
# x_big = da.random.choice(
#     np.array([0, 1, 2], dtype='i1'), 
#     p=[.7, .2, .1], 
#     size=(1_000_000, 1_000),
#     chunks=(50_000, None))
# x_big

In [10]:
# x_big.to_zarr('example.zarr', component='x_big', overwrite=True)

In [11]:
import zarr
x_big_zarr = zarr.open('example.zarr')['x_big']
x_big_zarr.info

0,1
Name,/x_big
Type,zarr.core.Array
Data type,int8
Shape,"(1000000, 1000)"
Chunk shape,"(50000, 1000)"
Order,C
Read-only,False
Compressor,"Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)"
Store type,zarr.storage.DirectoryStore
No. bytes,1000000000 (953.7M)


In [12]:
x_big_dask = da.from_array(x_big_zarr)
x_big_dask

Unnamed: 0,Array,Chunk
Bytes,1000.00 MB,100.00 MB
Shape,"(1000000, 1000)","(100000, 1000)"
Count,11 Tasks,10 Chunks
Type,int8,numpy.ndarray
"Array Chunk Bytes 1000.00 MB 100.00 MB Shape (1000000, 1000) (100000, 1000) Count 11 Tasks 10 Chunks Type int8 numpy.ndarray",1000  1000000,

Unnamed: 0,Array,Chunk
Bytes,1000.00 MB,100.00 MB
Shape,"(1000000, 1000)","(100000, 1000)"
Count,11 Tasks,10 Chunks
Type,int8,numpy.ndarray


In [29]:
%%time
dist_big = pairwise_cityblock_dask(x_big_dask, f=pairwise_cityblock_cpu).compute()
dist_big

KeyboardInterrupt: 

In [30]:
x_big_dask_cuda = x_big_dask.map_blocks(cuda.to_device)

ValueError: `dtype` inference failed in `map_blocks`.

Please specify the dtype explicitly using the `dtype` kwarg.

Original error is below:
------------------------
CudaSupportError('Error at driver init: \n[100] Call to cuInit results in CUDA_ERROR_NO_DEVICE:',)

Traceback:
---------
  File "/home/aliman/malariagen/binder/conda/envs/vector-ops-c668a73/lib/python3.6/site-packages/dask/array/core.py", line 343, in apply_infer_dtype
    o = func(*args, **kwargs)
  File "/home/aliman/malariagen/binder/conda/envs/vector-ops-c668a73/lib/python3.6/site-packages/numba/cuda/cudadrv/devices.py", line 224, in _require_cuda_context
    with _runtime.ensure_context():
  File "/home/aliman/malariagen/binder/conda/envs/vector-ops-c668a73/lib/python3.6/contextlib.py", line 81, in __enter__
    return next(self.gen)
  File "/home/aliman/malariagen/binder/conda/envs/vector-ops-c668a73/lib/python3.6/site-packages/numba/cuda/cudadrv/devices.py", line 122, in ensure_context
    with driver.get_active_context():
  File "/home/aliman/malariagen/binder/conda/envs/vector-ops-c668a73/lib/python3.6/site-packages/numba/cuda/cudadrv/driver.py", line 387, in __enter__
    driver.cuCtxGetCurrent(byref(hctx))
  File "/home/aliman/malariagen/binder/conda/envs/vector-ops-c668a73/lib/python3.6/site-packages/numba/cuda/cudadrv/driver.py", line 278, in __getattr__
    self.initialization_error)


In [None]:
# launch a local cuda cluster?

In [None]:
%%time
dist_big_cuda = pairwise_distance_dask(x_big_dask_cuda, f=pairwise_cityblock_cuda).compute(num_workers=1)
dist_big_cuda

In [None]:
%%time
dist_big_cuda = pairwise_distance_dask(x_big_dask_cuda, f=pairwise_cityblock_cuda).compute(num_workers=2)
dist_big_cuda