In [1]:
import numpy as np
import scipy.spatial.distance as spd
import dask.array as da

In [2]:
def pairwise_cityblock_cpu(x):
    assert x.ndim == 2
    out = spd.pdist(x.T, metric='cityblock')
    out = out.reshape((1, out.shape[0]))
    return out


TODO explain what the data mean.

In [3]:
# simulate some genetic data
x = np.random.choice(np.array([0, 1, 2], dtype='i1'), 
                     p=[.7, .2, .1,], 
                     size=(20_000, 1_000))

In [4]:
%%time
pairwise_cityblock_cpu(x)

CPU times: user 8.3 s, sys: 64 ms, total: 8.37 s
Wall time: 8.36 s


array([[12067., 11977., 11998., ..., 12055., 11977., 12132.]])

In [5]:
x_dask = da.from_array(x, chunks=(2_000, None))
x_dask

Unnamed: 0,Array,Chunk
Bytes,20.00 MB,2.00 MB
Shape,"(20000, 1000)","(2000, 1000)"
Count,11 Tasks,10 Chunks
Type,int8,numpy.ndarray
"Array Chunk Bytes 20.00 MB 2.00 MB Shape (20000, 1000) (2000, 1000) Count 11 Tasks 10 Chunks Type int8 numpy.ndarray",1000  20000,

Unnamed: 0,Array,Chunk
Bytes,20.00 MB,2.00 MB
Shape,"(20000, 1000)","(2000, 1000)"
Count,11 Tasks,10 Chunks
Type,int8,numpy.ndarray


In [6]:
def pairwise_cityblock_dask(x, f):
    
    # Compute number of blocks.
    n_blocks = len(x.chunks[0])

    # Compute number of pairs.
    n = x.shape[1]
    n_pairs = n * (n - 1) // 2
    
    # Compute distance in blocks.
    chunks = ((1,) * n_blocks, (n_pairs,))
    d = da.map_blocks(
        f, x, chunks=chunks, dtype=np.float64
    )

    # Sum blocks.
    out = da.sum(d, axis=0, dtype=np.float64)

    return out


In [7]:
%%time
pairwise_cityblock_dask(x_dask, f=pairwise_cityblock_cpu).compute()

CPU times: user 9.06 s, sys: 168 ms, total: 9.22 s
Wall time: 1.12 s


array([12067., 11977., 11998., ..., 12055., 11977., 12132.])

In [8]:
!nvidia-smi | head

Fri Aug  9 07:49:55 2019       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 396.44                 Driver Version: 396.44                    |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|   0  Tesla V100-SXM2...  On   | 00000000:06:00.0 Off |                    0 |
| N/A   36C    P0    57W / 300W |   1365MiB / 32510MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+


In [9]:
from numba import cuda

How long to move the data?

In [19]:
%%time
x_cuda = cuda.to_device(x)
x_cuda

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 56.3 ms


<numba.cuda.cudadrv.devicearray.DeviceNDArray at 0x7ff01ec0f940>

In [11]:
import math


@cuda.jit(device=True)
def square_coords_cuda(pair_index, n):
    pair_index = np.float32(pair_index)
    n = np.float32(n)
    j = (((2 * n) - 1) - math.sqrt((1 - (2 * n)) ** 2 - (8 * pair_index))) // 2
    k = pair_index - (j * ((2 * n) - j - 1) / 2) + j + 1
    j = np.int64(j)
    k = np.int64(k)
    return j, k


@cuda.jit
def kernel_cityblock_cuda(x, out):
    m = x.shape[0]
    n = x.shape[1]
    n_pairs = (n * (n - 1)) // 2
    pair_index = cuda.grid(1)
    if pair_index < n_pairs:
        # Unpack the pair index to column indices.
        j, k = square_coords_cuda(pair_index, n)
        # Iterate over rows, accumulating distance.
        d = np.float32(0)
        for i in range(m):
            u = np.float32(x[i, j])
            v = np.float32(x[i, k])
            d += math.fabs(u - v)
        # Store distance result.
        out[pair_index] = d

        
