In [1]:
from typing import Any, Dict, List, Tuple, Union

import affine
import dask.array as da
import numpy as np
import pandas as pd
import xarray as xr

from xcube.core.gridmapping import GridMapping

To check whether the block mapper function we define below works in a distributed environment,
start a local distributed cluster that uses worker processes:

In [2]:
test_distributed = False

In [3]:
if test_distributed:
    import dask.distributed
    dask.distributed.Client(processes=True)

Note, `tile_size` determines the spatial data chunking, see https://docs.dask.org/en/latest/array-chunks.html

In [3]:
tile_size = 4096
width = 24 * tile_size  # pixels
height = 12 * tile_size  # pixels
xy_min = -10.0, 24.0
xy_res = 50 / width

super_cube_gm = GridMapping.regular(size=(width, height), 
                                    xy_min=xy_min, 
                                    xy_res=xy_res, 
                                    tile_size=tile_size,
                                    crs="CRS84",
                                    is_j_axis_up=True)

In [4]:
super_cube_gm

class: **RegularGridMapping**
* is_regular: True
* is_j_axis_up: True
* is_lon_360: False
* crs: CRS84
* xy_res: (0.0005086263020833334, 0.0005086263020833334)
* xy_bbox: (-10, 24, 40, 49)
* ij_bbox: (0, 0, 98304, 49152)
* xy_dim_names: ('lon', 'lat')
* xy_var_names: ('lon', 'lat')
* size: (98304, 49152)
* tile_size: (4096, 4096)

---

TODO: scan all available sub-cubes to provide following information

- `regions`: list of (path, bbox) tuples
- `time` and or `time_bounds`: numpy arrays with time coordinate labels
- `data_vars`: list of tuples with data variable information (name, dtype, fill_value, attrs)


In [8]:
regions = [
    ("path/to/region1", [-10., 24., 0., 30.]),
    ("path/to/region2", [10., 25., 30., 50.]),
]

In [9]:
time_bounds = np.array([
    ['2020-05-01 10:00:00', '2020-05-01 11:00:00'],
    ['2020-05-02 14:00:00', '2020-05-02 15:00:00'],
    ['2020-05-03 13:00:00', '2020-05-03 14:00:00'],
], dtype=np.dtype('datetime64[ns]'))

time = time_bounds[:,0] + (time_bounds[:,1] - time_bounds[:,0]) / 2

In [10]:
data_vars = [
    ("chl", np.dtype('float32'), 0.0, dict(units="mg/m^3")),
    ("tsm", np.dtype('float32'), 0.0, dict(units="mg/m^3")),
    ("qflags", np.dtype('uint16'), 0, {}),
]

---

In [11]:
# Utility functions to transform bounding boxes to pixel coordinates

BBox = Tuple[float, float, float, float]

def ij_bbox_from_xy_bbox(matrix: affine.Affine, xy_bbox: BBox) -> BBox:
    x1, y1, x2, y2 = xy_bbox
    i1, j1 = matrix * (x1, y1)
    i2, j2 = matrix * (x2, y2)
    return i1, j1, i2, j2

def ij_bboxes_from_xy_bboxes(gm: GridMapping, xy_bboxes: List[BBox]) -> List[BBox]:
    # TODO (forman): use this code into xcube's 
    # GridMapping.ij_bboxes_from_xy_bboxes() if self.is_regular
    matrix = gm.xy_to_ij_transform
    matrix = affine.Affine(*matrix[0], *matrix[1])
    return [ij_bbox_from_xy_bbox(matrix, xy_bbox) for xy_bbox in xy_bboxes]

For the following cube template function we use `dask.array.full()` to specify the 
empty arrays for the data variables.
See https://docs.dask.org/en/stable/generated/dask.array.full.html

