Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Features/517-netcdf4-features #559

Merged
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
51bfd8b
removed copy constructor and use collective netcdf-output
Nov 28, 2019
95943f2
Merge remote-tracking branch 'upstream/master'
Apr 24, 2020
4f87064
Adding IO features and tests
Apr 24, 2020
139170a
Merge branch 'master' into features/517-netCDF4-features
ben-bou Apr 26, 2020
edff24f
run pre-commit
Apr 29, 2020
0e3281b
Merge branch 'master' into features/517-netCDF4-features
ben-bou May 4, 2020
2d99471
Fixed: collective IO only when necessary, also tests
May 7, 2020
124dd96
pre-commit ran
May 7, 2020
a531173
added support for index-lists and additional fixed dimensions
May 9, 2020
48cc6a0
run python black formatting
May 9, 2020
b3e0e30
bugfix
May 9, 2020
f3625c5
Use nc4-methods to get (start, count, stride) as well as out_shape (l…
May 13, 2020
3bd006f
pre-commit
May 13, 2020
e2f837a
Merge branch 'master' into features/517-netCDF4-features
ben-bou May 14, 2020
eb4a56e
Merge branch 'master' into features/517-netCDF4-features
coquelin77 May 15, 2020
1a0aaea
cleanup
May 15, 2020
7c5ca61
cleanup test
May 15, 2020
3bd6e61
minor bugs and coverage
May 18, 2020
ee0c40e
corrections as requested
May 22, 2020
5dd86a3
Merge branch 'master' into features/517-netCDF4-features
Markus-Goetz May 26, 2020
dbf788a
as requested
Jun 8, 2020
b468ab5
Merge branch 'master' into features/517-netCDF4-features
ben-bou Jun 8, 2020
89986a3
Merge branch 'master' into features/517-netCDF4-features
ben-bou Jun 9, 2020
3d525af
Merge branch 'master' into features/517-netCDF4-features
coquelin77 Jun 15, 2020
ee31e14
Merge branch 'master' into features/517-netCDF4-features
coquelin77 Jun 23, 2020
4e9cc8c
Merge branch 'master' into features/517-netCDF4-features
ben-bou Jun 24, 2020
97f12c9
Merge branch 'master' into features/517-netCDF4-features
coquelin77 Jul 1, 2020
463888f
Merge branch 'master' into features/517-netCDF4-features
Markus-Goetz Jul 14, 2020
26ffbb0
Merge branch 'master' into features/517-netCDF4-features
ClaudiaComito Jul 15, 2020
0e4e885
Merge branch 'master' into features/517-netCDF4-features
ben-bou Oct 20, 2020
c7f15ea
Update tests and formatting
Oct 20, 2020
7680137
Update documentation and replace _dndarray__array with larray
Oct 26, 2020
f09080a
Merge branch 'master' into features/517-netCDF4-features
ben-bou Oct 27, 2020
70507d2
update changelog
Oct 27, 2020
bcb8096
append tests
Oct 27, 2020
b84a7bf
bugfix test
Oct 27, 2020
7b470cd
bugix test
Oct 27, 2020
021288c
bugfix expanded_split
Oct 27, 2020
befe6f2
bugfix expanded_split
Oct 27, 2020
eb38941
remove superflous check
Oct 27, 2020
dd64d62
broadcast exceptions to all processes; prevents stalling
Oct 29, 2020
1d11ad5
Merge branch 'master' into features/517-netCDF4-features
ClaudiaComito Nov 13, 2020
f488b63
Merge branch 'master' into features/517-netCDF4-features
ClaudiaComito Nov 17, 2020
9eedf74
Merge branch 'master' into features/517-netCDF4-features
ClaudiaComito Nov 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
230 changes: 208 additions & 22 deletions heat/core/io.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os.path

import torch
import numpy as np
import warnings

from heat.core import factories
Expand Down Expand Up @@ -306,7 +307,16 @@ def load_netcdf(path, variable, dtype=types.float32, split=None, device=None, co

return dndarray.DNDarray(data, gshape, dtype, split, device, comm)

def save_netcdf(data, path, variable, mode="w", **kwargs):
def save_netcdf(
data,
path,
variable,
mode="w",
dimension_names=None,
is_unlimited=False,
file_slices=slice(None),
**kwargs
):
"""
Saves data to a netCDF4 file. Attempts to utilize parallel I/O if possible.

Expand All @@ -320,6 +330,17 @@ def save_netcdf(data, path, variable, mode="w", **kwargs):
Name of the variable the data is saved to.
mode : str, one of 'w', 'a', 'r+'
File access mode
dimension_names : list or tuple or string
Specifies the netCDF Dimensions used by the variable. Ignored if
Variable already exists.
is_unlimited : bool
If True, every dimension created for this variable (i.e. doesn't
already exist) is unlimited. Already existing limited dimensions
cannot be changed to unlimited and vice versa.
file_slices : tuple of integer, slice, ellipsis or 1-d bool or
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fits the docs better when it is just the types. the explanation then goes below it

integer sequences
Keys used to slice the netCDF Variable, as given in
the nc.utils._StartCountStride method.
kwargs : dict
additional arguments passed to the created dataset.

Expand All @@ -329,7 +350,7 @@ def save_netcdf(data, path, variable, mode="w", **kwargs):
If any of the input parameters are not of correct type.
ValueError
If the access mode is not understood.

If the number of dimension names does not match the number of dimensions.
Examples
--------
>>> a_range = ht.arange(100, split=0)
Expand All @@ -341,6 +362,24 @@ def save_netcdf(data, path, variable, mode="w", **kwargs):
raise TypeError("path must be str, not {}".format(type(path)))
if not isinstance(variable, str):
raise TypeError("variable must be str, not {}".format(type(path)))
if dimension_names is None:
dimension_names = [
__NETCDF_DIM_TEMPLATE.format(variable, dim) for dim, _ in enumerate(data.shape)
]
elif isinstance(dimension_names, str):
dimension_names = [dimension_names]
elif isinstance(dimension_names, tuple):
dimension_names = list(dimension_names)
elif not isinstance(dimension_names, list):
raise TypeError(
"dimension_names must be list or tuple or string, not{}".format(
type(dimension_names)
)
)
elif not len(dimension_names) == len(data.shape):
raise ValueError(
"{0} names given for {1} dimensions".format(len(dimension_names), len(data.shape))
)

# we only support a subset of possible modes
if mode not in __VALID_WRITE_MODES:
Expand All @@ -352,34 +391,178 @@ def save_netcdf(data, path, variable, mode="w", **kwargs):
is_split = data.split is not None
_, _, slices = data.comm.chunk(data.gshape, data.split if is_split else 0)

def __get_expanded_split(shape, expandedShape, split):
"""
Returns the hypothetical split-axis of a dndarray of shape=shape and
split=split if it was expanded to expandedShape by adding empty dimensions.

Parameters
----------
shape : tuple(int)
Shape of a dndarray.
expandedShape : tuple(int)
Shape of hypothetical expanded dndarray.
split : int or None
split-axis of dndarray.

Returns
-------
int
split-Axis of expanded dndarray.

Raises
-------
ValueError
"""
# Get indices of non-empty dimensions and squeezed shapes
enumerated = [[i, v] for i, v in enumerate(shape) if v != 1]
ind_nonempty, sq_shape = list(zip(*enumerated)) # transpose
enumerated = [[i, v] for i, v in enumerate(expandedShape) if v != 1]
ex_ind_nonempty, sq_ex = list(zip(*enumerated)) # transpose
if not sq_shape == sq_ex:
raise ValueError(
"Shapes %s and %s differ in non-empty dimensions" % (shape, expandedShape)
)
if len(shape) == len(expandedShape): # actually not expanded at all
return split
if split is None: # not split at all
return None
if split in ind_nonempty: # split along non-empty dimension
split_sq = ind_nonempty.index(split) # split-axis in squeezed shape
return ex_ind_nonempty[split_sq]
# split along empty dimension: split doesnt matter, only one process contains data
# return the last empty dimension (in expanded shape) before (the first nonempty dimension after split)
# number of nonempty elems before split
ne_before_split = split - shape[:split].count(1)
ind_ne_after_split = ind_nonempty[
ne_before_split
] # index of (first nonempty element after split) in squeezed shape
return max(
i
for i, v in enumerate(expandedShape[: max(ex_ind_nonempty[:ind_ne_after_split])])
if v == 1
)

def __merge_slices(var, var_slices, data, data_slices=None):
"""
Using var[var_slices][data_slices] = data
combines a __getitem__ with a __setitem__ call, therefore it does not allow
parallelization of the write-operation and does not work is var_slices = slice(None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear why parallelization would not work here. I am hesitant to merge this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning is that because var_slices specifies the location of the (global) dndarray (and therefore is the same for every process), doing var[var_slices] loads the global data onto every single process as a ndarray, only to overwrite the local chunk using [data_slices] = data. I believe this does not allow parallelization because in the only method we call on the netcdf.Variable, the __getitem__, every process requests access to same location (var_slices).

Also, when I tried to use var[var_slices][data_slices] = data, the tests failed (in some cases, it did not write at all), so I came up with the idea to merge the var_slices and data_slices into a single set (tuple) of slices (or other indices).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was an update to the slicing recently. can you see if the new slicing solved your issue?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, but I don't believe this is relevent here.

Maybe I didn't make it clear what this method actually does:
Each process knows its location inside the global dndarray via comm.chunk(). This method calculates (for each process) its location inside the (global) netCDF-File, such that every process can write to its corresponding location. Therefore this method does not apply any slicing in HeAT.

Anyway, where can I find the changes made to slicing? I couldnt find them in the changelog

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ben,

comm.chunk also returns the offset of the local 0-index with respect to the global one, would using var.larray[var_slices-offset] allow you to parallelize the write operation? or maybe I misunderstand

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Claudia,
I don't understand why you would index the local array with the global offset? Also, please keep in mind that var is the netcdf4.Variable-object. I am now reworking the documentation for this method to clarify what it does

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully, this is more clear:

        This method allows replacing:
        ``var[var_slices][data_slices] = data``
        (a `netcdf4.Variable.__getitem__` and a `numpy.ndarray.__setitem__` call)
        with:
        ``var[ __merge_slices(var, var_slices, data, data_slices) ] = data``
        (a single `netcdf4.Variable.__setitem__` call)

        This is necessary because performing the former would, in the `__getitem__`, load the
        global dataset onto every process in local `numpy-ndarrays`. Then, the `__setitem__`
        would write the local `chunk` into the `numpy-ndarray`.

        The latter allows the netcdf4 library to parallelize the write-operation by directly
        using the `netcdf4.Variable.__setitem__` method.

(in that casem __getitem__ returns a copy and not a view).
This method merges both keys:
var[mergeSlices(var, var_slices, data)] = data

Parameters
----------
var : netcdf4.Variable
Variable to which data is to be saved.
var_slices :
Keys to pass to the set-operator.
data : dndarray
Data to be saved.
data_slices: tuple of slices
As returned by the data.comm.chunk method.

Returns
-------
tuple of (slice or integer sequence)
Keys for the set-operation.
"""
slices = data_slices
if slices is None:
_, _, slices = data.comm.chunk(data.gshape, data.split if is_split else 0)
start, count, stride, _ = nc.utils._StartCountStride(
elem=var_slices,
shape=var.shape,
dimensions=var.dimensions,
grp=var.group(),
datashape=data.shape,
put=True,
)
out_shape = nc._netCDF4._out_array_shape(count)
out_split = __get_expanded_split(data.shape, out_shape, data.split)

start, count, stride = start.T, count.T, stride.T # transpose for iteration
stop = start + stride * count
new_slices = []
for begin, end, step in zip(start, stop, stride):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this here vectorized, seems possible with a little division

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to vectorize this

if begin.size == 1:
begin, end, step = begin.item(), end.item(), step.item()
new_slices.append(slice(begin, end, step))
else:
begin, end, step = begin.flatten(), end.flatten(), step.flatten()
new_slices.append(
np.r_[
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numpy is okay to use here as data which is saved already must be copied to the CPU...but in general this will cause errors. before the issues on HDFML, were you able to test this with GPUs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not test any of this on GPUs. I'm using numpy here because I'm working on start, count, stride which are generated by nc.utils and are ndarrays. That's ok because var_slices is usually (a tuple of) slices/ints/lists and the same for every process. You're right though, this will fail when using a dndarray for indexing because a netcdf.Variable does not support to be indexed by a dndarray (results in IndexError: only integers, slices (:), ellipsis (...), and 1-d integer or boolean arrays are valid indices). However, it does support being indexed by a torch.tensor so it might be interesting to see what happens when using a gpu-tensor for indexing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that its relevant here, but why does indexing with a dndarray behave different than a torch.tensor?

In [1]: import heat as ht                                                                                                                                                                                   
In [2]: import numpy as np                                                                                                                                                                                  
In [3]: import torch                                                                                                                                                                                        
In [4]: arr = ht.arange(3).expand_dims(1) + ht.arange(3)                                                                                                                                                    
In [5]: arr                                                                                                                                                                                                 
Out[5]: 
tensor([[0, 1, 2],
        [1, 2, 3],
        [2, 3, 4]], dtype=torch.int32)

In [6]: ind = [0,1]                                                                                                                                                                                         
In [7]: arr[ind]                                                                                                                                                                                            
Out[7]: 
tensor([[0, 1, 2],
        [1, 2, 3]], dtype=torch.int32)

In [8]: arr[np.array(ind)]                                                                                                                                                                                  
Out[8]: 
tensor([[0, 1, 2],
        [1, 2, 3]], dtype=torch.int32)

In [9]: arr[torch.tensor(ind)]                                                                                                                                                                              
Out[9]: 
tensor([[0, 1, 2],
        [1, 2, 3]], dtype=torch.int32)

In [10]: arr[tuple(ind)]                                                                                                                                                                                    
Out[10]: tensor(1, dtype=torch.int32)

In [11]: arr[ht.array(ind)]                                                                                                                                                                                 
Out[11]: tensor(1, dtype=torch.int32)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the second is because the advanced indexing using heat DNDarrays is not yet implemented, but that is on the list of things to do.

@Markus-Goetz what is the verdict on NumPy here? im not sure. because its a saving routine it is bound to the CPU anyway. but we would need to specify that higher up in the function. if that is done then i have no problem with it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have an issue about the advanced indexing.

Numpy seems fine here, ensure that the data is actually on cpu if necessary

tuple(
slice(b.item(), e.item(), s.item())
for b, e, s in zip(begin, end, step)
)
]
)
if out_split is not None: # add split-slice
if isinstance(new_slices[out_split], slice):
start, stop, step = (
new_slices[out_split].start,
new_slices[out_split].stop,
new_slices[out_split].step,
)
sliced = range(start, stop, step)[slices[data.split]]
a, b, c = sliced.start, sliced.stop, sliced.step
a = None if a < 0 else a
b = None if b < 0 else b
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that step also needs to be corrected here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it might be the most elegant solution to just use sliced (which is of type range) as index instead of transforming it into a slice

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it might be the most elegant solution to just use sliced (which is of type range) as index instead of transforming it into a slice

No, even though slicing with ranges is generally possible, this fails the indexing netcdf file: broadcasting var-test.

I don't think step needs to be corrected because it is generated by nc.utils (so possible 'weird' values are already dealt with) and is used to calculate stop. Also the slicing doesn't change the step because ht.comm.chunk() always uses None as step.

new_slices[out_split] = slice(a, b, c)
# new_slices[out_split] = sliced
elif isinstance(new_slices[out_split], np.ndarray):
new_slices[out_split] = new_slices[out_split][slices[data.split]]
else:
new_slices[out_split] = np.r_[new_slices[out_split]][slices[data.split]]
return tuple(new_slices)

# attempt to perform parallel I/O if possible
if __nc_has_par:
with nc.Dataset(path, mode, parallel=True, comm=data.comm.handle) as handle:
dimension_names = []
for dimension, elements in enumerate(data.shape):
name = __NETCDF_DIM_TEMPLATE.format(variable, dimension)
handle.createDimension(name, elements)
dimension_names.append(name)

var = handle.createVariable(variable, data.dtype.char(), dimension_names, **kwargs)
var[slices] = data.larray.cpu() if is_split else data.larray[slices].cpu()
if variable in handle.variables:
var = handle.variables[variable]
else:
for name, elements in zip(dimension_names, data.shape):
if name not in handle.dimensions:
handle.createDimension(name, elements if not is_unlimited else None)
var = handle.createVariable(
variable, data.dtype.char(), dimension_names, **kwargs
)
merged_slices = __merge_slices(var, file_slices, data)
try:
var[merged_slices] = (
data._DNDarray__array.cpu()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ben, please replace _DNDarray__array__ with the new larray for consistency with the rest - also in the follow-up code

if is_split
else data._DNDarray__array[slices].cpu()
)
except RuntimeError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(im unfamiliar with netCDF) why is this raised?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes, collective IO is needed (e.g. for writing into an unlimited dimension). A RuntimeError is thrown when attempting this without collective IO.
Someone with more experience in netcdf might know a better convention for when to use collective IO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All metadata operations, e.g. creation, deletion, ..., are collective. This means all processors need to call this operation otherwise the application deadlocks. Should only be necessary in parallel mode to begin with

var.set_collective(True)
var[merged_slices] = (
data._DNDarray__array.cpu()
if is_split
else data._DNDarray__array[slices].cpu()
)

# otherwise a single rank only write is performed in case of local data (i.e. no split)
elif data.comm.rank == 0:
with nc.Dataset(path, mode) as handle:
dimension_names = []
for dimension, elements in enumerate(data.shape):
name = __NETCDF_DIM_TEMPLATE.format(variable, dimension)
handle.createDimension(name, elements)
dimension_names.append(name)

var = handle.createVariable(
variable, data.dtype.char(), tuple(dimension_names), **kwargs
)
if variable in handle.variables:
var = handle.variables[variable]
else:
for name, elements in zip(dimension_names, data.shape):
if name not in handle.dimensions:
handle.createDimension(name, elements if not is_unlimited else None)
var = handle.createVariable(
variable, data.dtype.char(), dimension_names, **kwargs
)
var.set_collective(False) # not possible with non-parallel netcdf
if is_split:
var[slices] = data.larray.cpu()
merged_slices = __merge_slices(var, file_slices, data)
var[merged_slices] = data._DNDarray__array.cpu()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_DNDarray__array --> larray

else:
var[:] = data.larray.cpu()
var[file_slices] = data._DNDarray__array.cpu()

# ping next rank if it exists
if is_split and data.comm.size > 1:
Expand All @@ -391,7 +574,10 @@ def save_netcdf(data, path, variable, mode="w", **kwargs):
# wait for the previous rank to finish writing its chunk, then write own part
data.comm.Recv([None, 0, MPI.INT], source=data.comm.rank - 1)
with nc.Dataset(path, "r+") as handle:
handle[variable][slices] = data.larray.cpu()
var = handle.variables[variable]
var.set_collective(False) # not possible with non-parallel netcdf
merged_slices = __merge_slices(var, file_slices, data)
var[merged_slices] = data._DNDarray__array.cpu()

# ping the next node in the communicator, wrap around to 0 to complete barrier behavior
next_rank = (data.comm.rank + 1) % data.comm.size
Expand Down