From 0be6643adf734ac48fbe781d89bcf37317007fdc Mon Sep 17 00:00:00 2001 From: lshaw8317 Date: Mon, 24 Nov 2025 12:26:51 +0100 Subject: [PATCH 1/3] Add saving lazyudf --- src/blosc2/lazyexpr.py | 130 ++++++++++++++++++---------------- tests/ndarray/test_lazyudf.py | 22 ++++++ 2 files changed, 92 insertions(+), 60 deletions(-) diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index 5184f411..644503f9 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -3029,50 +3029,6 @@ def info_items(self): items += [("dtype", self.dtype)] return items - def _save(self, urlpath=None, **kwargs): - if urlpath is None: - raise ValueError("To save a LazyArray you must provide an urlpath") - - # Validate expression - validate_expr(self.expression) - - meta = kwargs.get("meta", {}) - meta["LazyArray"] = LazyArrayEnum.Expr.value - kwargs["urlpath"] = urlpath - kwargs["meta"] = meta - kwargs["mode"] = "w" # always overwrite the file in urlpath - - # Create an empty array; useful for providing the shape and dtype of the outcome - array = blosc2.empty(shape=self.shape, dtype=self.dtype, **kwargs) - - # Save the expression and operands in the metadata - operands = {} - for key, value in self.operands.items(): - if isinstance(value, blosc2.C2Array): - operands[key] = { - "path": str(value.path), - "urlbase": value.urlbase, - } - continue - if key in {"numpy", "np"}: - # Provide access to cast funcs like int8 et al. - continue - if isinstance(value, blosc2.Proxy): - # Take the required info from the Proxy._cache container - value = value._cache - if not hasattr(value, "schunk"): - raise ValueError( - "To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects" - ) - if value.schunk.urlpath is None: - raise ValueError("To save a LazyArray, all operands must be stored on disk/network") - operands[key] = value.schunk.urlpath - array.schunk.vlmeta["_LazyArray"] = { - "expression": self.expression, - "UDF": None, - "operands": operands, - } - def save(self, urlpath=None, **kwargs): if urlpath is None: raise ValueError("To save a LazyArray you must provide an urlpath") @@ -3357,8 +3313,44 @@ def __getitem__(self, item): return output[item] return self.res_getitem[item] - def save(self, **kwargs): - raise NotImplementedError("For safety reasons, this is not implemented for UDFs") + def save(self, urlpath=None, **kwargs): + if urlpath is None: + raise ValueError("To save a LazyArray you must provide an urlpath") + + meta = kwargs.get("meta", {}) + meta["LazyArray"] = LazyArrayEnum.UDF.value + kwargs["urlpath"] = urlpath + kwargs["meta"] = meta + kwargs["mode"] = "w" # always overwrite the file in urlpath + + # Create an empty array; useful for providing the shape and dtype of the outcome + array = blosc2.empty(shape=self.shape, dtype=self.dtype, **kwargs) + + # Save the expression and operands in the metadata + operands = {} + operands_ = self.inputs_dict + for key, value in operands_.items(): + if isinstance(value, blosc2.C2Array): + operands[key] = { + "path": str(value.path), + "urlbase": value.urlbase, + } + continue + if isinstance(value, blosc2.Proxy): + # Take the required info from the Proxy._cache container + value = value._cache + if not hasattr(value, "schunk"): + raise ValueError( + "To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects" + ) + if value.schunk.urlpath is None: + raise ValueError("To save a LazyArray, all operands must be stored on disk/network") + operands[key] = value.schunk.urlpath + array.schunk.vlmeta["_LazyArray"] = { + "UDF": inspect.getsource(self.func), + "operands": operands, + "name": self.func.__name__, + } def _numpy_eval_expr(expression, operands, prefer_blosc=False): @@ -3615,41 +3607,59 @@ def lazyexpr( def _open_lazyarray(array): value = array.schunk.meta["LazyArray"] - if value == LazyArrayEnum.UDF.value: - raise NotImplementedError("For safety reasons, persistent UDFs are not supported") - - # LazyExpr lazyarray = array.schunk.vlmeta["_LazyArray"] + if value == LazyArrayEnum.Expr.value: + expr = lazyarray["expression"] + elif value == LazyArrayEnum.UDF.value: + expr = lazyarray["UDF"] + else: + raise ValueError("Argument `array` is not LazyExpr or LazyUDF instance.") + operands = lazyarray["operands"] parent_path = Path(array.schunk.urlpath).parent operands_dict = {} missing_ops = {} - for key, value in operands.items(): - if isinstance(value, str): - value = parent_path / value + for key, v in operands.items(): + if isinstance(v, str): + v = parent_path / v try: - op = blosc2.open(value) + op = blosc2.open(v) except FileNotFoundError: - missing_ops[key] = value + missing_ops[key] = v else: operands_dict[key] = op - elif isinstance(value, dict): + elif isinstance(v, dict): # C2Array operands_dict[key] = blosc2.C2Array( - pathlib.Path(value["path"]).as_posix(), - urlbase=value["urlbase"], + pathlib.Path(v["path"]).as_posix(), + urlbase=v["urlbase"], ) else: raise TypeError("Error when retrieving the operands") - expr = lazyarray["expression"] if missing_ops: exc = exceptions.MissingOperands(expr, missing_ops) exc.expr = expr exc.missing_ops = missing_ops raise exc - new_expr = LazyExpr._new_expr(expr, operands_dict, guess=True, out=None, where=None) + # LazyExpr + if value == LazyArrayEnum.Expr.value: + new_expr = LazyExpr._new_expr(expr, operands_dict, guess=True, out=None, where=None) + elif value == LazyArrayEnum.UDF.value: + local_ns = {} + exec(expr, {"np": np, "blosc2": blosc2}, local_ns) + name = lazyarray["name"] + func = local_ns[name] + # TODO: make more robust for general kwargs (not just cparams) + new_expr = blosc2.lazyudf( + func, + tuple(operands_dict[f"o{n}"] for n in range(len(operands_dict))), + shape=array.shape, + dtype=array.dtype, + cparams=array.cparams, + ) + # Make the array info available for the user (only available when opened from disk) new_expr.array = array # We want to expose schunk too, so that .info() can be used on the LazyArray diff --git a/tests/ndarray/test_lazyudf.py b/tests/ndarray/test_lazyudf.py index 55f2180a..d1e0cdc3 100644 --- a/tests/ndarray/test_lazyudf.py +++ b/tests/ndarray/test_lazyudf.py @@ -455,3 +455,25 @@ def test_clip_logaddexp(shape, chunks, blocks, slices): np.testing.assert_allclose(expr, np.sin(np.logaddexp(npb, npa))) expr = blosc2.evaluate("clip(logaddexp(b, a), 6, 12)") np.testing.assert_allclose(expr, np.clip(np.logaddexp(npb, npa), 6, 12)) + + +def test_save_ludf(): + shape = (23,) + npa = np.arange(start=0, stop=np.prod(shape)).reshape(shape) + blosc2.remove_urlpath("a.b2nd") + array = blosc2.asarray(npa, urlpath="a.b2nd") + + # Assert that shape is computed correctly + npc = npa + 1 + cparams = {"nthreads": 4} + urlpath = "lazyarray.b2nd" + blosc2.remove_urlpath(urlpath) + + expr = blosc2.lazyudf(udf1p, (array,), np.float64, cparams=cparams) + + expr.save(urlpath=urlpath) + del expr + expr = blosc2.open(urlpath) + assert isinstance(expr, blosc2.LazyUDF) + res_lazyexpr = expr.compute() + np.testing.assert_array_equal(res_lazyexpr[:], npc) From 726ca2cbfa3536258d9188413fef10e6c8fb4554 Mon Sep 17 00:00:00 2001 From: lshaw8317 Date: Tue, 25 Nov 2025 13:27:07 +0100 Subject: [PATCH 2/3] Save LazyUDF such that source remains available --- src/blosc2/lazyexpr.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index 644503f9..4118dc98 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -14,6 +14,7 @@ import concurrent.futures import copy import inspect +import linecache import math import os import pathlib @@ -3648,8 +3649,13 @@ def _open_lazyarray(array): new_expr = LazyExpr._new_expr(expr, operands_dict, guess=True, out=None, where=None) elif value == LazyArrayEnum.UDF.value: local_ns = {} - exec(expr, {"np": np, "blosc2": blosc2}, local_ns) name = lazyarray["name"] + filename = f"<{name}>" # any unique name + + # Register the source so inspect can find it + linecache.cache[filename] = (len(expr), None, expr.splitlines(True), filename) + + exec(compile(expr, filename, "exec"), {"np": np, "blosc2": blosc2}, local_ns) func = local_ns[name] # TODO: make more robust for general kwargs (not just cparams) new_expr = blosc2.lazyudf( From 48cbef6ed2c1d0df1836a46ebbc61c7f867e4cfa Mon Sep 17 00:00:00 2001 From: lshaw8317 Date: Wed, 26 Nov 2025 18:08:44 +0100 Subject: [PATCH 3/3] Streamline lazyudf compute --- src/blosc2/lazyexpr.py | 254 +++++++++++++----------- tests/ndarray/test_elementwise_funcs.py | 18 +- tests/ndarray/test_lazyudf.py | 48 ++--- 3 files changed, 172 insertions(+), 148 deletions(-) diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index 4118dc98..b14970d9 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -1072,7 +1072,7 @@ def read_nchunk(arrs, info): iter_chunks = None -def fill_chunk_operands( # noqa: C901 +def fill_chunk_operands( operands, slice_, chunks_, full_chunk, aligned, nchunk, iter_disk, chunk_operands, reduc=False ): """Retrieve the chunk operands for evaluating an expression. @@ -1102,11 +1102,12 @@ def fill_chunk_operands( # noqa: C901 chunk_operands[key] = chunks[i] continue # Otherwise, we need to decompress them - special = blosc2.SpecialValue((chunks[i][31] & 0x70) >> 4) - if special == blosc2.SpecialValue.ZERO: - # The chunk is a special zero chunk, so we can treat it as a scalar - chunk_operands[key] = np.zeros((), dtype=value.dtype) - continue + # The following can cause bugs if e.g. the shape of the operand is used in a lazyUDF + # special = blosc2.SpecialValue((chunks[i][31] & 0x70) >> 4) + # if special == blosc2.SpecialValue.ZERO: + # # The chunk is a special zero chunk, so we can treat it as a scalar + # chunk_operands[key] = np.zeros((), dtype=value.dtype) + # continue if aligned[key]: buff = blosc2.decompress2(chunks[i]) bsize = value.dtype.itemsize * math.prod(chunks_) @@ -1135,12 +1136,13 @@ def fill_chunk_operands( # noqa: C901 # First check if the chunk is a special zero chunk. # Using lazychunks is very effective here because we only need to read the header. - chunk = value.schunk.get_lazychunk(nchunk) - special = blosc2.SpecialValue((chunk[31] & 0x70) >> 4) - if special == blosc2.SpecialValue.ZERO: - # The chunk is a special zero chunk, so we can treat it as a scalar - chunk_operands[key] = np.zeros((), dtype=value.dtype) - continue + # chunk = value.schunk.get_lazychunk(nchunk) + # The following can cause bugs if e.g. the shape of the operand is used in a lazyUDF + # special = blosc2.SpecialValue((chunk[31] & 0x70) >> 4) + # if special == blosc2.SpecialValue.ZERO: + # # The chunk is a special zero chunk, so we can treat it as a scalar + # chunk_operands[key] = np.zeros((), dtype=value.dtype) + # continue # If key is in operands, we can reuse the buffer if ( @@ -1252,6 +1254,13 @@ def fast_eval( # noqa: C901 # else: # ne_evaluate(expression, chunk_operands, out=out[slice_]) # continue + if out is None: + # We can enter here when using any of the compute() or __getitem__() methods + if getitem: + out = np.empty(shape, dtype=dtype) + else: + out = blosc2.empty(shape, chunks=chunks, blocks=blocks, dtype=dtype, **kwargs) + if callable(expression): result = np.empty(chunks_, dtype=out.dtype) expression(tuple(chunk_operands.values()), result, offset=offset) @@ -1267,13 +1276,6 @@ def fast_eval( # noqa: C901 # We do not support one or zero operands in the fast path yet raise ValueError("Fast path: the where condition must be a tuple with two elements") - if out is None: - # We can enter here when using any of the compute() or __getitem__() methods - if getitem: - out = np.empty(shape, dtype=dtype) - else: - out = blosc2.empty(shape, chunks=chunks, blocks=blocks, dtype=dtype, **kwargs) - # Store the result in the output array if getitem: try: @@ -1334,6 +1336,7 @@ def slices_eval( # noqa: C901 operands: dict, getitem: bool, _slice=NDINDEX_EMPTY_TUPLE, + shape=None, **kwargs, ) -> blosc2.NDArray | np.ndarray: """Evaluate the expression in chunks of operands. @@ -1355,6 +1358,9 @@ def slices_eval( # noqa: C901 _slice: ndindex.Tuple sequence of slices and ints. Default = ndindex.Tuple(), optional If provided, only the chunks that intersect with this slice will be evaluated. + shape: tuple | None + The shape of the full (unsliced result). Typically passed on from parent LazyArray. + If None, a guess is made from broadcasting the operands. kwargs: Any, optional Additional keyword arguments that are supported by the :func:`empty` constructor. @@ -1386,22 +1392,22 @@ def slices_eval( # noqa: C901 orig_slice = _slice # Compute the shape and chunks of the output array, including broadcasting - shape = compute_broadcast_shape(operands.values()) - if out is None: - if _slice != (): - # Check whether _slice contains an integer, or any step that are not None or 1 - if any((isinstance(s, int)) for s in _slice): - need_final_slice = True - _slice = tuple(slice(i, i + 1, 1) if isinstance(i, int) else i for i in _slice) - # shape_slice in general not equal to final shape: - # dummy dims (due to ints) will be dealt with by taking final_slice - shape_slice = ndindex.ndindex(_slice).newshape(shape) - mask_slice = np.array([isinstance(i, int) for i in orig_slice], dtype=np.bool_) - else: - # # out should always have shape of full array - # if shape is not None and shape != out.shape: - # raise ValueError("Provided output shape does not match the operands' shape.") - shape = out.shape + if shape is None: # lazyudf provides shape kwarg + shape = compute_broadcast_shape(operands.values()) + + if _slice != (): + # Check whether _slice contains an integer, or any step that are not None or 1 + if any((isinstance(s, int)) for s in _slice): + need_final_slice = True + _slice = tuple(slice(i, i + 1, 1) if isinstance(i, int) else i for i in _slice) + # shape_slice in general not equal to final shape: + # dummy dims (due to ints) will be dealt with by taking final_slice + shape_slice = ndindex.ndindex(_slice).newshape(shape) + mask_slice = np.array([isinstance(i, int) for i in orig_slice], dtype=np.bool_) + if out is not None: + shape_ = shape_slice if shape_slice is not None else shape + if shape_ != out.shape: + raise ValueError("Provided output shape does not match the slice shape.") if chunks is None: # Guess chunk shape # Either out, or operand with `chunks`, can be used to get the chunks @@ -1483,6 +1489,26 @@ def slices_eval( # noqa: C901 chunk_operands[key] = value[cslice] + if out is None: + shape_ = shape_slice if shape_slice is not None else shape + if where is not None and len(where) < 2: + # The result is a linear array + shape_ = math.prod(shape_) + if getitem or _order: + out = np.empty(shape_, dtype=dtype_) + if _order: + indices_ = np.empty(shape_, dtype=np.int64) + else: + # if "chunks" not in kwargs and (where is None or len(where) == 2): + # Let's use the same chunks as the first operand (it could have been automatic too) + # out = blosc2.empty(shape_, chunks=chunks, dtype=dtype_, **kwargs) + # out = blosc2.empty(shape_, dtype=dtype_, **kwargs) + if "chunks" in kwargs and (where is not None and len(where) < 2 and len(shape_) > 1): + # Remove the chunks argument if the where condition is not a tuple with two elements + kwargs.pop("chunks") + out = blosc2.empty(shape_, dtype=dtype_, **kwargs) + # Check if the in out partitions are well-behaved (i.e. no padding) + behaved = blosc2.are_partitions_behaved(out.shape, out.chunks, out.blocks) # Evaluate the expression using chunks of operands if callable(expression): @@ -1491,7 +1517,7 @@ def slices_eval( # noqa: C901 # Call the udf directly and use result as the output array offset = tuple(s.start for s in cslice) expression(tuple(chunk_operands.values()), result, offset=offset) - out[cslice] = result + out[cslice_subidx] = result continue if where is None: @@ -1534,27 +1560,6 @@ def slices_eval( # noqa: C901 # but avoid copy if already contiguous result = np.require(result, requirements="C") - if out is None: - shape_ = shape_slice if shape_slice is not None else shape - if where is not None and len(where) < 2: - # The result is a linear array - shape_ = math.prod(shape_) - if getitem or _order: - out = np.empty(shape_, dtype=dtype_) - if _order: - indices_ = np.empty(shape_, dtype=np.int64) - else: - # if "chunks" not in kwargs and (where is None or len(where) == 2): - # Let's use the same chunks as the first operand (it could have been automatic too) - # out = blosc2.empty(shape_, chunks=chunks, dtype=dtype_, **kwargs) - # out = blosc2.empty(shape_, dtype=dtype_, **kwargs) - if "chunks" in kwargs and (where is not None and len(where) < 2 and len(shape_) > 1): - # Remove the chunks argument if the where condition is not a tuple with two elements - kwargs.pop("chunks") - out = blosc2.empty(shape_, dtype=dtype_, **kwargs) - # Check if the in out partitions are well-behaved (i.e. no padding) - behaved = blosc2.are_partitions_behaved(out.shape, out.chunks, out.blocks) - if where is None or len(where) == 2: if behaved and result.shape == out.chunks and result.dtype == out.dtype: # Fast path @@ -1592,7 +1597,9 @@ def slices_eval( # noqa: C901 else: # Need to take final_slice since filled up array according to slice_ for each chunk if need_final_slice: # only called if out was None if isinstance(out, np.ndarray): - out = np.squeeze(out, np.where(mask_slice)[0]) + squeeze_axis = np.where(mask_slice)[0] + squeeze_axis = np.squeeze(squeeze_axis) # handle 1d mask_slice + out = np.squeeze(out, squeeze_axis) elif isinstance(out, blosc2.NDArray): # It *seems* better to choose an automatic chunks and blocks for the output array # out = out.slice(_slice, chunks=out.chunks, blocks=out.blocks) @@ -1646,11 +1653,13 @@ def slices_eval_getitem( where: dict | None = kwargs.pop("_where_args", None) dtype = kwargs.pop("dtype", None) - if out is None: - # Compute the shape and chunks of the output array, including broadcasting - shape = compute_broadcast_shape(operands.values()) - else: - shape = out.shape + shape = kwargs.pop("shape", None) + if shape is None: + if out is None: + # Compute the shape and chunks of the output array, including broadcasting + shape = compute_broadcast_shape(operands.values()) + else: + shape = out.shape # compute the shape of the output array _slice_bcast = tuple(slice(i, i + 1) if isinstance(i, int) else i for i in _slice.raw) @@ -1675,12 +1684,17 @@ def slices_eval_getitem( slice_operands[key] = value[_slice] # Evaluate the expression using slices of operands - if where is None: - result = ne_evaluate(expression, slice_operands, **ne_args) + if callable(expression): + result = np.empty(slice_shape, dtype=dtype) + offset = tuple(s.start for s in _slice) # offset for the udf + expression(tuple(slice_operands.values()), result, offset=offset) else: - # Apply the where condition (in result) - new_expr = f"where({expression}, _where_x, _where_y)" - result = ne_evaluate(new_expr, slice_operands, **ne_args) + if where is None: + result = ne_evaluate(expression, slice_operands, **ne_args) + else: + # Apply the where condition (in result) + new_expr = f"where({expression}, _where_x, _where_y)" + result = ne_evaluate(new_expr, slice_operands, **ne_args) if out is None: # avoid copying unnecessarily try: @@ -2119,9 +2133,14 @@ def chunked_eval( # noqa: C901 try: # standardise slice to be ndindex.Tuple item = () if item in (None, slice(None, None, None)) else item + item = item if isinstance(item, tuple) else (item,) + item = tuple( + slice(s.start, s.stop, 1 if s.step is None else s.step) if isinstance(s, slice) else s + for s in item + ) item = ndindex.ndindex(item) - shape = compute_broadcast_shape(operands.values()) - if (shape is not None) and (item.raw != ()): + shape = kwargs.pop("shape", None) + if item.raw != () and shape is not None: item = item.expand(shape) # converts to standard tuple form getitem = kwargs.pop("_getitem", False) @@ -2146,7 +2165,7 @@ def chunked_eval( # noqa: C901 if not is_full_slice(item.raw) or (where is not None and len(where) < 2): # The fast path is possible under a few conditions - if getitem and (where is None or len(where) == 2) and not callable(expression): + if getitem and (where is None or len(where) == 2): # Compute the size of operands for the fast path unit_steps = np.all([s.step == 1 for s in item.raw if isinstance(s, slice)]) # shape of slice, if non-unit steps have to decompress full array into memory @@ -2155,8 +2174,8 @@ def chunked_eval( # noqa: C901 size_operands = math.prod(shape_operands) * len(operands) * _dtype.itemsize # Only take the fast path if the size of operands is relatively small if size_operands < blosc2.MAX_FAST_PATH_SIZE: - return slices_eval_getitem(expression, operands, _slice=item, **kwargs) - return slices_eval(expression, operands, getitem=getitem, _slice=item, **kwargs) + return slices_eval_getitem(expression, operands, _slice=item, shape=shape, **kwargs) + return slices_eval(expression, operands, getitem=getitem, _slice=item, shape=shape, **kwargs) if fast_path: if getitem: @@ -2171,7 +2190,7 @@ def chunked_eval( # noqa: C901 return fast_eval(expression, operands, getitem=False, **kwargs) # End up here by default - return slices_eval(expression, operands, getitem=getitem, _slice=item, **kwargs) + return slices_eval(expression, operands, getitem=getitem, _slice=item, shape=shape, **kwargs) finally: # Deactivate cache for NDField instances @@ -2976,6 +2995,7 @@ def compute(self, item=(), **kwargs) -> blosc2.NDArray: if hasattr(self, "_where_args"): kwargs["_where_args"] = self._where_args kwargs["dtype"] = self.dtype + kwargs["shape"] = self.shape if hasattr(self, "_indices"): kwargs["_indices"] = self._indices if hasattr(self, "_order"): @@ -2992,7 +3012,7 @@ def compute(self, item=(), **kwargs) -> blosc2.NDArray: and not isinstance(result, blosc2.NDArray) ): # Get rid of all the extra kwargs that are not accepted by blosc2.asarray - kwargs_not_accepted = {"_where_args", "_indices", "_order", "_ne_args", "dtype"} + kwargs_not_accepted = {"_where_args", "_indices", "_order", "_ne_args", "dtype", "shape"} kwargs = {key: value for key, value in kwargs.items() if key not in kwargs_not_accepted} result = blosc2.asarray(result, **kwargs) return result @@ -3141,7 +3161,7 @@ class LazyUDF(LazyArray): def __init__(self, func, inputs, dtype, shape=None, chunked_eval=True, **kwargs): # After this, all the inputs should be np.ndarray or NDArray objects self.inputs = convert_inputs(inputs) - self.chunked_eval = chunked_eval + self.chunked_eval = True # chunked_eval # Get res shape if shape is None: self._shape = compute_broadcast_shape(self.inputs) @@ -3153,6 +3173,8 @@ def __init__(self, func, inputs, dtype, shape=None, chunked_eval=True, **kwargs) self._shape = shape self.kwargs = kwargs + self.kwargs["dtype"] = dtype + self.kwargs["shape"] = self._shape self._dtype = dtype self.func = func @@ -3167,10 +3189,11 @@ def __init__(self, func, inputs, dtype, shape=None, chunked_eval=True, **kwargs) raise TypeError("dparams should be a dictionary") kwargs_getitem["dparams"] = dparams - self.res_getitem = blosc2.empty(self._shape, self._dtype, **kwargs_getitem) - # Register a postfilter for getitem - if 0 not in self._shape: - self.res_getitem._set_postf_udf(self.func, id(self.inputs)) + # TODO: enable parallelism using python 3.14t + # self.res_getitem = blosc2.empty(self._shape, self._dtype, **kwargs_getitem) + # # Register a postfilter for getitem + # if 0 not in self._shape: + # self.res_getitem._set_postf_udf(self.func, id(self.inputs)) self.inputs_dict = {f"o{i}": obj for i, obj in enumerate(self.inputs)} @@ -3271,48 +3294,47 @@ def compute(self, item=(), **kwargs): ): raise ValueError("Cannot use the same urlpath for LazyArray and eval NDArray") _ = aux_kwargs.pop("urlpath", None) - aux_kwargs.update(kwargs) - - if item == (): - if self.chunked_eval: - res_eval = blosc2.empty(self.shape, self.dtype, **aux_kwargs) - chunked_eval(self.func, self.inputs_dict, None, _getitem=False, _output=res_eval) - return res_eval - - # Cannot use multithreading when applying a prefilter, save nthreads to set them - # after the evaluation - cparams = aux_kwargs.get("cparams", {}) - if isinstance(cparams, dict): - self._cnthreads = cparams.get("nthreads", blosc2.cparams_dflts["nthreads"]) - cparams["nthreads"] = 1 - else: - raise ValueError("cparams should be a dictionary") - aux_kwargs["cparams"] = cparams - - res_eval = blosc2.empty(self.shape, self.dtype, **aux_kwargs) - # Register a prefilter for eval - res_eval._set_pref_udf(self.func, id(self.inputs)) - aux = np.empty(res_eval.shape, res_eval.dtype) - res_eval[...] = aux - res_eval.schunk.remove_prefilter(self.func.__name__) - res_eval.schunk.cparams.nthreads = self._cnthreads + if "out" in kwargs: # use provided out preferentially + aux_kwargs["_output"] = kwargs.pop("out") + elif hasattr(self, "_output"): + aux_kwargs["_output"] = self._output + aux_kwargs.update(kwargs) - return res_eval - else: - # Get only a slice - np_array = self.__getitem__(item) - return blosc2.asarray(np_array, **aux_kwargs) + if self.chunked_eval: + # aux_kwargs includes self.shape and self.dtype + return chunked_eval(self.func, self.inputs_dict, item, _getitem=False, **aux_kwargs) + + # TODO: Implement multithreading + # # Cannot use multithreading when applying a prefilter, save nthreads to set them + # # after the evaluation + # cparams = aux_kwargs.get("cparams", {}) + # if isinstance(cparams, dict): + # self._cnthreads = cparams.get("nthreads", blosc2.cparams_dflts["nthreads"]) + # cparams["nthreads"] = 1 + # else: + # raise ValueError("cparams should be a dictionary") + # aux_kwargs["cparams"] = cparams + + # res_eval = blosc2.empty(self.shape, self.dtype, **aux_kwargs) + # # Register a prefilter for eval + # res_eval._set_pref_udf(self.func, id(self.inputs)) + + # aux = np.empty(res_eval.shape, res_eval.dtype) + # res_eval[...] = aux + # res_eval.schunk.remove_prefilter(self.func.__name__) + # res_eval.schunk.cparams.nthreads = self._cnthreads + + # return res_eval + return None def __getitem__(self, item): if self.chunked_eval: - # TODO: as this creates a big array, this can potentially consume a lot of memory - output = np.empty(self.shape, self.dtype) # It is important to pass kwargs here, because chunks can be used internally - # fills numpy array with desired slice - chunked_eval(self.func, self.inputs_dict, item, _getitem=True, _output=output, **self.kwargs) - return output[item] - return self.res_getitem[item] + # self.kwargs includes self.shape and self.dtype + return chunked_eval(self.func, self.inputs_dict, item, _getitem=True, **self.kwargs) + # return self.res_getitem[item] # TODO: implement multithreading + return None def save(self, urlpath=None, **kwargs): if urlpath is None: diff --git a/tests/ndarray/test_elementwise_funcs.py b/tests/ndarray/test_elementwise_funcs.py index c6f95733..0a03a4a2 100644 --- a/tests/ndarray/test_elementwise_funcs.py +++ b/tests/ndarray/test_elementwise_funcs.py @@ -38,7 +38,7 @@ SHAPES_CHUNKS_HEAVY = [((10, 13, 13), (3, 5, 2))] -def _test_unary_func_impl(np_func, blosc_func, dtype, shape, chunkshape): +def _test_unary_func_impl(np_func, blosc_func, dtype, shape, chunkshape): # noqa : C901 """Helper function containing the actual test logic for unary functions.""" if np_func.__name__ in ("arccos", "arcsin", "arctanh"): a_blosc = blosc2.linspace( @@ -82,8 +82,12 @@ def _test_unary_func_impl(np_func, blosc_func, dtype, shape, chunkshape): assert True if success: try: - result = blosc_func(a_blosc)[...] - np.testing.assert_allclose(result, expected, rtol=1e-6, atol=1e-6) + result = blosc_func(a_blosc) + np.testing.assert_allclose(result[()], expected, rtol=1e-6, atol=1e-6) + # test compute too + if hasattr(result, "compute"): + result = result.compute() + np.testing.assert_allclose(result, expected, rtol=1e-6, atol=1e-6) except TypeError as e: # some functions don't support certain dtypes and that's fine assert True @@ -140,8 +144,12 @@ def _test_binary_func_proxy(np_func, blosc_func, dtype, shape, chunkshape, xp): assert True if success: try: - result = blosc_func(not_blosc1, a_blosc2)[()] - np.testing.assert_allclose(result, expected, rtol=1e-6, atol=1e-6) + result = blosc_func(not_blosc1, a_blosc2) + np.testing.assert_allclose(result[()], expected, rtol=1e-6, atol=1e-6) + # test compute too + if hasattr(result, "compute"): + result = result.compute() + np.testing.assert_allclose(result, expected, rtol=1e-6, atol=1e-6) except TypeError as e: # some functions don't support certain dtypes and that's fine assert True diff --git a/tests/ndarray/test_lazyudf.py b/tests/ndarray/test_lazyudf.py index d1e0cdc3..ada04676 100644 --- a/tests/ndarray/test_lazyudf.py +++ b/tests/ndarray/test_lazyudf.py @@ -5,7 +5,6 @@ # This source code is licensed under a BSD-style license (found in the # LICENSE file in the root directory of this source tree) ####################################################################### -import math import numpy as np import pytest @@ -152,7 +151,8 @@ def test_0p(shape, chunks, blocks, chunked_eval): expr = blosc2.lazyudf( udf0p, (), npa.dtype, shape=shape, chunked_eval=chunked_eval, chunks=chunks, blocks=blocks ) - res = expr.compute() + out = blosc2.empty(dtype=expr.dtype, shape=expr.shape) + res = expr.compute(out=out) np.testing.assert_allclose(res[...], npa) @@ -339,8 +339,13 @@ def test_eval_slice(shape, chunks, blocks, slices, urlpath, contiguous, chunked_ def udf_offset(inputs_tuple, output, offset): - _ = inputs_tuple[0] - output[:] = sum(offset) + x = inputs_tuple[0] + coords = np.zeros_like(x) + for n in range(x.ndim): + for i in range(x.shape[n]): + _slice = tuple(slice(None, None) if n != n_ else i for n_ in range(x.ndim)) + coords[_slice] += offset[n] + i + output[:] = np.sin(coords) @pytest.mark.parametrize("eval_mode", ["eval", "getitem"]) @@ -355,15 +360,11 @@ def udf_offset(inputs_tuple, output, offset): ((8, 8), (4, 4), (2, 2), (slice(None), slice(None))), ((9, 8), (4, 4), (2, 3), (slice(None), slice(None))), ((13, 13), (10, 10), (4, 3), (slice(None), slice(None))), - # TODO: make this work - # I think the issue is in how the offsets are calculated here, in the test. - # Other tests involving offset with slices are working fine - # (e.g. test_ndarray::test_arange) - # ((8, 8), (4, 4), (2, 2), (slice(0, 5), slice(5, 8))), - # ((9, 8), (4, 4), (2, 3), (slice(0, 5), slice(5, 8))), - # ((40, 20), (30, 10), (5, 5), (slice(0, 5), slice(5, 20))), - # ((13, 13), (10, 10), (4, 3), (slice(3, 8), slice(9, 12))), - # ((13, 13, 10), (10, 10, 5), (5, 5, 3), (slice(0, 12), slice(3, 13), ...)), + ((8, 8), (4, 4), (2, 2), (slice(0, 5), slice(5, 8))), + ((9, 8), (4, 4), (2, 3), (slice(0, 5), slice(5, 8))), + ((40, 20), (30, 10), (5, 5), (slice(0, 5), slice(5, 20))), + ((13, 13), (10, 10), (4, 3), (slice(3, 8), slice(9, 12))), + ((13, 13, 10), (10, 10, 5), (5, 5, 3), (slice(0, 12), slice(3, 13), ...)), ], ) def test_offset(shape, chunks, blocks, slices, chunked_eval, eval_mode): @@ -372,19 +373,12 @@ def test_offset(shape, chunks, blocks, slices, chunked_eval, eval_mode): # Compute the desired output out = np.zeros_like(x) - # Calculate the number of chunks in each dimension - if not chunked_eval: - # When using prefilters/postfilters, the computation is split in blocks, not chunks - chunks = blocks - nchunks = tuple(math.ceil(x.shape[i] / blocks[i]) for i in range(len(x.shape))) - - # Iterate over the chunks for computing the output - for index in np.ndindex(nchunks): - # Calculate the offset for the current chunk - offset = [index[i] * chunks[i] for i in range(len(index))] - # Apply the offset to the chunk and store the result in the output array - out_slice = tuple(slice(index[i] * chunks[i], (index[i] + 1) * chunks[i]) for i in range(len(index))) - out[out_slice] = sum(offset) + coords = np.zeros_like(x) + for n in range(x.ndim): + for i in range(x.shape[n]): + _slice = tuple(slice(None, None) if n != n_ else i for n_ in range(x.ndim)) + coords[_slice] += i + out = np.sin(coords) expr = blosc2.lazyudf( udf_offset, @@ -395,7 +389,7 @@ def test_offset(shape, chunks, blocks, slices, chunked_eval, eval_mode): blocks=blocks, ) if eval_mode == "eval": - res = expr.compute(slices) + res = expr.compute(slices) # tests slices_eval res = res[:] else: res = expr[slices]