In [12]:
def new_super_cube_template(
    gm: GridMapping,
    data_vars: Tuple[str, np.dtype, Union[int, float], Dict[str, Any]],  
    time: np.ndarray,
    time_bounds: np.ndarray,
    time_chunk_size: int = 1,
):
    time_name = "time"
    x_name, y_name = gm.xy_dim_names
    width, height = gm.size
    tile_width, tile_height = gm.tile_size
    spatial_coords = gm.to_coords()

    data_var_dims = time_name, y_name, x_name
    data_var_shape = len(time), height, width
    data_var_chunks = time_chunk_size, tile_height, tile_width

    return xr.Dataset(
        data_vars=dict(
            **{name: xr.DataArray(da.full(data_var_shape, 
                                          fill_value, 
                                          chunks=data_var_chunks,                                      
                                          dtype=dtype,
                                          name=f'{name}_base'),
                                  dims=data_var_dims,
                                  attrs=attrs) 
               for name, dtype, fill_value, attrs in data_vars},
        ),
        coords=dict(
            **spatial_coords,
            time=xr.DataArray(time, 
                              dims="time", 
                              attrs={"bounds": "time_bnds"}),
            time_bnds=xr.DataArray(time_bounds, 
                                   dims=(time_name, "bnds")),
        ),
        attrs={
            # TODO: add global attributes, see cube spec
        }
    )

In [13]:
super_cube_template = new_super_cube_template(
    super_cube_gm,
    data_vars,
    time,
    time_bounds,
    time_chunk_size=1
)

In [14]:
super_cube_template

Unnamed: 0,Array,Chunk
Bytes,54.00 GiB,64.00 MiB
Shape,"(3, 49152, 98304)","(1, 4096, 4096)"
Dask graph,864 chunks in 1 graph layer,864 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 54.00 GiB 64.00 MiB Shape (3, 49152, 98304) (1, 4096, 4096) Dask graph 864 chunks in 1 graph layer Data type float32 numpy.ndarray",98304  49152  3,

Unnamed: 0,Array,Chunk
Bytes,54.00 GiB,64.00 MiB
Shape,"(3, 49152, 98304)","(1, 4096, 4096)"
Dask graph,864 chunks in 1 graph layer,864 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,54.00 GiB,64.00 MiB
Shape,"(3, 49152, 98304)","(1, 4096, 4096)"
Dask graph,864 chunks in 1 graph layer,864 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 54.00 GiB 64.00 MiB Shape (3, 49152, 98304) (1, 4096, 4096) Dask graph 864 chunks in 1 graph layer Data type float32 numpy.ndarray",98304  49152  3,

Unnamed: 0,Array,Chunk
Bytes,54.00 GiB,64.00 MiB
Shape,"(3, 49152, 98304)","(1, 4096, 4096)"
Dask graph,864 chunks in 1 graph layer,864 chunks in 1 graph layer
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,27.00 GiB,32.00 MiB
Shape,"(3, 49152, 98304)","(1, 4096, 4096)"
Dask graph,864 chunks in 1 graph layer,864 chunks in 1 graph layer
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray
"Array Chunk Bytes 27.00 GiB 32.00 MiB Shape (3, 49152, 98304) (1, 4096, 4096) Dask graph 864 chunks in 1 graph layer Data type uint16 numpy.ndarray",98304  49152  3,

Unnamed: 0,Array,Chunk
Bytes,27.00 GiB,32.00 MiB
Shape,"(3, 49152, 98304)","(1, 4096, 4096)"
Dask graph,864 chunks in 1 graph layer,864 chunks in 1 graph layer
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray


Unpack the paths and turn the geographical bounding boxes into pixel coordinates of the super cube:

In [16]:
region_paths = [path for path, _ in regions]
region_xy_bboxes = [xy_bbox for _, xy_bbox in regions]
region_ij_bboxes = ij_bboxes_from_xy_bboxes(super_cube_gm, region_xy_bboxes)
region_ij_bboxes

[(0.0, 0.0, 19660.8, 11796.479999999996),
 (39321.6, 1966.0800000000017, 78643.2, 51118.08)]

