Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ env:
- TEST_TARGET=default
- TEST_TARGET=default TEST_MINIMAL=true
- TEST_TARGET=coding
- TEST_TARGET=example
- TEST_TARGET=doctest

git:
depth: 10000
Expand Down Expand Up @@ -107,7 +105,7 @@ install:

script:
- if [[ $TEST_TARGET == 'default' ]]; then
python -m iris.tests.runner --default-tests --system-tests --print-failed-images;
python -m unittest discover -v lib/iris/tests/integration/temp_dask;
fi
- if [[ $TEST_TARGET == 'example' ]]; then
python -m iris.tests.runner --example-tests --print-failed-images;
Expand Down
1 change: 1 addition & 0 deletions conda-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ numpy
pyke
udunits2
cf_units
dask

# Iris build dependencies
setuptools
Expand Down
79 changes: 79 additions & 0 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# (C) British Crown Copyright 2017, Met Office
#
# This file is part of Iris.
#
# Iris is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Iris is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Iris. If not, see <http://www.gnu.org/licenses/>.
"""
Routines for lazy data handling.

To avoid replicating implementation-dependent test and conversion code.

"""
from __future__ import (absolute_import, division, print_function)
from six.moves import (filter, input, map, range, zip) # noqa

import dask.array as da


def is_lazy_data(data):
"""
Return whether the argument is an Iris 'lazy' data array.

At present, this means simply a Dask array.
We determine this by checking for a "compute" property.

"""
return hasattr(data, 'compute')


def as_concrete_data(data):
"""
Return the actual content of the argument, as a numpy array.

If lazy, return the realised data, otherwise return the argument unchanged.

"""
if is_lazy_data(data):
data = data.compute()
return data


# A magic value, borrowed from biggus
_MAX_CHUNK_SIZE = 8 * 1024 * 1024 * 2


def as_lazy_data(data):
"""
Return a lazy equivalent of the argument, as a lazy array.

For an existing dask array, return it unchanged.
Otherwise, return the argument wrapped with dask.array.from_array.
This assumes the underlying object has numpy-array-like properties.

"""
#
# NOTE: there is still some doubts here about what forms of indexing are
# valid.
# Call an integer, slice, ellipsis or new-axis object a "simple" index, and
# other cases "compound" : a list, tuple, or array of integers.
# ( Except, a length-1 tuple, list or array might count as "simple" ? )
# If there is at most one compund index, I think we are ok -- i.e. all
# interpretations should deliver the same.
# If there is *more than one* "compound" index there is potential for
# trouble.
# NOTE#2: cube indexing processes the indices, which may also be relevant.
#
if not is_lazy_data(data):
data = da.from_array(data, chunks=_MAX_CHUNK_SIZE)
return data
46 changes: 35 additions & 11 deletions lib/iris/_merge.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2010 - 2016, Met Office
# (C) British Crown Copyright 2010 - 2017, Met Office
#
# This file is part of Iris.
#
Expand Down Expand Up @@ -29,13 +29,14 @@
from collections import namedtuple, OrderedDict
from copy import deepcopy

import biggus
import dask.array as da
import numpy as np
import numpy.ma as ma

import iris.cube
import iris.coords
import iris.exceptions
from iris._lazy_data import is_lazy_data, as_concrete_data, as_lazy_data
import iris.util


Expand Down Expand Up @@ -1068,6 +1069,27 @@ def derive_space(groups, relation_matrix, positions, function_matrix=None):
return space


def _multidim_daskstack(stack):
"""
Recursively build a multidensional stacked dask array.

The argument is an ndarray of dask arrays.
This is needed because dask.array.stack only accepts a 1-dimensional list.

"""
if stack.ndim == 0:
# Handle array scalar inputs, as biggus does.
dask_components = stack[()]
elif stack.ndim == 1:
# 'Another' base case : simple 1-d goes direct in dask.
dask_components = list(stack)
else:
# Recurse because dask.stack does not do multi-dimensional.
dask_components = [_multidim_daskstack(subarray)
for subarray in stack]
return da.stack(dask_components)


