Skip to content

Commit

Permalink
Support HDF5 compression of AMUSE files (#840)
Browse files Browse the repository at this point in the history
* Support HDF5 compression of AMUSE files

use with 'compression=True' or 'compression="hdf5", compression_opts=compression_level'
(compression level is a number up to 9)

* add test for compressed AMUSE file (v 2.0 only)

* Don't create a new_dtype we then don't use
  • Loading branch information
rieder committed Mar 28, 2022
1 parent 8da2908 commit e8a2046
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 40 deletions.
38 changes: 27 additions & 11 deletions src/amuse/io/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class HDF5FileFormatProcessor(base.FileFormatProcessor):

provided_formats = ['hdf5', 'amuse']

def __init__(self, filename = None, set = None, format = None):
def __init__(self, filename=None, set=None, format=None):
base.FileFormatProcessor.__init__(self, filename, set, format)

def load(self):
Expand All @@ -49,12 +49,15 @@ def load(self):

def load_base(self):
processor = store_v2.StoreHDF(
self.filename,
open_for_writing = False,
append_to_file = self.append_to_file or self.allow_writing,
copy_history = self.copy_history,
overwrite_file = self.overwrite_file )
if not processor.is_correct_version():
self.filename,
open_for_writing=False,
append_to_file=self.append_to_file or self.allow_writing,
copy_history=self.copy_history,
overwrite_file=self.overwrite_file,
compression=False,
compression_opts=None,
)
if not processor.is_correct_version():
processor.close()
processor = store_v1.StoreHDF(
self.filename,
Expand Down Expand Up @@ -94,10 +97,12 @@ def store(self):
raise Exception("You are trying to append to a file that was not written in version 1.0 format")
else:
processor = store_v2.StoreHDF(
self.filename,
append_to_file = self.append_to_file,
open_for_writing = True,
overwrite_file = self.overwrite_file
self.filename,
append_to_file=self.append_to_file,
open_for_writing=True,
overwrite_file=self.overwrite_file,
compression=self.compression,
compression_opts=self.compression_opts
)

if not processor.is_correct_version():
Expand Down Expand Up @@ -170,3 +175,14 @@ def overwrite_file(self):
"""If set to True, overwrite file if it exists, otherwise writing an existing
file generates an exception"""
return False

@base.format_option
def compression(self):
"""If set to False, don't use compression, otherwise use specified compression
mode (defaults to False, True means 'gzip')"""
return False

@base.format_option
def compression_opts(self):
"Compression level to use if using compression (0-9, defaults to None)"
return None
196 changes: 167 additions & 29 deletions src/amuse/io/store_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def unpickle_from_string(value):
return pickle.loads(value, encoding='bytes')

class HDF5Attribute(object):
compression = False
compression_opts = None

def __init__(self, name):
self.name = name
Expand Down Expand Up @@ -66,26 +68,56 @@ def new_attribute(cls, name, shape, input, group):
if not hasattr(shape, '__iter__'):
shape = shape,
dtype = numpy.asanyarray(input.number).dtype
dataset = group.create_dataset(name, shape=shape, dtype=dtype)
dataset = group.create_dataset(
name,
shape=shape,
dtype=dtype,
compression=cls.compression,
compression_opts=cls.compression_opts,
)
dataset.attrs["units"] = input.unit.to_simple_form().reference_string().encode('ascii')
return HDF5VectorQuantityAttribute(name, dataset, input.unit)
elif hasattr(input, 'as_set'):
raise Exception("adding a linked attribute to a set stored in a HDF file is not supported, alternative is to copy the set and save it")
subgroup = group.create_group(name)
group.create_dataset('keys', shape=shape, dtype=input.key.dtype)
group.create_dataset('masked', shape=shape, dtype=numpy.bool)
group.create_dataset(
'keys',
shape=shape,
dtype=input.key.dtype,
compression=cls.compression,
compression_opts=cls.compression_opts,
)
group.create_dataset(
'masked',
shape=shape,
dtype=numpy.bool,
compression=cls.compression,
compression_opts=cls.compression_opts,
)
return HDF5LinkedAttribute(name, subgroup)
else:
dtype = numpy.asanyarray(input).dtype
if dtype.kind == 'U':
new_dtype = numpy.dtype('S' + dtype.itemsize * 4)
dataset = group.create_dataset(name, shape=shape, dtype=dtype)
# new_dtype = numpy.dtype('S' + dtype.itemsize * 4)
dataset = group.create_dataset(
name,
shape=shape,
dtype=dtype,
compression=cls.compression,
compression_opts=cls.compression_opts,
)
dataset.attrs["units"] = "UNICODE".encode('ascii')
return HDF5UnicodeAttribute(name, dataset)
else:
if not hasattr(shape, '__iter__'):
if not hasattr(shape, '__iter__'):
shape = shape,
dataset = group.create_dataset(name, shape=shape, dtype=dtype)
dataset = group.create_dataset(
name,
shape=shape,
dtype=dtype,
compression=cls.compression,
compression_opts=cls.compression_opts,
)
dataset.attrs["units"] = "none".encode('ascii')
return HDF5UnitlessAttribute(name, dataset)

Expand Down Expand Up @@ -131,6 +163,8 @@ def remove_indices(self, indices):
pass

class HDF5VectorQuantityAttribute(HDF5Attribute):
compression = False
compression_opts = None

def __init__(self, name, dataset, unit):
HDF5Attribute.__init__(self, name)
Expand Down Expand Up @@ -173,7 +207,13 @@ def increase_to_length(self, newlength):

parent = self.dataset.parent
del parent[self.name]
self.dataset = parent.create_dataset(self.name, shape=newshape, dtype=values.dtype)
self.dataset = parent.create_dataset(
self.name,
shape=newshape,
dtype=values.dtype,
compression=self.compression,
compression_opts=self.compression_opts,
)
self.dataset[:oldlength] = values[:]

def get_length(self):
Expand All @@ -197,7 +237,13 @@ def remove_indices(self, indices):

parent = self.dataset.parent
del parent[self.name]
self.dataset = parent.create_dataset(self.name, shape=values.shape, dtype=values.dtype)
self.dataset = parent.create_dataset(
self.name,
shape=values.shape,
dtype=values.dtype,
compression=self.compression,
compression_opts=self.compression_opts,
)
self.dataset[:] = values[:]


Expand All @@ -206,6 +252,8 @@ def has_units(self):
return True

class HDF5UnitlessAttribute(HDF5Attribute):
compression = False
compression_opts = None

def __init__(self, name, dataset):
HDF5Attribute.__init__(self, name)
Expand Down Expand Up @@ -240,7 +288,13 @@ def increase_to_length(self, newlength):

parent = self.dataset.parent
del parent[self.name]
self.dataset = parent.create_dataset(self.name, shape=newshape, dtype=values.dtype)
self.dataset = parent.create_dataset(
self.name,
shape=newshape,
dtype=values.dtype,
compression=self.compression,
compression_opts=self.compression_opts,
)
self.dataset[:oldlength] = values[:]

def get_length(self):
Expand All @@ -264,7 +318,13 @@ def remove_indices(self, indices):

parent = self.dataset.parent
del parent[self.name]
self.dataset = parent.create_dataset(self.name, shape=values.shape, dtype=values.dtype)
self.dataset = parent.create_dataset(
self.name,
shape=values.shape,
dtype=values.dtype,
compression=self.compression,
compression_opts=self.compression_opts,
)
self.dataset[:] = values[:]

def has_units(self):
Expand Down Expand Up @@ -384,14 +444,26 @@ def increase_to_length(self, newlength):

parent = self.group
del parent['keys']
self.keys = parent.create_dataset('keys', shape=newshape, dtype=values.dtype)
self.keys = parent.create_dataset(
'keys',
shape=newshape,
dtype=values.dtype,
compression=self.compression,
compression_opts=self.compression_opts,
)
self.keys[:oldlength] = values[:]

values = numpy.empty(shape=self.masked.shape, dtype=self.masked.dtype)
values[:] = self.masked[:]
parent = self.group
del parent['masked']
self.masked = parent.create_dataset('masked', shape=newshape, dtype=values.dtype)
self.masked = parent.create_dataset(
'masked',
shape=newshape,
dtype=values.dtype,
compression=self.compression,
compression_opts=self.compression_opts,
)
self.masked[:oldlength] = values[:]


Expand Down Expand Up @@ -424,7 +496,13 @@ def remove_indices(self, indices):

parent = self.dataset.parent
del parent[self.name]
self.dataset = parent.create_dataset(self.name, shape=values.shape, dtype=values.dtype)
self.dataset = parent.create_dataset(
self.name,
shape=values.shape,
dtype=values.dtype,
compression=self.compression,
compression_opts=self.compression_opts,
)
self.dataset[:] = values[:]

def has_units(self):
Expand Down Expand Up @@ -628,13 +706,28 @@ def is_resolved(self):
class StoreHDF(object):
INFO_GROUP_NAME = 'AMUSE_INF'
DATA_GROUP_NAME = 'data'

def __init__(self, filename, append_to_file=True, open_for_writing = True, copy_history = False, overwrite_file=False):

def __init__(
self,
filename,
append_to_file=True,
open_for_writing=True,
copy_history=False,
overwrite_file=False,
compression=False,
compression_opts=None,
):
self.compression = compression
self.compression_opts = compression_opts
if h5py is None:
raise exceptions.AmuseException("h5py module not available, cannot use hdf5 files")

logger.info("opening {0} with options {1} {2} {3} {4}".format(filename, append_to_file, open_for_writing, copy_history,overwrite_file))

raise exceptions.AmuseException(
"h5py module not available, cannot use hdf5 files")

logger.info(
f"opening {filename} with options {append_to_file} "
f"{open_for_writing} {copy_history} {overwrite_file}"
)

if not append_to_file and open_for_writing:
if os.path.exists(filename):
if overwrite_file:
Expand Down Expand Up @@ -745,7 +838,12 @@ def store_particles(self, particles, extra_attributes = {}, parent=None, mapping
group.attrs["class_of_the_particles"] = pickle_to_string(particles._factory_for_new_collection())

keys = particles.get_all_keys_in_store()
dataset = group.create_dataset("keys", data=keys)
dataset = group.create_dataset(
"keys",
data=keys,
compression=self.compression,
compression_opts=self.compression_opts,
)
self.hdf5file.flush()
self.store_collection_attributes(particles, group, extra_attributes, links)
self.store_values(particles, group, links)
Expand All @@ -763,7 +861,12 @@ def store_grid(self, grid, extra_attributes = {}, parent=None, mapping_from_seti

group.attrs["type"] = 'grid'.encode("ascii")
group.attrs["class_of_the_container"] = pickle_to_string(grid._factory_for_new_collection())
group.create_dataset("shape", data=numpy.asarray(grid.shape))
group.create_dataset(
"shape",
data=numpy.asarray(grid.shape),
compression=self.compression,
compression_opts=self.compression_opts,
)

self.store_collection_attributes(grid, group, extra_attributes, links)
self.store_values(grid, group, links)
Expand Down Expand Up @@ -793,17 +896,32 @@ def store_values(self, container, group, links = []):
for attribute, quantity in zip(container.get_attribute_names_defined_in_store(), all_values):
if is_quantity(quantity):
value = quantity.value_in(quantity.unit)
dataset = attributes_group.create_dataset(attribute, data=value)
dataset = attributes_group.create_dataset(
attribute,
data=value,
compression=self.compression,
compression_opts=self.compression_opts,
)
dataset.attrs["units"] = quantity.unit.to_simple_form().reference_string().encode("ascii")
elif isinstance(quantity, LinkedArray):
self.store_linked_array(attribute, attributes_group, quantity, group, links)
else:
dtype = numpy.asanyarray(quantity).dtype
if dtype.kind == 'U':
dataset = attributes_group.create_dataset(attribute, data=numpy.char.encode(quantity, 'UTF-32BE'))
dataset = attributes_group.create_dataset(
attribute,
data=numpy.char.encode(quantity, 'UTF-32BE'),
compression=self.compression,
compression_opts=self.compression_opts,
)
dataset.attrs["units"] = "UNICODE".encode('ascii')
else:
dataset = attributes_group.create_dataset(attribute, data=quantity)
dataset = attributes_group.create_dataset(
attribute,
data=quantity,
compression=self.compression,
compression_opts=self.compression_opts,
)
dataset.attrs["units"] = "none".encode('ascii')


Expand All @@ -814,7 +932,12 @@ def store_linked_array(self, attribute, attributes_group, quantity, group, links
kind_array = numpy.zeros(shape, dtype = numpy.int16)
ref_dtype = h5py.special_dtype(ref=h5py.Reference)
ref_array = numpy.empty(shape, dtype = ref_dtype)
ref_dataset = subgroup.create_dataset('ref', data=ref_array)
ref_dataset = subgroup.create_dataset(
'ref',
data=ref_array,
compression=self.compression,
compression_opts=self.compression_opts,
)
key_array = numpy.zeros(shape, dtype = numpy.uint64)

max_len_grid_indices = 0
Expand Down Expand Up @@ -870,10 +993,25 @@ def store_linked_array(self, attribute, attributes_group, quantity, group, links
else:
raise Exception("unsupported type {0}".format(type(object)))
if max_len_grid_indices > 0:
subgroup.create_dataset('indices', data=indices_array)
subgroup.create_dataset(
'indices',
data=indices_array,
compression=self.compression,
compression_opts=self.compression_opts,
)

subgroup.create_dataset('keys', data=key_array)
subgroup.create_dataset('kind', data=kind_array)
subgroup.create_dataset(
'keys',
data=key_array,
compression=self.compression,
compression_opts=self.compression_opts,
)
subgroup.create_dataset(
'kind',
data=kind_array,
compression=self.compression,
compression_opts=self.compression_opts,
)
subgroup.attrs["units"] = "link".encode("ascii")


Expand Down

0 comments on commit e8a2046

Please sign in to comment.