diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index 5184f411..c8269439 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -14,6 +14,7 @@ import concurrent.futures import copy import inspect +import linecache import math import os import pathlib @@ -914,7 +915,7 @@ def validate_inputs(inputs: dict, out=None, reduce=False) -> tuple: # noqa: C90 # Check the out NDArray (if present) first if isinstance(out, blosc2.NDArray) and not reduce: if first_input.shape != out.shape: - raise ValueError("Output shape does not match the first input shape") + return None, None, None, False if first_input.chunks != out.chunks: fast_path = False if first_input.blocks != out.blocks: @@ -1071,7 +1072,7 @@ def read_nchunk(arrs, info): iter_chunks = None -def fill_chunk_operands( # noqa: C901 +def fill_chunk_operands( operands, slice_, chunks_, full_chunk, aligned, nchunk, iter_disk, chunk_operands, reduc=False ): """Retrieve the chunk operands for evaluating an expression. @@ -1101,11 +1102,6 @@ def fill_chunk_operands( # noqa: C901 chunk_operands[key] = chunks[i] continue # Otherwise, we need to decompress them - special = blosc2.SpecialValue((chunks[i][31] & 0x70) >> 4) - if special == blosc2.SpecialValue.ZERO: - # The chunk is a special zero chunk, so we can treat it as a scalar - chunk_operands[key] = np.zeros((), dtype=value.dtype) - continue if aligned[key]: buff = blosc2.decompress2(chunks[i]) bsize = value.dtype.itemsize * math.prod(chunks_) @@ -1132,15 +1128,6 @@ def fill_chunk_operands( # noqa: C901 chunk_operands[key] = value[slice_] continue - # First check if the chunk is a special zero chunk. - # Using lazychunks is very effective here because we only need to read the header. - chunk = value.schunk.get_lazychunk(nchunk) - special = blosc2.SpecialValue((chunk[31] & 0x70) >> 4) - if special == blosc2.SpecialValue.ZERO: - # The chunk is a special zero chunk, so we can treat it as a scalar - chunk_operands[key] = np.zeros((), dtype=value.dtype) - continue - # If key is in operands, we can reuse the buffer if ( key in chunk_operands @@ -1251,6 +1238,13 @@ def fast_eval( # noqa: C901 # else: # ne_evaluate(expression, chunk_operands, out=out[slice_]) # continue + if out is None: + # We can enter here when using any of the compute() or __getitem__() methods + if getitem: + out = np.empty(shape, dtype=dtype) + else: + out = blosc2.empty(shape, chunks=chunks, blocks=blocks, dtype=dtype, **kwargs) + if callable(expression): result = np.empty(chunks_, dtype=out.dtype) expression(tuple(chunk_operands.values()), result, offset=offset) @@ -1266,13 +1260,6 @@ def fast_eval( # noqa: C901 # We do not support one or zero operands in the fast path yet raise ValueError("Fast path: the where condition must be a tuple with two elements") - if out is None: - # We can enter here when using any of the compute() or __getitem__() methods - if getitem: - out = np.empty(shape, dtype=dtype) - else: - out = blosc2.empty(shape, chunks=chunks, blocks=blocks, dtype=dtype, **kwargs) - # Store the result in the output array if getitem: try: @@ -1333,6 +1320,7 @@ def slices_eval( # noqa: C901 operands: dict, getitem: bool, _slice=NDINDEX_EMPTY_TUPLE, + shape=None, **kwargs, ) -> blosc2.NDArray | np.ndarray: """Evaluate the expression in chunks of operands. @@ -1354,6 +1342,9 @@ def slices_eval( # noqa: C901 _slice: ndindex.Tuple sequence of slices and ints. Default = ndindex.Tuple(), optional If provided, only the chunks that intersect with this slice will be evaluated. + shape: tuple | None + The shape of the full (unsliced result). Typically passed on from parent LazyArray. + If None, a guess is made from broadcasting the operands. kwargs: Any, optional Additional keyword arguments that are supported by the :func:`empty` constructor. @@ -1385,22 +1376,22 @@ def slices_eval( # noqa: C901 orig_slice = _slice # Compute the shape and chunks of the output array, including broadcasting - shape = compute_broadcast_shape(operands.values()) - if out is None: - if _slice != (): - # Check whether _slice contains an integer, or any step that are not None or 1 - if any((isinstance(s, int)) for s in _slice): - need_final_slice = True - _slice = tuple(slice(i, i + 1, 1) if isinstance(i, int) else i for i in _slice) - # shape_slice in general not equal to final shape: - # dummy dims (due to ints) will be dealt with by taking final_slice - shape_slice = ndindex.ndindex(_slice).newshape(shape) - mask_slice = np.array([isinstance(i, int) for i in orig_slice], dtype=np.bool_) - else: - # # out should always have shape of full array - # if shape is not None and shape != out.shape: - # raise ValueError("Provided output shape does not match the operands' shape.") - shape = out.shape + if shape is None: # lazyudf provides shape kwarg + shape = compute_broadcast_shape(operands.values()) + + if _slice != (): + # Check whether _slice contains an integer, or any step that are not None or 1 + if any((isinstance(s, int)) for s in _slice): + need_final_slice = True + _slice = tuple(slice(i, i + 1, 1) if isinstance(i, int) else i for i in _slice) + # shape_slice in general not equal to final shape: + # dummy dims (due to ints) will be dealt with by taking final_slice + shape_slice = ndindex.ndindex(_slice).newshape(shape) + mask_slice = np.array([isinstance(i, int) for i in orig_slice], dtype=np.bool_) + if out is not None: + shape_ = shape_slice if shape_slice is not None else shape + if shape_ != out.shape: + raise ValueError("Provided output shape does not match the slice shape.") if chunks is None: # Guess chunk shape # Either out, or operand with `chunks`, can be used to get the chunks @@ -1482,6 +1473,26 @@ def slices_eval( # noqa: C901 chunk_operands[key] = value[cslice] + if out is None: + shape_ = shape_slice if shape_slice is not None else shape + if where is not None and len(where) < 2: + # The result is a linear array + shape_ = math.prod(shape_) + if getitem or _order: + out = np.empty(shape_, dtype=dtype_) + if _order: + indices_ = np.empty(shape_, dtype=np.int64) + else: + # if "chunks" not in kwargs and (where is None or len(where) == 2): + # Let's use the same chunks as the first operand (it could have been automatic too) + # out = blosc2.empty(shape_, chunks=chunks, dtype=dtype_, **kwargs) + # out = blosc2.empty(shape_, dtype=dtype_, **kwargs) + if "chunks" in kwargs and (where is not None and len(where) < 2 and len(shape_) > 1): + # Remove the chunks argument if the where condition is not a tuple with two elements + kwargs.pop("chunks") + out = blosc2.empty(shape_, dtype=dtype_, **kwargs) + # Check if the in out partitions are well-behaved (i.e. no padding) + behaved = blosc2.are_partitions_behaved(out.shape, out.chunks, out.blocks) # Evaluate the expression using chunks of operands if callable(expression): @@ -1490,7 +1501,7 @@ def slices_eval( # noqa: C901 # Call the udf directly and use result as the output array offset = tuple(s.start for s in cslice) expression(tuple(chunk_operands.values()), result, offset=offset) - out[cslice] = result + out[cslice_subidx] = result continue if where is None: @@ -1533,27 +1544,6 @@ def slices_eval( # noqa: C901 # but avoid copy if already contiguous result = np.require(result, requirements="C") - if out is None: - shape_ = shape_slice if shape_slice is not None else shape - if where is not None and len(where) < 2: - # The result is a linear array - shape_ = math.prod(shape_) - if getitem or _order: - out = np.empty(shape_, dtype=dtype_) - if _order: - indices_ = np.empty(shape_, dtype=np.int64) - else: - # if "chunks" not in kwargs and (where is None or len(where) == 2): - # Let's use the same chunks as the first operand (it could have been automatic too) - # out = blosc2.empty(shape_, chunks=chunks, dtype=dtype_, **kwargs) - # out = blosc2.empty(shape_, dtype=dtype_, **kwargs) - if "chunks" in kwargs and (where is not None and len(where) < 2 and len(shape_) > 1): - # Remove the chunks argument if the where condition is not a tuple with two elements - kwargs.pop("chunks") - out = blosc2.empty(shape_, dtype=dtype_, **kwargs) - # Check if the in out partitions are well-behaved (i.e. no padding) - behaved = blosc2.are_partitions_behaved(out.shape, out.chunks, out.blocks) - if where is None or len(where) == 2: if behaved and result.shape == out.chunks and result.dtype == out.dtype: # Fast path @@ -1591,7 +1581,9 @@ def slices_eval( # noqa: C901 else: # Need to take final_slice since filled up array according to slice_ for each chunk if need_final_slice: # only called if out was None if isinstance(out, np.ndarray): - out = np.squeeze(out, np.where(mask_slice)[0]) + squeeze_axis = np.where(mask_slice)[0] + squeeze_axis = np.squeeze(squeeze_axis) # handle 1d mask_slice + out = np.squeeze(out, squeeze_axis) elif isinstance(out, blosc2.NDArray): # It *seems* better to choose an automatic chunks and blocks for the output array # out = out.slice(_slice, chunks=out.chunks, blocks=out.blocks) @@ -1645,16 +1637,19 @@ def slices_eval_getitem( where: dict | None = kwargs.pop("_where_args", None) dtype = kwargs.pop("dtype", None) - if out is None: - # Compute the shape and chunks of the output array, including broadcasting - shape = compute_broadcast_shape(operands.values()) - else: - shape = out.shape + shape = kwargs.pop("shape", None) + if shape is None: + if out is None: + # Compute the shape and chunks of the output array, including broadcasting + shape = compute_broadcast_shape(operands.values()) + else: + shape = out.shape # compute the shape of the output array _slice_bcast = tuple(slice(i, i + 1) if isinstance(i, int) else i for i in _slice.raw) slice_shape = ndindex.ndindex(_slice_bcast).newshape(shape) # includes dummy dimensions _slice = _slice.raw + offset = tuple(s.start for s in _slice_bcast) # offset for the udf # Get the slice of each operand slice_operands = {} @@ -1674,12 +1669,16 @@ def slices_eval_getitem( slice_operands[key] = value[_slice] # Evaluate the expression using slices of operands - if where is None: - result = ne_evaluate(expression, slice_operands, **ne_args) + if callable(expression): + result = np.empty(slice_shape, dtype=dtype) + expression(tuple(slice_operands.values()), result, offset=offset) else: - # Apply the where condition (in result) - new_expr = f"where({expression}, _where_x, _where_y)" - result = ne_evaluate(new_expr, slice_operands, **ne_args) + if where is None: + result = ne_evaluate(expression, slice_operands, **ne_args) + else: + # Apply the where condition (in result) + new_expr = f"where({expression}, _where_x, _where_y)" + result = ne_evaluate(new_expr, slice_operands, **ne_args) if out is None: # avoid copying unnecessarily try: @@ -2118,9 +2117,14 @@ def chunked_eval( # noqa: C901 try: # standardise slice to be ndindex.Tuple item = () if item in (None, slice(None, None, None)) else item + item = item if isinstance(item, tuple) else (item,) + item = tuple( + slice(s.start, s.stop, 1 if s.step is None else s.step) if isinstance(s, slice) else s + for s in item + ) item = ndindex.ndindex(item) - shape = compute_broadcast_shape(operands.values()) - if (shape is not None) and (item.raw != ()): + shape = kwargs.pop("shape", None) + if item.raw != () and shape is not None: item = item.expand(shape) # converts to standard tuple form getitem = kwargs.pop("_getitem", False) @@ -2145,7 +2149,7 @@ def chunked_eval( # noqa: C901 if not is_full_slice(item.raw) or (where is not None and len(where) < 2): # The fast path is possible under a few conditions - if getitem and (where is None or len(where) == 2) and not callable(expression): + if getitem and (where is None or len(where) == 2): # Compute the size of operands for the fast path unit_steps = np.all([s.step == 1 for s in item.raw if isinstance(s, slice)]) # shape of slice, if non-unit steps have to decompress full array into memory @@ -2154,10 +2158,11 @@ def chunked_eval( # noqa: C901 size_operands = math.prod(shape_operands) * len(operands) * _dtype.itemsize # Only take the fast path if the size of operands is relatively small if size_operands < blosc2.MAX_FAST_PATH_SIZE: - return slices_eval_getitem(expression, operands, _slice=item, **kwargs) - return slices_eval(expression, operands, getitem=getitem, _slice=item, **kwargs) + return slices_eval_getitem(expression, operands, _slice=item, shape=shape, **kwargs) + return slices_eval(expression, operands, getitem=getitem, _slice=item, shape=shape, **kwargs) - if fast_path: + fast_path = is_full_slice(item.raw) and fast_path + if fast_path: # necessarily item is () if getitem: # When using getitem, taking the fast path is always possible return fast_eval(expression, operands, getitem=True, **kwargs) @@ -2170,7 +2175,7 @@ def chunked_eval( # noqa: C901 return fast_eval(expression, operands, getitem=False, **kwargs) # End up here by default - return slices_eval(expression, operands, getitem=getitem, _slice=item, **kwargs) + return slices_eval(expression, operands, getitem=getitem, _slice=item, shape=shape, **kwargs) finally: # Deactivate cache for NDField instances @@ -2412,25 +2417,24 @@ def __init__(self, new_op): # noqa: C901 def get_chunk(self, nchunk): """Get the `nchunk` of the expression, evaluating only that one.""" - # Create an empty array with the same shape and dtype; this is fast - out = blosc2.empty(shape=self.shape, dtype=self.dtype, chunks=self.chunks, blocks=self.blocks) - shape = out.shape - chunks = out.chunks - # Calculate the shape of the (chunk) slice_ (specially at the end of the array) + # Create an empty array with the chunkshape and dtype; this is fast + shape = self.shape + chunks = self.chunks + # Calculate the shape of the (chunk) slice_ (especially at the end of the array) chunks_idx, _ = get_chunks_idx(shape, chunks) coords = tuple(np.unravel_index(nchunk, chunks_idx)) slice_ = tuple( slice(c * s, min((c + 1) * s, shape[i])) for i, (c, s) in enumerate(zip(coords, chunks, strict=True)) ) - # TODO: we need more metadata for treating reductions - # We want to fill a single chunk, so we need to evaluate the expression on out - expr = lazyexpr(self, out=out) - # The evals below produce arrays with different chunks and blocks; - # we choose the ones for LazyExpr main class - expr.compute(item=slice_) - # out = expr.compute(item=slice_) - return out.schunk.get_chunk(nchunk) + loc_chunks = tuple(s.stop - s.start for s in slice_) + out = blosc2.empty(shape=self.chunks, dtype=self.dtype, chunks=self.chunks, blocks=self.blocks) + if loc_chunks == self.chunks: + self.compute(item=slice_, out=out) + else: + _slice_ = tuple(slice(0, s) for s in loc_chunks) + out[_slice_] = self.compute(item=slice_) + return out.schunk.get_chunk(0) def update_expr(self, new_op): # noqa: C901 prev_flag = blosc2._disable_overloaded_equal @@ -2975,6 +2979,7 @@ def compute(self, item=(), **kwargs) -> blosc2.NDArray: if hasattr(self, "_where_args"): kwargs["_where_args"] = self._where_args kwargs["dtype"] = self.dtype + kwargs["shape"] = self.shape if hasattr(self, "_indices"): kwargs["_indices"] = self._indices if hasattr(self, "_order"): @@ -2991,7 +2996,7 @@ def compute(self, item=(), **kwargs) -> blosc2.NDArray: and not isinstance(result, blosc2.NDArray) ): # Get rid of all the extra kwargs that are not accepted by blosc2.asarray - kwargs_not_accepted = {"_where_args", "_indices", "_order", "_ne_args", "dtype"} + kwargs_not_accepted = {"_where_args", "_indices", "_order", "_ne_args", "dtype", "shape"} kwargs = {key: value for key, value in kwargs.items() if key not in kwargs_not_accepted} result = blosc2.asarray(result, **kwargs) return result @@ -3029,50 +3034,6 @@ def info_items(self): items += [("dtype", self.dtype)] return items - def _save(self, urlpath=None, **kwargs): - if urlpath is None: - raise ValueError("To save a LazyArray you must provide an urlpath") - - # Validate expression - validate_expr(self.expression) - - meta = kwargs.get("meta", {}) - meta["LazyArray"] = LazyArrayEnum.Expr.value - kwargs["urlpath"] = urlpath - kwargs["meta"] = meta - kwargs["mode"] = "w" # always overwrite the file in urlpath - - # Create an empty array; useful for providing the shape and dtype of the outcome - array = blosc2.empty(shape=self.shape, dtype=self.dtype, **kwargs) - - # Save the expression and operands in the metadata - operands = {} - for key, value in self.operands.items(): - if isinstance(value, blosc2.C2Array): - operands[key] = { - "path": str(value.path), - "urlbase": value.urlbase, - } - continue - if key in {"numpy", "np"}: - # Provide access to cast funcs like int8 et al. - continue - if isinstance(value, blosc2.Proxy): - # Take the required info from the Proxy._cache container - value = value._cache - if not hasattr(value, "schunk"): - raise ValueError( - "To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects" - ) - if value.schunk.urlpath is None: - raise ValueError("To save a LazyArray, all operands must be stored on disk/network") - operands[key] = value.schunk.urlpath - array.schunk.vlmeta["_LazyArray"] = { - "expression": self.expression, - "UDF": None, - "operands": operands, - } - def save(self, urlpath=None, **kwargs): if urlpath is None: raise ValueError("To save a LazyArray you must provide an urlpath") @@ -3184,7 +3145,7 @@ class LazyUDF(LazyArray): def __init__(self, func, inputs, dtype, shape=None, chunked_eval=True, **kwargs): # After this, all the inputs should be np.ndarray or NDArray objects self.inputs = convert_inputs(inputs) - self.chunked_eval = chunked_eval + self.chunked_eval = True # chunked_eval # Get res shape if shape is None: self._shape = compute_broadcast_shape(self.inputs) @@ -3196,6 +3157,8 @@ def __init__(self, func, inputs, dtype, shape=None, chunked_eval=True, **kwargs) self._shape = shape self.kwargs = kwargs + self.kwargs["dtype"] = dtype + self.kwargs["shape"] = self._shape self._dtype = dtype self.func = func @@ -3210,10 +3173,11 @@ def __init__(self, func, inputs, dtype, shape=None, chunked_eval=True, **kwargs) raise TypeError("dparams should be a dictionary") kwargs_getitem["dparams"] = dparams - self.res_getitem = blosc2.empty(self._shape, self._dtype, **kwargs_getitem) - # Register a postfilter for getitem - if 0 not in self._shape: - self.res_getitem._set_postf_udf(self.func, id(self.inputs)) + # TODO: enable parallelism using python 3.14t + # self.res_getitem = blosc2.empty(self._shape, self._dtype, **kwargs_getitem) + # # Register a postfilter for getitem + # if 0 not in self._shape: + # self.res_getitem._set_postf_udf(self.func, id(self.inputs)) self.inputs_dict = {f"o{i}": obj for i, obj in enumerate(self.inputs)} @@ -3314,51 +3278,86 @@ def compute(self, item=(), **kwargs): ): raise ValueError("Cannot use the same urlpath for LazyArray and eval NDArray") _ = aux_kwargs.pop("urlpath", None) - aux_kwargs.update(kwargs) - - if item == (): - if self.chunked_eval: - res_eval = blosc2.empty(self.shape, self.dtype, **aux_kwargs) - chunked_eval(self.func, self.inputs_dict, None, _getitem=False, _output=res_eval) - return res_eval - - # Cannot use multithreading when applying a prefilter, save nthreads to set them - # after the evaluation - cparams = aux_kwargs.get("cparams", {}) - if isinstance(cparams, dict): - self._cnthreads = cparams.get("nthreads", blosc2.cparams_dflts["nthreads"]) - cparams["nthreads"] = 1 - else: - raise ValueError("cparams should be a dictionary") - aux_kwargs["cparams"] = cparams - res_eval = blosc2.empty(self.shape, self.dtype, **aux_kwargs) - # Register a prefilter for eval - res_eval._set_pref_udf(self.func, id(self.inputs)) - - aux = np.empty(res_eval.shape, res_eval.dtype) - res_eval[...] = aux - res_eval.schunk.remove_prefilter(self.func.__name__) - res_eval.schunk.cparams.nthreads = self._cnthreads + if "out" in kwargs: # use provided out preferentially + aux_kwargs["_output"] = kwargs.pop("out") + elif hasattr(self, "_output"): + aux_kwargs["_output"] = self._output + aux_kwargs.update(kwargs) - return res_eval - else: - # Get only a slice - np_array = self.__getitem__(item) - return blosc2.asarray(np_array, **aux_kwargs) + if self.chunked_eval: + # aux_kwargs includes self.shape and self.dtype + return chunked_eval(self.func, self.inputs_dict, item, _getitem=False, **aux_kwargs) + + # TODO: Implement multithreading + # # Cannot use multithreading when applying a prefilter, save nthreads to set them + # # after the evaluation + # cparams = aux_kwargs.get("cparams", {}) + # if isinstance(cparams, dict): + # self._cnthreads = cparams.get("nthreads", blosc2.cparams_dflts["nthreads"]) + # cparams["nthreads"] = 1 + # else: + # raise ValueError("cparams should be a dictionary") + # aux_kwargs["cparams"] = cparams + + # res_eval = blosc2.empty(self.shape, self.dtype, **aux_kwargs) + # # Register a prefilter for eval + # res_eval._set_pref_udf(self.func, id(self.inputs)) + + # aux = np.empty(res_eval.shape, res_eval.dtype) + # res_eval[...] = aux + # res_eval.schunk.remove_prefilter(self.func.__name__) + # res_eval.schunk.cparams.nthreads = self._cnthreads + + # return res_eval + return None def __getitem__(self, item): if self.chunked_eval: - # TODO: as this creates a big array, this can potentially consume a lot of memory - output = np.empty(self.shape, self.dtype) # It is important to pass kwargs here, because chunks can be used internally - # fills numpy array with desired slice - chunked_eval(self.func, self.inputs_dict, item, _getitem=True, _output=output, **self.kwargs) - return output[item] - return self.res_getitem[item] + # self.kwargs includes self.shape and self.dtype + return chunked_eval(self.func, self.inputs_dict, item, _getitem=True, **self.kwargs) + # return self.res_getitem[item] # TODO: implement multithreading + return None + + def save(self, urlpath=None, **kwargs): + if urlpath is None: + raise ValueError("To save a LazyArray you must provide an urlpath") - def save(self, **kwargs): - raise NotImplementedError("For safety reasons, this is not implemented for UDFs") + meta = kwargs.get("meta", {}) + meta["LazyArray"] = LazyArrayEnum.UDF.value + kwargs["urlpath"] = urlpath + kwargs["meta"] = meta + kwargs["mode"] = "w" # always overwrite the file in urlpath + + # Create an empty array; useful for providing the shape and dtype of the outcome + array = blosc2.empty(shape=self.shape, dtype=self.dtype, **kwargs) + + # Save the expression and operands in the metadata + operands = {} + operands_ = self.inputs_dict + for key, value in operands_.items(): + if isinstance(value, blosc2.C2Array): + operands[key] = { + "path": str(value.path), + "urlbase": value.urlbase, + } + continue + if isinstance(value, blosc2.Proxy): + # Take the required info from the Proxy._cache container + value = value._cache + if not hasattr(value, "schunk"): + raise ValueError( + "To save a LazyArray, all operands must be blosc2.NDArray or blosc2.C2Array objects" + ) + if value.schunk.urlpath is None: + raise ValueError("To save a LazyArray, all operands must be stored on disk/network") + operands[key] = value.schunk.urlpath + array.schunk.vlmeta["_LazyArray"] = { + "UDF": inspect.getsource(self.func), + "operands": operands, + "name": self.func.__name__, + } def _numpy_eval_expr(expression, operands, prefer_blosc=False): @@ -3615,41 +3614,64 @@ def lazyexpr( def _open_lazyarray(array): value = array.schunk.meta["LazyArray"] - if value == LazyArrayEnum.UDF.value: - raise NotImplementedError("For safety reasons, persistent UDFs are not supported") - - # LazyExpr lazyarray = array.schunk.vlmeta["_LazyArray"] + if value == LazyArrayEnum.Expr.value: + expr = lazyarray["expression"] + elif value == LazyArrayEnum.UDF.value: + expr = lazyarray["UDF"] + else: + raise ValueError("Argument `array` is not LazyExpr or LazyUDF instance.") + operands = lazyarray["operands"] parent_path = Path(array.schunk.urlpath).parent operands_dict = {} missing_ops = {} - for key, value in operands.items(): - if isinstance(value, str): - value = parent_path / value + for key, v in operands.items(): + if isinstance(v, str): + v = parent_path / v try: - op = blosc2.open(value) + op = blosc2.open(v) except FileNotFoundError: - missing_ops[key] = value + missing_ops[key] = v else: operands_dict[key] = op - elif isinstance(value, dict): + elif isinstance(v, dict): # C2Array operands_dict[key] = blosc2.C2Array( - pathlib.Path(value["path"]).as_posix(), - urlbase=value["urlbase"], + pathlib.Path(v["path"]).as_posix(), + urlbase=v["urlbase"], ) else: raise TypeError("Error when retrieving the operands") - expr = lazyarray["expression"] if missing_ops: exc = exceptions.MissingOperands(expr, missing_ops) exc.expr = expr exc.missing_ops = missing_ops raise exc - new_expr = LazyExpr._new_expr(expr, operands_dict, guess=True, out=None, where=None) + # LazyExpr + if value == LazyArrayEnum.Expr.value: + new_expr = LazyExpr._new_expr(expr, operands_dict, guess=True, out=None, where=None) + elif value == LazyArrayEnum.UDF.value: + local_ns = {} + name = lazyarray["name"] + filename = f"<{name}>" # any unique name + + # Register the source so inspect can find it + linecache.cache[filename] = (len(expr), None, expr.splitlines(True), filename) + + exec(compile(expr, filename, "exec"), {"np": np, "blosc2": blosc2}, local_ns) + func = local_ns[name] + # TODO: make more robust for general kwargs (not just cparams) + new_expr = blosc2.lazyudf( + func, + tuple(operands_dict[f"o{n}"] for n in range(len(operands_dict))), + shape=array.shape, + dtype=array.dtype, + cparams=array.cparams, + ) + # Make the array info available for the user (only available when opened from disk) new_expr.array = array # We want to expose schunk too, so that .info() can be used on the LazyArray diff --git a/tests/ndarray/test_elementwise_funcs.py b/tests/ndarray/test_elementwise_funcs.py index c6f95733..0a03a4a2 100644 --- a/tests/ndarray/test_elementwise_funcs.py +++ b/tests/ndarray/test_elementwise_funcs.py @@ -38,7 +38,7 @@ SHAPES_CHUNKS_HEAVY = [((10, 13, 13), (3, 5, 2))] -def _test_unary_func_impl(np_func, blosc_func, dtype, shape, chunkshape): +def _test_unary_func_impl(np_func, blosc_func, dtype, shape, chunkshape): # noqa : C901 """Helper function containing the actual test logic for unary functions.""" if np_func.__name__ in ("arccos", "arcsin", "arctanh"): a_blosc = blosc2.linspace( @@ -82,8 +82,12 @@ def _test_unary_func_impl(np_func, blosc_func, dtype, shape, chunkshape): assert True if success: try: - result = blosc_func(a_blosc)[...] - np.testing.assert_allclose(result, expected, rtol=1e-6, atol=1e-6) + result = blosc_func(a_blosc) + np.testing.assert_allclose(result[()], expected, rtol=1e-6, atol=1e-6) + # test compute too + if hasattr(result, "compute"): + result = result.compute() + np.testing.assert_allclose(result, expected, rtol=1e-6, atol=1e-6) except TypeError as e: # some functions don't support certain dtypes and that's fine assert True @@ -140,8 +144,12 @@ def _test_binary_func_proxy(np_func, blosc_func, dtype, shape, chunkshape, xp): assert True if success: try: - result = blosc_func(not_blosc1, a_blosc2)[()] - np.testing.assert_allclose(result, expected, rtol=1e-6, atol=1e-6) + result = blosc_func(not_blosc1, a_blosc2) + np.testing.assert_allclose(result[()], expected, rtol=1e-6, atol=1e-6) + # test compute too + if hasattr(result, "compute"): + result = result.compute() + np.testing.assert_allclose(result, expected, rtol=1e-6, atol=1e-6) except TypeError as e: # some functions don't support certain dtypes and that's fine assert True diff --git a/tests/ndarray/test_lazyudf.py b/tests/ndarray/test_lazyudf.py index 55f2180a..ada04676 100644 --- a/tests/ndarray/test_lazyudf.py +++ b/tests/ndarray/test_lazyudf.py @@ -5,7 +5,6 @@ # This source code is licensed under a BSD-style license (found in the # LICENSE file in the root directory of this source tree) ####################################################################### -import math import numpy as np import pytest @@ -152,7 +151,8 @@ def test_0p(shape, chunks, blocks, chunked_eval): expr = blosc2.lazyudf( udf0p, (), npa.dtype, shape=shape, chunked_eval=chunked_eval, chunks=chunks, blocks=blocks ) - res = expr.compute() + out = blosc2.empty(dtype=expr.dtype, shape=expr.shape) + res = expr.compute(out=out) np.testing.assert_allclose(res[...], npa) @@ -339,8 +339,13 @@ def test_eval_slice(shape, chunks, blocks, slices, urlpath, contiguous, chunked_ def udf_offset(inputs_tuple, output, offset): - _ = inputs_tuple[0] - output[:] = sum(offset) + x = inputs_tuple[0] + coords = np.zeros_like(x) + for n in range(x.ndim): + for i in range(x.shape[n]): + _slice = tuple(slice(None, None) if n != n_ else i for n_ in range(x.ndim)) + coords[_slice] += offset[n] + i + output[:] = np.sin(coords) @pytest.mark.parametrize("eval_mode", ["eval", "getitem"]) @@ -355,15 +360,11 @@ def udf_offset(inputs_tuple, output, offset): ((8, 8), (4, 4), (2, 2), (slice(None), slice(None))), ((9, 8), (4, 4), (2, 3), (slice(None), slice(None))), ((13, 13), (10, 10), (4, 3), (slice(None), slice(None))), - # TODO: make this work - # I think the issue is in how the offsets are calculated here, in the test. - # Other tests involving offset with slices are working fine - # (e.g. test_ndarray::test_arange) - # ((8, 8), (4, 4), (2, 2), (slice(0, 5), slice(5, 8))), - # ((9, 8), (4, 4), (2, 3), (slice(0, 5), slice(5, 8))), - # ((40, 20), (30, 10), (5, 5), (slice(0, 5), slice(5, 20))), - # ((13, 13), (10, 10), (4, 3), (slice(3, 8), slice(9, 12))), - # ((13, 13, 10), (10, 10, 5), (5, 5, 3), (slice(0, 12), slice(3, 13), ...)), + ((8, 8), (4, 4), (2, 2), (slice(0, 5), slice(5, 8))), + ((9, 8), (4, 4), (2, 3), (slice(0, 5), slice(5, 8))), + ((40, 20), (30, 10), (5, 5), (slice(0, 5), slice(5, 20))), + ((13, 13), (10, 10), (4, 3), (slice(3, 8), slice(9, 12))), + ((13, 13, 10), (10, 10, 5), (5, 5, 3), (slice(0, 12), slice(3, 13), ...)), ], ) def test_offset(shape, chunks, blocks, slices, chunked_eval, eval_mode): @@ -372,19 +373,12 @@ def test_offset(shape, chunks, blocks, slices, chunked_eval, eval_mode): # Compute the desired output out = np.zeros_like(x) - # Calculate the number of chunks in each dimension - if not chunked_eval: - # When using prefilters/postfilters, the computation is split in blocks, not chunks - chunks = blocks - nchunks = tuple(math.ceil(x.shape[i] / blocks[i]) for i in range(len(x.shape))) - - # Iterate over the chunks for computing the output - for index in np.ndindex(nchunks): - # Calculate the offset for the current chunk - offset = [index[i] * chunks[i] for i in range(len(index))] - # Apply the offset to the chunk and store the result in the output array - out_slice = tuple(slice(index[i] * chunks[i], (index[i] + 1) * chunks[i]) for i in range(len(index))) - out[out_slice] = sum(offset) + coords = np.zeros_like(x) + for n in range(x.ndim): + for i in range(x.shape[n]): + _slice = tuple(slice(None, None) if n != n_ else i for n_ in range(x.ndim)) + coords[_slice] += i + out = np.sin(coords) expr = blosc2.lazyudf( udf_offset, @@ -395,7 +389,7 @@ def test_offset(shape, chunks, blocks, slices, chunked_eval, eval_mode): blocks=blocks, ) if eval_mode == "eval": - res = expr.compute(slices) + res = expr.compute(slices) # tests slices_eval res = res[:] else: res = expr[slices] @@ -455,3 +449,25 @@ def test_clip_logaddexp(shape, chunks, blocks, slices): np.testing.assert_allclose(expr, np.sin(np.logaddexp(npb, npa))) expr = blosc2.evaluate("clip(logaddexp(b, a), 6, 12)") np.testing.assert_allclose(expr, np.clip(np.logaddexp(npb, npa), 6, 12)) + + +def test_save_ludf(): + shape = (23,) + npa = np.arange(start=0, stop=np.prod(shape)).reshape(shape) + blosc2.remove_urlpath("a.b2nd") + array = blosc2.asarray(npa, urlpath="a.b2nd") + + # Assert that shape is computed correctly + npc = npa + 1 + cparams = {"nthreads": 4} + urlpath = "lazyarray.b2nd" + blosc2.remove_urlpath(urlpath) + + expr = blosc2.lazyudf(udf1p, (array,), np.float64, cparams=cparams) + + expr.save(urlpath=urlpath) + del expr + expr = blosc2.open(urlpath) + assert isinstance(expr, blosc2.LazyUDF) + res_lazyexpr = expr.compute() + np.testing.assert_array_equal(res_lazyexpr[:], npc)