The following function will be called for each chunk of our super cube.
There are some restrictions concerning the function's implemention so
it can run concurrently on distinct dask workers:

- Do not access any global data in the function, e.g. caches.
- This function shall be "pure" - no side effects.
- Everything the functions needs to know must be passed as args or kwargs.
- For optimal concurrency using the Dask default multi-threading scheduler, 
  try to surpass the Python global interpreter lock (GIL). E.g., use `numba`.
  
See also https://docs.xarray.dev/en/stable/user-guide/dask.html#dask-automatic-parallelization.

Note that if `test_distributed == True`, we wont see any output of the print() function 
because they are emitted by different processes.

In [17]:
def block_mapper(block_ds: xr.Dataset, 
                 ds_time_index: pd.DatetimeIndex,
                 region_paths: List[str], 
                 region_xy_bboxes: np.ndarray,
                 region_ij_bboxes: np.ndarray):
    # Assuming time chunking = 1
    time = block_ds.time.values[0]
    time_idx = ds_time_index.get_loc(time)
    
    x1, x2 = block_ds.lon_bnds[0, 0], block_ds.lon_bnds[-1, 1]
    y1, y2 = block_ds.lat_bnds[0, 0], block_ds.lat_bnds[-1, 1]
    xy_bbox = np.array((x1, y1, x2, y2))
    
    print(f"mapping time={time},"
          f" time_idx={time_idx},"
          f" xy_bbox={xy_bbox}")
    
    ds = block_ds.copy()
    
    # TODO: detect intersecting sub-regions
    #   Open file, read variables for given time step 
    #   Copy array regions of variables into ds
    
    return ds

See https://docs.xarray.dev/en/stable/generated/xarray.map_blocks.html.
Note that `args` are copied to dask workers; so they should be serializable and slim.

In [18]:
super_cube = super_cube_template.map_blocks(
    block_mapper,
    args=[super_cube_template.get_index("time"),
          region_paths, 
          np.array(region_xy_bboxes),
          np.array(region_ij_bboxes)],  
    template=super_cube_template
)

In [19]:
super_cube

Unnamed: 0,Array,Chunk
Bytes,1.50 MiB,64.00 kiB
Shape,"(98304, 2)","(4096, 2)"
Dask graph,24 chunks in 10 graph layers,24 chunks in 10 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 1.50 MiB 64.00 kiB Shape (98304, 2) (4096, 2) Dask graph 24 chunks in 10 graph layers Data type float64 numpy.ndarray",2  98304,

Unnamed: 0,Array,Chunk
Bytes,1.50 MiB,64.00 kiB
Shape,"(98304, 2)","(4096, 2)"
Dask graph,24 chunks in 10 graph layers,24 chunks in 10 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,768.00 kiB,64.00 kiB
Shape,"(49152, 2)","(4096, 2)"
Dask graph,12 chunks in 10 graph layers,12 chunks in 10 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 768.00 kiB 64.00 kiB Shape (49152, 2) (4096, 2) Dask graph 12 chunks in 10 graph layers Data type float64 numpy.ndarray",2  49152,

Unnamed: 0,Array,Chunk
Bytes,768.00 kiB,64.00 kiB
Shape,"(49152, 2)","(4096, 2)"
Dask graph,12 chunks in 10 graph layers,12 chunks in 10 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,48 B,16 B
Shape,"(3, 2)","(1, 2)"
Dask graph,3 chunks in 10 graph layers,3 chunks in 10 graph layers
Data type,datetime64[ns] numpy.ndarray,datetime64[ns] numpy.ndarray
"Array Chunk Bytes 48 B 16 B Shape (3, 2) (1, 2) Dask graph 3 chunks in 10 graph layers Data type datetime64[ns] numpy.ndarray",2  3,

