Skip to content
208 changes: 142 additions & 66 deletions cf/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ def __init__(
except (AttributeError, TypeError):
pass
else:
self._set_dask(array, copy=copy, delete_source=False)
self._set_dask(array, copy=copy, conform=False)
else:
self._del_dask(None)

Expand Down Expand Up @@ -478,7 +478,7 @@ def __init__(
self._Units = units

# Store the dask array
self._set_dask(array, delete_source=False)
self._set_dask(array, conform=False)

# Override the data type
if dtype is not None:
Expand Down Expand Up @@ -1107,9 +1107,8 @@ def __setitem__(self, indices, value):
shifts = [-shift for shift in shifts]
self.roll(shift=shifts, axis=roll_axes, inplace=True)

# Remove a source array, on the grounds that we can't
# guarantee its consistency with the updated dask array.
self._del_Array(None)
# Remove elements made invalid by updating the `dask` array
self._conform_after_dask_update()

return

Expand Down Expand Up @@ -1227,12 +1226,32 @@ def __keepdims_indexing__(self):
def __keepdims_indexing__(self, value):
self._custom["__keepdims_indexing__"] = bool(value)

def _set_dask(self, array, copy=False, delete_source=True):
def _conform_after_dask_update(self):
"""Remove elements made invalid by updating the `dask` array.

Removes or modifies components that can't be guaranteed to be
consistent with an updated `dask` array`:

* Deletes a source array.
* Deletes cached element values.

.. versionadded:: TODODASKVER

:Returns:

`None`

"""
self._del_Array(None)
self._del_cached_elements()

def _set_dask(self, array, copy=False, conform=True):
"""Set the dask array.

.. versionadded:: TODODASKVER

.. seealso:: `to_dask_array`, `_del_dask`
.. seealso:: `to_dask_array`, `_conform_after_dask_update`,
`_del_dask`

:Parameters:

Expand All @@ -1243,10 +1262,10 @@ def _set_dask(self, array, copy=False, delete_source=True):
If True then copy *array* before setting it. By
default it is not copied.

delete_source: `bool`, optional
If False then do not delete a source array, if one
exists, after setting the new dask array. By default a
source array is deleted.
conform: `bool`, optional
If True, the default, then remove elements made
invalid by updating the `dask` array. See
`_conform_after_dask_update` for details.

:Returns:

Expand All @@ -1255,35 +1274,40 @@ def _set_dask(self, array, copy=False, delete_source=True):
"""
if array is NotImplemented:
logger.warning(
"NotImplemented has been set in the place of a dask array"
"NotImplemented has been set in the place of a dask array."
"\n\n"
"This could occur if any sort of exception is raised "
"by a function that is run on chunks (via, for "
"instance, da.map_blocks or "
"dask.array.core.elemwise). Such a function could get "
"run at definition time in order to ascertain "
"suitability (such as data type casting, "
"broadcasting, etc.). Note that the exception may be "
"difficult to diagnose, as dask will have silently "
"trapped it and returned NotImplemented (for "
"instance, see dask.array.core.elemwise). Print "
"statements in a local copy of dask are possibly the "
"way to go if the cause of the error is not obvious."
)
# This could occur if any sort of exception is raised by
# function that is run on chunks (such as
# `cf_where`). Such a function could get run at definition
# time in order to ascertain suitability (such as data
# type casting, broadcasting, etc.). Note that the
# exception may be difficult to diagnose, as dask will
# have silently trapped it and returned NotImplemented
# (for instance, see `dask.array.core.elemwise`). Print
# statements in a local copy of dask are prossibly the way
# to go if the cause of the error is not obvious.

if copy:
array = array.copy()

self._custom["dask"] = array

if delete_source:
# Remove a source array, on the grounds that we can't
# guarantee its consistency with the new dask array.
self._del_Array(None)
if conform:
# Remove elements made invalid by updating the `dask`
# array
self._conform_after_dask_update()

def _del_dask(self, default=ValueError(), delete_source=True):
def _del_dask(self, default=ValueError(), conform=True):
"""Remove the dask array.

.. versionadded:: TODODASKVER

.. seealso:: `_set_dask`, `to_dask_array`
.. seealso:: `to_dask_array`, `_conform_after_dask_update`,
`_set_dask`


:Parameters:

Expand All @@ -1293,9 +1317,10 @@ def _del_dask(self, default=ValueError(), delete_source=True):

{{default Exception}}

delete_source: `bool`, optional
If False then do not delete a compressed source array,
if one exists.
conform: `bool`, optional
If True, the default, then remove elements made
invalid by updating the `dask` array. See
`_conform_after_dask_update` for details.

:Returns:

Expand Down Expand Up @@ -1325,14 +1350,71 @@ def _del_dask(self, default=ValueError(), delete_source=True):
default, f"{self.__class__.__name__!r} has no dask array"
)

if delete_source:
# Remove a source array, on the grounds that we can't
# guarantee its consistency with any future new dask
# array.
self._del_Array(None)
if conform:
# Remove elements made invalid by deleting the `dask`
# array
self._conform_after_dask_update()

return out

def _del_cached_elements(self):
"""Delete any cached element values.

Updates *data* in-place to remove the cached element values
``'first_element'``, ``'second_element'`` and
``'last_element'``.

.. note:: By default, `_del_cached_elements` is run whenever
the `_set_dask` and `del_dask` methods are used. If
the `dask` array is updated or changed without using
the default behaviour of either of these two
methods, and there is any chance that the cached
values might be inconsistent with the new data, then
`_del_cached_elements` must be called explicitly to
ensure consistency.

.. versionadded:: TODODASKVER

.. seealso:: `_del_dask`, `_set_cached_elements`, `_set_dask`

:Returns:

`None`

"""
custom = self._custom
for element in ("first_element", "second_element", "last_element"):
custom.pop(element, None)

def _set_cached_elements(self, elements):
"""Cache selected element values.

Updates *data* in-place to store the given element values
within its ``custom`` dictionary.

.. versionadded:: TODODASKVER

.. seealso:: `_del_cached_elements`

:Parameters:

elements: `dict`
Zero or more element values to be cached, each keyed by
a unique identifier to allow unambiguous retrieval.
Existing cached elements not specified by *elements*
will not be removed.

:Returns:

`None`

**Examples**

>>> d._set_cached_elements({'first_element': 273.15})

"""
self._custom.update(elements)

@_inplace_enabled(default=False)
def diff(self, axis=-1, n=1, inplace=False):
"""Calculate the n-th discrete difference along the given axis.
Expand Down Expand Up @@ -2181,7 +2263,7 @@ def persist(self, inplace=False):

dx = self.to_dask_array()
dx = dx.persist()
d._set_dask(dx, delete_source=False)
d._set_dask(dx, conform=False)

return d

Expand Down Expand Up @@ -2751,7 +2833,7 @@ def rechunk(

dx = d.to_dask_array()
dx = dx.rechunk(chunks, threshold, block_size_limit, balance)
d._set_dask(dx, delete_source=False)
d._set_dask(dx, conform=False)

return d

Expand Down Expand Up @@ -7518,7 +7600,7 @@ def harden_mask(self):
"""
dx = self.to_dask_array()
dx = dx.map_blocks(cf_harden_mask, dtype=self.dtype)
self._set_dask(dx, delete_source=False)
self._set_dask(dx, conform=False)
self.hardmask = True

def has_calendar(self):
Expand Down Expand Up @@ -7615,7 +7697,7 @@ def soften_mask(self):
"""
dx = self.to_dask_array()
dx = dx.map_blocks(cf_soften_mask, dtype=self.dtype)
self._set_dask(dx, delete_source=False)
self._set_dask(dx, conform=False)
self.hardmask = False

@_inplace_enabled(default=False)
Expand Down Expand Up @@ -7673,7 +7755,7 @@ def filled(self, fill_value=None, inplace=False):

return d

def first_element(self, verbose=None):
def first_element(self):
"""Return the first element of the data as a scalar.

If the value is deemed too expensive to compute then a
Expand Down Expand Up @@ -7705,16 +7787,14 @@ def first_element(self, verbose=None):
masked

"""
if self.can_compute():
return super().first_element()

raise ValueError(
"First element of the data is considered too expensive "
"to compute. Consider setting the 'force_compute' attribute, or "
"setting the log level to 'DEBUG'."
)
try:
return self._custom["first_element"]
except KeyError:
item = super().first_element()
self._set_cached_elements({"first_element": item})
return item

def second_element(self, verbose=None):
def second_element(self):
"""Return the second element of the data as a scalar.

If the value is deemed too expensive to compute then a
Expand Down Expand Up @@ -7746,14 +7826,12 @@ def second_element(self, verbose=None):
masked

"""
if self.can_compute():
return super().second_element()

raise ValueError(
"Second element of the data is considered too expensive "
"to compute. Consider setting the 'force_compute' atribute, or "
"setting the log level to 'DEBUG'."
)
try:
return self._custom["second_element"]
except KeyError:
item = super().second_element()
self._set_cached_elements({"second_element": item})
return item

def last_element(self):
"""Return the last element of the data as a scalar.
Expand Down Expand Up @@ -7787,14 +7865,12 @@ def last_element(self):
masked

"""
if self.can_compute():
return super().last_element()

raise ValueError(
"First element of the data is considered too expensive "
"to compute. Consider setting the 'force_compute' attribute, or "
"setting the log level to 'DEBUG'."
)
try:
return self._custom["last_element"]
except KeyError:
item = super().last_element()
self._set_cached_elements({"last_element": item})
return item

def flat(self, ignore_masked=True):
"""Return a flat iterator over elements of the data array.
Expand Down
Loading