diff --git a/arraypartition/__init__.py b/arraypartition/__init__.py index 07b4fee..136028e 100644 --- a/arraypartition/__init__.py +++ b/arraypartition/__init__.py @@ -2,10 +2,11 @@ ArrayLike, SuperLazyArrayLike, ArrayPartition, + get_chunk_extent, + get_chunk_positions, get_chunk_shape, get_chunk_space, - get_chunk_positions, - get_chunk_extent, get_dask_chunks, - combine_slices + combine_slices, + normalize_partition_chunks ) \ No newline at end of file diff --git a/arraypartition/__pycache__/__init__.cpython-311.pyc b/arraypartition/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..ee9def1 Binary files /dev/null and b/arraypartition/__pycache__/__init__.cpython-311.pyc differ diff --git a/arraypartition/__pycache__/__init__.cpython-312.pyc b/arraypartition/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..65e978b Binary files /dev/null and b/arraypartition/__pycache__/__init__.cpython-312.pyc differ diff --git a/arraypartition/__pycache__/partition.cpython-311.pyc b/arraypartition/__pycache__/partition.cpython-311.pyc new file mode 100644 index 0000000..8bab38a Binary files /dev/null and b/arraypartition/__pycache__/partition.cpython-311.pyc differ diff --git a/arraypartition/__pycache__/partition.cpython-312.pyc b/arraypartition/__pycache__/partition.cpython-312.pyc new file mode 100644 index 0000000..3c48e52 Binary files /dev/null and b/arraypartition/__pycache__/partition.cpython-312.pyc differ diff --git a/arraypartition/partition.py b/arraypartition/partition.py index d5ce2e1..81163c2 100644 --- a/arraypartition/partition.py +++ b/arraypartition/partition.py @@ -1,16 +1,14 @@ __author__ = "Daniel Westwood" __contact__ = "daniel.westwood@stfc.ac.uk" -__copyright__ = "Copyright 2023 United Kingdom Research and Innovation" - -# Chunk wrapper is common to both CFAPyX and XarrayActive -VERSION = '1.2.1' +__copyright__ = "Copyright 2024 United Kingdom Research and Innovation" import numpy as np import netCDF4 from itertools import product -from copy import deepcopy +import math from dask.utils import SerializableLock +from dask.array.core import normalize_chunks try: from XarrayActive import ActiveChunk @@ -57,7 +55,8 @@ def ndim(self): def copy(self, **kwargs): """ Return a new basic ArrayLike instance. Ignores provided kwargs - this class does not require, but other inheritors may.""" + this class does not require, but other inheritors may. + """ return ArrayLike( self.shape, **self.get_kwargs() @@ -65,7 +64,8 @@ def copy(self, **kwargs): def get_kwargs(self): """ - Get the kwargs provided to this class initially - for creating a copy.""" + Get the kwargs provided to this class initially - for creating a copy. + """ return { 'units':self.units, 'dtype':self.dtype, @@ -76,7 +76,8 @@ class SuperLazyArrayLike(ArrayLike): """ Container class for SuperLazy Array-Like behaviour. ``SuperLazy`` behaviour is defined as Lazy-Slicing behaviour for objects that are below the 'Dask Surface', - i.e for object that serve as Dask Chunks.""" + i.e for object that serve as Dask Chunks. + """ description = "Container class for SuperLazy Array-Like behaviour" @@ -127,7 +128,35 @@ def shape(self, value): self._shape = value def get_extent(self): - return tuple(self._extent) + """ + Method for getting private variable `_extent` outside of this class. + """ + return tuple(self._smooth_extent(self._extent)) + + def set_extent(self, extent): + """ + Method for directly setting the `_extent` for this class, once it has been + 'smoothed'.""" + self._extent = self._smooth_extent(extent) + + def _smooth_extent(self, extent): + """ + Replace values of None within each provided slice of the extent with integer + values derived from the current shape. + """ + if len(extent) != self.ndim: + raise ValueError( + 'Direct assignment of truncated extent is not supported.' + ) + + smoothextent = [] + for x, ext in enumerate(extent): + start = ext.start or 0 + stop = ext.stop or self.shape[x] + step = ext.step or 1 + smoothextent.append(slice(start,stop,step)) + + return smoothextent def copy(self, extent=None): """ @@ -215,7 +244,7 @@ def __init__(self, if extent: # Apply a specific extent if given by the initiator - self._extent = extent + self.set_extent(extent) def __array__(self, *args, **kwargs): """ @@ -307,9 +336,9 @@ def _correct_slice(self, array_dims): def _post_process_data(self, data): """ - Perform any post-processing steps on the data here. - - unit correction - - calendar correction + Perform any post-processing steps on the data here. Method to be + overriden by inherrited classes (CFAPyX.CFAPartition and + XarrayActive.ActivePartition) """ return data @@ -429,11 +458,10 @@ def _identical_extents(old, new, dshape): def get_chunk_space(chunk_shape, shape): """ - Derive the chunk space and shape given the user-provided ``chunks`` option. - Chunk space is the number of chunks in each dimension which presents like an array - shape, but is referred to as a ``space`` because it has a novel coordinate system. - Chunk shape is the shape of each chunk in ``array space``, which must be regular - even if lower-level objects used to define the chunk are not. + Derive the chunk space from the ratio between the chunk shape and array shape in + each dimension. Chunk space is the number of chunks in each dimension which is + referred to as a ``space`` because it effectively represents the lengths of the each + dimension in 'chunk space' rather than any particular chunk coordinate. Example: 50 chunks across the time dimension of 1000 values which is represented by 8 @@ -452,9 +480,29 @@ def get_chunk_space(chunk_shape, shape): """ - return tuple([int(i/j) for i, j in zip(shape, chunk_shape)]) + space = tuple([math.ceil(i/j) for i, j in zip(shape, chunk_shape)]) + return space def get_chunk_shape(chunks, shape, dims, chunk_limits=True): + """ + Calculate the chunk shape from the user-provided ``chunks`` parameter, + the array shape and named dimensions, and apply chunk limits if enabled. + + :param chunks: (dict) The user specified chunks, which match the usual dask + chunks from xr.open_dataset, except these come from the ``cfa_options``. + + :param shape: (tuple) The array shape of the data array to be chunked. + + :param dims: (tuple) The names of each dimension to match to the ``chunks`` + provided. + + :param chunk_limits (bool) Option to disable, chunk limits will prevent chunking + to beyond a useful chunk size which is likely to be much less than the memory + chunk size of the source files, in which case there would be a lot of wasted + data retrieval. + + :returns: A tuple of the shape of each chunk in ``array space`` for each dimension. + """ chunk_shape = [i for i in shape] for dim in chunks.keys(): @@ -464,15 +512,17 @@ def get_chunk_shape(chunks, shape, dims, chunk_limits=True): if d == dim: idim = x - if not idim: + if idim == None: raise ValueError( f"Requested chunking across dimension '{dim}'" f"but only '{dims}' present in the dataset" ) + # Apply chunk limits unless disabled. min_size = int(shape[idim]/np.prod(shape)) if chunk_limits: - min_size = int(min_size * 2e6) + min_size = int(min_size * 2e6) + # 2M data points is the smallest total size allowed. chunk_size = chunks[dim] chunk_shape[idim] = max(chunk_size, min_size) @@ -480,6 +530,10 @@ def get_chunk_shape(chunks, shape, dims, chunk_limits=True): return tuple(chunk_shape) def get_chunk_positions(chunk_space): + """ + Get the list of chunk positions in ``chunk space`` given the size + of the space. + """ origin = [0 for i in chunk_space] positions = [ @@ -491,9 +545,13 @@ def get_chunk_positions(chunk_space): return positions def get_chunk_extent(position, shape, chunk_space): + """ + Get the extent of a particular chunk within the space given its position, + the array shape and the extent of the chunk space. + """ extent = [] for dim in range(len(position)): - pos_index = position[dim] + pos_index = position[dim] shape_size = shape[dim] space_size = chunk_space[dim] @@ -503,6 +561,7 @@ def get_chunk_extent(position, shape, chunk_space): int(pos_index*conversion), int((pos_index+1)*conversion) ) extent.append(ext) + return extent def get_dask_chunks( @@ -519,11 +578,13 @@ def get_dask_chunks( :param fragment_space: (tuple) The shape of the array in ``fragment space``. - :param extent: (dict) The global extent of each fragment - where it fits into the total array for this variable (in array space). + :param extent: (dict) The global extent of each fragment - where it fits into + the total array for this variable (in array space). - :param dtype: (obj) The datatype for this variable. + :param dtype: (obj) The datatype for this variable. - :param explicit_shapes: (tuple) Set of shapes to apply to the fragments - currently not implemented outside this function. + :param explicit_shapes: (tuple) Set of shapes to apply to the fragments - currently + not implemented outside this function. :returns: A tuple of the chunk sizes along each dimension. """ @@ -556,7 +617,8 @@ def get_dask_chunks( ## Handle explicit shapes for the fragments. if isinstance(explicit_shapes, (str, Number)) or explicit_shapes is None: - fsizes_per_dim = [ # For each dimension, use fs or explicit_shapes if the dimension is fragmented or not respectively. + # For each dimension, use fs or explicit_shapes if the dimension is fragmented or not respectively. + fsizes_per_dim = [ fs if i in fragmented_dim_indices else explicit_shapes for i, fs in enumerate(fsizes_per_dim) ] elif isinstance(explicit_shapes, dict): @@ -621,11 +683,29 @@ def combine_sliced_dim(old, new, dim): return slice(start, stop, step) - if not extent: return newslice else: for dim in range(len(newslice)): if not _identical_extents(extent[dim], newslice[dim], shape[dim]): extent[dim] = combine_sliced_dim(extent[dim], newslice[dim], dim) - return extent \ No newline at end of file + return extent + +def normalize_partition_chunks(chunks, shape, dtype, named_dims): + + chunk_values = [] + + for nd in named_dims: + if nd not in chunks.keys(): + chunk_values.append('auto') + continue + try: + chunk_values.append(int(chunks[nd])) + except ValueError: + chunk_values.append(chunks[nd]) + + return normalize_chunks( + chunk_values, + shape, + dtype=dtype + ) \ No newline at end of file