From 51bfd8b5db2644f26a6c06c4ff9bd2a63793395d Mon Sep 17 00:00:00 2001 From: Benjamin Bourgart Date: Thu, 28 Nov 2019 16:34:06 +0100 Subject: [PATCH 01/25] removed copy constructor and use collective netcdf-output --- heat/core/io.py | 3 ++- setup.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index 9f3a07bc74..ed41b2c0aa 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -298,7 +298,7 @@ def load_netcdf(path, variable, dtype=types.float32, split=None, device=None, co # chunk up the data portion _, local_shape, indices = comm.chunk(gshape, split) if split is None or local_shape[split] > 0: - data = torch.tensor( + data = torch.as_tensor( data[indices], dtype=dtype.torch_type(), device=device.torch_device ) else: @@ -364,6 +364,7 @@ def save_netcdf(data, path, variable, mode="w", **kwargs): dimension_names.append(name) var = handle.createVariable(variable, data.dtype.char(), dimension_names, **kwargs) + var.set_collective(True) var[slices] = ( data._DNDarray__array.cpu() if is_split else data._DNDarray__array[slices].cpu() ) diff --git a/setup.py b/setup.py index 41601d6fd7..a79e3193e4 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ setup( name="heat", - packages=["heat", "heat.core", "heat.ml", "heat.ml.cluster"], + packages=["heat", "heat.core", "heat.ml", "heat.ml.cluster", 'heat.ml.regression', 'heat.ml.regression.lasso'], data_files=["README.md", "LICENSE"], version=version.__version__, description="A framework for high performance data analytics and machine learning.", From 4f87064cb21e5da1880df1510f52874a9d8f5974 Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Fri, 24 Apr 2020 13:03:53 +0200 Subject: [PATCH 02/25] Adding IO features and tests --- heat/core/io.py | 128 ++++++++++++++++++++++++++++++------- heat/core/tests/test_io.py | 59 +++++++++++++++++ 2 files changed, 165 insertions(+), 22 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index ed41b2c0aa..841d97076a 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -298,7 +298,7 @@ def load_netcdf(path, variable, dtype=types.float32, split=None, device=None, co # chunk up the data portion _, local_shape, indices = comm.chunk(gshape, split) if split is None or local_shape[split] > 0: - data = torch.as_tensor( + data = torch.tensor( data[indices], dtype=dtype.torch_type(), device=device.torch_device ) else: @@ -308,7 +308,7 @@ def load_netcdf(path, variable, dtype=types.float32, split=None, device=None, co return dndarray.DNDarray(data, gshape, dtype, split, device, comm) - def save_netcdf(data, path, variable, mode="w", **kwargs): + def save_netcdf(data, path, variable, mode="w", dimension_names=None, is_unlimited=False, file_slices=slice(None), **kwargs): """ Saves data to a netCDF4 file. Attempts to utilize parallel I/O if possible. @@ -322,6 +322,15 @@ def save_netcdf(data, path, variable, mode="w", **kwargs): Name of the variable the data is saved to. mode : str, one of 'w', 'a', 'r+' File access mode + dimension_names : list or tuple or string + Specifies the netCDF Dimensions used by the variable. + is_unlimited : bool + If True, every dimension created for this variable (i.e. doesn't + already exist) is unlimited. Already existing limited dimensions + cannot be changed to unlimited and vice versa. + file_slices : tuple of integer, slice, ellipsis or 1-d bool or + integer sequences used to slice the netCDF Variable, as given in + the nc.utils._StartCountStride method. kwargs : dict additional arguments passed to the created dataset. @@ -331,7 +340,7 @@ def save_netcdf(data, path, variable, mode="w", **kwargs): If any of the input parameters are not of correct type. ValueError If the access mode is not understood. - + If the number of dimension names does not match the number of dimensions. Examples -------- >>> a_range = ht.arange(100, split=0) @@ -343,6 +352,17 @@ def save_netcdf(data, path, variable, mode="w", **kwargs): raise TypeError("path must be str, not {}".format(type(path))) if not isinstance(variable, str): raise TypeError("variable must be str, not {}".format(type(path))) + if dimension_names is not None: + if isinstance(dimension_names, str): + dimension_names = [dimension_names] + if isinstance(dimension_names, tuple): + dimension_names = list(dimension_names) + if not isinstance(dimension_names, list): + raise TypeError("dimension_names must be list or tuple or string, not{}".format(type(dimension_names))) + if not len(dimension_names) == len(data.shape): + raise ValueError("{0} names given for {1} dimensions".format(len(dimension_names), len(data.shape))) + else: + dimension_names = [__NETCDF_DIM_TEMPLATE.format(variable, dim) for dim, _ in enumerate(data.shape)] # we only support a subset of possible modes if mode not in __VALID_WRITE_MODES: @@ -357,34 +377,98 @@ def save_netcdf(data, path, variable, mode="w", **kwargs): # attempt to perform parallel I/O if possible if __nc_has_par: with nc.Dataset(path, mode, parallel=True, comm=data.comm.handle) as handle: - dimension_names = [] - for dimension, elements in enumerate(data.shape): - name = __NETCDF_DIM_TEMPLATE.format(variable, dimension) - handle.createDimension(name, elements) - dimension_names.append(name) + for name, elements in zip(dimension_names, data.shape): + if name not in handle.dimensions: + handle.createDimension(name, elements if not is_unlimited else None) - var = handle.createVariable(variable, data.dtype.char(), dimension_names, **kwargs) + if variable in handle.variables: + var = handle.variables[variable] + else: + var = handle.createVariable(variable, data.dtype.char(), dimension_names, **kwargs) var.set_collective(True) - var[slices] = ( + + start, count, stride, _ = nc.utils._StartCountStride( + elem=file_slices, + shape=var.shape, + dimensions=dimension_names, + grp=var.group(), + datashape=data.shape, + put=True, + ) + start = start.reshape(-1) + count = count.reshape(-1) + stride = stride.reshape(-1) + stop = start + stride * count + new_slices = [] + for begin, end, step, htSlice in zip(start, stop, stride, slices): + """ + We need var[file_slices][htSlices] = data, but netcdf can + only parallelize the first call. Therefore, we need to + merge the slices: + var[new_slices] = data + Because slices cannot be sliced but are similar to ranges + (i.e. consist of start, stop, step) which can be sliced: + 1) Build a range + 2) slice the range + 3) Build slice of the resulting range + """ + range_from_slice = range(begin, end, step) + sliced = range_from_slice[htSlice] + a, b, c = sliced.start, sliced.stop, sliced.step + """ + Negative values in ranges exist to include zero (in case of + negative stride) and actual negative numbers. This is + incompatible with negative slicing. Because + nc.utils._StartCountStride already transforms negative + slice-indices to their corresponding positive value, + negative values at this point only include zero. In slices, + this is done by using None. + """ + a = None if a < 0 else a + b = None if b < 0 else b + new_slices.append(slice(a, b, c)) + + var[tuple(new_slices)] = ( data._DNDarray__array.cpu() if is_split else data._DNDarray__array[slices].cpu() ) # otherwise a single rank only write is performed in case of local data (i.e. no split) elif data.comm.rank == 0: with nc.Dataset(path, mode) as handle: - dimension_names = [] - for dimension, elements in enumerate(data.shape): - name = __NETCDF_DIM_TEMPLATE.format(variable, dimension) - handle.createDimension(name, elements) - dimension_names.append(name) - - var = handle.createVariable( - variable, data.dtype.char(), tuple(dimension_names), **kwargs - ) + for name, elements in zip(dimension_names, data.shape): + if name not in handle.dimensions: + handle.createDimension(name, elements if not is_unlimited else None) + + if variable in handle.variables: + var = handle.variables[variable] + else: + var = handle.createVariable( + variable, data.dtype.char(), dimension_names, **kwargs + ) if is_split: - var[slices] = data._DNDarray__array.cpu() + start, count, stride, _ = nc.utils._StartCountStride( + elem=file_slices, + shape=var.shape, + dimensions=dimension_names, + grp=var.group(), + datashape=data.shape, + put=True, + ) + start = start.reshape(-1) + count = count.reshape(-1) + stride = stride.reshape(-1) + stop = start + stride * count + new_slices = [] + for begin, end, step, htSlice in zip(start, stop, stride, slices): + range_from_slice = range(begin, end, step) + sliced = range_from_slice[htSlice] + a, b, c = sliced.start, sliced.stop, sliced.step + a = None if a < 0 else a + b = None if b < 0 else b + new_slices.append(slice(a, b, c)) + var[tuple(new_slices)] = data._DNDarray__array.cpu() else: - var[:] = data._DNDarray__array.cpu() + var[file_slices] = data._DNDarray__array.cpu() # ping next rank if it exists if is_split and data.comm.size > 1: @@ -396,7 +480,7 @@ def save_netcdf(data, path, variable, mode="w", **kwargs): # wait for the previous rank to finish writing its chunk, then write own part data.comm.Recv([None, 0, MPI.INT], source=data.comm.rank - 1) with nc.Dataset(path, "r+") as handle: - handle[variable][slices] = data._DNDarray__array.cpu() + handle[variable][file_slices][slices] = data._DNDarray__array.cpu() # ping the next node in the communicator, wrap around to 0 to complete barrier behavior next_rank = (data.comm.rank + 1) % data.comm.size diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index f44896aa28..8ee17645a5 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -29,6 +29,7 @@ def setUpClass(cls): cls.NETCDF_PATH = os.path.join(os.getcwd(), "heat/datasets/data/iris.nc") cls.NETCDF_OUT_PATH = os.path.join(tempfile.gettempdir(), "test.nc") cls.NETCDF_VARIABLE = "data" + cls.NETCDF_DIMENSION = "data" # load comparison data from csv cls.CSV_PATH = os.path.join(os.getcwd(), "heat/datasets/data/iris.csv") @@ -212,6 +213,64 @@ def test_save(self): ) self.assertTrue((local_range._DNDarray__array == comparison).all()) + # naming dimensions + local_range = ht.arange(100, device=ht_device) + local_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, + dimension_names=self.NETCDF_DIMENSION) + if local_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = handle[self.NETCDF_VARIABLE].dimensions + self.assertTrue(self.NETCDF_DIMENSION in comparison) + + # appending unlimited variable + split_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, + is_unlimited=True) + split_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", + file_slices=slice(split_range.size, None, None)) + if split_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = torch.tensor( + handle[self.NETCDF_VARIABLE][:], dtype=torch.int32, device=device + ) + self.assertTrue( + (ht.concatenate((local_range, local_range))._DNDarray__array == comparison).all() + ) + + # indexing netcdf file: single index + one = ht.ones(1, device=ht_device) + indices = [-1] + one.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=indices) + if split_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = torch.tensor( + handle[self.NETCDF_VARIABLE][indices], dtype=torch.int32, device=device + ) + self.assertTrue((one._DNDarray__array == comparison).all()) + + # indexing netcdf file: multiple indices + small_range = ht.arange(5, split=0, device=ht_device) + indices = [[0, 1, 2, 3, 4]] + small_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", + file_slices=indices) + if split_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = torch.tensor( + handle[self.NETCDF_VARIABLE][indices], dtype=torch.int32, device=device + ) + self.assertTrue((small_range._DNDarray__array == comparison).all()) + + # slicing netcdf file + small_range = ht.arange(5, split=0, device=ht_device) + sslice = slice(7, 2, -1) + small_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", + file_slices=sslice) + if split_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = torch.tensor( + handle[self.NETCDF_VARIABLE][sslice], dtype=torch.int32, device=device + ) + self.assertTrue((small_range._DNDarray__array == comparison).all()) + def test_save_exception(self): data = ht.arange(1, device=ht_device) From edff24ff7300647a6483d3f1fb4c0775dae4fab1 Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Wed, 29 Apr 2020 16:54:05 +0200 Subject: [PATCH 03/25] run pre-commit --- heat/core/io.py | 35 ++++++++++++++++++++++++++++------- heat/core/tests/test_io.py | 30 +++++++++++++++++++----------- 2 files changed, 47 insertions(+), 18 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index 841d97076a..e0bdfd1de9 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -308,7 +308,16 @@ def load_netcdf(path, variable, dtype=types.float32, split=None, device=None, co return dndarray.DNDarray(data, gshape, dtype, split, device, comm) - def save_netcdf(data, path, variable, mode="w", dimension_names=None, is_unlimited=False, file_slices=slice(None), **kwargs): + def save_netcdf( + data, + path, + variable, + mode="w", + dimension_names=None, + is_unlimited=False, + file_slices=slice(None), + **kwargs + ): """ Saves data to a netCDF4 file. Attempts to utilize parallel I/O if possible. @@ -358,11 +367,21 @@ def save_netcdf(data, path, variable, mode="w", dimension_names=None, is_unlimit if isinstance(dimension_names, tuple): dimension_names = list(dimension_names) if not isinstance(dimension_names, list): - raise TypeError("dimension_names must be list or tuple or string, not{}".format(type(dimension_names))) + raise TypeError( + "dimension_names must be list or tuple or string, not{}".format( + type(dimension_names) + ) + ) if not len(dimension_names) == len(data.shape): - raise ValueError("{0} names given for {1} dimensions".format(len(dimension_names), len(data.shape))) + raise ValueError( + "{0} names given for {1} dimensions".format( + len(dimension_names), len(data.shape) + ) + ) else: - dimension_names = [__NETCDF_DIM_TEMPLATE.format(variable, dim) for dim, _ in enumerate(data.shape)] + dimension_names = [ + __NETCDF_DIM_TEMPLATE.format(variable, dim) for dim, _ in enumerate(data.shape) + ] # we only support a subset of possible modes if mode not in __VALID_WRITE_MODES: @@ -384,7 +403,9 @@ def save_netcdf(data, path, variable, mode="w", dimension_names=None, is_unlimit if variable in handle.variables: var = handle.variables[variable] else: - var = handle.createVariable(variable, data.dtype.char(), dimension_names, **kwargs) + var = handle.createVariable( + variable, data.dtype.char(), dimension_names, **kwargs + ) var.set_collective(True) start, count, stride, _ = nc.utils._StartCountStride( @@ -394,7 +415,7 @@ def save_netcdf(data, path, variable, mode="w", dimension_names=None, is_unlimit grp=var.group(), datashape=data.shape, put=True, - ) + ) start = start.reshape(-1) count = count.reshape(-1) stride = stride.reshape(-1) @@ -453,7 +474,7 @@ def save_netcdf(data, path, variable, mode="w", dimension_names=None, is_unlimit grp=var.group(), datashape=data.shape, put=True, - ) + ) start = start.reshape(-1) count = count.reshape(-1) stride = stride.reshape(-1) diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index 8ee17645a5..21693ca8a7 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -215,25 +215,31 @@ def test_save(self): # naming dimensions local_range = ht.arange(100, device=ht_device) - local_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, - dimension_names=self.NETCDF_DIMENSION) + local_range.save( + self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, dimension_names=self.NETCDF_DIMENSION + ) if local_range.comm.rank == 0: with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: comparison = handle[self.NETCDF_VARIABLE].dimensions self.assertTrue(self.NETCDF_DIMENSION in comparison) # appending unlimited variable - split_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, - is_unlimited=True) - split_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", - file_slices=slice(split_range.size, None, None)) + split_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, is_unlimited=True) + split_range.save( + self.NETCDF_OUT_PATH, + self.NETCDF_VARIABLE, + mode="r+", + file_slices=slice(split_range.size, None, None), + ) if split_range.comm.rank == 0: with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: comparison = torch.tensor( handle[self.NETCDF_VARIABLE][:], dtype=torch.int32, device=device ) self.assertTrue( - (ht.concatenate((local_range, local_range))._DNDarray__array == comparison).all() + ( + ht.concatenate((local_range, local_range))._DNDarray__array == comparison + ).all() ) # indexing netcdf file: single index @@ -250,8 +256,9 @@ def test_save(self): # indexing netcdf file: multiple indices small_range = ht.arange(5, split=0, device=ht_device) indices = [[0, 1, 2, 3, 4]] - small_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", - file_slices=indices) + small_range.save( + self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=indices + ) if split_range.comm.rank == 0: with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: comparison = torch.tensor( @@ -262,8 +269,9 @@ def test_save(self): # slicing netcdf file small_range = ht.arange(5, split=0, device=ht_device) sslice = slice(7, 2, -1) - small_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", - file_slices=sslice) + small_range.save( + self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=sslice + ) if split_range.comm.rank == 0: with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: comparison = torch.tensor( From 2d994719398ac9bf63b655ed9e0d09ae6e736ce3 Mon Sep 17 00:00:00 2001 From: Benjamin Bourgart Date: Thu, 7 May 2020 15:56:06 +0200 Subject: [PATCH 04/25] Fixed: collective IO only when necessary, also tests --- heat/core/io.py | 14 ++++++++++---- heat/core/tests/test_io.py | 8 ++++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index e0bdfd1de9..3be34c194c 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -406,7 +406,6 @@ def save_netcdf( var = handle.createVariable( variable, data.dtype.char(), dimension_names, **kwargs ) - var.set_collective(True) start, count, stride, _ = nc.utils._StartCountStride( elem=file_slices, @@ -449,9 +448,16 @@ def save_netcdf( b = None if b < 0 else b new_slices.append(slice(a, b, c)) - var[tuple(new_slices)] = ( - data._DNDarray__array.cpu() if is_split else data._DNDarray__array[slices].cpu() - ) + try: + var[tuple(new_slices)] = ( + data._DNDarray__array.cpu() if is_split else data._DNDarray__array[slices].cpu() + ) + except RuntimeError: + var.set_collective(True) + var[tuple(new_slices)] = ( + data._DNDarray__array.cpu() if is_split else data._DNDarray__array[slices].cpu() + ) + # otherwise a single rank only write is performed in case of local data (i.e. no split) elif data.comm.rank == 0: diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index 21693ca8a7..11b98ba72f 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -254,9 +254,10 @@ def test_save(self): self.assertTrue((one._DNDarray__array == comparison).all()) # indexing netcdf file: multiple indices - small_range = ht.arange(5, split=0, device=ht_device) + small_range_split = ht.arange(5, split=0, device=ht_device) + small_range = ht.arange(5, device=ht_device) indices = [[0, 1, 2, 3, 4]] - small_range.save( + small_range_split.save( self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=indices ) if split_range.comm.rank == 0: @@ -267,9 +268,8 @@ def test_save(self): self.assertTrue((small_range._DNDarray__array == comparison).all()) # slicing netcdf file - small_range = ht.arange(5, split=0, device=ht_device) sslice = slice(7, 2, -1) - small_range.save( + small_range_split.save( self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=sslice ) if split_range.comm.rank == 0: From 124dd96cff1f212cc261442310320ab181313389 Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Thu, 7 May 2020 16:02:52 +0200 Subject: [PATCH 05/25] pre-commit ran --- heat/core/io.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index 3be34c194c..f02b39ceea 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -450,15 +450,18 @@ def save_netcdf( try: var[tuple(new_slices)] = ( - data._DNDarray__array.cpu() if is_split else data._DNDarray__array[slices].cpu() + data._DNDarray__array.cpu() + if is_split + else data._DNDarray__array[slices].cpu() ) except RuntimeError: var.set_collective(True) var[tuple(new_slices)] = ( - data._DNDarray__array.cpu() if is_split else data._DNDarray__array[slices].cpu() + data._DNDarray__array.cpu() + if is_split + else data._DNDarray__array[slices].cpu() ) - # otherwise a single rank only write is performed in case of local data (i.e. no split) elif data.comm.rank == 0: with nc.Dataset(path, mode) as handle: From a531173e10e3b4eedeb7508788fca27298456efd Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Sat, 9 May 2020 13:58:07 +0200 Subject: [PATCH 06/25] added support for index-lists and additional fixed dimensions --- heat/core/io.py | 123 ++++++++++++++++++++++++------------- heat/core/tests/test_io.py | 24 +++++--- 2 files changed, 94 insertions(+), 53 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index f02b39ceea..3fcc379d2e 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -1,6 +1,7 @@ import os.path import torch +import numpy as np import warnings from heat.core import factories @@ -406,48 +407,67 @@ def save_netcdf( var = handle.createVariable( variable, data.dtype.char(), dimension_names, **kwargs ) - - start, count, stride, _ = nc.utils._StartCountStride( + start, count, stride, inds = nc.utils._StartCountStride( elem=file_slices, shape=var.shape, - dimensions=dimension_names, + dimensions=var.dimensions, grp=var.group(), datashape=data.shape, put=True, ) - start = start.reshape(-1) - count = count.reshape(-1) - stride = stride.reshape(-1) + start, count, stride = start.T, count.T, stride.T # transpose for iteration stop = start + stride * count new_slices = [] - for begin, end, step, htSlice in zip(start, stop, stride, slices): - """ - We need var[file_slices][htSlices] = data, but netcdf can - only parallelize the first call. Therefore, we need to - merge the slices: - var[new_slices] = data - Because slices cannot be sliced but are similar to ranges - (i.e. consist of start, stop, step) which can be sliced: - 1) Build a range - 2) slice the range - 3) Build slice of the resulting range - """ - range_from_slice = range(begin, end, step) - sliced = range_from_slice[htSlice] - a, b, c = sliced.start, sliced.stop, sliced.step - """ - Negative values in ranges exist to include zero (in case of - negative stride) and actual negative numbers. This is - incompatible with negative slicing. Because - nc.utils._StartCountStride already transforms negative - slice-indices to their corresponding positive value, - negative values at this point only include zero. In slices, - this is done by using None. - """ - a = None if a < 0 else a - b = None if b < 0 else b - new_slices.append(slice(a, b, c)) + htSlices = list(slices) + shape_index = 0 + for i in range(count.shape[0]): # append fixed dimensions to heat slices + if shape_index >= len(data.shape): + htSlices.append(slice(None)) + elif count[i].sum() != data.shape[shape_index]: + htSlices.insert(i, slice(None)) + else: + shape_index += 1 + htSlices = tuple(htSlices) + + for begin, end, step, htSlice in zip(start, stop, stride, htSlices): + if begin.size == 1: + begin, end, step = begin.item(), end.item(), step.item() + """ + We need var[file_slices][htSlices] = data, but netcdf can + only parallelize the first call. Therefore, we need to + merge the slices: + var[new_slices] = data + Because slices cannot be sliced but are similar to ranges + (i.e. consist of start, stop, step) which can be sliced: + 1) Build a range + 2) slice the range + 3) Build slice of the resulting range + """ + range_from_slice = range(begin, end, step) + sliced = range_from_slice[htSlice] + a, b, c = sliced.start, sliced.stop, sliced.step + """ + Negative values in ranges exist to include zero (in case of + negative stride) and actual negative numbers. This is + incompatible with negative slicing. Because + nc.utils._StartCountStride already transforms negative + slice-indices to their corresponding positive value, + negative values at this point only include zero. In slices, + this is done by using None. + """ + a = None if a < 0 else a + b = None if b < 0 else b + new_slices.append(slice(a, b, c)) + else: + """ + If there is more than one slice along one dimension, slice notation cannot be used. + Fall back to using a list of integers as indices. + """ + ranges = tuple( + slice(b.item(), e.item(), s.item()) for b, e, s in zip(begin, end, step) + ) + new_slices.append(np.r_[ranges][htSlice]) try: var[tuple(new_slices)] = ( data._DNDarray__array.cpu() @@ -484,18 +504,35 @@ def save_netcdf( datashape=data.shape, put=True, ) - start = start.reshape(-1) - count = count.reshape(-1) - stride = stride.reshape(-1) + start, count, stride = start.T, count.T, stride.T # transpose for iteration stop = start + stride * count new_slices = [] - for begin, end, step, htSlice in zip(start, stop, stride, slices): - range_from_slice = range(begin, end, step) - sliced = range_from_slice[htSlice] - a, b, c = sliced.start, sliced.stop, sliced.step - a = None if a < 0 else a - b = None if b < 0 else b - new_slices.append(slice(a, b, c)) + + htSlices = list(slices) + shape_index = 0 + for i in range(count.shape[0]): # append fixed dimensions to heat slices + if shape_index >= len(data.shape): + htSlices.append(slice(None)) + elif count[i].sum() != data.shape[shape_index]: + htSlices.insert(i, slice(None)) + else: + shape_index += 1 + htSlices = tuple(htSlices) + + for begin, end, step, htSlice in zip(start, stop, stride, htSlices): + if begin.size == 1: + range_from_slice = range(begin, end, step) + sliced = range_from_slice[htSlice] + a, b, c = sliced.start, sliced.stop, sliced.step + a = None if a < 0 else a + b = None if b < 0 else b + new_slices.append(slice(a, b, c)) + else: + ranges = tuple( + slice(b.item(), e.item(), s.item()) + for b, e, s in zip(begin, end, step) + ) + new_slices.append(np.r_[ranges][htSlice]) var[tuple(new_slices)] = data._DNDarray__array.cpu() else: var[file_slices] = data._DNDarray__array.cpu() diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index 11b98ba72f..66a2480f43 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -243,22 +243,24 @@ def test_save(self): ) # indexing netcdf file: single index - one = ht.ones(1, device=ht_device) - indices = [-1] - one.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=indices) + zeros = ht.zeros((20,1,20,2), device=ht_device) + zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") + ones = ht.ones(20, device=ht_device) + indices = (-1,0,slice(None),1) + ones.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=indices) if split_range.comm.rank == 0: with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: comparison = torch.tensor( handle[self.NETCDF_VARIABLE][indices], dtype=torch.int32, device=device ) - self.assertTrue((one._DNDarray__array == comparison).all()) + self.assertTrue((ones._DNDarray__array == comparison).all()) # indexing netcdf file: multiple indices - small_range_split = ht.arange(5, split=0, device=ht_device) - small_range = ht.arange(5, device=ht_device) - indices = [[0, 1, 2, 3, 4]] + small_range_split = ht.arange(10, split=0, device=ht_device) + small_range = ht.arange(10, device=ht_device) + indices = [[0, 9, 5, 2, 1, 3, 7, 4, 8, 6]] small_range_split.save( - self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=indices + self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w", file_slices=indices ) if split_range.comm.rank == 0: with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: @@ -269,7 +271,9 @@ def test_save(self): # slicing netcdf file sslice = slice(7, 2, -1) - small_range_split.save( + range_five_split = ht.arange(5, split=0, device=ht_device) + range_five = ht.arange(5, device=ht_device) + range_five_split.save( self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=sslice ) if split_range.comm.rank == 0: @@ -277,7 +281,7 @@ def test_save(self): comparison = torch.tensor( handle[self.NETCDF_VARIABLE][sslice], dtype=torch.int32, device=device ) - self.assertTrue((small_range._DNDarray__array == comparison).all()) + self.assertTrue((range_five._DNDarray__array == comparison).all()) def test_save_exception(self): data = ht.arange(1, device=ht_device) From 48cc6a0a3f74caf981924d60eba502fac8964c11 Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Sat, 9 May 2020 14:04:19 +0200 Subject: [PATCH 07/25] run python black formatting --- heat/core/tests/test_io.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index 66a2480f43..656b419d92 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -243,10 +243,10 @@ def test_save(self): ) # indexing netcdf file: single index - zeros = ht.zeros((20,1,20,2), device=ht_device) + zeros = ht.zeros((20, 1, 20, 2), device=ht_device) zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") ones = ht.ones(20, device=ht_device) - indices = (-1,0,slice(None),1) + indices = (-1, 0, slice(None), 1) ones.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=indices) if split_range.comm.rank == 0: with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: From b3e0e303c5c7656b56164313ef7b4836cafc8a7e Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Sat, 9 May 2020 14:11:54 +0200 Subject: [PATCH 08/25] bugfix --- heat/core/io.py | 1 + 1 file changed, 1 insertion(+) diff --git a/heat/core/io.py b/heat/core/io.py index 3fcc379d2e..74b434f8e6 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -521,6 +521,7 @@ def save_netcdf( for begin, end, step, htSlice in zip(start, stop, stride, htSlices): if begin.size == 1: + begin, end, step = begin.item(), end.item(), step.item() range_from_slice = range(begin, end, step) sliced = range_from_slice[htSlice] a, b, c = sliced.start, sliced.stop, sliced.step From f3625c5204a5814d9f4c530299beb63de07931ff Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Wed, 13 May 2020 19:37:25 +0200 Subject: [PATCH 09/25] Use nc4-methods to get (start, count, stride) as well as out_shape (left-hand shape of set-operation). Then only merge slices for split-axis. This is a testing/debugging version, cleanup needed --- heat/core/io.py | 286 ++++++++++++++++++++++++++++--------- heat/core/tests/test_io.py | 18 ++- 2 files changed, 238 insertions(+), 66 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index 74b434f8e6..e439ccf196 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -317,6 +317,7 @@ def save_netcdf( dimension_names=None, is_unlimited=False, file_slices=slice(None), + debug=False, **kwargs ): """ @@ -394,6 +395,49 @@ def save_netcdf( is_split = data.split is not None _, _, slices = data.comm.chunk(data.gshape, data.split if is_split else 0) + def mergeSlices(file_slices, var, data): + start, count, stride, _ = nc.utils._StartCountStride( + elem=file_slices, + shape=var.shape, + dimensions=dimension_names, + grp=var.group(), + datashape=data.shape, + put=True, + ) + out_shape = nc._netCDF4._out_array_shape(count) + out_split = ( + data.split + out_shape[: data.split].count(1) - data.shape[: data.split].count(1) + ) + start, count, stride = start.T, count.T, stride.T # transpose for iteration + stop = start + stride * count + new_slices = [] + + for begin, end, step in zip(start, stop, stride): + if begin.size == 1: + begin, end, step = begin.item(), end.item(), step.item() + new_slices.append(slice(begin, end, step)) + else: + ranges = tuple( + slice(b.item(), e.item(), s.item()) for b, e, s in zip(begin, end, step) + ) + new_slices.append(np.r_[ranges]) + if isinstance(new_slices[out_split], slice): + start, stop, step = ( + new_slices[out_split].start, + new_slices[out_split].stop, + new_slices[out_split].step, + ) + sliced = range(start, stop, step)[slices[data.split]] + a, b, c = sliced.start, sliced.stop, sliced.step + a = None if a < 0 else a + b = None if b < 0 else b + new_slices[out_split] = slice(a, b, c) + elif isinstance(new_slices[out_split], np.ndarray): + new_slices[out_split] = new_slices[out_split][slices[data.split]] + else: + new_slices[out_split] = np.r_[new_slices[out_split]][slices[data.split]] + return new_slices + # attempt to perform parallel I/O if possible if __nc_has_par: with nc.Dataset(path, mode, parallel=True, comm=data.comm.handle) as handle: @@ -407,6 +451,9 @@ def save_netcdf( var = handle.createVariable( variable, data.dtype.char(), dimension_names, **kwargs ) + # if file_slices is None: + # new_slices = slices + # else: start, count, stride, inds = nc.utils._StartCountStride( elem=file_slices, shape=var.shape, @@ -415,59 +462,74 @@ def save_netcdf( datashape=data.shape, put=True, ) + out_shape = nc._netCDF4._out_array_shape(count) # TODO: use this shape + out_split = ( + data.split + + out_shape[: data.split].count(1) + - data.shape[: data.split].count(1) + ) start, count, stride = start.T, count.T, stride.T # transpose for iteration stop = start + stride * count new_slices = [] - - htSlices = list(slices) - shape_index = 0 - for i in range(count.shape[0]): # append fixed dimensions to heat slices - if shape_index >= len(data.shape): - htSlices.append(slice(None)) - elif count[i].sum() != data.shape[shape_index]: - htSlices.insert(i, slice(None)) - else: - shape_index += 1 - htSlices = tuple(htSlices) - - for begin, end, step, htSlice in zip(start, stop, stride, htSlices): + # print(inds, count, end="\n\n", flush=True) + # htSlices = list(slices) + # shape_index = 0 + # for i in range(count.shape[0]): # append fixed dimensions to heat slices + # if shape_index >= len(data.shape): + # htSlices.append(slice(None)) + # elif count[i].sum() != data.shape[shape_index]: + # htSlices.insert(i, slice(None)) + # else: + # shape_index += 1 + # htSlices = tuple(htSlices) + + for begin, end, step in zip(start, stop, stride): # htSlice, htSlices): if begin.size == 1: begin, end, step = begin.item(), end.item(), step.item() - """ - We need var[file_slices][htSlices] = data, but netcdf can - only parallelize the first call. Therefore, we need to - merge the slices: - var[new_slices] = data - Because slices cannot be sliced but are similar to ranges - (i.e. consist of start, stop, step) which can be sliced: - 1) Build a range - 2) slice the range - 3) Build slice of the resulting range - """ - range_from_slice = range(begin, end, step) - sliced = range_from_slice[htSlice] - a, b, c = sliced.start, sliced.stop, sliced.step - """ - Negative values in ranges exist to include zero (in case of - negative stride) and actual negative numbers. This is - incompatible with negative slicing. Because - nc.utils._StartCountStride already transforms negative - slice-indices to their corresponding positive value, - negative values at this point only include zero. In slices, - this is done by using None. - """ - a = None if a < 0 else a - b = None if b < 0 else b - new_slices.append(slice(a, b, c)) + # # We need var[file_slices][htSlices] = data, but netcdf can + # # only parallelize the first call. Therefore, we need to + # # merge the slices: + # # var[new_slices] = data + # # Because slices cannot be sliced but are similar to ranges + # # (i.e. consist of start, stop, step) which can be sliced: + # # 1) Build a range + # # 2) slice the range + # # 3) Build slice of the resulting range + # range_from_slice = range(begin, end, step) + # sliced = range_from_slice[htSlice] + # a, b, c = sliced.start, sliced.stop, sliced.step + # # Negative values in ranges exist to include zero (in case of + # # negative stride) and actual negative numbers. This is + # # incompatible with negative slicing. Because + # # nc.utils._StartCountStride already transforms negative + # # slice-indices to their corresponding positive value, + # # negative values at this point only include zero. In slices, + # # this is done by using None. + # a = None if a < 0 else a + # b = None if b < 0 else b + # new_slices.append(slice(a, b, c)) + new_slices.append(slice(begin, end, step)) else: - """ - If there is more than one slice along one dimension, slice notation cannot be used. - Fall back to using a list of integers as indices. - """ + # If there is more than one slice along one dimension, slice notation cannot be used. + # Fall back to using a list of integers as indices. ranges = tuple( slice(b.item(), e.item(), s.item()) for b, e, s in zip(begin, end, step) ) - new_slices.append(np.r_[ranges][htSlice]) + new_slices.append(np.r_[ranges]) # [htSlice]) + # Add chunk-slice to new_slices + if isinstance(new_slices[out_split], slice): + start, stop, step = ( + new_slices[out_split].start, + new_slices[out_split].stop, + new_slices[out_split].step, + ) + sliced = range(start, stop, step)[slices[data.split]] + a, b, c = sliced.start, sliced.stop, sliced.step + a = None if a < 0 else a + b = None if b < 0 else b + new_slices[out_split] = slice(a, b, c) + else: + new_slices[out_split] = np.r_[new_slices[out_split]][slices[data.split]] try: var[tuple(new_slices)] = ( data._DNDarray__array.cpu() @@ -495,6 +557,7 @@ def save_netcdf( var = handle.createVariable( variable, data.dtype.char(), dimension_names, **kwargs ) + var.set_collective(False) if is_split: start, count, stride, _ = nc.utils._StartCountStride( elem=file_slices, @@ -504,55 +567,148 @@ def save_netcdf( datashape=data.shape, put=True, ) + out_shape = nc._netCDF4._out_array_shape(count) + out_split = ( + data.split + + out_shape[: data.split].count(1) + - data.shape[: data.split].count(1) + ) start, count, stride = start.T, count.T, stride.T # transpose for iteration stop = start + stride * count new_slices = [] - htSlices = list(slices) - shape_index = 0 - for i in range(count.shape[0]): # append fixed dimensions to heat slices - if shape_index >= len(data.shape): - htSlices.append(slice(None)) - elif count[i].sum() != data.shape[shape_index]: - htSlices.insert(i, slice(None)) - else: - shape_index += 1 - htSlices = tuple(htSlices) - - for begin, end, step, htSlice in zip(start, stop, stride, htSlices): + # htSlices = list(slices) + # shape_index = 0 + # for i in range(count.shape[0]): # append fixed dimensions to heat slices + # if shape_index >= len(data.shape): + # htSlices.append(slice(None)) + # elif count[i].sum() != data.shape[shape_index]: + # htSlices.insert(i, slice(None)) + # else: + # shape_index += 1 + # htSlices = tuple(htSlices) + + for begin, end, step in zip(start, stop, stride): # htSlice, htSlices): if begin.size == 1: begin, end, step = begin.item(), end.item(), step.item() - range_from_slice = range(begin, end, step) - sliced = range_from_slice[htSlice] - a, b, c = sliced.start, sliced.stop, sliced.step - a = None if a < 0 else a - b = None if b < 0 else b - new_slices.append(slice(a, b, c)) + # range_from_slice = range(begin, end, step) + # sliced = range_from_slice[htSlice] + # a, b, c = sliced.start, sliced.stop, sliced.step + # a = None if a < 0 else a + # b = None if b < 0 else b + # new_slices.append(slice(a, b, c)) + new_slices.append(slice(begin, end, step)) else: ranges = tuple( slice(b.item(), e.item(), s.item()) for b, e, s in zip(begin, end, step) ) - new_slices.append(np.r_[ranges][htSlice]) + new_slices.append(np.r_[ranges]) # [htSlice]) + if isinstance(new_slices[out_split], slice): + start, stop, step = ( + new_slices[out_split].start, + new_slices[out_split].stop, + new_slices[out_split].step, + ) + sliced = range(start, stop, step)[slices[data.split]] + a, b, c = sliced.start, sliced.stop, sliced.step + a = None if a < 0 else a + b = None if b < 0 else b + new_slices[out_split] = slice(a, b, c) + elif isinstance(new_slices[out_split], np.ndarray): + new_slices[out_split] = new_slices[out_split][slices[data.split]] + else: + new_slices[out_split] = np.r_[new_slices[out_split]][slices[data.split]] + if debug: + print("root proc new", new_slices, flush=True) var[tuple(new_slices)] = data._DNDarray__array.cpu() else: var[file_slices] = data._DNDarray__array.cpu() - + if debug: + print("root proc finished writing", flush=True) # ping next rank if it exists if is_split and data.comm.size > 1: data.comm.Isend([None, 0, MPI.INT], dest=1) data.comm.Recv([None, 0, MPI.INT], source=data.comm.size - 1) + if debug: + print("root proc received signal", flush=True) # no MPI, but data is split, we have to serialize the writes elif is_split: # wait for the previous rank to finish writing its chunk, then write own part data.comm.Recv([None, 0, MPI.INT], source=data.comm.rank - 1) + if debug: + print("non-root beginning", flush=True) with nc.Dataset(path, "r+") as handle: - handle[variable][file_slices][slices] = data._DNDarray__array.cpu() + var = handle.variables[variable] + start, count, stride, _ = nc.utils._StartCountStride( + elem=file_slices, + shape=var.shape, + dimensions=dimension_names, + grp=var.group(), + datashape=data.shape, + put=True, + ) + out_shape = nc._netCDF4._out_array_shape(count) + out_split = ( + data.split + + out_shape[: data.split].count(1) + - data.shape[: data.split].count(1) + ) + start, count, stride = start.T, count.T, stride.T # transpose for iteration + stop = start + stride * count + new_slices = [] + + # htSlices = list(slices) + # shape_index = 0 + # for i in range(count.shape[0]): # append fixed dimensions to heat slices + # if shape_index >= len(data.shape): + # htSlices.append(slice(None)) + # elif count[i].sum() != data.shape[shape_index]: + # htSlices.insert(i, slice(None)) + # else: + # shape_index += 1 + # htSlices = tuple(htSlices) + + for begin, end, step in zip(start, stop, stride): # htSlice, htSlices): + if begin.size == 1: + begin, end, step = begin.item(), end.item(), step.item() + # range_from_slice = range(begin, end, step) + # sliced = range_from_slice[htSlice] + # a, b, c = sliced.start, sliced.stop, sliced.step + # a = None if a < 0 else a + # b = None if b < 0 else b + # new_slices.append(slice(a, b, c)) + new_slices.append(slice(begin, end, step)) + else: + ranges = tuple( + slice(b.item(), e.item(), s.item()) for b, e, s in zip(begin, end, step) + ) + new_slices.append(np.r_[ranges]) # [htSlice]) + if isinstance(new_slices[out_split], slice): + start, stop, step = ( + new_slices[out_split].start, + new_slices[out_split].stop, + new_slices[out_split].step, + ) + sliced = range(start, stop, step)[slices[data.split]] + a, b, c = sliced.start, sliced.stop, sliced.step + a = None if a < 0 else a + b = None if b < 0 else b + new_slices[out_split] = slice(a, b, c) + elif isinstance(new_slices[out_split], np.ndarray): + new_slices[out_split] = new_slices[out_split][slices[data.split]] + else: + new_slices[out_split] = np.r_[new_slices[out_split]][slices[data.split]] + var[tuple(new_slices)] = data._DNDarray__array.cpu() + if debug: + print("non-root finished", flush=True) # ping the next node in the communicator, wrap around to 0 to complete barrier behavior next_rank = (data.comm.rank + 1) % data.comm.size data.comm.Isend([None, 0, MPI.INT], dest=next_rank) + if debug: + print("non-root sent signal", flush=True) def load(path, *args, **kwargs): diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index 656b419d92..1df917cb74 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -204,16 +204,23 @@ def test_save(self): self.assertTrue((local_range._DNDarray__array == comparison).all()) # split range + ht.MPI_WORLD.Barrier() + print(ht.MPI_WORLD.rank, "split range", flush=True) split_range = ht.arange(100, split=0, device=ht_device) - split_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE) + split_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, debug=True) if split_range.comm.rank == 0: + print("root comparison beginning", flush=True) with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: comparison = torch.tensor( handle[self.NETCDF_VARIABLE][:], dtype=torch.int32, device=device ) + print(comparison, flush=True) self.assertTrue((local_range._DNDarray__array == comparison).all()) + print("root comparison finished", flush=True) # naming dimensions + # ht.MPI_WORLD.Barrier() + print(ht.MPI_WORLD.rank, "naming dims", flush=True) local_range = ht.arange(100, device=ht_device) local_range.save( self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, dimension_names=self.NETCDF_DIMENSION @@ -225,11 +232,14 @@ def test_save(self): # appending unlimited variable split_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, is_unlimited=True) + ht.MPI_WORLD.Barrier() + print(ht.MPI_WORLD.rank, "setting sliced var", flush=True) split_range.save( self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=slice(split_range.size, None, None), + # debug=True, ) if split_range.comm.rank == 0: with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: @@ -243,6 +253,8 @@ def test_save(self): ) # indexing netcdf file: single index + ht.MPI_WORLD.Barrier() + print(ht.MPI_WORLD.rank, "beginning single index", flush=True) zeros = ht.zeros((20, 1, 20, 2), device=ht_device) zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") ones = ht.ones(20, device=ht_device) @@ -256,6 +268,8 @@ def test_save(self): self.assertTrue((ones._DNDarray__array == comparison).all()) # indexing netcdf file: multiple indices + ht.MPI_WORLD.Barrier() + print(ht.MPI_WORLD.rank, "beginning multi index", flush=True) small_range_split = ht.arange(10, split=0, device=ht_device) small_range = ht.arange(10, device=ht_device) indices = [[0, 9, 5, 2, 1, 3, 7, 4, 8, 6]] @@ -270,6 +284,8 @@ def test_save(self): self.assertTrue((small_range._DNDarray__array == comparison).all()) # slicing netcdf file + ht.MPI_WORLD.Barrier() + print(ht.MPI_WORLD.rank, "beginning slicing index", flush=True) sslice = slice(7, 2, -1) range_five_split = ht.arange(5, split=0, device=ht_device) range_five = ht.arange(5, device=ht_device) From 3bd006f8623de50110ce200e9e433ead9e67fbe7 Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Wed, 13 May 2020 20:13:21 +0200 Subject: [PATCH 10/25] pre-commit --- heat/core/io.py | 1 - 1 file changed, 1 deletion(-) diff --git a/heat/core/io.py b/heat/core/io.py index e439ccf196..121ce5f650 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -411,7 +411,6 @@ def mergeSlices(file_slices, var, data): start, count, stride = start.T, count.T, stride.T # transpose for iteration stop = start + stride * count new_slices = [] - for begin, end, step in zip(start, stop, stride): if begin.size == 1: begin, end, step = begin.item(), end.item(), step.item() From 1a0aaeacd097e9bca51f34c98dc9f87517782b9e Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Fri, 15 May 2020 14:43:57 +0200 Subject: [PATCH 11/25] cleanup --- heat/core/io.py | 363 ++++++++++++++++-------------------------------- 1 file changed, 123 insertions(+), 240 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index 170bcab548..b28d875b1c 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -317,7 +317,6 @@ def save_netcdf( dimension_names=None, is_unlimited=False, file_slices=slice(None), - debug=False, **kwargs ): """ @@ -395,9 +394,93 @@ def save_netcdf( is_split = data.split is not None _, _, slices = data.comm.chunk(data.gshape, data.split if is_split else 0) - def mergeSlices(file_slices, var, data): + def getExpandedSplit(shape, expandedShape, split): + """Returns the hypothetical split-axis of a dndarray of shape=shape and + split=split if it was expanded to expandedShape by adding empty dimensions. + + Parameters + ---------- + shape : tuple(int) + Shape of a dndarray. + expandedShape : tuple(int) + Shape of hypothetical expanded dndarray. + split : int or None + split-axis of dndarray. + + Returns + ------- + int + split-Axis of expanded dndarray. + + Raises + ------- + ValueError + If the shapes differ in non-empty dimensions. + + """ + # Get indices of non-empty dimensions and squeezed shapes + ind_nonempty, sq_shape = np.array([[i, v] for i, v in enumerate(shape) if v != 1]).T + ex_ind_nonempty, sq_ex = np.array( + [[i, v] for i, v in enumerate(expandedShape) if v != 1] + ).T + if not all(sq_shape == sq_ex): + raise ValueError( + "Shapes %s and %s differ in non-empty dimensions" % (shape, expandedShape) + ) + if len(shape) == len(expandedShape): # actually not expanded at all + return split + if split is None: # not split at all + return None + if split in ind_nonempty: # split along non-empty dimension + split_sq = ind_nonempty.index(split) # split-axis in squeezed shape + return ex_ind_nonempty[split_sq] + # split along empty dimension: split doesnt matter, only one process contains data + # return the last empty dimension (in expanded shape) before (the first nonempty dimension after split) + ne_before_split = split - shape[:split].count( + 1 + ) # number of nonempty elems before split + ind_ne_after_split = ind_nonempty[ + ne_before_split + ] # index of (first nonempty element after split) in squeezed shape + return max( + i + for i, v in enumerate(expandedShape[: ex_ind_nonempty[ind_ne_after_split]]) + if v == 1 + ) + + def mergeSlices(var, var_slices, data, data_slices=None): + """ Using var[var_slices][data_slices] = data + combines a __getitem__ with a __setitem__ call, therefore it does not allow + parallelization of the write-operation and does not work is var_slices = slice(None) + (in that casem __getitem__ returns a copy and not a view). + This method merges both keys: + var[mergeSlices(var, var_slices, data)] = data + + Parameters + ---------- + var : netcdf4.Variable + Variable to which data is to be saved. + var_slices : + Keys to pass to the set-operator. + data : dndarray + Data to be saved. + data_slices: tuple of slices + As returned by the data.comm.chunk method. + + Returns + ------- + + Key for the set-operation. + + Raises + ------- + + """ + slices = data_slices + if slices is None: + _, _, slices = data.comm.chunk(data.gshape, data.split if is_split else 0) start, count, stride, _ = nc.utils._StartCountStride( - elem=file_slices, + elem=var_slices, shape=var.shape, dimensions=dimension_names, grp=var.group(), @@ -405,9 +488,8 @@ def mergeSlices(file_slices, var, data): put=True, ) out_shape = nc._netCDF4._out_array_shape(count) - out_split = ( - data.split + out_shape[: data.split].count(1) - data.shape[: data.split].count(1) - ) + out_split = getExpandedSplit(data.shape, out_shape, data.split) + start, count, stride = start.T, count.T, stride.T # transpose for iteration stop = start + stride * count new_slices = [] @@ -416,26 +498,32 @@ def mergeSlices(file_slices, var, data): begin, end, step = begin.item(), end.item(), step.item() new_slices.append(slice(begin, end, step)) else: - ranges = tuple( - slice(b.item(), e.item(), s.item()) for b, e, s in zip(begin, end, step) + begin, end, step = begin.flatten(), end.flatten(), step.flatten() + new_slices.append( + np.r_[ + tuple( + slice(b.item(), e.item(), s.item()) + for b, e, s in zip(begin, end, step) + ) + ] ) - new_slices.append(np.r_[ranges]) - if isinstance(new_slices[out_split], slice): - start, stop, step = ( - new_slices[out_split].start, - new_slices[out_split].stop, - new_slices[out_split].step, - ) - sliced = range(start, stop, step)[slices[data.split]] - a, b, c = sliced.start, sliced.stop, sliced.step - a = None if a < 0 else a - b = None if b < 0 else b - new_slices[out_split] = slice(a, b, c) - elif isinstance(new_slices[out_split], np.ndarray): - new_slices[out_split] = new_slices[out_split][slices[data.split]] - else: - new_slices[out_split] = np.r_[new_slices[out_split]][slices[data.split]] - return new_slices + if out_split is not None: # add split-slice + if isinstance(new_slices[out_split], slice): + start, stop, step = ( + new_slices[out_split].start, + new_slices[out_split].stop, + new_slices[out_split].step, + ) + sliced = range(start, stop, step)[slices[data.split]] + a, b, c = sliced.start, sliced.stop, sliced.step + a = None if a < 0 else a + b = None if b < 0 else b + new_slices[out_split] = slice(a, b, c) + elif isinstance(new_slices[out_split], np.ndarray): + new_slices[out_split] = new_slices[out_split][slices[data.split]] + else: + new_slices[out_split] = np.r_[new_slices[out_split]][slices[data.split]] + return tuple(new_slices) # attempt to perform parallel I/O if possible if __nc_has_par: @@ -450,94 +538,16 @@ def mergeSlices(file_slices, var, data): var = handle.createVariable( variable, data.dtype.char(), dimension_names, **kwargs ) - # if file_slices is None: - # new_slices = slices - # else: - start, count, stride, inds = nc.utils._StartCountStride( - elem=file_slices, - shape=var.shape, - dimensions=var.dimensions, - grp=var.group(), - datashape=data.shape, - put=True, - ) - out_shape = nc._netCDF4._out_array_shape(count) # TODO: use this shape - out_split = ( - data.split - + out_shape[: data.split].count(1) - - data.shape[: data.split].count(1) - ) - start, count, stride = start.T, count.T, stride.T # transpose for iteration - stop = start + stride * count - new_slices = [] - # print(inds, count, end="\n\n", flush=True) - # htSlices = list(slices) - # shape_index = 0 - # for i in range(count.shape[0]): # append fixed dimensions to heat slices - # if shape_index >= len(data.shape): - # htSlices.append(slice(None)) - # elif count[i].sum() != data.shape[shape_index]: - # htSlices.insert(i, slice(None)) - # else: - # shape_index += 1 - # htSlices = tuple(htSlices) - - for begin, end, step in zip(start, stop, stride): # htSlice, htSlices): - if begin.size == 1: - begin, end, step = begin.item(), end.item(), step.item() - # # We need var[file_slices][htSlices] = data, but netcdf can - # # only parallelize the first call. Therefore, we need to - # # merge the slices: - # # var[new_slices] = data - # # Because slices cannot be sliced but are similar to ranges - # # (i.e. consist of start, stop, step) which can be sliced: - # # 1) Build a range - # # 2) slice the range - # # 3) Build slice of the resulting range - # range_from_slice = range(begin, end, step) - # sliced = range_from_slice[htSlice] - # a, b, c = sliced.start, sliced.stop, sliced.step - # # Negative values in ranges exist to include zero (in case of - # # negative stride) and actual negative numbers. This is - # # incompatible with negative slicing. Because - # # nc.utils._StartCountStride already transforms negative - # # slice-indices to their corresponding positive value, - # # negative values at this point only include zero. In slices, - # # this is done by using None. - # a = None if a < 0 else a - # b = None if b < 0 else b - # new_slices.append(slice(a, b, c)) - new_slices.append(slice(begin, end, step)) - else: - # If there is more than one slice along one dimension, slice notation cannot be used. - # Fall back to using a list of integers as indices. - ranges = tuple( - slice(b.item(), e.item(), s.item()) for b, e, s in zip(begin, end, step) - ) - new_slices.append(np.r_[ranges]) # [htSlice]) - # Add chunk-slice to new_slices - if isinstance(new_slices[out_split], slice): - start, stop, step = ( - new_slices[out_split].start, - new_slices[out_split].stop, - new_slices[out_split].step, - ) - sliced = range(start, stop, step)[slices[data.split]] - a, b, c = sliced.start, sliced.stop, sliced.step - a = None if a < 0 else a - b = None if b < 0 else b - new_slices[out_split] = slice(a, b, c) - else: - new_slices[out_split] = np.r_[new_slices[out_split]][slices[data.split]] + mergedSlices = mergeSlices(var, file_slices, data) try: - var[tuple(new_slices)] = ( + var[mergedSlices] = ( data._DNDarray__array.cpu() if is_split else data._DNDarray__array[slices].cpu() ) except RuntimeError: var.set_collective(True) - var[tuple(new_slices)] = ( + var[mergedSlices] = ( data._DNDarray__array.cpu() if is_split else data._DNDarray__array[slices].cpu() @@ -556,158 +566,31 @@ def mergeSlices(file_slices, var, data): var = handle.createVariable( variable, data.dtype.char(), dimension_names, **kwargs ) - var.set_collective(False) + var.set_collective(False) # not possible with non-parallel netcdf if is_split: - start, count, stride, _ = nc.utils._StartCountStride( - elem=file_slices, - shape=var.shape, - dimensions=dimension_names, - grp=var.group(), - datashape=data.shape, - put=True, - ) - out_shape = nc._netCDF4._out_array_shape(count) - out_split = ( - data.split - + out_shape[: data.split].count(1) - - data.shape[: data.split].count(1) - ) - start, count, stride = start.T, count.T, stride.T # transpose for iteration - stop = start + stride * count - new_slices = [] - - # htSlices = list(slices) - # shape_index = 0 - # for i in range(count.shape[0]): # append fixed dimensions to heat slices - # if shape_index >= len(data.shape): - # htSlices.append(slice(None)) - # elif count[i].sum() != data.shape[shape_index]: - # htSlices.insert(i, slice(None)) - # else: - # shape_index += 1 - # htSlices = tuple(htSlices) - - for begin, end, step in zip(start, stop, stride): # htSlice, htSlices): - if begin.size == 1: - begin, end, step = begin.item(), end.item(), step.item() - # range_from_slice = range(begin, end, step) - # sliced = range_from_slice[htSlice] - # a, b, c = sliced.start, sliced.stop, sliced.step - # a = None if a < 0 else a - # b = None if b < 0 else b - # new_slices.append(slice(a, b, c)) - new_slices.append(slice(begin, end, step)) - else: - ranges = tuple( - slice(b.item(), e.item(), s.item()) - for b, e, s in zip(begin, end, step) - ) - new_slices.append(np.r_[ranges]) # [htSlice]) - if isinstance(new_slices[out_split], slice): - start, stop, step = ( - new_slices[out_split].start, - new_slices[out_split].stop, - new_slices[out_split].step, - ) - sliced = range(start, stop, step)[slices[data.split]] - a, b, c = sliced.start, sliced.stop, sliced.step - a = None if a < 0 else a - b = None if b < 0 else b - new_slices[out_split] = slice(a, b, c) - elif isinstance(new_slices[out_split], np.ndarray): - new_slices[out_split] = new_slices[out_split][slices[data.split]] - else: - new_slices[out_split] = np.r_[new_slices[out_split]][slices[data.split]] - if debug: - print("root proc new", new_slices, flush=True) - var[tuple(new_slices)] = data._DNDarray__array.cpu() + mergedSlices = mergeSlices(var, file_slices, data) + var[mergedSlices] = data._DNDarray__array.cpu() else: var[file_slices] = data._DNDarray__array.cpu() - if debug: - print("root proc finished writing", flush=True) + # ping next rank if it exists if is_split and data.comm.size > 1: data.comm.Isend([None, 0, MPI.INT], dest=1) data.comm.Recv([None, 0, MPI.INT], source=data.comm.size - 1) - if debug: - print("root proc received signal", flush=True) # no MPI, but data is split, we have to serialize the writes elif is_split: # wait for the previous rank to finish writing its chunk, then write own part data.comm.Recv([None, 0, MPI.INT], source=data.comm.rank - 1) - if debug: - print("non-root beginning", flush=True) with nc.Dataset(path, "r+") as handle: var = handle.variables[variable] - start, count, stride, _ = nc.utils._StartCountStride( - elem=file_slices, - shape=var.shape, - dimensions=dimension_names, - grp=var.group(), - datashape=data.shape, - put=True, - ) - out_shape = nc._netCDF4._out_array_shape(count) - out_split = ( - data.split - + out_shape[: data.split].count(1) - - data.shape[: data.split].count(1) - ) - start, count, stride = start.T, count.T, stride.T # transpose for iteration - stop = start + stride * count - new_slices = [] - - # htSlices = list(slices) - # shape_index = 0 - # for i in range(count.shape[0]): # append fixed dimensions to heat slices - # if shape_index >= len(data.shape): - # htSlices.append(slice(None)) - # elif count[i].sum() != data.shape[shape_index]: - # htSlices.insert(i, slice(None)) - # else: - # shape_index += 1 - # htSlices = tuple(htSlices) - - for begin, end, step in zip(start, stop, stride): # htSlice, htSlices): - if begin.size == 1: - begin, end, step = begin.item(), end.item(), step.item() - # range_from_slice = range(begin, end, step) - # sliced = range_from_slice[htSlice] - # a, b, c = sliced.start, sliced.stop, sliced.step - # a = None if a < 0 else a - # b = None if b < 0 else b - # new_slices.append(slice(a, b, c)) - new_slices.append(slice(begin, end, step)) - else: - ranges = tuple( - slice(b.item(), e.item(), s.item()) for b, e, s in zip(begin, end, step) - ) - new_slices.append(np.r_[ranges]) # [htSlice]) - if isinstance(new_slices[out_split], slice): - start, stop, step = ( - new_slices[out_split].start, - new_slices[out_split].stop, - new_slices[out_split].step, - ) - sliced = range(start, stop, step)[slices[data.split]] - a, b, c = sliced.start, sliced.stop, sliced.step - a = None if a < 0 else a - b = None if b < 0 else b - new_slices[out_split] = slice(a, b, c) - elif isinstance(new_slices[out_split], np.ndarray): - new_slices[out_split] = new_slices[out_split][slices[data.split]] - else: - new_slices[out_split] = np.r_[new_slices[out_split]][slices[data.split]] - var[tuple(new_slices)] = data._DNDarray__array.cpu() - if debug: - print("non-root finished", flush=True) + var.set_collective(False) # not possible with non-parallel netcdf + mergedSlices = mergeSlices(var, file_slices, data) + var[mergedSlices] = data._DNDarray__array.cpu() # ping the next node in the communicator, wrap around to 0 to complete barrier behavior next_rank = (data.comm.rank + 1) % data.comm.size data.comm.Isend([None, 0, MPI.INT], dest=next_rank) - if debug: - print("non-root sent signal", flush=True) def load(path, *args, **kwargs): From 7c5ca61af54bd01259ca7f7723668db9687f077b Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Fri, 15 May 2020 14:50:42 +0200 Subject: [PATCH 12/25] cleanup test --- heat/core/tests/test_io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index 1df917cb74..5e3ad8f9b9 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -207,7 +207,7 @@ def test_save(self): ht.MPI_WORLD.Barrier() print(ht.MPI_WORLD.rank, "split range", flush=True) split_range = ht.arange(100, split=0, device=ht_device) - split_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, debug=True) + split_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE) if split_range.comm.rank == 0: print("root comparison beginning", flush=True) with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: From 3bd6e612d89f2b2ef98287b82d9d230c03dba6a8 Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Mon, 18 May 2020 18:40:12 +0200 Subject: [PATCH 13/25] minor bugs and coverage --- heat/core/io.py | 28 +++++++++---------- heat/core/tests/test_io.py | 56 +++++++++++++++++++++++++++++--------- 2 files changed, 56 insertions(+), 28 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index b28d875b1c..fa4741262c 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -333,7 +333,8 @@ def save_netcdf( mode : str, one of 'w', 'a', 'r+' File access mode dimension_names : list or tuple or string - Specifies the netCDF Dimensions used by the variable. + Specifies the netCDF Dimensions used by the variable. Ignored if + Variable already exists. is_unlimited : bool If True, every dimension created for this variable (i.e. doesn't already exist) is unlimited. Already existing limited dimensions @@ -432,19 +433,18 @@ def getExpandedSplit(shape, expandedShape, split): if split is None: # not split at all return None if split in ind_nonempty: # split along non-empty dimension - split_sq = ind_nonempty.index(split) # split-axis in squeezed shape + split_sq = ind_nonempty.tolist().index(split) # split-axis in squeezed shape return ex_ind_nonempty[split_sq] # split along empty dimension: split doesnt matter, only one process contains data # return the last empty dimension (in expanded shape) before (the first nonempty dimension after split) - ne_before_split = split - shape[:split].count( - 1 - ) # number of nonempty elems before split + # number of nonempty elems before split + ne_before_split = split - shape[:split].count(1) ind_ne_after_split = ind_nonempty[ ne_before_split ] # index of (first nonempty element after split) in squeezed shape return max( i - for i, v in enumerate(expandedShape[: ex_ind_nonempty[ind_ne_after_split]]) + for i, v in enumerate(expandedShape[: max(ex_ind_nonempty[:ind_ne_after_split])]) if v == 1 ) @@ -482,7 +482,7 @@ def mergeSlices(var, var_slices, data, data_slices=None): start, count, stride, _ = nc.utils._StartCountStride( elem=var_slices, shape=var.shape, - dimensions=dimension_names, + dimensions=var.dimensions, grp=var.group(), datashape=data.shape, put=True, @@ -528,13 +528,12 @@ def mergeSlices(var, var_slices, data, data_slices=None): # attempt to perform parallel I/O if possible if __nc_has_par: with nc.Dataset(path, mode, parallel=True, comm=data.comm.handle) as handle: - for name, elements in zip(dimension_names, data.shape): - if name not in handle.dimensions: - handle.createDimension(name, elements if not is_unlimited else None) - if variable in handle.variables: var = handle.variables[variable] else: + for name, elements in zip(dimension_names, data.shape): + if name not in handle.dimensions: + handle.createDimension(name, elements if not is_unlimited else None) var = handle.createVariable( variable, data.dtype.char(), dimension_names, **kwargs ) @@ -556,13 +555,12 @@ def mergeSlices(var, var_slices, data, data_slices=None): # otherwise a single rank only write is performed in case of local data (i.e. no split) elif data.comm.rank == 0: with nc.Dataset(path, mode) as handle: - for name, elements in zip(dimension_names, data.shape): - if name not in handle.dimensions: - handle.createDimension(name, elements if not is_unlimited else None) - if variable in handle.variables: var = handle.variables[variable] else: + for name, elements in zip(dimension_names, data.shape): + if name not in handle.dimensions: + handle.createDimension(name, elements if not is_unlimited else None) var = handle.createVariable( variable, data.dtype.char(), dimension_names, **kwargs ) diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index 5e3ad8f9b9..e52ffd220d 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -204,23 +204,16 @@ def test_save(self): self.assertTrue((local_range._DNDarray__array == comparison).all()) # split range - ht.MPI_WORLD.Barrier() - print(ht.MPI_WORLD.rank, "split range", flush=True) split_range = ht.arange(100, split=0, device=ht_device) split_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE) if split_range.comm.rank == 0: - print("root comparison beginning", flush=True) with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: comparison = torch.tensor( handle[self.NETCDF_VARIABLE][:], dtype=torch.int32, device=device ) - print(comparison, flush=True) self.assertTrue((local_range._DNDarray__array == comparison).all()) - print("root comparison finished", flush=True) - # naming dimensions - # ht.MPI_WORLD.Barrier() - print(ht.MPI_WORLD.rank, "naming dims", flush=True) + # naming dimensions: string local_range = ht.arange(100, device=ht_device) local_range.save( self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, dimension_names=self.NETCDF_DIMENSION @@ -230,10 +223,19 @@ def test_save(self): comparison = handle[self.NETCDF_VARIABLE].dimensions self.assertTrue(self.NETCDF_DIMENSION in comparison) + # naming dimensions: tuple + local_range = ht.arange(100, device=ht_device) + local_range.save( + self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, dimension_names=(self.NETCDF_DIMENSION,) + ) + if local_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = handle[self.NETCDF_VARIABLE].dimensions + self.assertTrue(self.NETCDF_DIMENSION in comparison) + # appending unlimited variable split_range.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, is_unlimited=True) ht.MPI_WORLD.Barrier() - print(ht.MPI_WORLD.rank, "setting sliced var", flush=True) split_range.save( self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, @@ -254,7 +256,6 @@ def test_save(self): # indexing netcdf file: single index ht.MPI_WORLD.Barrier() - print(ht.MPI_WORLD.rank, "beginning single index", flush=True) zeros = ht.zeros((20, 1, 20, 2), device=ht_device) zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") ones = ht.ones(20, device=ht_device) @@ -269,7 +270,6 @@ def test_save(self): # indexing netcdf file: multiple indices ht.MPI_WORLD.Barrier() - print(ht.MPI_WORLD.rank, "beginning multi index", flush=True) small_range_split = ht.arange(10, split=0, device=ht_device) small_range = ht.arange(10, device=ht_device) indices = [[0, 9, 5, 2, 1, 3, 7, 4, 8, 6]] @@ -284,8 +284,6 @@ def test_save(self): self.assertTrue((small_range._DNDarray__array == comparison).all()) # slicing netcdf file - ht.MPI_WORLD.Barrier() - print(ht.MPI_WORLD.rank, "beginning slicing index", flush=True) sslice = slice(7, 2, -1) range_five_split = ht.arange(5, split=0, device=ht_device) range_five = ht.arange(5, device=ht_device) @@ -299,6 +297,34 @@ def test_save(self): ) self.assertTrue((range_five._DNDarray__array == comparison).all()) + # indexing netcdf file: broadcasting array + zeros = ht.zeros((2, 1, 1, 4), device=ht_device) + zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") + ones = ht.ones((4), split=0, device=ht_device) + ones_nosplit = ht.ones((4), split=None, device=ht_device) + indices = (0, slice(None), slice(None)) + ones.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=indices) + if split_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = torch.tensor( + handle[self.NETCDF_VARIABLE][indices], dtype=torch.int32, device=device + ) + self.assertTrue((ones_nosplit._DNDarray__array == comparison).all()) + + # indexing netcdf file: broadcasting var + ht.MPI_WORLD.Barrier() + zeros = ht.zeros((2, 2), device=ht_device) + zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") + ones = ht.ones((1, 2, 1), split=0, device=ht_device) + indices = (0,) + ones.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=indices) + if split_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = torch.tensor( + handle[self.NETCDF_VARIABLE][indices], dtype=torch.int32, device=device + ) + self.assertTrue((ones._DNDarray__array == comparison).all()) + def test_save_exception(self): data = ht.arange(1, device=ht_device) @@ -519,3 +545,7 @@ def test_save_netcdf_exception(self): ht.save_netcdf(data, 1, self.NETCDF_VARIABLE) with self.assertRaises(TypeError): ht.save_netcdf(data, self.NETCDF_PATH, 1) + with self.assertRaises(TypeError): + ht.save_netcdf(data, self.NETCDF_PATH, self.NETCDF_VARIABLE, dimension_names=1) + with self.assertRaises(ValueError): + ht.save_netcdf(data, self.NETCDF_PATH, self.NETCDF_VARIABLE, dimension_names=["a", "b"]) From ee0c40ed1212a92c40db7974cb93c0d524da50a7 Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Fri, 22 May 2020 16:43:25 +0200 Subject: [PATCH 14/25] corrections as requested --- heat/core/io.py | 24 +++++++++++------------- heat/core/tests/test_io.py | 15 ++++++++++++++- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index fa4741262c..05feb1ea52 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -340,7 +340,8 @@ def save_netcdf( already exist) is unlimited. Already existing limited dimensions cannot be changed to unlimited and vice versa. file_slices : tuple of integer, slice, ellipsis or 1-d bool or - integer sequences used to slice the netCDF Variable, as given in + integer sequences + Keys used to slice the netCDF Variable, as given in the nc.utils._StartCountStride method. kwargs : dict additional arguments passed to the created dataset. @@ -395,7 +396,7 @@ def save_netcdf( is_split = data.split is not None _, _, slices = data.comm.chunk(data.gshape, data.split if is_split else 0) - def getExpandedSplit(shape, expandedShape, split): + def __get_expanded_split(shape, expandedShape, split): """Returns the hypothetical split-axis of a dndarray of shape=shape and split=split if it was expanded to expandedShape by adding empty dimensions. @@ -448,7 +449,7 @@ def getExpandedSplit(shape, expandedShape, split): if v == 1 ) - def mergeSlices(var, var_slices, data, data_slices=None): + def __merge_slices(var, var_slices, data, data_slices=None): """ Using var[var_slices][data_slices] = data combines a __getitem__ with a __setitem__ call, therefore it does not allow parallelization of the write-operation and does not work is var_slices = slice(None) @@ -469,12 +470,8 @@ def mergeSlices(var, var_slices, data, data_slices=None): Returns ------- - - Key for the set-operation. - - Raises - ------- - + tuple of (slice or integer sequence) + Keys for the set-operation. """ slices = data_slices if slices is None: @@ -488,7 +485,7 @@ def mergeSlices(var, var_slices, data, data_slices=None): put=True, ) out_shape = nc._netCDF4._out_array_shape(count) - out_split = getExpandedSplit(data.shape, out_shape, data.split) + out_split = __get_expanded_split(data.shape, out_shape, data.split) start, count, stride = start.T, count.T, stride.T # transpose for iteration stop = start + stride * count @@ -519,6 +516,7 @@ def mergeSlices(var, var_slices, data, data_slices=None): a = None if a < 0 else a b = None if b < 0 else b new_slices[out_split] = slice(a, b, c) + # new_slices[out_split] = sliced elif isinstance(new_slices[out_split], np.ndarray): new_slices[out_split] = new_slices[out_split][slices[data.split]] else: @@ -537,7 +535,7 @@ def mergeSlices(var, var_slices, data, data_slices=None): var = handle.createVariable( variable, data.dtype.char(), dimension_names, **kwargs ) - mergedSlices = mergeSlices(var, file_slices, data) + mergedSlices = __merge_slices(var, file_slices, data) try: var[mergedSlices] = ( data._DNDarray__array.cpu() @@ -566,7 +564,7 @@ def mergeSlices(var, var_slices, data, data_slices=None): ) var.set_collective(False) # not possible with non-parallel netcdf if is_split: - mergedSlices = mergeSlices(var, file_slices, data) + mergedSlices = __merge_slices(var, file_slices, data) var[mergedSlices] = data._DNDarray__array.cpu() else: var[file_slices] = data._DNDarray__array.cpu() @@ -583,7 +581,7 @@ def mergeSlices(var, var_slices, data, data_slices=None): with nc.Dataset(path, "r+") as handle: var = handle.variables[variable] var.set_collective(False) # not possible with non-parallel netcdf - mergedSlices = mergeSlices(var, file_slices, data) + mergedSlices = __merge_slices(var, file_slices, data) var[mergedSlices] = data._DNDarray__array.cpu() # ping the next node in the communicator, wrap around to 0 to complete barrier behavior diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index e52ffd220d..cedfb20db5 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -316,6 +316,7 @@ def test_save(self): zeros = ht.zeros((2, 2), device=ht_device) zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") ones = ht.ones((1, 2, 1), split=0, device=ht_device) + ones_nosplit = ht.ones((1, 2, 1), device=ht_device) indices = (0,) ones.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=indices) if split_range.comm.rank == 0: @@ -323,7 +324,19 @@ def test_save(self): comparison = torch.tensor( handle[self.NETCDF_VARIABLE][indices], dtype=torch.int32, device=device ) - self.assertTrue((ones._DNDarray__array == comparison).all()) + self.assertTrue((ones_nosplit._DNDarray__array == comparison).all()) + + # different split and dtype + ht.MPI_WORLD.Barrier() + zeros = ht.zeros((2, 2), split=1, dtype=ht.int32, device=ht_device) + zeros_nosplit = ht.zeros((2, 2), dtype=ht.int32, device=ht_device) + zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") + if split_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = torch.tensor( + handle[self.NETCDF_VARIABLE][:], dtype=torch.int32, device=device + ) + self.assertTrue((zeros_nosplit._DNDarray__array == comparison).all()) def test_save_exception(self): data = ht.arange(1, device=ht_device) From dbf788a23ff3f50e16358e209bd9c936a7f6cb00 Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Mon, 8 Jun 2020 10:58:41 +0200 Subject: [PATCH 15/25] as requested --- heat/core/io.py | 67 +++++++++++++++++++++++-------------------------- 1 file changed, 32 insertions(+), 35 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index 05feb1ea52..19a6d56ffa 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -364,27 +364,24 @@ def save_netcdf( raise TypeError("path must be str, not {}".format(type(path))) if not isinstance(variable, str): raise TypeError("variable must be str, not {}".format(type(path))) - if dimension_names is not None: - if isinstance(dimension_names, str): - dimension_names = [dimension_names] - if isinstance(dimension_names, tuple): - dimension_names = list(dimension_names) - if not isinstance(dimension_names, list): - raise TypeError( - "dimension_names must be list or tuple or string, not{}".format( - type(dimension_names) - ) - ) - if not len(dimension_names) == len(data.shape): - raise ValueError( - "{0} names given for {1} dimensions".format( - len(dimension_names), len(data.shape) - ) - ) - else: + if dimension_names is None: dimension_names = [ __NETCDF_DIM_TEMPLATE.format(variable, dim) for dim, _ in enumerate(data.shape) ] + elif isinstance(dimension_names, str): + dimension_names = [dimension_names] + elif isinstance(dimension_names, tuple): + dimension_names = list(dimension_names) + elif not isinstance(dimension_names, list): + raise TypeError( + "dimension_names must be list or tuple or string, not{}".format( + type(dimension_names) + ) + ) + elif not len(dimension_names) == len(data.shape): + raise ValueError( + "{0} names given for {1} dimensions".format(len(dimension_names), len(data.shape)) + ) # we only support a subset of possible modes if mode not in __VALID_WRITE_MODES: @@ -397,7 +394,8 @@ def save_netcdf( _, _, slices = data.comm.chunk(data.gshape, data.split if is_split else 0) def __get_expanded_split(shape, expandedShape, split): - """Returns the hypothetical split-axis of a dndarray of shape=shape and + """ + Returns the hypothetical split-axis of a dndarray of shape=shape and split=split if it was expanded to expandedShape by adding empty dimensions. Parameters @@ -417,15 +415,13 @@ def __get_expanded_split(shape, expandedShape, split): Raises ------- ValueError - If the shapes differ in non-empty dimensions. - """ # Get indices of non-empty dimensions and squeezed shapes - ind_nonempty, sq_shape = np.array([[i, v] for i, v in enumerate(shape) if v != 1]).T - ex_ind_nonempty, sq_ex = np.array( - [[i, v] for i, v in enumerate(expandedShape) if v != 1] - ).T - if not all(sq_shape == sq_ex): + enumerated = [[i, v] for i, v in enumerate(shape) if v != 1] + ind_nonempty, sq_shape = list(zip(*enumerated)) # transpose + enumerated = [[i, v] for i, v in enumerate(expandedShape) if v != 1] + ex_ind_nonempty, sq_ex = list(zip(*enumerated)) # transpose + if not sq_shape == sq_ex: raise ValueError( "Shapes %s and %s differ in non-empty dimensions" % (shape, expandedShape) ) @@ -434,7 +430,7 @@ def __get_expanded_split(shape, expandedShape, split): if split is None: # not split at all return None if split in ind_nonempty: # split along non-empty dimension - split_sq = ind_nonempty.tolist().index(split) # split-axis in squeezed shape + split_sq = ind_nonempty.index(split) # split-axis in squeezed shape return ex_ind_nonempty[split_sq] # split along empty dimension: split doesnt matter, only one process contains data # return the last empty dimension (in expanded shape) before (the first nonempty dimension after split) @@ -450,7 +446,8 @@ def __get_expanded_split(shape, expandedShape, split): ) def __merge_slices(var, var_slices, data, data_slices=None): - """ Using var[var_slices][data_slices] = data + """ + Using var[var_slices][data_slices] = data combines a __getitem__ with a __setitem__ call, therefore it does not allow parallelization of the write-operation and does not work is var_slices = slice(None) (in that casem __getitem__ returns a copy and not a view). @@ -535,16 +532,16 @@ def __merge_slices(var, var_slices, data, data_slices=None): var = handle.createVariable( variable, data.dtype.char(), dimension_names, **kwargs ) - mergedSlices = __merge_slices(var, file_slices, data) + merged_slices = __merge_slices(var, file_slices, data) try: - var[mergedSlices] = ( + var[merged_slices] = ( data._DNDarray__array.cpu() if is_split else data._DNDarray__array[slices].cpu() ) except RuntimeError: var.set_collective(True) - var[mergedSlices] = ( + var[merged_slices] = ( data._DNDarray__array.cpu() if is_split else data._DNDarray__array[slices].cpu() @@ -564,8 +561,8 @@ def __merge_slices(var, var_slices, data, data_slices=None): ) var.set_collective(False) # not possible with non-parallel netcdf if is_split: - mergedSlices = __merge_slices(var, file_slices, data) - var[mergedSlices] = data._DNDarray__array.cpu() + merged_slices = __merge_slices(var, file_slices, data) + var[merged_slices] = data._DNDarray__array.cpu() else: var[file_slices] = data._DNDarray__array.cpu() @@ -581,8 +578,8 @@ def __merge_slices(var, var_slices, data, data_slices=None): with nc.Dataset(path, "r+") as handle: var = handle.variables[variable] var.set_collective(False) # not possible with non-parallel netcdf - mergedSlices = __merge_slices(var, file_slices, data) - var[mergedSlices] = data._DNDarray__array.cpu() + merged_slices = __merge_slices(var, file_slices, data) + var[merged_slices] = data._DNDarray__array.cpu() # ping the next node in the communicator, wrap around to 0 to complete barrier behavior next_rank = (data.comm.rank + 1) % data.comm.size From c7f15ea04d59d430871e00b761c47b86e21a8d6e Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Tue, 20 Oct 2020 11:49:19 +0200 Subject: [PATCH 16/25] Update tests and formatting --- heat/core/tests/test_io.py | 77 ++++++++++++++++++++++---------------- 1 file changed, 44 insertions(+), 33 deletions(-) diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index b05705ab09..1d3e6e877f 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -216,7 +216,7 @@ def test_save(self): self.assertTrue((local_range.larray == comparison).all()) # naming dimensions: string - local_range = ht.arange(100, device=ht_device) + local_range = ht.arange(100, device=self.device) local_range.save( self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, dimension_names=self.NETCDF_DIMENSION ) @@ -226,7 +226,7 @@ def test_save(self): self.assertTrue(self.NETCDF_DIMENSION in comparison) # naming dimensions: tuple - local_range = ht.arange(100, device=ht_device) + local_range = ht.arange(100, device=self.device) local_range.save( self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, dimension_names=(self.NETCDF_DIMENSION,) ) @@ -248,32 +248,34 @@ def test_save(self): if split_range.comm.rank == 0: with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: comparison = torch.tensor( - handle[self.NETCDF_VARIABLE][:], dtype=torch.int32, device=device + handle[self.NETCDF_VARIABLE][:], + dtype=torch.int32, + device=self.device.torch_device, ) self.assertTrue( - ( - ht.concatenate((local_range, local_range))._DNDarray__array == comparison - ).all() + (ht.concatenate((local_range, local_range)).larray == comparison).all() ) # indexing netcdf file: single index ht.MPI_WORLD.Barrier() - zeros = ht.zeros((20, 1, 20, 2), device=ht_device) + zeros = ht.zeros((20, 1, 20, 2), device=self.device) zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") - ones = ht.ones(20, device=ht_device) + ones = ht.ones(20, device=self.device) indices = (-1, 0, slice(None), 1) ones.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=indices) if split_range.comm.rank == 0: with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: comparison = torch.tensor( - handle[self.NETCDF_VARIABLE][indices], dtype=torch.int32, device=device + handle[self.NETCDF_VARIABLE][indices], + dtype=torch.int32, + device=self.device.torch_device, ) - self.assertTrue((ones._DNDarray__array == comparison).all()) + self.assertTrue((ones.larray == comparison).all()) # indexing netcdf file: multiple indices ht.MPI_WORLD.Barrier() - small_range_split = ht.arange(10, split=0, device=ht_device) - small_range = ht.arange(10, device=ht_device) + small_range_split = ht.arange(10, split=0, device=self.device) + small_range = ht.arange(10, device=self.device) indices = [[0, 9, 5, 2, 1, 3, 7, 4, 8, 6]] small_range_split.save( self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w", file_slices=indices @@ -281,64 +283,74 @@ def test_save(self): if split_range.comm.rank == 0: with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: comparison = torch.tensor( - handle[self.NETCDF_VARIABLE][indices], dtype=torch.int32, device=device + handle[self.NETCDF_VARIABLE][indices], + dtype=torch.int32, + device=self.device.torch_device, ) - self.assertTrue((small_range._DNDarray__array == comparison).all()) + self.assertTrue((small_range.larray == comparison).all()) # slicing netcdf file sslice = slice(7, 2, -1) - range_five_split = ht.arange(5, split=0, device=ht_device) - range_five = ht.arange(5, device=ht_device) + range_five_split = ht.arange(5, split=0, device=self.device) + range_five = ht.arange(5, device=self.device) range_five_split.save( self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=sslice ) if split_range.comm.rank == 0: with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: comparison = torch.tensor( - handle[self.NETCDF_VARIABLE][sslice], dtype=torch.int32, device=device + handle[self.NETCDF_VARIABLE][sslice], + dtype=torch.int32, + device=self.device.torch_device, ) - self.assertTrue((range_five._DNDarray__array == comparison).all()) + self.assertTrue((range_five.larray == comparison).all()) # indexing netcdf file: broadcasting array - zeros = ht.zeros((2, 1, 1, 4), device=ht_device) + zeros = ht.zeros((2, 1, 1, 4), device=self.device) zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") - ones = ht.ones((4), split=0, device=ht_device) - ones_nosplit = ht.ones((4), split=None, device=ht_device) + ones = ht.ones((4), split=0, device=self.device) + ones_nosplit = ht.ones((4), split=None, device=self.device) indices = (0, slice(None), slice(None)) ones.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=indices) if split_range.comm.rank == 0: with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: comparison = torch.tensor( - handle[self.NETCDF_VARIABLE][indices], dtype=torch.int32, device=device + handle[self.NETCDF_VARIABLE][indices], + dtype=torch.int32, + device=self.device.torch_device, ) - self.assertTrue((ones_nosplit._DNDarray__array == comparison).all()) + self.assertTrue((ones_nosplit.larray == comparison).all()) # indexing netcdf file: broadcasting var ht.MPI_WORLD.Barrier() - zeros = ht.zeros((2, 2), device=ht_device) + zeros = ht.zeros((2, 2), device=self.device) zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") - ones = ht.ones((1, 2, 1), split=0, device=ht_device) - ones_nosplit = ht.ones((1, 2, 1), device=ht_device) + ones = ht.ones((1, 2, 1), split=0, device=self.device) + ones_nosplit = ht.ones((1, 2, 1), device=self.device) indices = (0,) ones.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+", file_slices=indices) if split_range.comm.rank == 0: with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: comparison = torch.tensor( - handle[self.NETCDF_VARIABLE][indices], dtype=torch.int32, device=device + handle[self.NETCDF_VARIABLE][indices], + dtype=torch.int32, + device=self.device.torch_device, ) - self.assertTrue((ones_nosplit._DNDarray__array == comparison).all()) + self.assertTrue((ones_nosplit.larray == comparison).all()) # different split and dtype ht.MPI_WORLD.Barrier() - zeros = ht.zeros((2, 2), split=1, dtype=ht.int32, device=ht_device) - zeros_nosplit = ht.zeros((2, 2), dtype=ht.int32, device=ht_device) + zeros = ht.zeros((2, 2), split=1, dtype=ht.int32, device=self.device) + zeros_nosplit = ht.zeros((2, 2), dtype=ht.int32, device=self.device) zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") if split_range.comm.rank == 0: with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: comparison = torch.tensor( - handle[self.NETCDF_VARIABLE][:], dtype=torch.int32, device=device + handle[self.NETCDF_VARIABLE][:], + dtype=torch.int32, + device=self.device.torch_device, ) - self.assertTrue((zeros_nosplit._DNDarray__array == comparison).all()) + self.assertTrue((zeros_nosplit.larray == comparison).all()) def test_save_exception(self): data = ht.arange(1) @@ -573,4 +585,3 @@ def test_save_netcdf_exception(self): # os.rmdir(os.getcwd() + '/tmp/') # except OSError: # pass - \ No newline at end of file From 768013703242519431e017ead46748b2beb416bd Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Mon, 26 Oct 2020 14:51:09 +0100 Subject: [PATCH 17/25] Update documentation and replace _dndarray__array with larray --- heat/core/io.py | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index 353fd7d5d7..2503a27f22 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -445,12 +445,19 @@ def __get_expanded_split(shape, expandedShape, split): def __merge_slices(var, var_slices, data, data_slices=None): """ - Using var[var_slices][data_slices] = data - combines a __getitem__ with a __setitem__ call, therefore it does not allow - parallelization of the write-operation and does not work is var_slices = slice(None) - (in that casem __getitem__ returns a copy and not a view). - This method merges both keys: - var[mergeSlices(var, var_slices, data)] = data + This method allows replacing: + ``var[var_slices][data_slices] = data`` + (a `netcdf4.Variable.__getitem__` and a `numpy.ndarray.__setitem__` call) + with: + ``var[ __merge_slices(var, var_slices, data, data_slices) ] = data`` + (a single `netcdf4.Variable.__setitem__` call) + + This is necessary because performing the former would, in the `__getitem__`, load the + global dataset onto every process in local `numpy-ndarrays`. Then, the `__setitem__` + would write the local `chunk` into the `numpy-ndarray`. + + The latter allows the netcdf4 library to parallelize the write-operation by directly + using the `netcdf4.Variable.__setitem__` method. Parameters ---------- @@ -533,16 +540,12 @@ def __merge_slices(var, var_slices, data, data_slices=None): merged_slices = __merge_slices(var, file_slices, data) try: var[merged_slices] = ( - data._DNDarray__array.cpu() - if is_split - else data._DNDarray__array[slices].cpu() + data.larray.cpu() if is_split else data.larray[slices].cpu() ) except RuntimeError: var.set_collective(True) var[merged_slices] = ( - data._DNDarray__array.cpu() - if is_split - else data._DNDarray__array[slices].cpu() + data.larray.cpu() if is_split else data.larray[slices].cpu() ) # otherwise a single rank only write is performed in case of local data (i.e. no split) @@ -560,9 +563,9 @@ def __merge_slices(var, var_slices, data, data_slices=None): var.set_collective(False) # not possible with non-parallel netcdf if is_split: merged_slices = __merge_slices(var, file_slices, data) - var[merged_slices] = data._DNDarray__array.cpu() + var[merged_slices] = data.larray.cpu() else: - var[file_slices] = data._DNDarray__array.cpu() + var[file_slices] = data.larray.cpu() # ping next rank if it exists if is_split and data.comm.size > 1: @@ -577,7 +580,7 @@ def __merge_slices(var, var_slices, data, data_slices=None): var = handle.variables[variable] var.set_collective(False) # not possible with non-parallel netcdf merged_slices = __merge_slices(var, file_slices, data) - var[merged_slices] = data._DNDarray__array.cpu() + var[merged_slices] = data.larray.cpu() # ping the next node in the communicator, wrap around to 0 to complete barrier behavior next_rank = (data.comm.rank + 1) % data.comm.size From 70507d27b6b2a9e4e477dbf0db335b78517a5bc9 Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Tue, 27 Oct 2020 13:46:39 +0100 Subject: [PATCH 18/25] update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ddf3d264ad..7e9ac37238 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ ## New features - [#680](https://github.com/helmholtz-analytics/heat/pull/680) New property: larray - [#683](https://github.com/helmholtz-analytics/heat/pull/683) New properties: nbytes, gnbytes, lnbytes +- [#559](https://github.com/helmholtz-analytics/heat/pull/559) Enhancement: `save_netcdf` allows naming dimensions, creating unlimited dimensions, using existing dimensions and variables, slicing + ### Manipulations ### Statistical Functions - [#679](https://github.com/helmholtz-analytics/heat/pull/679) New feature: ``histc()`` and ``histogram()`` From bcb8096cde69ca9972dd5128e214a8aa5a541fdd Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Tue, 27 Oct 2020 14:16:49 +0100 Subject: [PATCH 19/25] append tests --- heat/core/tests/test_io.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index 1d3e6e877f..2ea504e054 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -373,6 +373,16 @@ def test_save_exception(self): ht.save(data, 1, self.NETCDF_VARIABLE) with self.assertRaises(TypeError): ht.save(data, self.NETCDF_OUT_PATH, 1) + with self.assertRaises(ValueError): + ht.save(data, self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r") + with self.assertRaises(ValueError): + ht.save(data, self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE) + ht.save( + ht.arange(2), + self.NETCDF_OUT_PATH, + self.NETCDF_VARIABLE, + file_slices=slice(None), + ) else: with self.assertRaises(ValueError): ht.save(data, self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE) From b84a7bf47f4aa089876e9aced663f7d668a38454 Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Tue, 27 Oct 2020 14:29:43 +0100 Subject: [PATCH 20/25] bugfix test --- heat/core/tests/test_io.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index 2ea504e054..8515d13dfc 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -375,13 +375,14 @@ def test_save_exception(self): ht.save(data, self.NETCDF_OUT_PATH, 1) with self.assertRaises(ValueError): ht.save(data, self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r") - with self.assertRaises(ValueError): + with self.assertRaises((ValueError, IndexError)): ht.save(data, self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE) ht.save( ht.arange(2), self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, file_slices=slice(None), + mode="a", ) else: with self.assertRaises(ValueError): From 7b470cd1f76f26f2a35cba2161f99d1d1279c433 Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Tue, 27 Oct 2020 14:33:24 +0100 Subject: [PATCH 21/25] bugix test --- heat/core/tests/test_io.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index 8515d13dfc..b9d9e43fa5 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -378,7 +378,16 @@ def test_save_exception(self): with self.assertRaises((ValueError, IndexError)): ht.save(data, self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE) ht.save( - ht.arange(2), + ht.arange(2, split=0), + self.NETCDF_OUT_PATH, + self.NETCDF_VARIABLE, + file_slices=slice(None), + mode="a", + ) + with self.assertRaises((ValueError, IndexError)): + ht.save(data, self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE) + ht.save( + ht.arange(2, split=None), self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, file_slices=slice(None), From 021288c71f7566b8e050c51be6227fa296c1ca1a Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Tue, 27 Oct 2020 15:40:00 +0100 Subject: [PATCH 22/25] bugfix expanded_split --- heat/core/io.py | 23 +++++++++++++++-------- heat/core/tests/test_io.py | 15 +++++++++++++++ 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index 4a01d4f19e..209aedb0f7 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -414,19 +414,26 @@ def __get_expanded_split(shape, expandedShape, split): ------- ValueError """ + if len(shape) == len(expandedShape): # actually not expanded at all + return split + if split is None: # not split at all + return None + if all(shape) == 1 and all(expandedShape) == 1: # size 1 array + return split + elif all(shape) == 1 or all(expandedShape) == 1: # one shape is size 1, the other isn't + raise ValueError( + "Shapes %s and %s differ in non-empty dimensions" % (shape, expandedShape) + ) + # Get indices of non-empty dimensions and squeezed shapes - enumerated = [[i, v] for i, v in enumerate(shape) if v != 1] - ind_nonempty, sq_shape = list(zip(*enumerated)) # transpose - enumerated = [[i, v] for i, v in enumerate(expandedShape) if v != 1] - ex_ind_nonempty, sq_ex = list(zip(*enumerated)) # transpose + enumerated = np.array([[i, v] for i, v in enumerate(shape) if v != 1]) + ind_nonempty, sq_shape = (enumerated.T).tolist() + enumerated = np.array([[i, v] for i, v in enumerate(expandedShape) if v != 1]) + ex_ind_nonempty, sq_ex = (enumerated.T).tolist() if not sq_shape == sq_ex: raise ValueError( "Shapes %s and %s differ in non-empty dimensions" % (shape, expandedShape) ) - if len(shape) == len(expandedShape): # actually not expanded at all - return split - if split is None: # not split at all - return None if split in ind_nonempty: # split along non-empty dimension split_sq = ind_nonempty.index(split) # split-axis in squeezed shape return ex_ind_nonempty[split_sq] diff --git a/heat/core/tests/test_io.py b/heat/core/tests/test_io.py index b9d9e43fa5..f4fb0ba3e6 100644 --- a/heat/core/tests/test_io.py +++ b/heat/core/tests/test_io.py @@ -338,6 +338,21 @@ def test_save(self): ) self.assertTrue((ones_nosplit.larray == comparison).all()) + # indexing netcdf file: broadcasting ones + ht.MPI_WORLD.Barrier() + zeros = ht.zeros((1, 1, 1, 1), device=self.device) + zeros.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="w") + ones = ht.ones((1, 1), device=self.device) + ones.save(self.NETCDF_OUT_PATH, self.NETCDF_VARIABLE, mode="r+") + if split_range.comm.rank == 0: + with ht.io.nc.Dataset(self.NETCDF_OUT_PATH, "r") as handle: + comparison = torch.tensor( + handle[self.NETCDF_VARIABLE][indices], + dtype=torch.int32, + device=self.device.torch_device, + ) + self.assertTrue((ones.larray == comparison).all()) + # different split and dtype ht.MPI_WORLD.Barrier() zeros = ht.zeros((2, 2), split=1, dtype=ht.int32, device=self.device) From befe6f232e805b306e77e6b620082887007335c0 Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Tue, 27 Oct 2020 16:10:18 +0100 Subject: [PATCH 23/25] bugfix expanded_split --- heat/core/io.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index 209aedb0f7..cf6d4f5bc0 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -414,16 +414,20 @@ def __get_expanded_split(shape, expandedShape, split): ------- ValueError """ - if len(shape) == len(expandedShape): # actually not expanded at all - return split - if split is None: # not split at all - return None + if np.prod(shape) != np.prod(expandedShape): + raise ValueError( + "Shapes %s and %s differ in non-empty dimensions" % (shape, expandedShape) + ) if all(shape) == 1 and all(expandedShape) == 1: # size 1 array return split elif all(shape) == 1 or all(expandedShape) == 1: # one shape is size 1, the other isn't raise ValueError( "Shapes %s and %s differ in non-empty dimensions" % (shape, expandedShape) ) + if len(shape) == len(expandedShape): # actually not expanded at all + return split + if split is None: # not split at all + return None # Get indices of non-empty dimensions and squeezed shapes enumerated = np.array([[i, v] for i, v in enumerate(shape) if v != 1]) From eb38941eea8b451c333521671cbe02448e0db892 Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Tue, 27 Oct 2020 16:15:32 +0100 Subject: [PATCH 24/25] remove superflous check --- heat/core/io.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index cf6d4f5bc0..9c66afd8bc 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -420,10 +420,6 @@ def __get_expanded_split(shape, expandedShape, split): ) if all(shape) == 1 and all(expandedShape) == 1: # size 1 array return split - elif all(shape) == 1 or all(expandedShape) == 1: # one shape is size 1, the other isn't - raise ValueError( - "Shapes %s and %s differ in non-empty dimensions" % (shape, expandedShape) - ) if len(shape) == len(expandedShape): # actually not expanded at all return split if split is None: # not split at all From dd64d62d1b6b26fba183a66768af082ce178b480 Mon Sep 17 00:00:00 2001 From: Ben Bourgart Date: Thu, 29 Oct 2020 18:37:36 +0100 Subject: [PATCH 25/25] broadcast exceptions to all processes; prevents stalling --- heat/core/io.py | 147 ++++++++++++++++++++++++++++-------------------- 1 file changed, 85 insertions(+), 62 deletions(-) diff --git a/heat/core/io.py b/heat/core/io.py index 9c66afd8bc..406201df60 100644 --- a/heat/core/io.py +++ b/heat/core/io.py @@ -387,6 +387,8 @@ def save_netcdf( "mode was {}, not in possible modes {}".format(mode, __VALID_WRITE_MODES) ) + failed = 0 + excep = None # chunk the data, if no split is set maximize parallel I/O and chunk first axis is_split = data.split is not None _, _, slices = data.comm.chunk(data.gshape, data.split if is_split else 0) @@ -416,20 +418,19 @@ def __get_expanded_split(shape, expandedShape, split): """ if np.prod(shape) != np.prod(expandedShape): raise ValueError( - "Shapes %s and %s differ in non-empty dimensions" % (shape, expandedShape) + "Shapes %s and %s do not have the same size" % (shape, expandedShape) ) - if all(shape) == 1 and all(expandedShape) == 1: # size 1 array + if np.prod(shape) == 1: # size 1 array return split if len(shape) == len(expandedShape): # actually not expanded at all return split if split is None: # not split at all return None - # Get indices of non-empty dimensions and squeezed shapes - enumerated = np.array([[i, v] for i, v in enumerate(shape) if v != 1]) - ind_nonempty, sq_shape = (enumerated.T).tolist() - enumerated = np.array([[i, v] for i, v in enumerate(expandedShape) if v != 1]) - ex_ind_nonempty, sq_ex = (enumerated.T).tolist() + enumerated = [[i, v] for i, v in enumerate(shape) if v != 1] + ind_nonempty, sq_shape = list(zip(*enumerated)) # transpose + enumerated = [[i, v] for i, v in enumerate(expandedShape) if v != 1] + ex_ind_nonempty, sq_ex = list(zip(*enumerated)) # transpose if not sq_shape == sq_ex: raise ValueError( "Shapes %s and %s differ in non-empty dimensions" % (shape, expandedShape) @@ -534,64 +535,86 @@ def __merge_slices(var, var_slices, data, data_slices=None): # attempt to perform parallel I/O if possible if __nc_has_par: - with nc.Dataset(path, mode, parallel=True, comm=data.comm.handle) as handle: - if variable in handle.variables: - var = handle.variables[variable] - else: - for name, elements in zip(dimension_names, data.shape): - if name not in handle.dimensions: - handle.createDimension(name, elements if not is_unlimited else None) - var = handle.createVariable( - variable, data.dtype.char(), dimension_names, **kwargs - ) - merged_slices = __merge_slices(var, file_slices, data) - try: - var[merged_slices] = ( - data.larray.cpu() if is_split else data.larray[slices].cpu() - ) - except RuntimeError: - var.set_collective(True) - var[merged_slices] = ( - data.larray.cpu() if is_split else data.larray[slices].cpu() - ) - + try: + with nc.Dataset(path, mode, parallel=True, comm=data.comm.handle) as handle: + if variable in handle.variables: + var = handle.variables[variable] + else: + for name, elements in zip(dimension_names, data.shape): + if name not in handle.dimensions: + handle.createDimension(name, elements if not is_unlimited else None) + var = handle.createVariable( + variable, data.dtype.char(), dimension_names, **kwargs + ) + merged_slices = __merge_slices(var, file_slices, data) + try: + var[merged_slices] = ( + data.larray.cpu() if is_split else data.larray[slices].cpu() + ) + except RuntimeError: + var.set_collective(True) + var[merged_slices] = ( + data.larray.cpu() if is_split else data.larray[slices].cpu() + ) + except Exception as e: + failed = data.comm.rank + 1 + excep = e # otherwise a single rank only write is performed in case of local data (i.e. no split) elif data.comm.rank == 0: - with nc.Dataset(path, mode) as handle: - if variable in handle.variables: - var = handle.variables[variable] - else: - for name, elements in zip(dimension_names, data.shape): - if name not in handle.dimensions: - handle.createDimension(name, elements if not is_unlimited else None) - var = handle.createVariable( - variable, data.dtype.char(), dimension_names, **kwargs - ) - var.set_collective(False) # not possible with non-parallel netcdf - if is_split: - merged_slices = __merge_slices(var, file_slices, data) - var[merged_slices] = data.larray.cpu() - else: - var[file_slices] = data.larray.cpu() - - # ping next rank if it exists - if is_split and data.comm.size > 1: - data.comm.Isend([None, 0, MPI.INT], dest=1) - data.comm.Recv([None, 0, MPI.INT], source=data.comm.size - 1) - - # no MPI, but data is split, we have to serialize the writes - elif is_split: + try: + with nc.Dataset(path, mode) as handle: + if variable in handle.variables: + var = handle.variables[variable] + else: + for name, elements in zip(dimension_names, data.shape): + if name not in handle.dimensions: + handle.createDimension(name, elements if not is_unlimited else None) + var = handle.createVariable( + variable, data.dtype.char(), dimension_names, **kwargs + ) + var.set_collective(False) # not possible with non-parallel netcdf + if is_split: + merged_slices = __merge_slices(var, file_slices, data) + var[merged_slices] = data.larray.cpu() + else: + var[file_slices] = data.larray.cpu() + except Exception as e: + failed = 1 + excep = e + finally: + if data.comm.size > 1: + data.comm.isend(failed, dest=1) + data.comm.recv() + + # non-root + else: # wait for the previous rank to finish writing its chunk, then write own part - data.comm.Recv([None, 0, MPI.INT], source=data.comm.rank - 1) - with nc.Dataset(path, "r+") as handle: - var = handle.variables[variable] - var.set_collective(False) # not possible with non-parallel netcdf - merged_slices = __merge_slices(var, file_slices, data) - var[merged_slices] = data.larray.cpu() - - # ping the next node in the communicator, wrap around to 0 to complete barrier behavior - next_rank = (data.comm.rank + 1) % data.comm.size - data.comm.Isend([None, 0, MPI.INT], dest=next_rank) + failed = data.comm.recv() + try: + # no MPI, but data is split, we have to serialize the writes + if not failed and is_split: + with nc.Dataset(path, "r+") as handle: + var = handle.variables[variable] + var.set_collective(False) # not possible with non-parallel netcdf + merged_slices = __merge_slices(var, file_slices, data) + var[merged_slices] = data.larray.cpu() + except Exception as e: + failed = data.comm.rank + 1 + excep = e + finally: + # ping the next node in the communicator, wrap around to 0 to complete barrier behavior + next_rank = (data.comm.rank + 1) % data.comm.size + data.comm.isend(failed, dest=next_rank) + + failed = data.comm.allreduce(failed, op=MPI.MAX) + if failed - 1 == data.comm.rank: + data.comm.bcast(excep, root=failed - 1) + raise excep + elif failed: + excep = data.comm.bcast(excep, root=failed - 1) + excep.args = "raised by process rank {}".format(failed - 1), *excep.args + raise excep from None # raise the same error but without traceback + # because that is on a different process def load(path, *args, **kwargs):