Skip to content

Commit

Permalink
Distributed labeling (#94)
Browse files Browse the repository at this point in the history
* Initial work on proper distributed labeling

* Use vindex (blech)

* Initial untested implementation

* _label_adj_graph working

* Almost working except relabeling

* Add missing imports

* Fix indexing

* Fix imports and total count

* Specify the chunking in `_relabel_components`

By specifying the chunking of the result from `map_blocks`, we are able
to work with an older version of Dask.

* Fix some flake8 errors

* Drop transpose of `all_mappings`

As we can concatenate along a different axis, which serves our purpose
just as well, go ahead and change the code accordingly to avoid a
transpose.

* Use NumPy's `in1d` instead of `isin`

As older versions of NumPy that we support and test against don't
include `isin`, switching to using `in1d`, which has been around longer.
Since the array in question is already 1-D, there is no need for us to
reshape the result after calling `in1d`. So this is sufficient for our
use case.

* Handle empty adjacency graph for singleton chunk

When there is a singleton chunk, there are no shared faces between
chunks to construct an adjacency graph from. In this case ensure there
is at least an empty array to start with. This doesn't alter the cases
where there are multiple chunks that do share faces. Though it does
avoid branching when there is a single chunk with no shared faces.

* Drop unused `i` from `numblocks` `for`-loop

* Fix connected components dtype

Previously we were incorrectly determining the connected components
dtype. This fixes it by inspecting the result on a trivial case and
seeing what the dtype is. Then using that to set the delayed type when
converting to a Dask Array.

* Fix non-zero increment's type to be `LABEL_DTYPE`

* Make sure relabeling array matches label type

* Get right index for connected component array

* Fix incorrect labeling between multiply-matched labels

* Fix test to test equivalent labeling, not identical labeling

* Major cleanup of the new label code

* Drop `partial` from `block_ndi_label_delayed`

As we are already calling a delayed wrapped copy of `label`, there is no
need to use `partial` to bind arguments first. So go ahead and drop
`partial` and pass the arguments directly.

* Drop `partial` from `connected_components_delayed`

As we are already calling a delayed wrapped copy of
`connected_components`, there is no need to use `partial` to bind
arguments first. So go ahead and drop `partial` and pass the arguments
directly.

* Bump Dask requirement to 0.18.2

As we are now making use of the `blocks` accessor of Dask Arrays and
this requires a newer version of Dask, bump the minimum version of Dask
to 0.18.2.

* Force `total` to a `LABEL_DTYPE` scalar

Ensure that `total` is a `LABEL_DTYPE` scalar. This is needed by
`where`, which checks the `dtype` of the arguments it is provided.

* Force `0` in `da.where` to `LABEL_DTYPE`

Make sure that `0` in `da.where` is also of `LABEL_DTYPE`. This way we
can ensure that the array generated by `where` has the expected type and
thus avoid using `astype` to copy and cast the array.

* Make `n` an array in `block_ndi_label_delayed`

Go ahead and make `n` an array in `block_ndi_label_delayed`. This
ensures it matches what we expect later. Plus it avoids some boilerplate
in `label` that makes things less clear.

* Ensure `n` is a `LABEL_DTYPE` scalar

To make sure that `n` matches the type we want it to be, add a delayed
cast to the desired type. Normally `n` would be of type `int`, but we
need it to `LABEL_DTYPE` so we can add it to the label images without
causing a type conversion. So this fixes that issue as well.

* Update tests of `label` for multiple chunks

As `label` now supports multiple chunks, drop all but one of the single
chunk test cases. Also add a few multiple chunk cases with 2-D and 3-D
data. Try a few different structured elements for these different cases.
Also make sure there is still one test for the singleton chunk case.
Freeze the NumPy random seed and threshold used for constructing the
masks based on what seems to work well for different label cases (e.g.
labels within a single chunk and labels crossing one or more chunk
boundaries).

* Test `label` with the "U" case

Make sure to exercise the test case where a labeled region crosses the
chunk boundary in two locations instead of just one. This is done to
ensure that the multiple chunk implementation is able to resolve this
down to a single labeled region.

* Convert `_utils` module to `_utils` package

Changes the `_utils` module into the `_utils` package. This should make
it easier to add more specific collections of utility functions and
group them accordingly.

* Create module stub for label utility functions

* Refactor label utility functions

Moves some utility functions from `dask_image.ndmeasure`'s `__init__`
used to perform `label` over multiple chunks to `_utils._label`.

* Drop underscores from externally used functions

* Mark some internal utility functions as private

* Drop import alias

* Use `slices_from_chunks` in `label`

* Revert "Bump Dask requirement to 0.18.2"

This reverts commit 2b2a84e.

* Tweak docstring initial line

* Drop unused import

* Implement `_unique_axis_0` by with structured view

By viewing the array as a structured array that merely groups together
the other dimensions within the structure, `_unique_axis_0` is able to
call NumPy's `unique` function on the array keeping the type information
unchanged. Thus if `unique` is able to handle the specific type more
efficiently, we benefit from that speed up still.

Note: More work would be needed if we wanted to handle F-contiguous
arrays, but that is not needed for our use case.

* Use our `_unique_axis_0` implementation

* Adjust whitespace in docstrings

* Generalize `_unique_axis`

Allow an arbitrary `axis` to specified in `_unique_axis`, but have it
default to `0` if not specified. This keeps the previous behavior while
making the function more generally useful.

* Minor indentation fix in docstring
  • Loading branch information
jni authored and jakirkham committed Feb 10, 2019
1 parent e3b746d commit 638cf24
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 37 deletions.
59 changes: 32 additions & 27 deletions dask_image/ndmeasure/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@

import collections
import functools
from warnings import warn

import numpy
import scipy.ndimage

import dask.array

from .. import _pycompat
from . import _utils
from ._utils import _label


def center_of_mass(input, labels=None, index=None):
Expand Down Expand Up @@ -210,31 +209,37 @@ def label(input, structure=None):

input = dask.array.asarray(input)

if not all([len(c) == 1 for c in input.chunks]):
warn(
"``input`` does not have 1 chunk in all dimensions; "
"it will be consolidated first",
RuntimeWarning
)

label_func = dask.delayed(scipy.ndimage.label, nout=2)
label, num_features = label_func(input, structure)

label = dask.array.from_delayed(
label,
input.shape,
numpy.int32
)

num_features = dask.array.from_delayed(
num_features,
tuple(),
int
)

result = (label, num_features)

return result
labeled_blocks = numpy.empty(input.numblocks, dtype=object)
total = _label.LABEL_DTYPE.type(0)

# First, label each block independently, incrementing the labels in that
# block by the total number of labels from previous blocks. This way, each
# block's labels are globally unique.
for index, cslice in zip(numpy.ndindex(*input.numblocks),
dask.array.core.slices_from_chunks(input.chunks)):
input_block = input[cslice]
labeled_block, n = _label.block_ndi_label_delayed(input_block,
structure)
block_label_offset = dask.array.where(labeled_block > 0,
total,
_label.LABEL_DTYPE.type(0))
labeled_block += block_label_offset
labeled_blocks[index] = labeled_block
total += n

# Put all the blocks together
block_labeled = dask.array.block(labeled_blocks.tolist())

# Now, build a label connectivity graph that groups labels across blocks.
# We use this graph to find connected components and then relabel each
# block according to those.
label_groups = _label.label_adjacency_graph(block_labeled, structure,
total)
new_labeling = _label.connected_components_delayed(label_groups)
relabeled = _label.relabel_blocks(block_labeled, new_labeling)
n = dask.array.max(relabeled)

return (relabeled, n)


def labeled_comprehension(input,
Expand Down
File renamed without changes.
246 changes: 246 additions & 0 deletions dask_image/ndmeasure/_utils/_label.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
# -*- coding: utf-8 -*-

import operator

import numpy
import scipy.ndimage
import scipy.sparse
import scipy.sparse.csgraph

import dask
import dask.array


def _get_ndimage_label_dtype():
return scipy.ndimage.label([1, 0, 1])[0].dtype


LABEL_DTYPE = _get_ndimage_label_dtype()


def _get_connected_components_dtype():
a = numpy.empty((0, 0), dtype=int)
return scipy.sparse.csgraph.connected_components(a)[1].dtype


CONN_COMP_DTYPE = _get_connected_components_dtype()


def relabel_blocks(block_labeled, new_labeling):
"""
Relabel a block-labeled array based on ``new_labeling``.
Parameters
----------
block_labeled : array of int
The input label array.
new_labeling : 1D array of int
A new labeling, such that ``labeling[i] = j`` implies that
any element in ``array`` valued ``i`` should be relabeled to ``j``.
Returns
-------
relabeled : array of int, same shape as ``array``
The relabeled input array.
"""
new_labeling = new_labeling.astype(LABEL_DTYPE)
relabeled = dask.array.map_blocks(operator.getitem,
new_labeling,
block_labeled,
dtype=LABEL_DTYPE,
chunks=block_labeled.chunks)
return relabeled


def _unique_axis(a, axis=0):
"""Find unique subarrays in axis in N-D array."""
at = numpy.ascontiguousarray(a.swapaxes(0, axis))
dt = numpy.dtype([("values", at.dtype, at.shape[1:])])
atv = at.view(dt)
r = numpy.unique(atv)["values"].swapaxes(0, axis)
return r


def _across_block_label_grouping(face, structure):
"""
Find a grouping of labels across block faces.
We assume that the labels on either side of the block face are unique to
that block. This is enforced elsewhere.
Parameters
----------
face : array-like
This is the boundary, of thickness (2,), between two blocks.
structure : array-like
Structuring element for the labeling of the face. This should have
length 3 along each axis and have the same number of dimensions as
``face``.
Returns
-------
grouped : array of int, shape (2, M)
If a column of ``grouped`` contains the values ``i`` and ``j``, it
implies that labels ``i`` and ``j`` belong in the same group. These
are edges in a global label connectivity graph.
Examples
--------
>>> face = numpy.array([[1, 1, 0, 2, 2, 0, 8],
... [0, 7, 7, 7, 7, 0, 9]])
>>> structure = numpy.ones((3, 3), dtype=bool)
>>> _across_block_label_grouping(face, structure)
array([[1, 2, 8],
[2, 7, 9]], dtype=numpy.int32)
This shows that 1-2 are connected, 2-7 are connected, and 8-9 are
connected. The resulting graph is (1-2-7), (8-9).
"""
common_labels = scipy.ndimage.label(face, structure)[0]
matching = numpy.stack((common_labels.ravel(), face.ravel()), axis=1)
unique_matching = _unique_axis(matching)
valid = numpy.all(unique_matching, axis=1)
unique_valid_matching = unique_matching[valid]
common_labels, labels = unique_valid_matching.T
in_group = numpy.flatnonzero(numpy.diff(common_labels) == 0)
i = numpy.take(labels, in_group)
j = numpy.take(labels, in_group + 1)
grouped = numpy.stack((i, j), axis=0)
return grouped


def _across_block_label_grouping_delayed(face, structure):
"""Delayed version of :func:`_across_block_label_grouping`."""
_across_block_label_grouping_ = dask.delayed(_across_block_label_grouping)
grouped = _across_block_label_grouping_(face, structure)
return dask.array.from_delayed(grouped,
shape=(2, numpy.nan),
dtype=LABEL_DTYPE)


@dask.delayed
def _to_csr_matrix(i, j, n):
"""Using i and j as coo-format coordinates, return csr matrix."""
v = numpy.ones_like(i)
mat = scipy.sparse.coo_matrix((v, (i, j)), shape=(n, n))
return mat.tocsr()


def label_adjacency_graph(labels, structure, nlabels):
"""
Adjacency graph of labels between chunks of ``labels``.
Each chunk in ``labels`` has been labeled independently, and the labels
in different chunks are guaranteed to be unique.
Here we construct a graph connecting labels in different chunks that
correspond to the same logical label in the global volume. This is true
if the two labels "touch" across the block face as defined by the input
``structure``.
Parameters
----------
labels : dask array of int
The input labeled array, where each chunk is independently labeled.
structure : array of bool
Structuring element, shape (3,) * labels.ndim.
nlabels : delayed int
The total number of labels in ``labels`` *before* correcting for
global consistency.
Returns
-------
mat : delayed scipy.sparse.csr_matrix
This matrix has value 1 at (i, j) if label i is connected to
label j in the global volume, 0 everywhere else.
"""
faces = _chunk_faces(labels.chunks, labels.shape)
all_mappings = [dask.array.empty((2, 0), dtype=LABEL_DTYPE, chunks=1)]
for face_slice in faces:
face = labels[face_slice]
mapped = _across_block_label_grouping_delayed(face, structure)
all_mappings.append(mapped)
all_mappings = dask.array.concatenate(all_mappings, axis=1)
i, j = all_mappings
mat = _to_csr_matrix(i, j, nlabels + 1)
return mat


def _chunk_faces(chunks, shape):
"""
Return slices for two-pixel-wide boundaries between chunks.
Parameters
----------
chunks : tuple of tuple of int
The chunk specification of the array.
shape : tuple of int
The shape of the array.
Returns
-------
faces : list of tuple of slices
Each element in this list indexes a face between two chunks.
Examples
--------
>>> a = dask.array.arange(110, chunks=110).reshape((10, 11)).rechunk(5)
>>> chunk_faces(a.chunks, a.shape)
[(slice(4, 6, None), slice(0, 5, None)),
(slice(4, 6, None), slice(5, 10, None)),
(slice(4, 6, None), slice(10, 11, None)),
(slice(0, 5, None), slice(4, 6, None)),
(slice(0, 5, None), slice(9, 11, None)),
(slice(5, 10, None), slice(4, 6, None)),
(slice(5, 10, None), slice(9, 11, None))]
"""
slices = dask.array.core.slices_from_chunks(chunks)
ndim = len(shape)
faces = []
for ax in range(ndim):
for sl in slices:
if sl[ax].stop == shape[ax]:
continue
slice_to_append = list(sl)
slice_to_append[ax] = slice(sl[ax].stop - 1, sl[ax].stop + 1)
faces.append(tuple(slice_to_append))
return faces


def block_ndi_label_delayed(block, structure):
"""
Delayed version of ``scipy.ndimage.label``.
Parameters
----------
block : dask array (single chunk)
The input array to be labeled.
structure : array of bool
Structure defining the connectivity of the labeling.
Returns
-------
labeled : dask array, same shape as ``block``.
The labeled array.
n : delayed int
The number of labels in ``labeled``.
"""
label = dask.delayed(scipy.ndimage.label, nout=2)
labeled_block, n = label(block, structure=structure)
n = dask.delayed(LABEL_DTYPE.type)(n)
labeled = dask.array.from_delayed(labeled_block, shape=block.shape,
dtype=LABEL_DTYPE)
n = dask.array.from_delayed(n, shape=(), dtype=LABEL_DTYPE)
return labeled, n


def connected_components_delayed(csr_matrix):
"""
Delayed version of scipy.sparse.csgraph.connected_components.
This version only returns the (delayed) connected component labelling, not
the number of components.
"""
conn_comp = dask.delayed(scipy.sparse.csgraph.connected_components, nout=2)
return dask.array.from_delayed(conn_comp(csr_matrix, directed=False)[1],
shape=(numpy.nan,), dtype=CONN_COMP_DTYPE)
41 changes: 31 additions & 10 deletions tests/test_dask_image/test_ndmeasure/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,39 @@ def test_histogram(shape, chunks, has_lbls, ind, min, max, bins):
assert np.allclose(a_r[i], d_r[i].compute(), equal_nan=True)


def _assert_equivalent_labeling(labels0, labels1):
"""Make sure the two label arrays are equivalent.
In the sense that if two pixels have the same label in labels0, they will
also have the same label in labels1, and vice-versa.
We check this by verifying that there is exactly a one-to-one mapping
between the two label volumes.
"""
matching = np.stack((labels0.ravel(), labels1.ravel()), axis=1)
unique_matching = dask_image.ndmeasure._label._unique_axis(matching)
bincount0 = np.bincount(unique_matching[:, 0])
bincount1 = np.bincount(unique_matching[:, 1])
assert np.all(bincount0 == 1)
assert np.all(bincount1 == 1)


@pytest.mark.parametrize(
"shape, chunks, connectivity", [
((15, 16), (4, 5), 1),
((15, 16), (15, 16), 1),
((15, 16), (15, 16), 2),
((5, 6, 4), (5, 6, 4), 1),
((5, 6, 4), (5, 6, 4), 2),
((5, 6, 4), (5, 6, 4), 3),
"seed, prob, shape, chunks, connectivity", [
(42, 0.4, (15, 16), (15, 16), 1),
(42, 0.4, (15, 16), (4, 5), 1),
(42, 0.4, (15, 16), (4, 5), 2),
(42, 0.4, (15, 16), (8, 5), 1),
(42, 0.4, (15, 16), (8, 5), 2),
(42, 0.3, (10, 8, 6), (5, 4, 3), 1),
(42, 0.3, (10, 8, 6), (5, 4, 3), 2),
(42, 0.3, (10, 8, 6), (5, 4, 3), 3),
]
)
def test_label(shape, chunks, connectivity):
a = np.random.random(shape) < 0.5
def test_label(seed, prob, shape, chunks, connectivity):
np.random.seed(seed)

a = np.random.random(shape) < prob
d = da.from_array(a, chunks=chunks)

s = spnd.generate_binary_structure(a.ndim, connectivity)
Expand All @@ -263,7 +284,7 @@ def test_label(shape, chunks, connectivity):

assert a_l.dtype == d_l.dtype
assert a_l.shape == d_l.shape
assert np.allclose(np.array(a_l), np.array(d_l), equal_nan=True)
_assert_equivalent_labeling(a_l, d_l.compute())


@pytest.mark.parametrize(
Expand Down

0 comments on commit 638cf24

Please sign in to comment.