class ProtoCube(object):
"""
Framework for merging source-cubes into one or more higher
Expand Down Expand Up @@ -1192,10 +1214,10 @@ def merge(self, unique=True):
# Generate group-depth merged cubes from the source-cubes.
for level in range(group_depth):
# Stack up all the data from all of the relevant source
# cubes in a single biggus ArrayStack.
# cubes in a single dask "stacked" array.
# If it turns out that all the source cubes already had
# their data loaded then at the end we can convert the
# ArrayStack back to a numpy array.
# their data loaded then at the end we convert the stack back
# into a plain numpy array.
stack = np.empty(self._stack_shape, 'object')
all_have_data = True
for nd_index in nd_indexes:
Expand All @@ -1204,17 +1226,19 @@ def merge(self, unique=True):
group = group_by_nd_index[nd_index]
offset = min(level, len(group) - 1)
data = self._skeletons[group[offset]].data
# Ensure the data is represented as a biggus.Array and
# slot that Array into the stack.
if isinstance(data, biggus.Array):
# Ensure the data is represented as a dask array and
# slot that array into the stack.
if is_lazy_data(data):
all_have_data = False
else:
data = biggus.NumpyArrayAdapter(data)
data = as_lazy_data(data)
stack[nd_index] = data

merged_data = biggus.ArrayStack(stack)
merged_data = _multidim_daskstack(stack)
if all_have_data:
merged_data = merged_data.masked_array()
# All inputs were concrete, so turn the result back into a
# normal array.
merged_data = as_concrete_data(merged_data)
# Unmask the array only if it is filled.
if (ma.isMaskedArray(merged_data) and
ma.count_masked(merged_data) == 0):
Expand Down
22 changes: 12 additions & 10 deletions lib/iris/cube.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# (C) British Crown Copyright 2010 - 2016, Met Office
# (C) British Crown Copyright 2010 - 2017, Met Office
#
# This file is part of Iris.
#
Expand Down Expand Up @@ -46,6 +46,7 @@
import iris.coords
import iris._concatenate
import iris._constraints
from iris._lazy_data import is_lazy_data, as_lazy_data, as_concrete_data
import iris._merge
import iris.exceptions
import iris.util
Expand Down Expand Up @@ -713,7 +714,7 @@ def __init__(self, data, standard_name=None, long_name=None,
if isinstance(data, six.string_types):
raise TypeError('Invalid data type: {!r}.'.format(data))

if not isinstance(data, (biggus.Array, ma.MaskedArray)):
if not is_lazy_data(data):
data = np.asarray(data)
self._my_data = data

Expand Down Expand Up @@ -1630,8 +1631,8 @@ def lazy_data(self, array=None):

"""
if array is not None:
if not isinstance(array, biggus.Array):
raise TypeError('new values must be a biggus.Array')
if not is_lazy_data(array):
raise TypeError('new values must be a lazy array')
if self.shape != array.shape:
# The _ONLY_ data reshape permitted is converting a
# 0-dimensional array into a 1-dimensional array of
Expand All @@ -1643,8 +1644,8 @@ def lazy_data(self, array=None):
self._my_data = array
else:
array = self._my_data
if not isinstance(array, biggus.Array):
array = biggus.NumpyArrayAdapter(array)
if not is_lazy_data(array):
array = as_lazy_data(array)
return array

@property
Expand Down Expand Up @@ -1681,9 +1682,9 @@ def data(self):

"""
data = self._my_data
if not isinstance(data, np.ndarray):
if is_lazy_data(data):
try:
data = data.masked_array()
data = as_concrete_data(data)
except MemoryError:
msg = "Failed to create the cube's data as there was not" \
" enough memory available.\n" \
Expand All @@ -1694,7 +1695,8 @@ def data(self):
msg = msg.format(self.shape, data.dtype)
raise MemoryError(msg)
# Unmask the array only if it is filled.
if isinstance(data, np.ndarray) and ma.count_masked(data) == 0:
if (isinstance(data, np.ma.masked_array) and
ma.count_masked(data) == 0):
data = data.data
# data may be a numeric type, so ensure an np.ndarray is returned
self._my_data = np.asanyarray(data)
Expand All @@ -1715,7 +1717,7 @@ def data(self, value):
self._my_data = data

def has_lazy_data(self):
return isinstance(self._my_data, biggus.Array)
return is_lazy_data(self._my_data)

@property
def dim_coords(self):
Expand Down
26 changes: 26 additions & 0 deletions lib/iris/tests/integration/temp_dask/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# (C) British Crown Copyright 2017, Met Office
#
# This file is part of Iris.
#
# Iris is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Iris is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Iris. If not, see <http://www.gnu.org/licenses/>.
"""
Temporary integration tests, specific to replacement of biggus with dask.

Note: some content here may eventually move into main tests.
Keep it here for now, so we can easily test all dask code with :
python -m unittest discover -v lib/iris/tests/integration/temp_dask

"""
from __future__ import (absolute_import, division, print_function)
from six.moves import (filter, input, map, range, zip) # noqa
79 changes: 79 additions & 0 deletions lib/iris/tests/integration/temp_dask/test_lazy_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# (C) British Crown Copyright 2017, Met Office
#
# This file is part of Iris.
#
# Iris is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Iris is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Iris. If not, see <http://www.gnu.org/licenses/>.
"""
Test lazy data utility functions.

Note: really belongs in "tests/unit/lazy_data".

"""
from __future__ import (absolute_import, division, print_function)
from six.moves import (filter, input, map, range, zip) # noqa

# Import iris.tests first so that some things can be initialised before
# importing anything else.
import iris.tests as tests


import numpy as np
import dask.array as da


from iris._lazy_data import is_lazy_data, as_lazy_data, as_concrete_data


class MixinLazyTests(object):
def setUp(self):
# Create test real and dask arrays.
self.real_array = np.arange(24).reshape((2, 3, 4))
self.lazy_values = np.arange(30).reshape((2, 5, 3))
self.lazy_array = da.from_array(self.lazy_values, 1e6)


class Test_is_lazy_data(MixinLazyTests, tests.IrisTest):
def test_lazy(self):
self.assertTrue(is_lazy_data(self.lazy_array))

def test_real(self):
self.assertFalse(is_lazy_data(self.real_array))


class Test_as_lazy_data(MixinLazyTests, tests.IrisTest):
def test_lazy(self):
result = as_lazy_data(self.lazy_array)
self.assertTrue(is_lazy_data(result))
self.assertIs(result, self.lazy_array)

def test_real(self):
result = as_lazy_data(self.real_array)
self.assertTrue(is_lazy_data(result))
self.assertArrayAllClose(as_concrete_data(result), self.real_array)


class Test_as_concrete_data(MixinLazyTests, tests.IrisTest):
def test_lazy(self):
result = as_concrete_data(self.lazy_array)
self.assertFalse(is_lazy_data(result))
self.assertArrayAllClose(result, self.lazy_values)

def test_real(self):
result = as_concrete_data(self.real_array)
self.assertFalse(is_lazy_data(result))
self.assertIs(result, self.real_array)


if __name__ == '__main__':
tests.main()
Loading