Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions arraypartition/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
ArrayLike,
SuperLazyArrayLike,
ArrayPartition,
get_chunk_extent,
get_chunk_positions,
get_chunk_shape,
get_chunk_space,
get_chunk_positions,
get_chunk_extent,
get_dask_chunks,
combine_slices
combine_slices,
normalize_partition_chunks
)
Binary file added arraypartition/__pycache__/__init__.cpython-311.pyc
Binary file not shown.
Binary file added arraypartition/__pycache__/__init__.cpython-312.pyc
Binary file not shown.
Binary file not shown.
Binary file not shown.
136 changes: 108 additions & 28 deletions arraypartition/partition.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
__author__ = "Daniel Westwood"
__contact__ = "daniel.westwood@stfc.ac.uk"
__copyright__ = "Copyright 2023 United Kingdom Research and Innovation"

# Chunk wrapper is common to both CFAPyX and XarrayActive
VERSION = '1.2.1'
__copyright__ = "Copyright 2024 United Kingdom Research and Innovation"

import numpy as np
import netCDF4

from itertools import product
from copy import deepcopy
import math
from dask.utils import SerializableLock
from dask.array.core import normalize_chunks

try:
from XarrayActive import ActiveChunk
Expand Down Expand Up @@ -57,15 +55,17 @@ def ndim(self):
def copy(self, **kwargs):
"""
Return a new basic ArrayLike instance. Ignores provided kwargs
this class does not require, but other inheritors may."""
this class does not require, but other inheritors may.
"""
return ArrayLike(
self.shape,
**self.get_kwargs()
)

def get_kwargs(self):
"""
Get the kwargs provided to this class initially - for creating a copy."""
Get the kwargs provided to this class initially - for creating a copy.
"""
return {
'units':self.units,
'dtype':self.dtype,
Expand All @@ -76,7 +76,8 @@ class SuperLazyArrayLike(ArrayLike):
"""
Container class for SuperLazy Array-Like behaviour. ``SuperLazy`` behaviour is
defined as Lazy-Slicing behaviour for objects that are below the 'Dask Surface',
i.e for object that serve as Dask Chunks."""
i.e for object that serve as Dask Chunks.
"""

description = "Container class for SuperLazy Array-Like behaviour"

Expand Down Expand Up @@ -127,7 +128,35 @@ def shape(self, value):
self._shape = value

def get_extent(self):
return tuple(self._extent)
"""
Method for getting private variable `_extent` outside of this class.
"""
return tuple(self._smooth_extent(self._extent))

def set_extent(self, extent):
"""
Method for directly setting the `_extent` for this class, once it has been
'smoothed'."""
self._extent = self._smooth_extent(extent)

def _smooth_extent(self, extent):
"""
Replace values of None within each provided slice of the extent with integer
values derived from the current shape.
"""
if len(extent) != self.ndim:
raise ValueError(
'Direct assignment of truncated extent is not supported.'
)

smoothextent = []
for x, ext in enumerate(extent):
start = ext.start or 0
stop = ext.stop or self.shape[x]
step = ext.step or 1
smoothextent.append(slice(start,stop,step))

return smoothextent

def copy(self, extent=None):
"""
Expand Down Expand Up @@ -215,7 +244,7 @@ def __init__(self,

if extent:
# Apply a specific extent if given by the initiator
self._extent = extent
self.set_extent(extent)

def __array__(self, *args, **kwargs):
"""
Expand Down Expand Up @@ -307,9 +336,9 @@ def _correct_slice(self, array_dims):

def _post_process_data(self, data):
"""
Perform any post-processing steps on the data here.
- unit correction
- calendar correction
Perform any post-processing steps on the data here. Method to be
overriden by inherrited classes (CFAPyX.CFAPartition and
XarrayActive.ActivePartition)
"""
return data

Expand Down Expand Up @@ -429,11 +458,10 @@ def _identical_extents(old, new, dshape):

def get_chunk_space(chunk_shape, shape):
"""
Derive the chunk space and shape given the user-provided ``chunks`` option.
Chunk space is the number of chunks in each dimension which presents like an array
shape, but is referred to as a ``space`` because it has a novel coordinate system.
Chunk shape is the shape of each chunk in ``array space``, which must be regular
even if lower-level objects used to define the chunk are not.
Derive the chunk space from the ratio between the chunk shape and array shape in
each dimension. Chunk space is the number of chunks in each dimension which is
referred to as a ``space`` because it effectively represents the lengths of the each
dimension in 'chunk space' rather than any particular chunk coordinate.

Example:
50 chunks across the time dimension of 1000 values which is represented by 8
Expand All @@ -452,9 +480,29 @@ def get_chunk_space(chunk_shape, shape):

"""

return tuple([int(i/j) for i, j in zip(shape, chunk_shape)])
space = tuple([math.ceil(i/j) for i, j in zip(shape, chunk_shape)])
return space

def get_chunk_shape(chunks, shape, dims, chunk_limits=True):
"""
Calculate the chunk shape from the user-provided ``chunks`` parameter,
the array shape and named dimensions, and apply chunk limits if enabled.

:param chunks: (dict) The user specified chunks, which match the usual dask
chunks from xr.open_dataset, except these come from the ``cfa_options``.

:param shape: (tuple) The array shape of the data array to be chunked.

:param dims: (tuple) The names of each dimension to match to the ``chunks``
provided.

:param chunk_limits (bool) Option to disable, chunk limits will prevent chunking
to beyond a useful chunk size which is likely to be much less than the memory
chunk size of the source files, in which case there would be a lot of wasted
data retrieval.

:returns: A tuple of the shape of each chunk in ``array space`` for each dimension.
"""
chunk_shape = [i for i in shape]

for dim in chunks.keys():
Expand All @@ -464,22 +512,28 @@ def get_chunk_shape(chunks, shape, dims, chunk_limits=True):
if d == dim:
idim = x

if not idim:
if idim == None:
raise ValueError(
f"Requested chunking across dimension '{dim}'"
f"but only '{dims}' present in the dataset"
)

# Apply chunk limits unless disabled.
min_size = int(shape[idim]/np.prod(shape))
if chunk_limits:
min_size = int(min_size * 2e6)
min_size = int(min_size * 2e6)
# 2M data points is the smallest total size allowed.

chunk_size = chunks[dim]
chunk_shape[idim] = max(chunk_size, min_size)

return tuple(chunk_shape)

def get_chunk_positions(chunk_space):
"""
Get the list of chunk positions in ``chunk space`` given the size
of the space.
"""
origin = [0 for i in chunk_space]

positions = [
Expand All @@ -491,9 +545,13 @@ def get_chunk_positions(chunk_space):
return positions

def get_chunk_extent(position, shape, chunk_space):
"""
Get the extent of a particular chunk within the space given its position,
the array shape and the extent of the chunk space.
"""
extent = []
for dim in range(len(position)):
pos_index = position[dim]
pos_index = position[dim]
shape_size = shape[dim]
space_size = chunk_space[dim]

Expand All @@ -503,6 +561,7 @@ def get_chunk_extent(position, shape, chunk_space):
int(pos_index*conversion), int((pos_index+1)*conversion)
)
extent.append(ext)

return extent

def get_dask_chunks(
Expand All @@ -519,11 +578,13 @@ def get_dask_chunks(

:param fragment_space: (tuple) The shape of the array in ``fragment space``.

:param extent: (dict) The global extent of each fragment - where it fits into the total array for this variable (in array space).
:param extent: (dict) The global extent of each fragment - where it fits into
the total array for this variable (in array space).

:param dtype: (obj) The datatype for this variable.
:param dtype: (obj) The datatype for this variable.

:param explicit_shapes: (tuple) Set of shapes to apply to the fragments - currently not implemented outside this function.
:param explicit_shapes: (tuple) Set of shapes to apply to the fragments - currently
not implemented outside this function.

:returns: A tuple of the chunk sizes along each dimension.
"""
Expand Down Expand Up @@ -556,7 +617,8 @@ def get_dask_chunks(
## Handle explicit shapes for the fragments.

if isinstance(explicit_shapes, (str, Number)) or explicit_shapes is None:
fsizes_per_dim = [ # For each dimension, use fs or explicit_shapes if the dimension is fragmented or not respectively.
# For each dimension, use fs or explicit_shapes if the dimension is fragmented or not respectively.
fsizes_per_dim = [
fs if i in fragmented_dim_indices else explicit_shapes for i, fs in enumerate(fsizes_per_dim)
]
elif isinstance(explicit_shapes, dict):
Expand Down Expand Up @@ -621,11 +683,29 @@ def combine_sliced_dim(old, new, dim):

return slice(start, stop, step)


if not extent:
return newslice
else:
for dim in range(len(newslice)):
if not _identical_extents(extent[dim], newslice[dim], shape[dim]):
extent[dim] = combine_sliced_dim(extent[dim], newslice[dim], dim)
return extent
return extent

def normalize_partition_chunks(chunks, shape, dtype, named_dims):

chunk_values = []

for nd in named_dims:
if nd not in chunks.keys():
chunk_values.append('auto')
continue
try:
chunk_values.append(int(chunks[nd]))
except ValueError:
chunk_values.append(chunks[nd])

return normalize_chunks(
chunk_values,
shape,
dtype=dtype
)