def pairwise_cityblock_cuda(x):

    # Set up output array.
    n = x.shape[1]
    n_pairs = (n * (n - 1)) // 2
    out = cuda.device_array(n_pairs, dtype=np.float32)

    # Let numba decide number of threads and blocks.
    kernel_spec = kernel_cityblock_cuda.forall(n_pairs)
    kernel_spec(x, out)

    # Reshape to allow for map blocks.
    out = out.reshape((1, out.shape[0]))
    
    return out



In [12]:
# warm-up jit
pairwise_cityblock_cuda(x_cuda)
cuda.synchronize()

In [13]:
%%time
dist_cuda = pairwise_cityblock_cuda(x_cuda)
cuda.synchronize()

CPU times: user 12 ms, sys: 8 ms, total: 20 ms
Wall time: 24.1 ms


In [14]:
# how long to copy data back to CPU
%time dist_cuda.copy_to_host()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 768 µs


array([[12067., 11977., 11998., ..., 12055., 11977., 12132.]],
      dtype=float32)

## Larger dataset

In [20]:
x_big = da.random.choice(
    np.array([0, 1, 2], dtype='i1'), 
    p=[.7, .2, .1], 
    size=(1_000_000, 1_000),
    chunks=(50_000, None))
x_big

Unnamed: 0,Array,Chunk
Bytes,1000.00 MB,50.00 MB
Shape,"(1000000, 1000)","(50000, 1000)"
Count,22 Tasks,20 Chunks
Type,int8,numpy.ndarray
"Array Chunk Bytes 1000.00 MB 50.00 MB Shape (1000000, 1000) (50000, 1000) Count 22 Tasks 20 Chunks Type int8 numpy.ndarray",1000  1000000,

Unnamed: 0,Array,Chunk
Bytes,1000.00 MB,50.00 MB
Shape,"(1000000, 1000)","(50000, 1000)"
Count,22 Tasks,20 Chunks
Type,int8,numpy.ndarray


In [21]:
x_big.to_zarr('example.zarr', component='x_big', overwrite=True)

In [22]:
import zarr
x_big_zarr = zarr.open('example.zarr')['x_big']
x_big_zarr.info

0,1
Name,/x_big
Type,zarr.core.Array
Data type,int8
Shape,"(1000000, 1000)"
Chunk shape,"(50000, 1000)"
Order,C
Read-only,False
Compressor,"Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)"
Store type,zarr.storage.DirectoryStore
No. bytes,1000000000 (953.7M)


In [23]:
x_big_dask = da.from_array(x_big_zarr)
x_big_dask

Unnamed: 0,Array,Chunk
Bytes,1000.00 MB,100.00 MB
Shape,"(1000000, 1000)","(100000, 1000)"
Count,11 Tasks,10 Chunks
Type,int8,numpy.ndarray
"Array Chunk Bytes 1000.00 MB 100.00 MB Shape (1000000, 1000) (100000, 1000) Count 11 Tasks 10 Chunks Type int8 numpy.ndarray",1000  1000000,

Unnamed: 0,Array,Chunk
Bytes,1000.00 MB,100.00 MB
Shape,"(1000000, 1000)","(100000, 1000)"
Count,11 Tasks,10 Chunks
Type,int8,numpy.ndarray


In [24]:
%%time
dist_big = pairwise_cityblock_dask(x_big_dask, f=pairwise_cityblock_cpu).compute()
dist_big

CPU times: user 9min 17s, sys: 19.3 s, total: 9min 36s
Wall time: 1min


array([598676., 598649., 599990., ..., 598433., 598728., 600953.])

In [25]:
x_big_dask_cuda = x_big_dask.map_blocks(cuda.to_device)

In [26]:
# launch a local cuda cluster?

In [29]:
%%time
dist_big_cuda = pairwise_cityblock_dask(x_big_dask_cuda, f=pairwise_cityblock_cuda).compute(num_workers=1)
dist_big_cuda

CPU times: user 1.54 s, sys: 856 ms, total: 2.4 s
Wall time: 3.26 s


array([598676., 598649., 599990., ..., 598433., 598728., 600953.])

In [30]:
%%time
dist_big_cuda = pairwise_cityblock_dask(x_big_dask_cuda, f=pairwise_cityblock_cuda).compute(num_workers=2)
dist_big_cuda

CPU times: user 1.62 s, sys: 860 ms, total: 2.48 s
Wall time: 1.61 s


array([598676., 598649., 599990., ..., 598433., 598728., 600953.])