Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support netcdf variable emulation #5212

Merged
merged 4 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
62 changes: 35 additions & 27 deletions lib/iris/fileformats/netcdf/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,34 +190,42 @@ def _get_cf_var_data(cf_var, filename):
unnecessarily slow + wasteful of memory.

"""
total_bytes = cf_var.size * cf_var.dtype.itemsize
if total_bytes < _LAZYVAR_MIN_BYTES:
# Don't make a lazy array, as it will cost more memory AND more time to access.
# Instead fetch the data immediately, as a real array, and return that.
result = cf_var[:]

if hasattr(cf_var, "_data_array"):
# The variable is not an actual netCDF4 file variable, but an emulating
# object with an attached data array (either numpy or dask), which can be
# returned immediately as-is. This is used as a hook to translate data to/from
# netcdf data container objects in other packages, such as xarray.
# See https://github.com/SciTools/iris/issues/4994 "Xarray bridge".
result = cf_var._data_array
else:
# Get lazy chunked data out of a cf variable.
dtype = _get_actual_dtype(cf_var)

# Make a data-proxy that mimics array access and can fetch from the file.
fill_value = getattr(
cf_var.cf_data,
"_FillValue",
_thread_safe_nc.default_fillvals[cf_var.dtype.str[1:]],
)
proxy = NetCDFDataProxy(
cf_var.shape, dtype, filename, cf_var.cf_name, fill_value
)
# Get the chunking specified for the variable : this is either a shape, or
# maybe the string "contiguous".
chunks = cf_var.cf_data.chunking()
# In the "contiguous" case, pass chunks=None to 'as_lazy_data'.
if chunks == "contiguous":
chunks = None

# Return a dask array providing deferred access.
result = as_lazy_data(proxy, chunks=chunks)
total_bytes = cf_var.size * cf_var.dtype.itemsize
if total_bytes < _LAZYVAR_MIN_BYTES:
# Don't make a lazy array, as it will cost more memory AND more time to access.
# Instead fetch the data immediately, as a real array, and return that.
result = cf_var[:]

else:
# Get lazy chunked data out of a cf variable.
dtype = _get_actual_dtype(cf_var)

# Make a data-proxy that mimics array access and can fetch from the file.
fill_value = getattr(
cf_var.cf_data,
"_FillValue",
_thread_safe_nc.default_fillvals[cf_var.dtype.str[1:]],
)
proxy = NetCDFDataProxy(
cf_var.shape, dtype, filename, cf_var.cf_name, fill_value
)
# Get the chunking specified for the variable : this is either a shape, or
# maybe the string "contiguous".
chunks = cf_var.cf_data.chunking()
# In the "contiguous" case, pass chunks=None to 'as_lazy_data'.
if chunks == "contiguous":
chunks = None

# Return a dask array providing deferred access.
result = as_lazy_data(proxy, chunks=chunks)

return result

Expand Down
139 changes: 78 additions & 61 deletions lib/iris/fileformats/netcdf/saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2345,71 +2345,88 @@ def _lazy_stream_data(self, data, fill_value, fill_warn, cf_var):
# contains just 1 row, so the cf_var is 1D.
data = data.squeeze(axis=0)

# Decide whether we are checking for fill-value collisions.
dtype = cf_var.dtype
# fill_warn allows us to skip warning if packing attributes have been
# specified. It would require much more complex operations to work out
# what the values and fill_value _would_ be in such a case.
if fill_warn:
if fill_value is not None:
fill_value_to_check = fill_value
else:
# Retain 'fill_value == None', to show that no specific value was given.
# But set 'fill_value_to_check' to a calculated value
fill_value_to_check = _thread_safe_nc.default_fillvals[
dtype.str[1:]
]
# Cast the check-value to the correct dtype.
# NOTE: In the case of 'S1' dtype (at least), the default (Python) value
# does not have a compatible type. This causes a deprecation warning at
# numpy 1.24, *and* was preventing correct fill-value checking of character
# data, since they are actually bytes (dtype 'S1').
fill_value_to_check = np.array(fill_value_to_check, dtype=dtype)
if hasattr(cf_var, "_data_array"):
# The variable is not an actual netCDF4 file variable, but an emulating
# object with an attached data array (either numpy or dask), which should be
# copied immediately to the target. This is used as a hook to translate
# data to/from netcdf data container objects in other packages, such as
# xarray.
# See https://github.com/SciTools/iris/issues/4994 "Xarray bridge".
# N.B. also, in this case there is no need for fill-value checking as the
# data is not being translated to an in-file representation.
cf_var._data_array = data
else:
# A None means we will NOT check for collisions.
fill_value_to_check = None

fill_info = _FillvalueCheckInfo(
user_value=fill_value,
check_value=fill_value_to_check,
dtype=dtype,
varname=cf_var.name,
)

doing_delayed_save = is_lazy_data(data)
if doing_delayed_save:
# save lazy data with a delayed operation. For now, we just record the
# necessary information -- a single, complete delayed action is constructed
# later by a call to delayed_completion().
def store(data, cf_var, fill_info):
# Create a data-writeable object that we can stream into, which
# encapsulates the file to be opened + variable to be written.
write_wrapper = _thread_safe_nc.NetCDFWriteProxy(
self.filepath, cf_var, self.file_write_lock
# Decide whether we are checking for fill-value collisions.
dtype = cf_var.dtype
# fill_warn allows us to skip warning if packing attributes have been
# specified. It would require much more complex operations to work out
# what the values and fill_value _would_ be in such a case.
if fill_warn:
if fill_value is not None:
fill_value_to_check = fill_value
else:
# Retain 'fill_value == None', to show that no specific value was given.
# But set 'fill_value_to_check' to a calculated value
fill_value_to_check = _thread_safe_nc.default_fillvals[
dtype.str[1:]
]
# Cast the check-value to the correct dtype.
# NOTE: In the case of 'S1' dtype (at least), the default (Python) value
# does not have a compatible type. This causes a deprecation warning at
# numpy 1.24, *and* was preventing correct fill-value checking of character
# data, since they are actually bytes (dtype 'S1').
fill_value_to_check = np.array(
fill_value_to_check, dtype=dtype
)
# Add to the list of delayed writes, used in delayed_completion().
self._delayed_writes.append((data, write_wrapper, fill_info))
# In this case, fill-value checking is done later. But return 2 dummy
# values, to be consistent with the non-streamed "store" signature.
is_masked, contains_value = False, False
return is_masked, contains_value

else:
# Real data is always written directly, i.e. not via lazy save.
# We also check it immediately for any fill-value problems.
def store(data, cf_var, fill_info):
cf_var[:] = data
return _data_fillvalue_check(np, data, fill_info.check_value)

# Store the data and check if it is masked and contains the fill value.
is_masked, contains_fill_value = store(data, cf_var, fill_info)

if not doing_delayed_save:
# Issue a fill-value warning immediately, if appropriate.
_fillvalue_report(
fill_info, is_masked, contains_fill_value, warn=True
else:
# A None means we will NOT check for collisions.
fill_value_to_check = None

fill_info = _FillvalueCheckInfo(
user_value=fill_value,
check_value=fill_value_to_check,
dtype=dtype,
varname=cf_var.name,
)

doing_delayed_save = is_lazy_data(data)
if doing_delayed_save:
# save lazy data with a delayed operation. For now, we just record the
# necessary information -- a single, complete delayed action is constructed
# later by a call to delayed_completion().
def store(data, cf_var, fill_info):
# Create a data-writeable object that we can stream into, which
# encapsulates the file to be opened + variable to be written.
write_wrapper = _thread_safe_nc.NetCDFWriteProxy(
self.filepath, cf_var, self.file_write_lock
)
# Add to the list of delayed writes, used in delayed_completion().
self._delayed_writes.append(
(data, write_wrapper, fill_info)
)
# In this case, fill-value checking is done later. But return 2 dummy
# values, to be consistent with the non-streamed "store" signature.
is_masked, contains_value = False, False
return is_masked, contains_value

else:
# Real data is always written directly, i.e. not via lazy save.
# We also check it immediately for any fill-value problems.
def store(data, cf_var, fill_info):
cf_var[:] = data
return _data_fillvalue_check(
np, data, fill_info.check_value
)

# Store the data and check if it is masked and contains the fill value.
is_masked, contains_fill_value = store(data, cf_var, fill_info)

if not doing_delayed_save:
# Issue a fill-value warning immediately, if appropriate.
_fillvalue_report(
fill_info, is_masked, contains_fill_value, warn=True
)

def delayed_completion(self) -> Delayed:
"""
Create and return a :class:`dask.delayed.Delayed` to perform file completion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from iris.exceptions import CannotAddError
from iris.fileformats._nc_load_rules.helpers import build_ancil_var
from iris.fileformats.netcdf import _thread_safe_nc as threadsafe_nc


