diff --git a/.travis.yml b/.travis.yml index d6c31622f4..9a0d295743 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,8 +15,6 @@ env: - TEST_TARGET=default - TEST_TARGET=default TEST_MINIMAL=true - TEST_TARGET=coding - - TEST_TARGET=example - - TEST_TARGET=doctest git: depth: 10000 @@ -107,7 +105,7 @@ install: script: - if [[ $TEST_TARGET == 'default' ]]; then - python -m iris.tests.runner --default-tests --system-tests --print-failed-images; + python -m unittest discover -v lib/iris/tests/integration/temp_dask; fi - if [[ $TEST_TARGET == 'example' ]]; then python -m iris.tests.runner --example-tests --print-failed-images; diff --git a/conda-requirements.txt b/conda-requirements.txt index 3324c5fbc4..cd89693e57 100644 --- a/conda-requirements.txt +++ b/conda-requirements.txt @@ -10,6 +10,7 @@ numpy pyke udunits2 cf_units +dask # Iris build dependencies setuptools diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py new file mode 100644 index 0000000000..87dcff2ba0 --- /dev/null +++ b/lib/iris/_lazy_data.py @@ -0,0 +1,79 @@ +# (C) British Crown Copyright 2017, Met Office +# +# This file is part of Iris. +# +# Iris is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Iris is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Iris. If not, see . +""" +Routines for lazy data handling. + +To avoid replicating implementation-dependent test and conversion code. + +""" +from __future__ import (absolute_import, division, print_function) +from six.moves import (filter, input, map, range, zip) # noqa + +import dask.array as da + + +def is_lazy_data(data): + """ + Return whether the argument is an Iris 'lazy' data array. + + At present, this means simply a Dask array. + We determine this by checking for a "compute" property. + + """ + return hasattr(data, 'compute') + + +def as_concrete_data(data): + """ + Return the actual content of the argument, as a numpy array. + + If lazy, return the realised data, otherwise return the argument unchanged. + + """ + if is_lazy_data(data): + data = data.compute() + return data + + +# A magic value, borrowed from biggus +_MAX_CHUNK_SIZE = 8 * 1024 * 1024 * 2 + + +def as_lazy_data(data): + """ + Return a lazy equivalent of the argument, as a lazy array. + + For an existing dask array, return it unchanged. + Otherwise, return the argument wrapped with dask.array.from_array. + This assumes the underlying object has numpy-array-like properties. + + """ + # + # NOTE: there is still some doubts here about what forms of indexing are + # valid. + # Call an integer, slice, ellipsis or new-axis object a "simple" index, and + # other cases "compound" : a list, tuple, or array of integers. + # ( Except, a length-1 tuple, list or array might count as "simple" ? ) + # If there is at most one compund index, I think we are ok -- i.e. all + # interpretations should deliver the same. + # If there is *more than one* "compound" index there is potential for + # trouble. + # NOTE#2: cube indexing processes the indices, which may also be relevant. + # + if not is_lazy_data(data): + data = da.from_array(data, chunks=_MAX_CHUNK_SIZE) + return data diff --git a/lib/iris/_merge.py b/lib/iris/_merge.py index b4b549609b..978efbb6c8 100644 --- a/lib/iris/_merge.py +++ b/lib/iris/_merge.py @@ -1,4 +1,4 @@ -# (C) British Crown Copyright 2010 - 2016, Met Office +# (C) British Crown Copyright 2010 - 2017, Met Office # # This file is part of Iris. # @@ -29,13 +29,14 @@ from collections import namedtuple, OrderedDict from copy import deepcopy -import biggus +import dask.array as da import numpy as np import numpy.ma as ma import iris.cube import iris.coords import iris.exceptions +from iris._lazy_data import is_lazy_data, as_concrete_data, as_lazy_data import iris.util @@ -1068,6 +1069,27 @@ def derive_space(groups, relation_matrix, positions, function_matrix=None): return space +def _multidim_daskstack(stack): + """ + Recursively build a multidensional stacked dask array. + + The argument is an ndarray of dask arrays. + This is needed because dask.array.stack only accepts a 1-dimensional list. + + """ + if stack.ndim == 0: + # Handle array scalar inputs, as biggus does. + dask_components = stack[()] + elif stack.ndim == 1: + # 'Another' base case : simple 1-d goes direct in dask. + dask_components = list(stack) + else: + # Recurse because dask.stack does not do multi-dimensional. + dask_components = [_multidim_daskstack(subarray) + for subarray in stack] + return da.stack(dask_components) + + class ProtoCube(object): """ Framework for merging source-cubes into one or more higher @@ -1192,10 +1214,10 @@ def merge(self, unique=True): # Generate group-depth merged cubes from the source-cubes. for level in range(group_depth): # Stack up all the data from all of the relevant source - # cubes in a single biggus ArrayStack. + # cubes in a single dask "stacked" array. # If it turns out that all the source cubes already had - # their data loaded then at the end we can convert the - # ArrayStack back to a numpy array. + # their data loaded then at the end we convert the stack back + # into a plain numpy array. stack = np.empty(self._stack_shape, 'object') all_have_data = True for nd_index in nd_indexes: @@ -1204,17 +1226,19 @@ def merge(self, unique=True): group = group_by_nd_index[nd_index] offset = min(level, len(group) - 1) data = self._skeletons[group[offset]].data - # Ensure the data is represented as a biggus.Array and - # slot that Array into the stack. - if isinstance(data, biggus.Array): + # Ensure the data is represented as a dask array and + # slot that array into the stack. + if is_lazy_data(data): all_have_data = False else: - data = biggus.NumpyArrayAdapter(data) + data = as_lazy_data(data) stack[nd_index] = data - merged_data = biggus.ArrayStack(stack) + merged_data = _multidim_daskstack(stack) if all_have_data: - merged_data = merged_data.masked_array() + # All inputs were concrete, so turn the result back into a + # normal array. + merged_data = as_concrete_data(merged_data) # Unmask the array only if it is filled. if (ma.isMaskedArray(merged_data) and ma.count_masked(merged_data) == 0): diff --git a/lib/iris/cube.py b/lib/iris/cube.py index 79fb074ea0..18aa4d350d 100644 --- a/lib/iris/cube.py +++ b/lib/iris/cube.py @@ -1,4 +1,4 @@ -# (C) British Crown Copyright 2010 - 2016, Met Office +# (C) British Crown Copyright 2010 - 2017, Met Office # # This file is part of Iris. # @@ -46,6 +46,7 @@ import iris.coords import iris._concatenate import iris._constraints +from iris._lazy_data import is_lazy_data, as_lazy_data, as_concrete_data import iris._merge import iris.exceptions import iris.util @@ -713,7 +714,7 @@ def __init__(self, data, standard_name=None, long_name=None, if isinstance(data, six.string_types): raise TypeError('Invalid data type: {!r}.'.format(data)) - if not isinstance(data, (biggus.Array, ma.MaskedArray)): + if not is_lazy_data(data): data = np.asarray(data) self._my_data = data @@ -1630,8 +1631,8 @@ def lazy_data(self, array=None): """ if array is not None: - if not isinstance(array, biggus.Array): - raise TypeError('new values must be a biggus.Array') + if not is_lazy_data(array): + raise TypeError('new values must be a lazy array') if self.shape != array.shape: # The _ONLY_ data reshape permitted is converting a # 0-dimensional array into a 1-dimensional array of @@ -1643,8 +1644,8 @@ def lazy_data(self, array=None): self._my_data = array else: array = self._my_data - if not isinstance(array, biggus.Array): - array = biggus.NumpyArrayAdapter(array) + if not is_lazy_data(array): + array = as_lazy_data(array) return array @property @@ -1681,9 +1682,9 @@ def data(self): """ data = self._my_data - if not isinstance(data, np.ndarray): + if is_lazy_data(data): try: - data = data.masked_array() + data = as_concrete_data(data) except MemoryError: msg = "Failed to create the cube's data as there was not" \ " enough memory available.\n" \ @@ -1694,7 +1695,8 @@ def data(self): msg = msg.format(self.shape, data.dtype) raise MemoryError(msg) # Unmask the array only if it is filled. - if isinstance(data, np.ndarray) and ma.count_masked(data) == 0: + if (isinstance(data, np.ma.masked_array) and + ma.count_masked(data) == 0): data = data.data # data may be a numeric type, so ensure an np.ndarray is returned self._my_data = np.asanyarray(data) @@ -1715,7 +1717,7 @@ def data(self, value): self._my_data = data def has_lazy_data(self): - return isinstance(self._my_data, biggus.Array) + return is_lazy_data(self._my_data) @property def dim_coords(self): diff --git a/lib/iris/tests/integration/temp_dask/__init__.py b/lib/iris/tests/integration/temp_dask/__init__.py new file mode 100644 index 0000000000..50f059effc --- /dev/null +++ b/lib/iris/tests/integration/temp_dask/__init__.py @@ -0,0 +1,26 @@ +# (C) British Crown Copyright 2017, Met Office +# +# This file is part of Iris. +# +# Iris is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Iris is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Iris. If not, see . +""" +Temporary integration tests, specific to replacement of biggus with dask. + +Note: some content here may eventually move into main tests. +Keep it here for now, so we can easily test all dask code with : + python -m unittest discover -v lib/iris/tests/integration/temp_dask + +""" +from __future__ import (absolute_import, division, print_function) +from six.moves import (filter, input, map, range, zip) # noqa diff --git a/lib/iris/tests/integration/temp_dask/test_lazy_utils.py b/lib/iris/tests/integration/temp_dask/test_lazy_utils.py new file mode 100644 index 0000000000..eee210f22b --- /dev/null +++ b/lib/iris/tests/integration/temp_dask/test_lazy_utils.py @@ -0,0 +1,79 @@ +# (C) British Crown Copyright 2017, Met Office +# +# This file is part of Iris. +# +# Iris is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Iris is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Iris. If not, see . +""" +Test lazy data utility functions. + +Note: really belongs in "tests/unit/lazy_data". + +""" +from __future__ import (absolute_import, division, print_function) +from six.moves import (filter, input, map, range, zip) # noqa + +# Import iris.tests first so that some things can be initialised before +# importing anything else. +import iris.tests as tests + + +import numpy as np +import dask.array as da + + +from iris._lazy_data import is_lazy_data, as_lazy_data, as_concrete_data + + +class MixinLazyTests(object): + def setUp(self): + # Create test real and dask arrays. + self.real_array = np.arange(24).reshape((2, 3, 4)) + self.lazy_values = np.arange(30).reshape((2, 5, 3)) + self.lazy_array = da.from_array(self.lazy_values, 1e6) + + +class Test_is_lazy_data(MixinLazyTests, tests.IrisTest): + def test_lazy(self): + self.assertTrue(is_lazy_data(self.lazy_array)) + + def test_real(self): + self.assertFalse(is_lazy_data(self.real_array)) + + +class Test_as_lazy_data(MixinLazyTests, tests.IrisTest): + def test_lazy(self): + result = as_lazy_data(self.lazy_array) + self.assertTrue(is_lazy_data(result)) + self.assertIs(result, self.lazy_array) + + def test_real(self): + result = as_lazy_data(self.real_array) + self.assertTrue(is_lazy_data(result)) + self.assertArrayAllClose(as_concrete_data(result), self.real_array) + + +class Test_as_concrete_data(MixinLazyTests, tests.IrisTest): + def test_lazy(self): + result = as_concrete_data(self.lazy_array) + self.assertFalse(is_lazy_data(result)) + self.assertArrayAllClose(result, self.lazy_values) + + def test_real(self): + result = as_concrete_data(self.real_array) + self.assertFalse(is_lazy_data(result)) + self.assertIs(result, self.real_array) + + +if __name__ == '__main__': + tests.main() diff --git a/lib/iris/tests/integration/temp_dask/test_merge.py b/lib/iris/tests/integration/temp_dask/test_merge.py new file mode 100644 index 0000000000..a58cdbddcf --- /dev/null +++ b/lib/iris/tests/integration/temp_dask/test_merge.py @@ -0,0 +1,196 @@ +# (C) British Crown Copyright 2017, Met Office +# +# This file is part of Iris. +# +# Iris is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Iris is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Iris. If not, see . +""" +Test data merging with dask. + +Note: really belongs in main unit tests, somewhere ? + +""" +from __future__ import (absolute_import, division, print_function) +from six.moves import (filter, input, map, range, zip) # noqa + +# Import iris.tests first so that some things can be initialised before +# importing anything else. +import iris.tests as tests + + +import numpy as np +import dask.array as da + +import iris +from iris.coords import DimCoord +from iris.cube import Cube, CubeList +from iris._lazy_data import is_lazy_data +import iris._merge + + +def sample_lazy_data(shape): + array = np.arange(1.0 * np.prod(shape)).reshape(shape) + lazy_array = da.from_array(array, 1e6) + return lazy_array + + +def sample_cube(shape=(2, 3), name='cube', units='1', + x=None, y=None, z=None, **attrs): + cube = Cube(sample_lazy_data(shape), units=units) + cube.rename(name) + cube.attributes.update(attrs) + if x is None and len(shape) > 0: + x = np.arange(shape[-1]) + if y is None and len(shape) > 1: + y = np.arange(shape[-2]) + if x is not None: + co = DimCoord(np.array(x, dtype=float), long_name='x', units='1') + cube.add_dim_coord(co, len(shape) - 1) + if y is not None: + co = DimCoord(np.array(y, dtype=float), long_name='y') + if len(shape) > 1: + cube.add_dim_coord(co, len(shape) - 2) + else: + cube.add_aux_coord(co) + if z is not None: + co = DimCoord(np.array(z, dtype=float), long_name='z') + if len(shape) > 2: + cube.add_dim_coord(co, len(shape) - 3) + else: + cube.add_aux_coord(co) + return cube + + +def sample_multidim_merge_cubelist(): + cubes = [] + for i_y in range(6): + for i_x in range(4): + cube = sample_cube(z=i_y) + cube.add_aux_coord(DimCoord(i_x, long_name='aux')) + cubes.append(cube) + return CubeList(cubes) + + +class TestMergeDataFunctional(tests.IrisTest): + def test_single_lazy(self): + cube = sample_cube() + self.assertTrue(cube.has_lazy_data()) + cubelist = CubeList([cube]) + merged = cubelist.merge() + self.assertEqual(len(merged), 1) + self.assertTrue(merged[0].has_lazy_data()) + self.assertTrue(cube.has_lazy_data()) + self.assertEqual(merged[0], cube) + + def test_single_concrete(self): + cube = sample_cube() + cube.data + cubelist = CubeList([cube]) + merged = cubelist.merge() + self.assertEqual(len(merged), 1) + self.assertFalse(merged[0].has_lazy_data()) + self.assertEqual(merged[0], cube) + + def test_multiple_distinct(self): + cubelist = CubeList([sample_cube(name='a1'), + sample_cube(name='a2')]) + merged = cubelist.merge() + self.assertEqual(len(merged), 2) + self.assertTrue(merged[0].has_lazy_data()) + self.assertTrue(merged[1].has_lazy_data()) + self.assertEqual(merged, cubelist) + + def _sample_merge_cubelist(self): + cube1 = sample_cube(z=5) + cube2 = sample_cube(z=7) + cube2._my_data = cube2._my_data + 100.0 # NB different but still lazy + cubelist = CubeList([cube1, cube2]) + return cubelist + + def _check_sample_merged_result(self, merged): + cube, = merged + self.assertArrayAlmostEqual(cube.coord('z').points, [5.0, 7.0]) + self.assertArrayAlmostEqual(cube.data, + [[[0, 1, 2], + [3, 4, 5]], + [[100, 101, 102], + [103, 104, 105]]]) + self.assertFalse(cube.has_lazy_data()) + + def test_multiple_joined_all_lazy(self): + cubelist = self._sample_merge_cubelist() + merged = cubelist.merge() + self.assertEqual(len(merged), 1) + self.assertTrue(merged[0].has_lazy_data()) + self._check_sample_merged_result(merged) + + def test_multiple_joined_all_concrete(self): + cubelist = self._sample_merge_cubelist() + [cube.data for cube in cubelist] + merged = cubelist.merge() + self.assertEqual(len(merged), 1) + self.assertFalse(merged[0].has_lazy_data()) + self._check_sample_merged_result(merged) + + def test_multiple_joined_mixed(self): + cubelist = self._sample_merge_cubelist() + cubelist[0].data + merged = cubelist.merge() + self.assertEqual(len(merged), 1) + self.assertTrue(merged[0].has_lazy_data()) + self._check_sample_merged_result(merged) + + def test_multidim_merge(self): + # Check an example that requires a multi-dimensional stack operation. + cubelist = sample_multidim_merge_cubelist() + merged = cubelist.merge() + self.assertEqual(len(merged), 1) + cube, = merged + self.assertTrue(cube.has_lazy_data()) + self.assertEqual(cube.shape, (4, 6, 2, 3)) + + +class TestMergeDataImplementation(tests.IrisTest): + def test_multidim_merge__inner_call(self): + # Check that merging the multidimensional testcase really does make + # merge use a multidimensional array of dask-arrays as expected. + + # First patch the inner call in _merge, to record the call arguments. + original_dasktack_call = iris._merge._multidim_daskstack + global _global_call_args + _global_call_args = [] + + def passthrough_daskstack_call(stack): + global _global_call_args + _global_call_args.append(stack) + return original_dasktack_call(stack) + + stack_patch = self.patch('iris._merge._multidim_daskstack', + passthrough_daskstack_call) + + # Perform merge on the standard 'multidimensional merge testcase'. + cubelist = sample_multidim_merge_cubelist() + cubelist.merge() + + # Check the call sequence + what was passed. + self.assertEqual([arg.shape for arg in _global_call_args], + [(4, 6), (6,), (6,), (6,), (6,)]) + last_arg = _global_call_args[-1] + self.assertIsInstance(last_arg, np.ndarray) + object_dtype = np.zeros((), dtype=object).dtype + self.assertEqual(last_arg.dtype, object_dtype) + self.assertTrue(is_lazy_data(last_arg[0])) + + +if __name__ == '__main__': + tests.main() diff --git a/minimal-conda-requirements.txt b/minimal-conda-requirements.txt index 5299e438e9..a87c787ec9 100644 --- a/minimal-conda-requirements.txt +++ b/minimal-conda-requirements.txt @@ -10,6 +10,7 @@ numpy pyke udunits2 cf_units +dask # Iris build dependencies setuptools