Unnamed: 0,Array,Chunk
Bytes,48 B,16 B
Shape,"(3, 2)","(1, 2)"
Dask graph,3 chunks in 10 graph layers,3 chunks in 10 graph layers
Data type,datetime64[ns] numpy.ndarray,datetime64[ns] numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,54.00 GiB,64.00 MiB
Shape,"(3, 49152, 98304)","(1, 4096, 4096)"
Dask graph,864 chunks in 10 graph layers,864 chunks in 10 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 54.00 GiB 64.00 MiB Shape (3, 49152, 98304) (1, 4096, 4096) Dask graph 864 chunks in 10 graph layers Data type float32 numpy.ndarray",98304  49152  3,

Unnamed: 0,Array,Chunk
Bytes,54.00 GiB,64.00 MiB
Shape,"(3, 49152, 98304)","(1, 4096, 4096)"
Dask graph,864 chunks in 10 graph layers,864 chunks in 10 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,54.00 GiB,64.00 MiB
Shape,"(3, 49152, 98304)","(1, 4096, 4096)"
Dask graph,864 chunks in 10 graph layers,864 chunks in 10 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 54.00 GiB 64.00 MiB Shape (3, 49152, 98304) (1, 4096, 4096) Dask graph 864 chunks in 10 graph layers Data type float32 numpy.ndarray",98304  49152  3,

Unnamed: 0,Array,Chunk
Bytes,54.00 GiB,64.00 MiB
Shape,"(3, 49152, 98304)","(1, 4096, 4096)"
Dask graph,864 chunks in 10 graph layers,864 chunks in 10 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,27.00 GiB,32.00 MiB
Shape,"(3, 49152, 98304)","(1, 4096, 4096)"
Dask graph,864 chunks in 10 graph layers,864 chunks in 10 graph layers
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray
"Array Chunk Bytes 27.00 GiB 32.00 MiB Shape (3, 49152, 98304) (1, 4096, 4096) Dask graph 864 chunks in 10 graph layers Data type uint16 numpy.ndarray",98304  49152  3,

Unnamed: 0,Array,Chunk
Bytes,27.00 GiB,32.00 MiB
Shape,"(3, 49152, 98304)","(1, 4096, 4096)"
Dask graph,864 chunks in 10 graph layers,864 chunks in 10 graph layers
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray


Test with a subset:

In [26]:
subset = super_cube.sel(lon=slice(10., 11.), lat=slice(26., 27.))
subset

Unnamed: 0,Array,Chunk
Bytes,30.72 kiB,25.59 kiB
Shape,"(1966, 2)","(1638, 2)"
Dask graph,2 chunks in 11 graph layers,2 chunks in 11 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 30.72 kiB 25.59 kiB Shape (1966, 2) (1638, 2) Dask graph 2 chunks in 11 graph layers Data type float64 numpy.ndarray",2  1966,

Unnamed: 0,Array,Chunk
Bytes,30.72 kiB,25.59 kiB
Shape,"(1966, 2)","(1638, 2)"
Dask graph,2 chunks in 11 graph layers,2 chunks in 11 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,30.72 kiB,28.16 kiB
Shape,"(1966, 2)","(1802, 2)"
Dask graph,2 chunks in 11 graph layers,2 chunks in 11 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 30.72 kiB 28.16 kiB Shape (1966, 2) (1802, 2) Dask graph 2 chunks in 11 graph layers Data type float64 numpy.ndarray",2  1966,

Unnamed: 0,Array,Chunk
Bytes,30.72 kiB,28.16 kiB
Shape,"(1966, 2)","(1802, 2)"
Dask graph,2 chunks in 11 graph layers,2 chunks in 11 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,44.23 MiB,11.26 MiB
Shape,"(3, 1966, 1966)","(1, 1802, 1638)"
Dask graph,12 chunks in 11 graph layers,12 chunks in 11 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 44.23 MiB 11.26 MiB Shape (3, 1966, 1966) (1, 1802, 1638) Dask graph 12 chunks in 11 graph layers Data type float32 numpy.ndarray",1966  1966  3,