@pytest.fixture
Expand All @@ -31,6 +32,7 @@ def mock_engine():
def mock_cf_av_var(monkeypatch):
data = np.arange(6)
output = mock.Mock(
spec=threadsafe_nc.VariableWrapper,
dimensions=("foo",),
scale_factor=1,
add_offset=0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from iris.exceptions import CannotAddError
from iris.fileformats._nc_load_rules.helpers import build_auxiliary_coordinate
from iris.fileformats.cf import CFVariable
from iris.fileformats.netcdf import _thread_safe_nc as threadsafe_nc


class TestBoundsVertexDim(tests.IrisTest):
Expand Down Expand Up @@ -238,6 +239,7 @@ def setUp(self):

points = np.arange(6)
self.cf_coord_var = mock.Mock(
spec=threadsafe_nc.VariableWrapper,
dimensions=("foo",),
scale_factor=1,
add_offset=0,
Expand All @@ -257,6 +259,7 @@ def setUp(self):

bounds = np.arange(12).reshape(6, 2)
self.cf_bounds_var = mock.Mock(
spec=threadsafe_nc.VariableWrapper,
dimensions=("x", "nv"),
scale_factor=1,
add_offset=0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from iris.exceptions import CannotAddError
from iris.fileformats._nc_load_rules.helpers import build_cell_measures
from iris.fileformats.netcdf import _thread_safe_nc as threadsafe_nc


@pytest.fixture
Expand All @@ -31,6 +32,7 @@ def mock_engine():
def mock_cf_cm_var(monkeypatch):
data = np.arange(6)
output = mock.Mock(
spec=threadsafe_nc.VariableWrapper,
dimensions=("foo",),
scale_factor=1,
add_offset=0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ def setUp(self):
self.shape = (300000, 240, 200)
self.expected_chunks = _optimum_chunksize(self.shape, self.shape)

def _make(self, chunksizes=None, shape=None, dtype="i4"):
def _make(
self, chunksizes=None, shape=None, dtype="i4", **extra_properties
):
cf_data = mock.MagicMock(
_FillValue=None,
__getitem__="<real-data>",
Expand All @@ -40,6 +42,7 @@ def _make(self, chunksizes=None, shape=None, dtype="i4"):
cf_name="DUMMY_VAR",
shape=shape,
size=np.prod(shape),
**extra_properties,
)
cf_var.__getitem__.return_value = mock.sentinel.real_data_accessed
return cf_var
Expand Down Expand Up @@ -90,6 +93,15 @@ def test_arraytype__100f8_is_real(self):
var_data = _get_cf_var_data(cf_var, self.filename)
self.assertIs(var_data, mock.sentinel.real_data_accessed)

def test_cf_data_emulation(self):
# Check that a variable emulation object passes its real data directly.
emulated_data = mock.Mock()
# Make a cf_var with a special extra '_data_array' property.
cf_var = self._make(chunksizes=None, _data_array=emulated_data)
result = _get_cf_var_data(cf_var, self.filename)
# This should get directly returned.
self.assertIs(emulated_data, result)


if __name__ == "__main__":
tests.main()