diff --git a/src/openeo_aggregator/metadata/merging.py b/src/openeo_aggregator/metadata/merging.py index ed66707c..5b4df96e 100644 --- a/src/openeo_aggregator/metadata/merging.py +++ b/src/openeo_aggregator/metadata/merging.py @@ -21,7 +21,7 @@ from openeo_aggregator.metadata.models.extent import Extent from openeo_aggregator.metadata.models.stac_summaries import StacSummaries from openeo_aggregator.metadata.reporter import LoggerReporter -from openeo_aggregator.utils import MultiDictGetter +from openeo_aggregator.utils import MultiDictGetter, common_prefix from openeo_driver.errors import OpenEOApiException _log = logging.getLogger(__name__) @@ -101,15 +101,17 @@ def merge_collection_metadata( result[field] = getter.first(field) # Summary merging - summaries_list = [] - for i, cube_dim_dict in enumerate(getter.select("summaries").dictionaries): - backend_id = list(by_backend.keys())[i] + summaries_by_backend = {} + for backend_id, collection in by_backend.items(): try: - summary = StacSummaries.from_dict(cube_dim_dict) - summaries_list.append((f"{backend_id}:{cid}", summary)) + summaries_by_backend[backend_id] = StacSummaries.from_dict( + collection.get("summaries") + ) except Exception as e: - report(repr(e), collection_id=cid, backend_id=backend_id) - result["summaries"] = StacSummaries.merge_all(summaries_list, report).to_dict() + report("Failed to parse summaries", collection_id=cid, backend_id=backend_id, exception=e) + result["summaries"] = StacSummaries.merge_all( + summaries_by_backend=summaries_by_backend, report=report, collection_id=cid + ).to_dict() # Assets if getter.has_key("assets"): @@ -182,15 +184,7 @@ def merge_collection_metadata( try: # Find common prefix of bands # TODO: better approach? e.g. keep everything and rewrite process graphs on the fly? - bands_iterator = cube_dim_getter.select(dim).get("values") - prefix = next(bands_iterator) - for bands in bands_iterator: - prefix = [t[0] for t in itertools.takewhile(lambda t: t[0] == t[1], zip(prefix, bands))] - if bands != prefix: - report( - f"Trimming bands {bands} to common prefix {prefix}", - collection_id=cid, - ) + prefix = common_prefix(cube_dim_getter.select(dim).get("values")) if len(prefix) > 0: result["cube:dimensions"][dim]["values"] = prefix else: diff --git a/src/openeo_aggregator/metadata/models/README.md b/src/openeo_aggregator/metadata/models/README.md index 1dd5e966..dfc1d464 100644 --- a/src/openeo_aggregator/metadata/models/README.md +++ b/src/openeo_aggregator/metadata/models/README.md @@ -6,7 +6,8 @@ After that, additional validation rules and merging logic was added manually. You can follow these steps to generate the models yourself: 1. Install the openapi-generator: `pip install openapi-generator-cli` -2. Convert the [openapi.yaml](https://github.com/Open-EO/openeo-api/blob/master/openapi.yaml) file to json. +2. Convert the [openEO API openapi.yaml file](https://github.com/Open-EO/openeo-api/blob/master/openapi.yaml) + to json. sudo openapi-generator-cli generate -g openapi -i openapi.yaml -o . diff --git a/src/openeo_aggregator/metadata/models/stac_eo.py b/src/openeo_aggregator/metadata/models/stac_eo.py new file mode 100644 index 00000000..9ec2e678 --- /dev/null +++ b/src/openeo_aggregator/metadata/models/stac_eo.py @@ -0,0 +1,134 @@ +# This implementation is based on +# code generated with https://app.quicktype.io/ +# using the spec of STAC extension "eo" +# https://github.com/stac-extensions/eo/blob/main/json-schema/schema.json + +from enum import Enum +from dataclasses import dataclass +from typing import Optional, Any, List, Dict, TypeVar, Type, Callable, cast + +from openeo.util import dict_no_none + +T = TypeVar("T") +EnumT = TypeVar("EnumT", bound=Enum) + + +def from_float(x: Any) -> float: + assert isinstance(x, (float, int)) and not isinstance(x, bool) + return float(x) + + +def from_none(x: Any) -> Any: + assert x is None + return x + + +def from_union(fs, x): + for f in fs: + try: + return f(x) + except: + pass + assert False + + +def from_str(x: Any) -> str: + assert isinstance(x, str) + return x + + +def to_float(x: Any) -> float: + assert isinstance(x, float) + return x + + +def to_enum(c: Type[EnumT], x: Any) -> EnumT: + assert isinstance(x, c) + return x.value + + +def from_list(f: Callable[[Any], T], x: Any) -> List[T]: + assert isinstance(x, list) + return [f(y) for y in x] + + +def to_class(c: Type[T], x: Any) -> dict: + assert isinstance(x, c) + return cast(Any, x).to_dict() + + +def from_dict(f: Callable[[Any], T], x: Any) -> Dict[str, T]: + assert isinstance(x, dict) + return {k: f(v) for (k, v) in x.items()} + + +class CommonNameOfTheBand(Enum): + BLUE = "blue" + CIRRUS = "cirrus" + COASTAL = "coastal" + GREEN = "green" + LWIR = "lwir" + LWIR11 = "lwir11" + LWIR12 = "lwir12" + NIR = "nir" + NIR08 = "nir08" + NIR09 = "nir09" + PAN = "pan" + RED = "red" + REDEDGE = "rededge" + SWIR16 = "swir16" + SWIR22 = "swir22" + YELLOW = "yellow" + + +@dataclass +class EoBand: + center_wavelength: Optional[float] = None + common_name: Optional[CommonNameOfTheBand] = None + full_width_half_max: Optional[float] = None + name: Optional[str] = None + solar_illumination: Optional[float] = None + + @staticmethod + def from_dict(obj: Any) -> "EoBand": + assert isinstance(obj, dict) + center_wavelength = from_union( + [from_float, from_none], obj.get("center_wavelength") + ) + common_name = from_union( + [CommonNameOfTheBand, from_none], obj.get("common_name") + ) + full_width_half_max = from_union( + [from_float, from_none], obj.get("full_width_half_max") + ) + name = from_union([from_str, from_none], obj.get("name")) + solar_illumination = from_union( + [from_float, from_none], obj.get("solar_illumination") + ) + return EoBand( + center_wavelength, + common_name, + full_width_half_max, + name, + solar_illumination, + ) + + def to_dict(self) -> dict: + return dict_no_none( + { + "center_wavelength": from_union( + [to_float, from_none], self.center_wavelength + ), + "common_name": from_union( + [lambda x: to_enum(CommonNameOfTheBand, x), from_none], + self.common_name, + ), + "full_width_half_max": from_union( + [to_float, from_none], self.full_width_half_max + ), + "name": from_union([from_str, from_none], self.name), + "solar_illumination": from_union( + [to_float, from_none], self.solar_illumination + ), + } + ) diff --git a/src/openeo_aggregator/metadata/models/stac_summaries.py b/src/openeo_aggregator/metadata/models/stac_summaries.py index f4d40c17..ab321dc6 100644 --- a/src/openeo_aggregator/metadata/models/stac_summaries.py +++ b/src/openeo_aggregator/metadata/models/stac_summaries.py @@ -4,8 +4,10 @@ import attr +from openeo_aggregator.metadata.models.stac_eo import EoBand from openeo_aggregator.metadata.models.statistics import Statistics from openeo_aggregator.metadata.utils import merge_lists_skip_duplicates +from openeo_aggregator.utils import common_prefix T = TypeVar("T", bound="StacSummaries") @@ -42,6 +44,8 @@ class StacSummaries: It is generally allowed to add custom fields. """ + + # TODO: this is a confusing name, why not just "summaries"? additional_properties: Dict[str, Union[List[Any], Statistics, Dict]] = attr.ib(init=True, factory=dict) def to_dict(self) -> Dict[str, Any]: @@ -111,36 +115,71 @@ def __contains__(self, key: str) -> bool: @staticmethod def merge_all( - summaries_list: List[Tuple[str, "StacSummaries"]], - report + summaries_by_backend: Dict[str, "StacSummaries"], + report, + collection_id: str, ) -> "StacSummaries": """ Merge all summaries into one. - :param summaries_list: List of summaries to merge each as a tuple of (collection_identifier, summary). + :param summaries_list: List of summaries to merge each as a tuple of (backend_id, summary). :param report: logging function to report errors :return: Merged summaries """ - additional_properties = [(cid, x.additional_properties) for cid, x in summaries_list] + by_backend = { + k: v.additional_properties for k, v in summaries_by_backend.items() + } # Calculate the unique summary names. - unique_summary_names: Set[str] = functools.reduce(lambda a, b: a.union(b), (d.keys() for _, d in additional_properties), set()) + unique_summary_names: Set[str] = functools.reduce( + lambda a, b: a.union(b), (d.keys() for d in by_backend.values()), set() + ) merged_addition_properties = {} for summary_name in unique_summary_names: if ( - summary_name in ["constellation", "platform", "instruments"] + summary_name + in [ + "constellation", + "platform", + "instruments", + "gsd", + ] or summary_name.startswith("sar:") or summary_name.startswith("sat:") ): - summary_lists = [d.get(summary_name, []) for _, d in additional_properties] + summary_lists = [d.get(summary_name, []) for d in by_backend.values()] merged_addition_properties[summary_name] = merge_lists_skip_duplicates(summary_lists) + elif summary_name == "eo:bands": + eo_bands_lists = [] + for collection_summaries in by_backend.values(): + try: + if summary_name in collection_summaries: + eo_bands_lists.append( + [ + EoBand.from_dict(b) + for b in collection_summaries.get(summary_name) + ] + ) + except Exception as e: + report( + f"Failed to parse summary {summary_name!r}: {e!r}", + collection_id=collection_id, + ) + prefix: List[EoBand] = common_prefix(eo_bands_lists) + if len(prefix) > 0: + eo_bands = [b.to_dict() for b in prefix] + else: + report( + f"Empty prefix for {summary_name!r}, falling back to first back-end's {summary_name!r}", + collection_id=collection_id, + ) + eo_bands = next( + d.get(summary_name) + for d in by_backend.values() + if summary_name in d + ) + merged_addition_properties[summary_name] = eo_bands else: - backends = ",".join( - [cid for cid, d in additional_properties if summary_name in d] - ) - report( - f"Unhandled merging of StacSummaries for summary_name: {summary_name!r}", - backend_id=backends, - ) + report(f"Unhandled merging of summary {summary_name!r}") return StacSummaries(additional_properties=merged_addition_properties) diff --git a/src/openeo_aggregator/utils.py b/src/openeo_aggregator/utils.py index 4645b054..6a4a9def 100644 --- a/src/openeo_aggregator/utils.py +++ b/src/openeo_aggregator/utils.py @@ -1,3 +1,5 @@ +import itertools + import datetime import functools import logging @@ -188,3 +190,18 @@ def timestamp_to_rfc3339(timestamp: float) -> str: def normalize_issuer_url(url: str) -> str: return url.rstrip("/").lower() + + +def common_prefix(lists: Iterable[Iterable[Any]]) -> List[Any]: + """Find common prefix of a set of sequences.""" + list_iterator = iter(lists) + try: + prefix = list(next(list_iterator)) + except StopIteration: + prefix = [] + for other in list_iterator: + prefix = [ + t[0] + for t in itertools.takewhile(lambda t: t[0] == t[1], zip(prefix, other)) + ] + return prefix diff --git a/tests/metadata/conftest.py b/tests/metadata/conftest.py new file mode 100644 index 00000000..0f62e5f8 --- /dev/null +++ b/tests/metadata/conftest.py @@ -0,0 +1,16 @@ +import pytest + + +class ListReporter: + """Reporter for testing""" + + def __init__(self): + self.logs = [] + + def __call__(self, msg, **kwargs): + self.logs.append({"msg": msg, **kwargs}) + + +@pytest.fixture +def reporter() -> ListReporter: + return ListReporter() diff --git a/tests/metadata/test_merging.py b/tests/metadata/test_merging.py index f2fcc43e..d553f27b 100644 --- a/tests/metadata/test_merging.py +++ b/tests/metadata/test_merging.py @@ -3,22 +3,6 @@ from openeo_aggregator.metadata.merging import ProcessMetadataMerger, json_diff from openeo_aggregator.testing import same_repr - -class ListReporter: - """Reporter for testing""" - - def __init__(self): - self.logs = [] - - def __call__(self, msg, **kwargs): - self.logs.append({"msg": msg, **kwargs}) - - -@pytest.fixture -def reporter() -> ListReporter: - return ListReporter() - - class TestMergeProcessMetadata: @pytest.fixture def merger(self, reporter) -> ProcessMetadataMerger: diff --git a/tests/metadata/test_stac_summaries.py b/tests/metadata/test_stac_summaries.py new file mode 100644 index 00000000..475d297f --- /dev/null +++ b/tests/metadata/test_stac_summaries.py @@ -0,0 +1,63 @@ +from openeo_aggregator.metadata.models.stac_summaries import StacSummaries +from openeo_aggregator.testing import approx_str_prefix + + +def test_merge_eo_bands_empty(reporter): + summaries_list = [] + result = StacSummaries.merge_all( + summaries_by_backend={}, + report=reporter, + collection_id="S2", + ).to_dict() + assert result == {} + + +def test_merge_eo_bands_simple(reporter): + b02 = {"common_name": "blue", "name": "B02"} + b03 = {"common_name": "green", "name": "B03"} + summaries_by_backend = { + "x": StacSummaries.from_dict({"eo:bands": [b02, b03]}), + "y": StacSummaries.from_dict({"eo:bands": [b02, b03]}), + } + result = StacSummaries.merge_all( + summaries_by_backend=summaries_by_backend, report=reporter, collection_id="S2" + ).to_dict() + assert result == {"eo:bands": [b02, b03]} + + +def test_merge_eo_bands_empty_prefix(reporter): + b02 = {"common_name": "blue", "name": "B02"} + b03 = {"common_name": "green", "name": "B03"} + summaries_by_backend = { + "x": StacSummaries.from_dict({"eo:bands": [b02, b03]}), + "y": StacSummaries.from_dict({"eo:bands": [b03, b02]}), + } + result = StacSummaries.merge_all( + summaries_by_backend=summaries_by_backend, report=reporter, collection_id="S2" + ).to_dict() + assert result == {"eo:bands": [b02, b03]} + assert reporter.logs == [ + { + "msg": "Empty prefix for 'eo:bands', falling back to first back-end's 'eo:bands'", + "collection_id": "S2", + } + ] + + +def test_merge_eo_bands_invalid(reporter): + b02 = {"common_name": "blue", "name": "B02"} + b03 = {"common_name": "green", "name": "B03"} + summaries_by_backend = { + "x": StacSummaries.from_dict({"eo:bands": [{"name": 123455}]}), + "y": StacSummaries.from_dict({"eo:bands": [b03, b02]}), + } + result = StacSummaries.merge_all( + summaries_by_backend=summaries_by_backend, report=reporter, collection_id="S2" + ).to_dict() + assert result == {"eo:bands": [b03, b02]} + assert reporter.logs == [ + { + "msg": approx_str_prefix("Failed to parse summary 'eo:bands'"), + "collection_id": "S2", + } + ] diff --git a/tests/test_backend.py b/tests/test_backend.py index aa95b930..8c10829f 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -796,12 +796,13 @@ def test_get_collection_metadata_merging_cubedimensions( requests_mock.get( backend1 + "/collections", json={"collections": [{"id": "S2"}]} ) + b1_bands = ["VV", "VH", "HV", "HH"] requests_mock.get( backend1 + "/collections/S2", json={ "id": "S2", "cube:dimensions": { - "bands": {"type": "bands", "values": ["VV", "VH", "HV", "HH"]}, + "bands": {"type": "bands", "values": b1_bands}, "t": { "extent": ["2013-10-03T04:14:15Z", "2020-04-03T00:00:00Z"], "step": 1, @@ -822,11 +823,13 @@ def test_get_collection_metadata_merging_cubedimensions( "type": "spatial", }, }, + "summaries": {"eo:bands": [{"name": b} for b in b1_bands]}, }, ) requests_mock.get( backend2 + "/collections", json={"collections": [{"id": "S2"}]} ) + b2_bands = ["VV", "VH", "HH", "HH+HV", "VV+VH", "HV"] requests_mock.get( backend2 + "/collections/S2", json={ @@ -834,7 +837,7 @@ def test_get_collection_metadata_merging_cubedimensions( "cube:dimensions": { "bands": { "type": "bands", - "values": ["VV", "VH", "HH", "HH+HV", "VV+VH", "HV"], + "values": b2_bands, }, "t": { "extent": ["2013-04-03T00:00:00Z", "2019-04-03T00:00:00Z"], @@ -854,10 +857,12 @@ def test_get_collection_metadata_merging_cubedimensions( "reference_system": {"name": "PROJJSON object."}, }, }, + "summaries": {"eo:bands": [{"name": b} for b in b2_bands]}, }, ) metadata = catalog.get_collection_metadata("S2") + expected_bands = ["VV", "VH"] assert metadata == { "id": "S2", "stac_version": "0.9.0", @@ -868,7 +873,7 @@ def test_get_collection_metadata_merging_cubedimensions( "cube:dimensions": { "bands": { "type": "bands", - "values": ["VV", "VH"], + "values": expected_bands, }, "t": { "extent": ["2013-04-03T00:00:00Z", "2020-04-03T00:00:00Z"], @@ -893,6 +898,7 @@ def test_get_collection_metadata_merging_cubedimensions( "summaries": { "federation:backends": ["b1", "b2"], "provider:backend": ["b1", "b2"], + "eo:bands": [{"name": b} for b in expected_bands], }, "extent": { "spatial": {"bbox": [[-180, -90, 180, 90]]}, @@ -938,6 +944,7 @@ def test_get_collection_metadata_merging_bands_prefix( "cube:dimensions": { "bands": {"type": "bands", "values": b1_bands}, }, + "summaries": {"eo:bands": [{"name": b} for b in b1_bands]}, }, ) requests_mock.get( @@ -950,6 +957,7 @@ def test_get_collection_metadata_merging_bands_prefix( "cube:dimensions": { "bands": {"type": "bands", "values": b2_bands}, }, + "summaries": {"eo:bands": [{"name": b} for b in b2_bands]}, }, ) @@ -970,6 +978,7 @@ def test_get_collection_metadata_merging_bands_prefix( "summaries": { "federation:backends": ["b1", "b2"], "provider:backend": ["b1", "b2"], + "eo:bands": [{"name": b} for b in expected_bands], }, "extent": { "spatial": {"bbox": [[-180, -90, 180, 90]]}, diff --git a/tests/test_utils.py b/tests/test_utils.py index 75a6a95c..e7e6bdb1 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,8 +1,17 @@ import pytest import shapely.geometry -from openeo_aggregator.utils import normalize_issuer_url, MultiDictGetter, subdict, dict_merge, EventHandler, \ - BoundingBox, strip_join, timestamp_to_rfc3339 +from openeo_aggregator.utils import ( + normalize_issuer_url, + MultiDictGetter, + subdict, + dict_merge, + EventHandler, + BoundingBox, + strip_join, + timestamp_to_rfc3339, + common_prefix, +) class TestMultiDictGetter: @@ -218,3 +227,35 @@ def test_timestamp_to_rfc3339(): def test_normalize_issuer_url(): assert normalize_issuer_url("https://example.com/oidc/") == "https://example.com/oidc" assert normalize_issuer_url("https://example.com/OidC/") == "https://example.com/oidc" + + +def test_common_prefix_empty(): + assert common_prefix([]) == [] + + +def test_common_prefix_single(): + assert common_prefix([[1, 2, 3]]) == [1, 2, 3] + + +@pytest.mark.parametrize( + ["first", "second", "expected"], + [ + ([1, 2, 3, 4], [1, 2, 3, 5], [1, 2, 3]), + ([1, 2, 3, 4], [2, 3, 4, 1], []), + ([1, 2, 3], [1, 2, "3"], [1, 2]), + (range(5), range(4), [0, 1, 2, 3]), + ], +) +def test_common_prefix_basic(first, second, expected): + assert common_prefix([first, second]) == expected + + +def test_common_prefix_multiple(): + assert common_prefix( + ( + [1, 2, 3, 4], + (1, 2, 3, 5), + (x for x in [1, 2, 3, 6]), + range(1, 9), + ) + ) == [1, 2, 3]