Unnamed: 0,Array,Chunk
Bytes,44.23 MiB,11.26 MiB
Shape,"(3, 1966, 1966)","(1, 1802, 1638)"
Dask graph,12 chunks in 11 graph layers,12 chunks in 11 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,44.23 MiB,11.26 MiB
Shape,"(3, 1966, 1966)","(1, 1802, 1638)"
Dask graph,12 chunks in 11 graph layers,12 chunks in 11 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 44.23 MiB 11.26 MiB Shape (3, 1966, 1966) (1, 1802, 1638) Dask graph 12 chunks in 11 graph layers Data type float32 numpy.ndarray",1966  1966  3,

Unnamed: 0,Array,Chunk
Bytes,44.23 MiB,11.26 MiB
Shape,"(3, 1966, 1966)","(1, 1802, 1638)"
Dask graph,12 chunks in 11 graph layers,12 chunks in 11 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,22.12 MiB,5.63 MiB
Shape,"(3, 1966, 1966)","(1, 1802, 1638)"
Dask graph,12 chunks in 11 graph layers,12 chunks in 11 graph layers
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray
"Array Chunk Bytes 22.12 MiB 5.63 MiB Shape (3, 1966, 1966) (1, 1802, 1638) Dask graph 12 chunks in 11 graph layers Data type uint16 numpy.ndarray",1966  1966  3,

Unnamed: 0,Array,Chunk
Bytes,22.12 MiB,5.63 MiB
Shape,"(3, 1966, 1966)","(1, 1802, 1638)"
Dask graph,12 chunks in 11 graph layers,12 chunks in 11 graph layers
Data type,uint16 numpy.ndarray,uint16 numpy.ndarray


In [27]:
subset.chl

Unnamed: 0,Array,Chunk
Bytes,44.23 MiB,11.26 MiB
Shape,"(3, 1966, 1966)","(1, 1802, 1638)"
Dask graph,12 chunks in 11 graph layers,12 chunks in 11 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 44.23 MiB 11.26 MiB Shape (3, 1966, 1966) (1, 1802, 1638) Dask graph 12 chunks in 11 graph layers Data type float32 numpy.ndarray",1966  1966  3,

Unnamed: 0,Array,Chunk
Bytes,44.23 MiB,11.26 MiB
Shape,"(3, 1966, 1966)","(1, 1802, 1638)"
Dask graph,12 chunks in 11 graph layers,12 chunks in 11 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [28]:
subset.tsm.values

mapping time=2020-05-01T10:30:00.000000000, time_idx=0, xy_bbox=[ 8.75       24.         10.83333333 26.08333333]
mapping time=2020-05-01T10:30:00.000000000, time_idx=0, xy_bbox=[10.83333333 24.         12.91666667 26.08333333]
mapping time=2020-05-01T10:30:00.000000000, time_idx=0, xy_bbox=[ 8.75       26.08333333 10.83333333 28.16666667]
mapping time=2020-05-01T10:30:00.000000000, time_idx=0, xy_bbox=[10.83333333 26.08333333 12.91666667 28.16666667]
mapping time=2020-05-02T14:30:00.000000000, time_idx=1, xy_bbox=[ 8.75       24.         10.83333333 26.08333333]
mapping time=2020-05-02T14:30:00.000000000, time_idx=1, xy_bbox=[10.83333333 24.         12.91666667 26.08333333]
mapping time=2020-05-02T14:30:00.000000000, time_idx=1, xy_bbox=[ 8.75       26.08333333 10.83333333 28.16666667]
mapping time=2020-05-02T14:30:00.000000000, time_idx=1, xy_bbox=[10.83333333 26.08333333 12.91666667 28.16666667]
mapping time=2020-05-03T13:30:00.000000000, time_idx=2, xy_bbox=[ 8.75       24.        

array([[[0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        ...,
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.]],

       [[0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        ...,
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.]],

       [[0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        ...,
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.],
        [0., 0., 0., ..., 0., 0., 0.]]], dtype=float32)