diff --git a/CHANGELOG.md b/CHANGELOG.md index b029cd8241..b6a071c5e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ - [#680](https://github.com/helmholtz-analytics/heat/pull/680) New property: larray - [#683](https://github.com/helmholtz-analytics/heat/pull/683) New properties: nbytes, gnbytes, lnbytes - [#687](https://github.com/helmholtz-analytics/heat/pull/687) New DNDarray property: balanced + +### I/O +- [#559](https://github.com/helmholtz-analytics/heat/pull/559) Enhancement: `save_netcdf` allows naming dimensions, creating unlimited dimensions, using existing dimensions and variables, slicing ### Manipulations - [#677](https://github.com/helmholtz-analytics/heat/pull/677) split, vsplit, dsplit, hsplit ### Statistical Functions diff --git a/heat/core/io.py b/heat/core/io.py index cc93a616e1..b0c62d7eaa 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -1,6 +1,7 @@ import os.path import torch +import numpy as np import warnings from heat.core import factories @@ -308,7 +309,16 @@ def load_netcdf(path, variable, dtype=types.float32, split=None, device=None, co return dndarray.DNDarray(data, gshape, dtype, split, device, comm, balanced) - def save_netcdf(data, path, variable, mode="w", **kwargs): + def save_netcdf( + data, + path, + variable, + mode="w", + dimension_names=None, + is_unlimited=False, + file_slices=slice(None), + **kwargs + ): """ Saves data to a netCDF4 file. Attempts to utilize parallel I/O if possible. @@ -322,6 +332,17 @@ def save_netcdf(data, path, variable, mode="w", **kwargs): Name of the variable the data is saved to. mode : str, one of 'w', 'a', 'r+' File access mode + dimension_names : list or tuple or string + Specifies the netCDF Dimensions used by the variable. Ignored if + Variable already exists. + is_unlimited : bool + If True, every dimension created for this variable (i.e. doesn't + already exist) is unlimited. Already existing limited dimensions + cannot be changed to unlimited and vice versa. + file_slices : tuple of integer, slice, ellipsis or 1-d bool or + integer sequences + Keys used to slice the netCDF Variable, as given in + the nc.utils._StartCountStride method. kwargs : dict additional arguments passed to the created dataset. @@ -331,7 +352,7 @@ def save_netcdf(data, path, variable, mode="w", **kwargs): If any of the input parameters are not of correct type. ValueError If the access mode is not understood. - + If the number of dimension names does not match the number of dimensions. Examples -------- >>> a_range = ht.arange(100, split=0) @@ -343,6 +364,24 @@ def save_netcdf(data, path, variable, mode="w", **kwargs): raise TypeError("path must be str, not {}".format(type(path))) if not isinstance(variable, str): raise TypeError("variable must be str, not {}".format(type(path))) + if dimension_names is None: + dimension_names = [ + __NETCDF_DIM_TEMPLATE.format(variable, dim) for dim, _ in enumerate(data.shape) + ] + elif isinstance(dimension_names, str): + dimension_names = [dimension_names] + elif isinstance(dimension_names, tuple): + dimension_names = list(dimension_names) + elif not isinstance(dimension_names, list): + raise TypeError( + "dimension_names must be list or tuple or string, not{}".format( + type(dimension_names) + ) + ) + elif not len(dimension_names) == len(data.shape): + raise ValueError( + "{0} names given for {1} dimensions".format(len(dimension_names), len(data.shape)) + ) # we only support a subset of possible modes if mode not in __VALID_WRITE_MODES: @@ -350,54 +389,234 @@ def save_netcdf(data, path, variable, mode="w", **kwargs): "mode was {}, not in possible modes {}".format(mode, __VALID_WRITE_MODES) ) + failed = 0 + excep = None # chunk the data, if no split is set maximize parallel I/O and chunk first axis is_split = data.split is not None _, _, slices = data.comm.chunk(data.gshape, data.split if is_split else 0) - # attempt to perform parallel I/O if possible - if __nc_has_par: - with nc.Dataset(path, mode, parallel=True, comm=data.comm.handle) as handle: - dimension_names = [] - for dimension, elements in enumerate(data.shape): - name = __NETCDF_DIM_TEMPLATE.format(variable, dimension) - handle.createDimension(name, elements) - dimension_names.append(name) + def __get_expanded_split(shape, expandedShape, split): + """ + Returns the hypothetical split-axis of a dndarray of shape=shape and + split=split if it was expanded to expandedShape by adding empty dimensions. + + Parameters + ---------- + shape : tuple(int) + Shape of a dndarray. + expandedShape : tuple(int) + Shape of hypothetical expanded dndarray. + split : int or None + split-axis of dndarray. + + Returns + ------- + int + split-Axis of expanded dndarray. + + Raises + ------- + ValueError + """ + if np.prod(shape) != np.prod(expandedShape): + raise ValueError( + "Shapes %s and %s do not have the same size" % (shape, expandedShape) + ) + if np.prod(shape) == 1: # size 1 array + return split + if len(shape) == len(expandedShape): # actually not expanded at all + return split + if split is None: # not split at all + return None + # Get indices of non-empty dimensions and squeezed shapes + enumerated = [[i, v] for i, v in enumerate(shape) if v != 1] + ind_nonempty, sq_shape = list(zip(*enumerated)) # transpose + enumerated = [[i, v] for i, v in enumerate(expandedShape) if v != 1] + ex_ind_nonempty, sq_ex = list(zip(*enumerated)) # transpose + if not sq_shape == sq_ex: + raise ValueError( + "Shapes %s and %s differ in non-empty dimensions" % (shape, expandedShape) + ) + if split in ind_nonempty: # split along non-empty dimension + split_sq = ind_nonempty.index(split) # split-axis in squeezed shape + return ex_ind_nonempty[split_sq] + # split along empty dimension: split doesnt matter, only one process contains data + # return the last empty dimension (in expanded shape) before (the first nonempty dimension after split) + # number of nonempty elems before split + ne_before_split = split - shape[:split].count(1) + ind_ne_after_split = ind_nonempty[ + ne_before_split + ] # index of (first nonempty element after split) in squeezed shape + return max( + i + for i, v in enumerate(expandedShape[: max(ex_ind_nonempty[:ind_ne_after_split])]) + if v == 1 + ) - var = handle.createVariable(variable, data.dtype.char(), dimension_names, **kwargs) - var[slices] = data.larray.cpu() if is_split else data.larray[slices].cpu() + def __merge_slices(var, var_slices, data, data_slices=None): + """ + This method allows replacing: + ``var[var_slices][data_slices] = data`` + (a `netcdf4.Variable.__getitem__` and a `numpy.ndarray.__setitem__` call) + with: + ``var[ __merge_slices(var, var_slices, data, data_slices) ] = data`` + (a single `netcdf4.Variable.__setitem__` call) + + This is necessary because performing the former would, in the `__getitem__`, load the + global dataset onto every process in local `numpy-ndarrays`. Then, the `__setitem__` + would write the local `chunk` into the `numpy-ndarray`. + + The latter allows the netcdf4 library to parallelize the write-operation by directly + using the `netcdf4.Variable.__setitem__` method. + + Parameters + ---------- + var : netcdf4.Variable + Variable to which data is to be saved. + var_slices : + Keys to pass to the set-operator. + data : dndarray + Data to be saved. + data_slices: tuple of slices + As returned by the data.comm.chunk method. + + Returns + ------- + tuple of (slice or integer sequence) + Keys for the set-operation. + """ + slices = data_slices + if slices is None: + _, _, slices = data.comm.chunk(data.gshape, data.split if is_split else 0) + start, count, stride, _ = nc.utils._StartCountStride( + elem=var_slices, + shape=var.shape, + dimensions=var.dimensions, + grp=var.group(), + datashape=data.shape, + put=True, + ) + out_shape = nc._netCDF4._out_array_shape(count) + out_split = __get_expanded_split(data.shape, out_shape, data.split) + + start, count, stride = start.T, count.T, stride.T # transpose for iteration + stop = start + stride * count + new_slices = [] + for begin, end, step in zip(start, stop, stride): + if begin.size == 1: + begin, end, step = begin.item(), end.item(), step.item() + new_slices.append(slice(begin, end, step)) + else: + begin, end, step = begin.flatten(), end.flatten(), step.flatten() + new_slices.append( + np.r_[ + tuple( + slice(b.item(), e.item(), s.item()) + for b, e, s in zip(begin, end, step) + ) + ] + ) + if out_split is not None: # add split-slice + if isinstance(new_slices[out_split], slice): + start, stop, step = ( + new_slices[out_split].start, + new_slices[out_split].stop, + new_slices[out_split].step, + ) + sliced = range(start, stop, step)[slices[data.split]] + a, b, c = sliced.start, sliced.stop, sliced.step + a = None if a < 0 else a + b = None if b < 0 else b + new_slices[out_split] = slice(a, b, c) + # new_slices[out_split] = sliced + elif isinstance(new_slices[out_split], np.ndarray): + new_slices[out_split] = new_slices[out_split][slices[data.split]] + else: + new_slices[out_split] = np.r_[new_slices[out_split]][slices[data.split]] + return tuple(new_slices) + # attempt to perform parallel I/O if possible + if __nc_has_par: + try: + with nc.Dataset(path, mode, parallel=True, comm=data.comm.handle) as handle: + if variable in handle.variables: + var = handle.variables[variable] + else: + for name, elements in zip(dimension_names, data.shape): + if name not in handle.dimensions: + handle.createDimension(name, elements if not is_unlimited else None) + var = handle.createVariable( + variable, data.dtype.char(), dimension_names, **kwargs + ) + merged_slices = __merge_slices(var, file_slices, data) + try: + var[merged_slices] = ( + data.larray.cpu() if is_split else data.larray[slices].cpu() + ) + except RuntimeError: + var.set_collective(True) + var[merged_slices] = ( + data.larray.cpu() if is_split else data.larray[slices].cpu() + ) + except Exception as e: + failed = data.comm.rank + 1 + excep = e # otherwise a single rank only write is performed in case of local data (i.e. no split) elif data.comm.rank == 0: - with nc.Dataset(path, mode) as handle: - dimension_names = [] - for dimension, elements in enumerate(data.shape): - name = __NETCDF_DIM_TEMPLATE.format(variable, dimension) - handle.createDimension(name, elements) - dimension_names.append(name) - - var = handle.createVariable( - variable, data.dtype.char(), tuple(dimension_names), **kwargs - ) - if is_split: - var[slices] = data.larray.cpu() - else: - var[:] = data.larray.cpu() - - # ping next rank if it exists - if is_split and data.comm.size > 1: - data.comm.Isend([None, 0, MPI.INT], dest=1) - data.comm.Recv([None, 0, MPI.INT], source=data.comm.size - 1) - - # no MPI, but data is split, we have to serialize the writes - elif is_split: + try: + with nc.Dataset(path, mode) as handle: + if variable in handle.variables: + var = handle.variables[variable] + else: + for name, elements in zip(dimension_names, data.shape): + if name not in handle.dimensions: + handle.createDimension(name, elements if not is_unlimited else None) + var = handle.createVariable( + variable, data.dtype.char(), dimension_names, **kwargs + ) + var.set_collective(False) # not possible with non-parallel netcdf + if is_split: + merged_slices = __merge_slices(var, file_slices, data) + var[merged_slices] = data.larray.cpu() + else: + var[file_slices] = data.larray.cpu() + except Exception as e: + failed = 1 + excep = e + finally: + if data.comm.size > 1: + data.comm.isend(failed, dest=1) + data.comm.recv() + + # non-root + else: # wait for the previous rank to finish writing its chunk, then write own part - data.comm.Recv([None, 0, MPI.INT], source=data.comm.rank - 1) - with nc.Dataset(path, "r+") as handle: - handle[variable][slices] = data.larray.cpu() - - # ping the next node in the communicator, wrap around to 0 to complete barrier behavior - next_rank = (data.comm.rank + 1) % data.comm.size - data.comm.Isend([None, 0, MPI.INT], dest=next_rank) + failed = data.comm.recv() + try: + # no MPI, but data is split, we have to serialize the writes + if not failed and is_split: + with nc.Dataset(path, "r+") as handle: + var = handle.variables[variable] + var.set_collective(False) # not possible with non-parallel netcdf + merged_slices = __merge_slices(var, file_slices, data) + var[merged_slices] = data.larray.cpu() + except Exception as e: + failed = data.comm.rank + 1 + excep = e + finally: + # ping the next node in the communicator, wrap around to 0 to complete barrier behavior + next_rank = (data.comm.rank + 1) % data.comm.size + data.comm.isend(failed, dest=next_rank) + + failed = data.comm.allreduce(failed, op=MPI.MAX) + if failed - 1 == data.comm.rank: + data.comm.bcast(excep, root=failed - 1) + raise excep + elif failed: + excep = data.comm.bcast(excep, root=failed - 1) + excep.args = "raised by process rank {}".format(failed - 1), *excep.args + raise excep from None # raise the same error but without traceback + # because that is on a different process def load(path, *args, **kwargs): diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index ee7a231729..f4fb0ba3e6 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -18,6 +18,7 @@ def setUpClass(cls): cls.NETCDF_PATH = os.path.join(os.getcwd(), "heat/datasets/data/iris.nc") cls.NETCDF_OUT_PATH = pwd + "/test.nc" cls.NETCDF_VARIABLE = "data" + cls.NETCDF_DIMENSION = "data" # load comparison data from csv cls.CSV_PATH = os.path.join(os.getcwd(), "heat/datasets/data/iris.csv") @@ -214,6 +215,158 @@ def test_save(self): ) self.assertTrue((local_range.larray == comparison).all()) + # naming dimensions: string + local_range = ht.arange(100, device=self.device) + local_range.save( + self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, dimension_names=self.NETCDF_DIMENSION + ) + if local_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = handle[self.NETCDF_VARIABLE].dimensions + self.assertTrue(self.NETCDF_DIMENSION in comparison) + + # naming dimensions: tuple + local_range = ht.arange(100, device=self.device) + local_range.save( + self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, dimension_names=(self.NETCDF_DIMENSION,) + ) + if local_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = handle[self.NETCDF_VARIABLE].dimensions + self.assertTrue(self.NETCDF_DIMENSION in comparison) + + # appending unlimited variable + split_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, is_unlimited=True) + ht.MPI_WORLD.Barrier() + split_range.save( + self.NETCDF_OUT_PATH, + self.NETCDF_VARIABLE, + mode="r+", + file_slices=slice(split_range.size, None, None), + # debug=True, + ) + if split_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = torch.tensor( + handle[self.NETCDF_VARIABLE][:], + dtype=torch.int32, + device=self.device.torch_device, + ) + self.assertTrue( + (ht.concatenate((local_range, local_range)).larray == comparison).all() + ) + + # indexing netcdf file: single index + ht.MPI_WORLD.Barrier() + zeros = ht.zeros((20, 1, 20, 2), device=self.device) + zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") + ones = ht.ones(20, device=self.device) + indices = (-1, 0, slice(None), 1) + ones.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=indices) + if split_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = torch.tensor( + handle[self.NETCDF_VARIABLE][indices], + dtype=torch.int32, + device=self.device.torch_device, + ) + self.assertTrue((ones.larray == comparison).all()) + + # indexing netcdf file: multiple indices + ht.MPI_WORLD.Barrier() + small_range_split = ht.arange(10, split=0, device=self.device) + small_range = ht.arange(10, device=self.device) + indices = [[0, 9, 5, 2, 1, 3, 7, 4, 8, 6]] + small_range_split.save( + self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w", file_slices=indices + ) + if split_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = torch.tensor( + handle[self.NETCDF_VARIABLE][indices], + dtype=torch.int32, + device=self.device.torch_device, + ) + self.assertTrue((small_range.larray == comparison).all()) + + # slicing netcdf file + sslice = slice(7, 2, -1) + range_five_split = ht.arange(5, split=0, device=self.device) + range_five = ht.arange(5, device=self.device) + range_five_split.save( + self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=sslice + ) + if split_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = torch.tensor( + handle[self.NETCDF_VARIABLE][sslice], + dtype=torch.int32, + device=self.device.torch_device, + ) + self.assertTrue((range_five.larray == comparison).all()) + + # indexing netcdf file: broadcasting array + zeros = ht.zeros((2, 1, 1, 4), device=self.device) + zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") + ones = ht.ones((4), split=0, device=self.device) + ones_nosplit = ht.ones((4), split=None, device=self.device) + indices = (0, slice(None), slice(None)) + ones.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=indices) + if split_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = torch.tensor( + handle[self.NETCDF_VARIABLE][indices], + dtype=torch.int32, + device=self.device.torch_device, + ) + self.assertTrue((ones_nosplit.larray == comparison).all()) + + # indexing netcdf file: broadcasting var + ht.MPI_WORLD.Barrier() + zeros = ht.zeros((2, 2), device=self.device) + zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") + ones = ht.ones((1, 2, 1), split=0, device=self.device) + ones_nosplit = ht.ones((1, 2, 1), device=self.device) + indices = (0,) + ones.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=indices) + if split_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = torch.tensor( + handle[self.NETCDF_VARIABLE][indices], + dtype=torch.int32, + device=self.device.torch_device, + ) + self.assertTrue((ones_nosplit.larray == comparison).all()) + + # indexing netcdf file: broadcasting ones + ht.MPI_WORLD.Barrier() + zeros = ht.zeros((1, 1, 1, 1), device=self.device) + zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") + ones = ht.ones((1, 1), device=self.device) + ones.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+") + if split_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = torch.tensor( + handle[self.NETCDF_VARIABLE][indices], + dtype=torch.int32, + device=self.device.torch_device, + ) + self.assertTrue((ones.larray == comparison).all()) + + # different split and dtype + ht.MPI_WORLD.Barrier() + zeros = ht.zeros((2, 2), split=1, dtype=ht.int32, device=self.device) + zeros_nosplit = ht.zeros((2, 2), dtype=ht.int32, device=self.device) + zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") + if split_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = torch.tensor( + handle[self.NETCDF_VARIABLE][:], + dtype=torch.int32, + device=self.device.torch_device, + ) + self.assertTrue((zeros_nosplit.larray == comparison).all()) + def test_save_exception(self): data = ht.arange(1) @@ -235,6 +388,26 @@ def test_save_exception(self): ht.save(data, 1, self.NETCDF_VARIABLE) with self.assertRaises(TypeError): ht.save(data, self.NETCDF_OUT_PATH, 1) + with self.assertRaises(ValueError): + ht.save(data, self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r") + with self.assertRaises((ValueError, IndexError)): + ht.save(data, self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE) + ht.save( + ht.arange(2, split=0), + self.NETCDF_OUT_PATH, + self.NETCDF_VARIABLE, + file_slices=slice(None), + mode="a", + ) + with self.assertRaises((ValueError, IndexError)): + ht.save(data, self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE) + ht.save( + ht.arange(2, split=None), + self.NETCDF_OUT_PATH, + self.NETCDF_VARIABLE, + file_slices=slice(None), + mode="a", + ) else: with self.assertRaises(ValueError): ht.save(data, self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE) @@ -436,6 +609,10 @@ def test_save_netcdf_exception(self): ht.save_netcdf(data, 1, self.NETCDF_VARIABLE) with self.assertRaises(TypeError): ht.save_netcdf(data, self.NETCDF_PATH, 1) + with self.assertRaises(TypeError): + ht.save_netcdf(data, self.NETCDF_PATH, self.NETCDF_VARIABLE, dimension_names=1) + with self.assertRaises(ValueError): + ht.save_netcdf(data, self.NETCDF_PATH, self.NETCDF_VARIABLE, dimension_names=["a", "b"]) # def test_remove_folder(self): # ht.MPI_WORLD.Barrier()