Skip to content

Commit

Permalink
Merge branch 'add_load_stac'
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Jul 18, 2023
2 parents 4971005 + 081520f commit 8d1d947
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 20 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Add support in `VectoCube.download()` to guess output format from extension of a given filename
([#401](https://github.com/Open-EO/openeo-python-client/issues/401), [#449](https://github.com/Open-EO/openeo-python-client/issues/449))
- Added `load_stac` for Client Side Processing, based on the [openeo-processes-dask implementation](https://github.com/Open-EO/openeo-processes-dask/pull/127)

### Changed

- Updated docs for Client Side Processing with `load_stac` examples, available at https://open-eo.github.io/openeo-python-client/cookbook/localprocessing.html

### Removed

### Fixed
Expand Down
102 changes: 89 additions & 13 deletions docs/cookbook/localprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ Background
----------

The client-side processing functionality allows to test and use openEO with its processes locally, i.e. without any connection to an openEO back-end.
It relies on the projects `openeo-pg-parser-networkx <https://github.com/Open-EO/openeo-pg-parser-networkx>`_, which provides an openEO process graph parsing tool, and `openeo-processes-dask <https://github.com/Open-EO/openeo-processes-dask>`_, which provides an Xarray and Dask implementation of most openEO processes.
It relies on the projects `openeo-pg-parser-networkx <https://github.com/Open-EO/openeo-pg-parser-networkx>`_, which provides an openEO process graph parsing tool, and `openeo-processes-dask <https://github.com/Open-EO/openeo-processes-dask>`_, which provides an Xarray and Dask implementation of most openEO processes.

Installation
------------

.. note::
This feature requires ``Python>=3.9`` and has been tested
with ``openeo-pg-parser-networkx==2023.3.1`` and
``openeo-processes-dask==2023.3.2``
This feature requires ``Python>=3.9``.
Tested with ``openeo-pg-parser-networkx==2023.5.1`` and
``openeo-processes-dask==2023.7.1``.

.. code:: bash
Expand All @@ -26,18 +26,69 @@ Installation
Usage
-----

Every openEO process graph relies on data which is typically provided by a cloud infrastructure (the openEO back-end).
The client-side processing adds the possibility to read and use local netCDFs, geoTIFFs, ZARR files, and remote STAC Collections or Items for your experiments.

STAC Collections and Items
~~~~~~~~~~~~~~~~~~~~~~~~~~

.. warning::
The provided examples using STAC rely on third party STAC Catalogs, we can't guarantee that the urls will remain valid.

With the ``load_stac`` process it's possible to load and use data provided by remote or local STAC Collections or Items.
The following code snippet loads Sentinel-2 L2A data from a public STAC Catalog, using specific spatial and temporal extent, band name and also properties for cloud coverage.

.. code-block:: pycon
>>> from openeo.local import LocalConnection
>>> local_conn = LocalConnection("./")
>>> url = "https://earth-search.aws.element84.com/v1/collections/sentinel-2-l2a"
>>> spatial_extent = {"west": 11, "east": 12, "south": 46, "north": 47}
>>> temporal_extent = ["2019-01-01", "2019-06-15"]
>>> bands = ["red"]
>>> properties = {"eo:cloud_cover": dict(lt=50)}
>>> s2_cube = local_conn.load_stac(url=url,
... spatial_extent=spatial_extent,
... temporal_extent=temporal_extent,
... bands=bands,
... properties=properties,
... )
>>> s2_cube.execute()
<xarray.DataArray 'stackstac-08730b1b5458a4ed34edeee60ac79254' (time: 177,
band: 1,
y: 11354,
x: 8025)>
dask.array<getitem, shape=(177, 1, 11354, 8025), dtype=float64, chunksize=(1, 1, 1024, 1024), chunktype=numpy.ndarray>
Coordinates: (12/53)
* time (time) datetime64[ns] 2019-01-02...
id (time) <U24 'S2B_32TPR_20190102_...
* band (band) <U3 'red'
* x (x) float64 6.52e+05 ... 7.323e+05
* y (y) float64 5.21e+06 ... 5.096e+06
s2:product_uri (time) <U65 'S2B_MSIL2A_20190102...
... ...
raster:bands object {'nodata': 0, 'data_type'...
gsd int32 10
common_name <U3 'red'
center_wavelength float64 0.665
full_width_half_max float64 0.038
epsg int32 32632
Attributes:
spec: RasterSpec(epsg=32632, bounds=(600000.0, 4990200.0, 809760.0...
crs: epsg:32632
transform: | 10.00, 0.00, 600000.00|\n| 0.00,-10.00, 5300040.00|\n| 0.0...
resolution: 10.0
Local Collections
~~~~~~~~~~~~~~~~~

Every openEO process graph relies on data, which was always provided by a cloud infrastructure (the openEO back-end) until now.
The client-side processing adds the possibility to read and use local netCDFs, geoTIFFs and ZARR files for your experiments.

If you want to use our sample data, please clone this repository:

.. code:: bash
git clone https://github.com/Open-EO/openeo-localprocessing-data.git
With some sample data we can now check the STAC metadata for the local files by doing:

.. code:: python
Expand Down Expand Up @@ -80,9 +131,8 @@ Let's start with the provided sample netCDF of Sentinel-2 data:
Attributes:
Conventions: CF-1.9
institution: openEO platform - Geotrellis backend: 0.9.5a1
description:
title:
...
description:
title:
As you can see in the previous example, we are using a call to execute() which will execute locally the generated openEO process graph.
In this case, the process graph consist only in a single load_collection, which performs lazy loading of the data. With this first step you can check if the data is being read correctly by openEO.
Expand All @@ -96,9 +146,35 @@ We can now do a simple processing for demo purposes, let's compute the median ND
b04 = s2_datacube.band("B04")
b08 = s2_datacube.band("B08")
ndvi = (b08-b04)/(b08+b04)
ndvi_median = ndvi.reduce_dimension(dimension="t",reducer="median")
ndvi = (b08 - b04) / (b08 + b04)
ndvi_median = ndvi.reduce_dimension(dimension="t", reducer="median")
result_ndvi = ndvi_median.execute()
result_ndvi.plot.imshow(cmap="Greens")
.. image:: ../_static/images/local/local_ndvi.jpg

We can perform the same example using data provided by STAC Collection:

.. code:: python
from openeo.local import LocalConnection
local_conn = LocalConnection("./")
url = "https://earth-search.aws.element84.com/v1/collections/sentinel-2-l2a"
spatial_extent = {"east": 11.40, "north": 46.52, "south": 46.46, "west": 11.25}
temporal_extent = ["2022-06-01", "2022-06-30"]
bands = ["red", "nir"]
properties = {"eo:cloud_cover": dict(lt=80)}
s2_datacube = local_conn.load_stac(
url=url,
spatial_extent=spatial_extent,
temporal_extent=temporal_extent,
bands=bands,
properties=properties,
)
b04 = s2_datacube.band("red")
b08 = s2_datacube.band("nir")
ndvi = (b08 - b04) / (b08 + b04)
ndvi_median = ndvi.reduce_dimension(dimension="time", reducer="median")
result_ndvi = ndvi_median.execute()
155 changes: 154 additions & 1 deletion openeo/local/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
from pathlib import Path
from typing import Callable, Dict, List, Optional, Union

import numpy as np
import xarray as xr
from openeo_pg_parser_networkx.graph import OpenEOProcessGraph
from openeo_pg_parser_networkx.pg_schema import BoundingBox, TemporalInterval
from openeo_processes_dask.process_implementations.cubes import load_stac

from openeo.internal.graph_building import PGNode, as_flat_graph
from openeo.internal.jupyter import VisualDict, VisualList
from openeo.local.collections import _get_geotiff_metadata, _get_local_collections, _get_netcdf_zarr_metadata
from openeo.local.processing import PROCESS_REGISTRY
from openeo.metadata import CollectionMetadata
from openeo.metadata import Band, BandDimension, CollectionMetadata, SpatialDimension, TemporalDimension
from openeo.rest.datacube import DataCube

_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -88,6 +91,156 @@ def load_collection(
fetch_metadata=fetch_metadata,
)

def datacube_from_process(self, process_id: str, namespace: Optional[str] = None, **kwargs) -> DataCube:
"""
Load a data cube from a (custom) process.
:param process_id: The process id.
:param namespace: optional: process namespace
:param kwargs: The arguments of the custom process
:return: A :py:class:`DataCube`, without valid metadata, as the client is not aware of this custom process.
"""
graph = PGNode(process_id, namespace=namespace, arguments=kwargs)
return DataCube(graph=graph, connection=self)

def load_stac(
self,
url: str,
spatial_extent: Optional[Dict[str, float]] = None,
temporal_extent: Optional[List[Union[str, datetime.datetime, datetime.date]]] = None,
bands: Optional[List[str]] = None,
properties: Optional[dict] = None,
) -> DataCube:
"""
Loads data from a static STAC catalog or a STAC API Collection and returns the data as a processable :py:class:`DataCube`.
A batch job result can be loaded by providing a reference to it.
If supported by the underlying metadata and file format, the data that is added to the data cube can be
restricted with the parameters ``spatial_extent``, ``temporal_extent`` and ``bands``.
If no data is available for the given extents, a ``NoDataAvailable`` error is thrown.
Remarks:
* The bands (and all dimensions that specify nominal dimension labels) are expected to be ordered as
specified in the metadata if the ``bands`` parameter is set to ``null``.
* If no additional parameter is specified this would imply that the whole data set is expected to be loaded.
Due to the large size of many data sets, this is not recommended and may be optimized by back-ends to only
load the data that is actually required after evaluating subsequent processes such as filters.
This means that the values should be processed only after the data has been limited to the required extent
and as a consequence also to a manageable size.
:param url: The URL to a static STAC catalog (STAC Item, STAC Collection, or STAC Catalog)
or a specific STAC API Collection that allows to filter items and to download assets.
This includes batch job results, which itself are compliant to STAC.
For external URLs, authentication details such as API keys or tokens may need to be included in the URL.
Batch job results can be specified in two ways:
- For Batch job results at the same back-end, a URL pointing to the corresponding batch job results
endpoint should be provided. The URL usually ends with ``/jobs/{id}/results`` and ``{id}``
is the corresponding batch job ID.
- For external results, a signed URL must be provided. Not all back-ends support signed URLs,
which are provided as a link with the link relation `canonical` in the batch job result metadata.
:param spatial_extent:
Limits the data to load to the specified bounding box or polygons.
For raster data, the process loads the pixel into the data cube if the point at the pixel center intersects
with the bounding box or any of the polygons (as defined in the Simple Features standard by the OGC).
For vector data, the process loads the geometry into the data cube if the geometry is fully within the
bounding box or any of the polygons (as defined in the Simple Features standard by the OGC).
Empty geometries may only be in the data cube if no spatial extent has been provided.
The GeoJSON can be one of the following feature types:
* A ``Polygon`` or ``MultiPolygon`` geometry,
* a ``Feature`` with a ``Polygon`` or ``MultiPolygon`` geometry, or
* a ``FeatureCollection`` containing at least one ``Feature`` with ``Polygon`` or ``MultiPolygon`` geometries.
Set this parameter to ``None`` to set no limit for the spatial extent.
Be careful with this when loading large datasets. It is recommended to use this parameter instead of
using ``filter_bbox()`` or ``filter_spatial()`` directly after loading unbounded data.
:param temporal_extent:
Limits the data to load to the specified left-closed temporal interval.
Applies to all temporal dimensions.
The interval has to be specified as an array with exactly two elements:
1. The first element is the start of the temporal interval.
The specified instance in time is **included** in the interval.
2. The second element is the end of the temporal interval.
The specified instance in time is **excluded** from the interval.
The second element must always be greater/later than the first element.
Otherwise, a `TemporalExtentEmpty` exception is thrown.
Also supports open intervals by setting one of the boundaries to ``None``, but never both.
Set this parameter to ``None`` to set no limit for the temporal extent.
Be careful with this when loading large datasets. It is recommended to use this parameter instead of
using ``filter_temporal()`` directly after loading unbounded data.
:param bands:
Only adds the specified bands into the data cube so that bands that don't match the list
of band names are not available. Applies to all dimensions of type `bands`.
Either the unique band name (metadata field ``name`` in bands) or one of the common band names
(metadata field ``common_name`` in bands) can be specified.
If the unique band name and the common name conflict, the unique band name has a higher priority.
The order of the specified array defines the order of the bands in the data cube.
If multiple bands match a common name, all matched bands are included in the original order.
It is recommended to use this parameter instead of using ``filter_bands()`` directly after loading unbounded data.
:param properties:
Limits the data by metadata properties to include only data in the data cube which
all given conditions return ``True`` for (AND operation).
Specify key-value-pairs with the key being the name of the metadata property,
which can be retrieved with the openEO Data Discovery for Collections.
The value must be a condition (user-defined process) to be evaluated against a STAC API.
This parameter is not supported for static STAC.
.. versionadded:: 0.21.0
"""
arguments = {"url": url}
# TODO: more normalization/validation of extent/band parameters and `properties`
if spatial_extent:
arguments["spatial_extent"] = spatial_extent
if temporal_extent:
arguments["temporal_extent"] = DataCube._get_temporal_extent(temporal_extent)
if bands:
arguments["bands"] = bands
if properties:
arguments["properties"] = properties
cube = self.datacube_from_process(process_id="load_stac", **arguments)
# detect actual metadata from URL
# run load_stac to get the datacube metadata
arguments["spatial_extent"] = BoundingBox.parse_obj(spatial_extent)
arguments["temporal_extent"] = TemporalInterval.parse_obj(temporal_extent)
xarray_cube = load_stac(**arguments)
attrs = xarray_cube.attrs
for at in attrs:
# allowed types: str, Number, ndarray, number, list, tuple
if not isinstance(attrs[at], (int, float, str, np.ndarray, list, tuple)):
attrs[at] = str(attrs[at])
metadata = CollectionMetadata(
attrs,
dimensions=[
SpatialDimension(name=xarray_cube.openeo.x_dim, extent=[]),
SpatialDimension(name=xarray_cube.openeo.y_dim, extent=[]),
TemporalDimension(name=xarray_cube.openeo.temporal_dims[0], extent=[]),
BandDimension(
name=xarray_cube.openeo.band_dims[0],
bands=[Band(x) for x in xarray_cube[xarray_cube.openeo.band_dims[0]].values],
),
],
)
cube.metadata = metadata
return cube

def execute(self, process_graph: Union[dict, str, Path]) -> xr.DataArray:
"""
Execute locally the process graph and return the result as an xarray.DataArray.
Expand Down
5 changes: 0 additions & 5 deletions requirements-localprocessing.txt

This file was deleted.

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"rioxarray>=0.13.0",
"pyproj",
"openeo_pg_parser_networkx>=2023.5.1",
"openeo_processes_dask[implementations]>=2023.5.1",
"openeo_processes_dask[implementations]>=2023.7.1",
]

jupyter_require = [
Expand Down

0 comments on commit 8d1d947

Please sign in to comment.