In [3]:
# dask 0.5.0-dev
!pip install git+https://github.com/ContinuumIO/dask.git#egg=dask[bag]

Downloading/unpacking dask[bag] from git+https://github.com/ContinuumIO/dask.git
  Cloning https://github.com/ContinuumIO/dask.git to /Users/arve/.virtualenvs/3.4/build/dask
  Running setup.py (path:/Users/arve/.virtualenvs/3.4/build/dask/setup.py) egg_info for package dask
    
  Installing extra requirements: 'bag'
Downloading/unpacking dill (from dask[bag])
  Downloading dill-0.2.2.tgz (54kB): 54kB downloaded
  Running setup.py (path:/Users/arve/.virtualenvs/3.4/build/dill/setup.py) egg_info for package dill
    
Downloading/unpacking psutil (from dask[bag])
  Downloading psutil-2.2.1.tar.gz (223kB): 223kB downloaded
  Running setup.py (path:/Users/arve/.virtualenvs/3.4/build/psutil/setup.py) egg_info for package psutil
    
Downloading/unpacking toolz (from dask[bag])
  Downloading toolz-0.7.2.tar.gz
  Running setup.py (path:/Users/arve/.virtualenvs/3.4/build/toolz/setup.py) egg_info for package toolz
    
Installing collected packages: dask, dill, psutil, toolz
  Running setup.py 

In [4]:
from math import ceil
from multiprocessing import cpu_count
import dask.array as da

def _get_chunks(shape, ncpu):
    """
    Split the array into equal sized chunks based on the number of
    available processors. The last chunk in each dimension absorbs the
    remainder array elements if the number of cpus does not divide evenly into
    the number of array elements.
    >>> _get_chunks((4, 4), 4)
    ((2, 2), (2, 2))
    >>> _get_chunks((4, 4), 2)
    ((2, 2), (4,))
    >>> _get_chunks((5, 5), 2)
    ((2, 3), (5,))
    >>> _get_chunks((2, 4), 2)
    ((1, 1), (4,))
    """
    chunks = []
    nchunks_per_dim = int(ceil(ncpu ** (1./len(shape))))

    used_chunks = 1
    for i in shape:
        if used_chunks < ncpu:
            regular_chunk = i // nchunks_per_dim
            remainder_chunk = regular_chunk + (i % nchunks_per_dim)

            if regular_chunk == 0:
                chunk_lens = (remainder_chunk,)
            else:
                chunk_lens = ((regular_chunk,) * (nchunks_per_dim - 1) +
                              (remainder_chunk,))
        else:
            chunk_lens = (i,)

        chunks.append(chunk_lens)
        used_chunks *= nchunks_per_dim
    return tuple(chunks)


def apply_chunks(function, array, chunks=None, depth=0, mode=None,
                 extra_arguments=(), extra_keywords={}):
    """Map a function in parallel across an array.
    Split an array into possibly overlapping chunks of a given depth and
    boundary type, call the given function in parallel on the chunks, combine
    the chunks and return the resulting array.
    Parameters
    ----------
    function : function
        Function to be mapped which takes an array as an argument.
    array : numpy array
        Array which the function will be applied to.
    chunks : int, tuple, or tuple of tuples, optional
        A single integer is interpreted as the length of one side of a square
        chunk that should be tiled across the array.  One tuple of length
        ``array.ndim`` represents the shape of a chunk, and it is tiled across
        the array.  A list of tuples of length ``ndim``, where each sub-tuple
        is a sequence of chunk sizes along the corresponding dimension. If
        None, the array is broken up into chunks based on the number of
        available cpus. More information about chunks is in the documentation
        `here <https://dask.pydata.org/en/latest/array-design.html>`_.
    depth : int, optional
        Integer equal to the depth of the added boundary cells. Defaults to
        zero.
    mode : 'reflect', 'periodic', 'wrap', 'nearest', optional
        type of external boundary padding
    extra_arguments : tuple, optional
        Tuple of arguments to be passed to the function.
    extra_keywords : dictionary, optional
        Dictionary of keyword arguments to be passed to the function.
    """
    if chunks is None:
        shape = array.shape
        ncpu = cpu_count()
        chunks = _get_chunks(shape, ncpu)

    if mode == 'wrap':
        mode = 'periodic'

    def wrapped_func(arr):
        return function(arr, *extra_arguments, **extra_keywords)

    darr = da.from_array(array, chunks=chunks)
    return darr.map_overlap(wrapped_func, depth, boundary=mode).compute()


In [6]:
# get a large image
!wget http://folk.ntnu.no/seljebu/stitched.png -O stitched.png

--2015-05-15 00:04:06--  http://folk.ntnu.no/seljebu/stitched.png
Resolving folk.ntnu.no... 129.241.56.95
Connecting to folk.ntnu.no|129.241.56.95|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 22994155 (22M) [image/png]
Saving to: 'stitched.png'


2015-05-15 00:04:14 (2.61 MB/s) - 'stitched.png' saved [22994155/22994155]



In [7]:
from skimage.io import imread
img = imread('stitched.png')

In [9]:
from skimage.morphology import square
from skimage.filters.rank import mean

selem = square(9)
%time res = apply_chunks(mean, img, depth=selem.shape[0]//2, extra_arguments=(selem,))

CPU times: user 10.3 s, sys: 224 ms, total: 10.6 s
Wall time: 10.5 s


In [12]:
%time res = mean(img, selem)

CPU times: user 11 s, sys: 140 ms, total: 11.1 s
Wall time: 11.1 s


In [10]:
import dask
from dask.multiprocessing import get as mpget
dask.set_options(get=mpget)

<dask.context.set_options at 0x110050ef0>

In [11]:
%time res = apply_chunks(mean, img, depth=selem.shape[0]//2, extra_arguments=(selem,))

PicklingError: Can't pickle <class 'numpy.int64'>: it's not the same object as numpy.int64