Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Features/517-netcdf4-features #559

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
51bfd8b
removed copy constructor and use collective netcdf-output
Nov 28, 2019
95943f2
Merge remote-tracking branch 'upstream/master'
Apr 24, 2020
4f87064
Adding IO features and tests
Apr 24, 2020
139170a
Merge branch 'master' into features/517-netCDF4-features
ben-bou Apr 26, 2020
edff24f
run pre-commit
Apr 29, 2020
0e3281b
Merge branch 'master' into features/517-netCDF4-features
ben-bou May 4, 2020
2d99471
Fixed: collective IO only when necessary, also tests
May 7, 2020
124dd96
pre-commit ran
May 7, 2020
a531173
added support for index-lists and additional fixed dimensions
May 9, 2020
48cc6a0
run python black formatting
May 9, 2020
b3e0e30
bugfix
May 9, 2020
f3625c5
Use nc4-methods to get (start, count, stride) as well as out_shape (l…
May 13, 2020
3bd006f
pre-commit
May 13, 2020
e2f837a
Merge branch 'master' into features/517-netCDF4-features
ben-bou May 14, 2020
eb4a56e
Merge branch 'master' into features/517-netCDF4-features
coquelin77 May 15, 2020
1a0aaea
cleanup
May 15, 2020
7c5ca61
cleanup test
May 15, 2020
3bd6e61
minor bugs and coverage
May 18, 2020
ee0c40e
corrections as requested
May 22, 2020
5dd86a3
Merge branch 'master' into features/517-netCDF4-features
Markus-Goetz May 26, 2020
dbf788a
as requested
Jun 8, 2020
b468ab5
Merge branch 'master' into features/517-netCDF4-features
ben-bou Jun 8, 2020
89986a3
Merge branch 'master' into features/517-netCDF4-features
ben-bou Jun 9, 2020
3d525af
Merge branch 'master' into features/517-netCDF4-features
coquelin77 Jun 15, 2020
ee31e14
Merge branch 'master' into features/517-netCDF4-features
coquelin77 Jun 23, 2020
4e9cc8c
Merge branch 'master' into features/517-netCDF4-features
ben-bou Jun 24, 2020
97f12c9
Merge branch 'master' into features/517-netCDF4-features
coquelin77 Jul 1, 2020
463888f
Merge branch 'master' into features/517-netCDF4-features
Markus-Goetz Jul 14, 2020
26ffbb0
Merge branch 'master' into features/517-netCDF4-features
ClaudiaComito Jul 15, 2020
0e4e885
Merge branch 'master' into features/517-netCDF4-features
ben-bou Oct 20, 2020
c7f15ea
Update tests and formatting
Oct 20, 2020
7680137
Update documentation and replace _dndarray__array with larray
Oct 26, 2020
f09080a
Merge branch 'master' into features/517-netCDF4-features
ben-bou Oct 27, 2020
70507d2
update changelog
Oct 27, 2020
bcb8096
append tests
Oct 27, 2020
b84a7bf
bugfix test
Oct 27, 2020
7b470cd
bugix test
Oct 27, 2020
021288c
bugfix expanded_split
Oct 27, 2020
befe6f2
bugfix expanded_split
Oct 27, 2020
eb38941
remove superflous check
Oct 27, 2020
dd64d62
broadcast exceptions to all processes; prevents stalling
Oct 29, 2020
1d11ad5
Merge branch 'master' into features/517-netCDF4-features
ClaudiaComito Nov 13, 2020
f488b63
Merge branch 'master' into features/517-netCDF4-features
ClaudiaComito Nov 17, 2020
9eedf74
Merge branch 'master' into features/517-netCDF4-features
ClaudiaComito Nov 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
- [#680](https://github.com/helmholtz-analytics/heat/pull/680) New property: larray
- [#683](https://github.com/helmholtz-analytics/heat/pull/683) New properties: nbytes, gnbytes, lnbytes
- [#687](https://github.com/helmholtz-analytics/heat/pull/687) New DNDarray property: balanced

### I/O
- [#559](https://github.com/helmholtz-analytics/heat/pull/559) Enhancement: `save_netcdf` allows naming dimensions, creating unlimited dimensions, using existing dimensions and variables, slicing
### Manipulations
- [#677](https://github.com/helmholtz-analytics/heat/pull/677) split, vsplit, dsplit, hsplit
### Statistical Functions
Expand Down
301 changes: 260 additions & 41 deletions heat/core/io.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os.path

import torch
import numpy as np
import warnings

from heat.core import factories
Expand Down Expand Up @@ -308,7 +309,16 @@ def load_netcdf(path, variable, dtype=types.float32, split=None, device=None, co

return dndarray.DNDarray(data, gshape, dtype, split, device, comm, balanced)

def save_netcdf(data, path, variable, mode="w", **kwargs):
def save_netcdf(
data,
path,
variable,
mode="w",
dimension_names=None,
is_unlimited=False,
file_slices=slice(None),
**kwargs
):
"""
Saves data to a netCDF4 file. Attempts to utilize parallel I/O if possible.

Expand All @@ -322,6 +332,17 @@ def save_netcdf(data, path, variable, mode="w", **kwargs):
Name of the variable the data is saved to.
mode : str, one of 'w', 'a', 'r+'
File access mode
dimension_names : list or tuple or string
Specifies the netCDF Dimensions used by the variable. Ignored if
Variable already exists.
is_unlimited : bool
If True, every dimension created for this variable (i.e. doesn't
already exist) is unlimited. Already existing limited dimensions
cannot be changed to unlimited and vice versa.
file_slices : tuple of integer, slice, ellipsis or 1-d bool or
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fits the docs better when it is just the types. the explanation then goes below it

integer sequences
Keys used to slice the netCDF Variable, as given in
the nc.utils._StartCountStride method.
kwargs : dict
additional arguments passed to the created dataset.

Expand All @@ -331,7 +352,7 @@ def save_netcdf(data, path, variable, mode="w", **kwargs):
If any of the input parameters are not of correct type.
ValueError
If the access mode is not understood.

If the number of dimension names does not match the number of dimensions.
Examples
--------
>>> a_range = ht.arange(100, split=0)
Expand All @@ -343,61 +364,259 @@ def save_netcdf(data, path, variable, mode="w", **kwargs):
raise TypeError("path must be str, not {}".format(type(path)))
if not isinstance(variable, str):
raise TypeError("variable must be str, not {}".format(type(path)))
if dimension_names is None:
dimension_names = [
__NETCDF_DIM_TEMPLATE.format(variable, dim) for dim, _ in enumerate(data.shape)
]
elif isinstance(dimension_names, str):
dimension_names = [dimension_names]
elif isinstance(dimension_names, tuple):
dimension_names = list(dimension_names)
elif not isinstance(dimension_names, list):
raise TypeError(
"dimension_names must be list or tuple or string, not{}".format(
type(dimension_names)
)
)
elif not len(dimension_names) == len(data.shape):
raise ValueError(
"{0} names given for {1} dimensions".format(len(dimension_names), len(data.shape))
)

# we only support a subset of possible modes
if mode not in __VALID_WRITE_MODES:
raise ValueError(
"mode was {}, not in possible modes {}".format(mode, __VALID_WRITE_MODES)
)

failed = 0
excep = None
# chunk the data, if no split is set maximize parallel I/O and chunk first axis
is_split = data.split is not None
_, _, slices = data.comm.chunk(data.gshape, data.split if is_split else 0)

# attempt to perform parallel I/O if possible
if __nc_has_par:
with nc.Dataset(path, mode, parallel=True, comm=data.comm.handle) as handle:
dimension_names = []
for dimension, elements in enumerate(data.shape):
name = __NETCDF_DIM_TEMPLATE.format(variable, dimension)
handle.createDimension(name, elements)
dimension_names.append(name)
def __get_expanded_split(shape, expandedShape, split):
"""
Returns the hypothetical split-axis of a dndarray of shape=shape and
split=split if it was expanded to expandedShape by adding empty dimensions.

Parameters
----------
shape : tuple(int)
Shape of a dndarray.
expandedShape : tuple(int)
Shape of hypothetical expanded dndarray.
split : int or None
split-axis of dndarray.

Returns
-------
int
split-Axis of expanded dndarray.

Raises
-------
ValueError
"""
if np.prod(shape) != np.prod(expandedShape):
raise ValueError(
"Shapes %s and %s do not have the same size" % (shape, expandedShape)
)
if np.prod(shape) == 1: # size 1 array
return split
if len(shape) == len(expandedShape): # actually not expanded at all
return split
if split is None: # not split at all
return None
# Get indices of non-empty dimensions and squeezed shapes
enumerated = [[i, v] for i, v in enumerate(shape) if v != 1]
ind_nonempty, sq_shape = list(zip(*enumerated)) # transpose
enumerated = [[i, v] for i, v in enumerate(expandedShape) if v != 1]
ex_ind_nonempty, sq_ex = list(zip(*enumerated)) # transpose
if not sq_shape == sq_ex:
raise ValueError(
"Shapes %s and %s differ in non-empty dimensions" % (shape, expandedShape)
)
if split in ind_nonempty: # split along non-empty dimension
split_sq = ind_nonempty.index(split) # split-axis in squeezed shape
return ex_ind_nonempty[split_sq]
# split along empty dimension: split doesnt matter, only one process contains data
# return the last empty dimension (in expanded shape) before (the first nonempty dimension after split)
# number of nonempty elems before split
ne_before_split = split - shape[:split].count(1)
ind_ne_after_split = ind_nonempty[
ne_before_split
] # index of (first nonempty element after split) in squeezed shape
return max(
i
for i, v in enumerate(expandedShape[: max(ex_ind_nonempty[:ind_ne_after_split])])
if v == 1
)

var = handle.createVariable(variable, data.dtype.char(), dimension_names, **kwargs)
var[slices] = data.larray.cpu() if is_split else data.larray[slices].cpu()
def __merge_slices(var, var_slices, data, data_slices=None):
"""
This method allows replacing:
``var[var_slices][data_slices] = data``
(a `netcdf4.Variable.__getitem__` and a `numpy.ndarray.__setitem__` call)
with:
``var[ __merge_slices(var, var_slices, data, data_slices) ] = data``
(a single `netcdf4.Variable.__setitem__` call)

This is necessary because performing the former would, in the `__getitem__`, load the
global dataset onto every process in local `numpy-ndarrays`. Then, the `__setitem__`
would write the local `chunk` into the `numpy-ndarray`.

The latter allows the netcdf4 library to parallelize the write-operation by directly
using the `netcdf4.Variable.__setitem__` method.

Parameters
----------
var : netcdf4.Variable
Variable to which data is to be saved.
var_slices :
Keys to pass to the set-operator.
data : dndarray
Data to be saved.
data_slices: tuple of slices
As returned by the data.comm.chunk method.

Returns
-------
tuple of (slice or integer sequence)
Keys for the set-operation.
"""
slices = data_slices
if slices is None:
_, _, slices = data.comm.chunk(data.gshape, data.split if is_split else 0)
start, count, stride, _ = nc.utils._StartCountStride(
elem=var_slices,
shape=var.shape,
dimensions=var.dimensions,
grp=var.group(),
datashape=data.shape,
put=True,
)
out_shape = nc._netCDF4._out_array_shape(count)
out_split = __get_expanded_split(data.shape, out_shape, data.split)

start, count, stride = start.T, count.T, stride.T # transpose for iteration
stop = start + stride * count
new_slices = []
for begin, end, step in zip(start, stop, stride):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this here vectorized, seems possible with a little division

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to vectorize this

if begin.size == 1:
begin, end, step = begin.item(), end.item(), step.item()
new_slices.append(slice(begin, end, step))
else:
begin, end, step = begin.flatten(), end.flatten(), step.flatten()
new_slices.append(
np.r_[
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numpy is okay to use here as data which is saved already must be copied to the CPU...but in general this will cause errors. before the issues on HDFML, were you able to test this with GPUs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not test any of this on GPUs. I'm using numpy here because I'm working on start, count, stride which are generated by nc.utils and are ndarrays. That's ok because var_slices is usually (a tuple of) slices/ints/lists and the same for every process. You're right though, this will fail when using a dndarray for indexing because a netcdf.Variable does not support to be indexed by a dndarray (results in IndexError: only integers, slices (:), ellipsis (...), and 1-d integer or boolean arrays are valid indices). However, it does support being indexed by a torch.tensor so it might be interesting to see what happens when using a gpu-tensor for indexing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that its relevant here, but why does indexing with a dndarray behave different than a torch.tensor?

In [1]: import heat as ht                                                                                                                                                                                   
In [2]: import numpy as np                                                                                                                                                                                  
In [3]: import torch                                                                                                                                                                                        
In [4]: arr = ht.arange(3).expand_dims(1) + ht.arange(3)                                                                                                                                                    
In [5]: arr                                                                                                                                                                                                 
Out[5]: 
tensor([[0, 1, 2],
        [1, 2, 3],
        [2, 3, 4]], dtype=torch.int32)

In [6]: ind = [0,1]                                                                                                                                                                                         
In [7]: arr[ind]                                                                                                                                                                                            
Out[7]: 
tensor([[0, 1, 2],
        [1, 2, 3]], dtype=torch.int32)

In [8]: arr[np.array(ind)]                                                                                                                                                                                  
Out[8]: 
tensor([[0, 1, 2],
        [1, 2, 3]], dtype=torch.int32)

In [9]: arr[torch.tensor(ind)]                                                                                                                                                                              
Out[9]: 
tensor([[0, 1, 2],
        [1, 2, 3]], dtype=torch.int32)

In [10]: arr[tuple(ind)]                                                                                                                                                                                    
Out[10]: tensor(1, dtype=torch.int32)

In [11]: arr[ht.array(ind)]                                                                                                                                                                                 
Out[11]: tensor(1, dtype=torch.int32)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the second is because the advanced indexing using heat DNDarrays is not yet implemented, but that is on the list of things to do.

@Markus-Goetz what is the verdict on NumPy here? im not sure. because its a saving routine it is bound to the CPU anyway. but we would need to specify that higher up in the function. if that is done then i have no problem with it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have an issue about the advanced indexing.

Numpy seems fine here, ensure that the data is actually on cpu if necessary

tuple(
slice(b.item(), e.item(), s.item())
for b, e, s in zip(begin, end, step)
)
]
)
if out_split is not None: # add split-slice
if isinstance(new_slices[out_split], slice):
start, stop, step = (
new_slices[out_split].start,
new_slices[out_split].stop,
new_slices[out_split].step,
)
sliced = range(start, stop, step)[slices[data.split]]
a, b, c = sliced.start, sliced.stop, sliced.step
a = None if a < 0 else a
b = None if b < 0 else b
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that step also needs to be corrected here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it might be the most elegant solution to just use sliced (which is of type range) as index instead of transforming it into a slice

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it might be the most elegant solution to just use sliced (which is of type range) as index instead of transforming it into a slice

No, even though slicing with ranges is generally possible, this fails the indexing netcdf file: broadcasting var-test.

I don't think step needs to be corrected because it is generated by nc.utils (so possible 'weird' values are already dealt with) and is used to calculate stop. Also the slicing doesn't change the step because ht.comm.chunk() always uses None as step.

new_slices[out_split] = slice(a, b, c)
# new_slices[out_split] = sliced
elif isinstance(new_slices[out_split], np.ndarray):
new_slices[out_split] = new_slices[out_split][slices[data.split]]
else:
new_slices[out_split] = np.r_[new_slices[out_split]][slices[data.split]]
return tuple(new_slices)

# attempt to perform parallel I/O if possible
if __nc_has_par:
try:
with nc.Dataset(path, mode, parallel=True, comm=data.comm.handle) as handle:
if variable in handle.variables:
var = handle.variables[variable]
else:
for name, elements in zip(dimension_names, data.shape):
if name not in handle.dimensions:
handle.createDimension(name, elements if not is_unlimited else None)
var = handle.createVariable(
variable, data.dtype.char(), dimension_names, **kwargs
)
merged_slices = __merge_slices(var, file_slices, data)
try:
var[merged_slices] = (
data.larray.cpu() if is_split else data.larray[slices].cpu()
)
except RuntimeError:
var.set_collective(True)
var[merged_slices] = (
data.larray.cpu() if is_split else data.larray[slices].cpu()
)
except Exception as e:
failed = data.comm.rank + 1
excep = e
# otherwise a single rank only write is performed in case of local data (i.e. no split)
elif data.comm.rank == 0:
with nc.Dataset(path, mode) as handle:
dimension_names = []
for dimension, elements in enumerate(data.shape):
name = __NETCDF_DIM_TEMPLATE.format(variable, dimension)
handle.createDimension(name, elements)
dimension_names.append(name)

var = handle.createVariable(
variable, data.dtype.char(), tuple(dimension_names), **kwargs
)
if is_split:
var[slices] = data.larray.cpu()
else:
var[:] = data.larray.cpu()

# ping next rank if it exists
if is_split and data.comm.size > 1:
data.comm.Isend([None, 0, MPI.INT], dest=1)
data.comm.Recv([None, 0, MPI.INT], source=data.comm.size - 1)

# no MPI, but data is split, we have to serialize the writes
elif is_split:
try:
with nc.Dataset(path, mode) as handle:
if variable in handle.variables:
var = handle.variables[variable]
else:
for name, elements in zip(dimension_names, data.shape):
if name not in handle.dimensions:
handle.createDimension(name, elements if not is_unlimited else None)
var = handle.createVariable(
variable, data.dtype.char(), dimension_names, **kwargs
)
var.set_collective(False) # not possible with non-parallel netcdf
if is_split:
merged_slices = __merge_slices(var, file_slices, data)
var[merged_slices] = data.larray.cpu()
else:
var[file_slices] = data.larray.cpu()
except Exception as e:
failed = 1
excep = e
finally:
if data.comm.size > 1:
data.comm.isend(failed, dest=1)
data.comm.recv()

# non-root
else:
# wait for the previous rank to finish writing its chunk, then write own part
data.comm.Recv([None, 0, MPI.INT], source=data.comm.rank - 1)
with nc.Dataset(path, "r+") as handle:
handle[variable][slices] = data.larray.cpu()

# ping the next node in the communicator, wrap around to 0 to complete barrier behavior
next_rank = (data.comm.rank + 1) % data.comm.size
data.comm.Isend([None, 0, MPI.INT], dest=next_rank)
failed = data.comm.recv()
try:
# no MPI, but data is split, we have to serialize the writes
if not failed and is_split:
with nc.Dataset(path, "r+") as handle:
var = handle.variables[variable]
var.set_collective(False) # not possible with non-parallel netcdf
merged_slices = __merge_slices(var, file_slices, data)
var[merged_slices] = data.larray.cpu()
except Exception as e:
failed = data.comm.rank + 1
excep = e
finally:
# ping the next node in the communicator, wrap around to 0 to complete barrier behavior
next_rank = (data.comm.rank + 1) % data.comm.size
data.comm.isend(failed, dest=next_rank)

failed = data.comm.allreduce(failed, op=MPI.MAX)
if failed - 1 == data.comm.rank:
data.comm.bcast(excep, root=failed - 1)
raise excep
elif failed:
excep = data.comm.bcast(excep, root=failed - 1)
excep.args = "raised by process rank {}".format(failed - 1), *excep.args
raise excep from None # raise the same error but without traceback
# because that is on a different process


def load(path, *args, **kwargs):
Expand Down
Loading