# Multithreaded Cityblock distance matrix function with SciPy and Dask's delayed execution

In this notebook we implement a function to compute the cityblock distance matrix using `scipy.spatial.distance.cdist`. Althought this function is quite fast, it uses a single thread. In cases like this, it may be convenient to implement a multithreaded version of the function by parallelicing the execution over chunks of data. We are going to use `dask.delayed` to do that.

In [1]:
import numpy as np
from scipy.spatial.distance import cdist
from dask import compute, delayed

In [3]:
nsamples = 12000
nfeat = 50

x = 10. * np.random.random([nsamples, nfeat])

Let's time the `cdist` function and look the `top` command.

In [4]:
# observe here that the funcion `cdist` used to get the cityblock distance
# is not multithreaded

%timeit cdist(x, x, 'cityblock')

3.58 s ± 28.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


With the `top` command we see that `cdist` runs in a single thread. In such cases it could be quite simple write a distributed version of the function. We can do this very easily with `dask.delayed`!

## Dask's async delayed execution
A simple distributed version of `cdist` can be done as the following:
  * Split the array of vectors into chunks. We can use `np.split(x, num_chunks)`
  * Compute partial cityblock distance matrices of the complete array with respect to each of the chunks
  * Concatenate the resulting list into a single cityblock distance matrix.

Note that concatenation is not a fast function, so probably we will have to continue improving our function.

In [5]:
# define the list of operations to be performed asynchronously
chunks = 12
values = [delayed(cdist)(x, xi, 'cityblock') for xi in np.split(x, chunks)]

# at this point nothing is executed

In [6]:
# execute the operations on the list `values`
_cbdm_dask = compute(*values, scheduler='threads')

In [7]:
# concatenate the list `_cbdm_dask` into a single array
cbdm_dask = np.concatenate(_cbdm_dask, axis=1)

Let's time the compute step and go to the command `top`. Now you can see that computation is executed in parallel resulting in a faster execution time.

In [8]:
%timeit compute(*values, scheduler='threads')

407 ms ± 10.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [9]:
# check that the resulting matrices are the same
cbdm_scipy = cdist(x, x, 'cityblock')
np.abs(cbdm_dask - cbdm_scipy).max()

0.0

A problem with this solution, as mentioned above, is that `np.concatenate` is not  a fast operation.

In [10]:
%timeit np.concatenate(_cbdm_dask, axis=1)

735 ms ± 2.89 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Let's implement the whole thing as a single function:

In [11]:
def cityblock_dask_concat(x, y, chunks):
    """Implementation using array concatenation"""
    values = [delayed(cdist)(x, xi, 'cityblock')
              for xi in np.split(x, chunks)]
    return np.concatenate(compute(*values, scheduler='threads'),
                          axis=1)


def cityblock_dask_empty(x, y, chunks):
    """Implementation filling in an empty array"""
    stride = x.shape[0]//chunks
    values = [delayed(cdist)(x, yi, 'cityblock')
              for yi in np.split(y, chunks)]
    cbdm_list = compute(*values, scheduler='threads')
    cbdm = np.empty((x.shape[0], y.shape[0]))
    for i, ci in enumerate(cbdm_list):
        cbdm[:,i * stride:i * stride + stride] = ci
        
    return cbdm

In [12]:
print(np.abs(cityblock_dask_concat(x, x, chunks) - cdist(x, x, 'cityblock')).max())
print(np.abs(cityblock_dask_empty(x, x, chunks) - cdist(x, x, 'cityblock')).max())

0.0
0.0


# Conclusions
The main points to take home from this notebook are:
  * Dask's delayed execution can be used to make distributed version of functions that run on a single thread.
  * Ditributed version of functions that use OpenMP threads might be slower than the original since the CPU threads need be shared between the concurrent executions of the function.