Skip to content

Commit

Permalink
Issue #81 metadata merging: add support for "eo:bands" summary
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Nov 16, 2022
1 parent 804ca22 commit bb2d213
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 53 deletions.
28 changes: 11 additions & 17 deletions src/openeo_aggregator/metadata/merging.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from openeo_aggregator.metadata.models.extent import Extent
from openeo_aggregator.metadata.models.stac_summaries import StacSummaries
from openeo_aggregator.metadata.reporter import LoggerReporter
from openeo_aggregator.utils import MultiDictGetter
from openeo_aggregator.utils import MultiDictGetter, common_prefix
from openeo_driver.errors import OpenEOApiException

_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -101,15 +101,17 @@ def merge_collection_metadata(
result[field] = getter.first(field)

# Summary merging
summaries_list = []
for i, cube_dim_dict in enumerate(getter.select("summaries").dictionaries):
backend_id = list(by_backend.keys())[i]
summaries_by_backend = {}
for backend_id, collection in by_backend.items():
try:
summary = StacSummaries.from_dict(cube_dim_dict)
summaries_list.append((f"{backend_id}:{cid}", summary))
summaries_by_backend[backend_id] = StacSummaries.from_dict(
collection.get("summaries")
)
except Exception as e:
report(repr(e), collection_id=cid, backend_id=backend_id)
result["summaries"] = StacSummaries.merge_all(summaries_list, report).to_dict()
report("Failed to parse summaries", collection_id=cid, backend_id=backend_id, exception=e)
result["summaries"] = StacSummaries.merge_all(
summaries_by_backend=summaries_by_backend, report=report, collection_id=cid
).to_dict()

# Assets
if getter.has_key("assets"):
Expand Down Expand Up @@ -182,15 +184,7 @@ def merge_collection_metadata(
try:
# Find common prefix of bands
# TODO: better approach? e.g. keep everything and rewrite process graphs on the fly?
bands_iterator = cube_dim_getter.select(dim).get("values")
prefix = next(bands_iterator)
for bands in bands_iterator:
prefix = [t[0] for t in itertools.takewhile(lambda t: t[0] == t[1], zip(prefix, bands))]
if bands != prefix:
report(
f"Trimming bands {bands} to common prefix {prefix}",
collection_id=cid,
)
prefix = common_prefix(cube_dim_getter.select(dim).get("values"))
if len(prefix) > 0:
result["cube:dimensions"][dim]["values"] = prefix
else:
Expand Down
3 changes: 2 additions & 1 deletion src/openeo_aggregator/metadata/models/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ After that, additional validation rules and merging logic was added manually.

You can follow these steps to generate the models yourself:
1. Install the openapi-generator: `pip install openapi-generator-cli`
2. Convert the [openapi.yaml](https://github.com/Open-EO/openeo-api/blob/master/openapi.yaml) file to json.
2. Convert the [openEO API openapi.yaml file](https://github.com/Open-EO/openeo-api/blob/master/openapi.yaml)
to json.

sudo openapi-generator-cli generate -g openapi -i openapi.yaml -o .

Expand Down
134 changes: 134 additions & 0 deletions src/openeo_aggregator/metadata/models/stac_eo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# This implementation is based on
# code generated with https://app.quicktype.io/
# using the spec of STAC extension "eo"
# https://github.com/stac-extensions/eo/blob/main/json-schema/schema.json

from enum import Enum
from dataclasses import dataclass
from typing import Optional, Any, List, Dict, TypeVar, Type, Callable, cast

from openeo.util import dict_no_none

T = TypeVar("T")
EnumT = TypeVar("EnumT", bound=Enum)


def from_float(x: Any) -> float:
assert isinstance(x, (float, int)) and not isinstance(x, bool)
return float(x)


def from_none(x: Any) -> Any:
assert x is None
return x


def from_union(fs, x):
for f in fs:
try:
return f(x)
except:
pass
assert False


def from_str(x: Any) -> str:
assert isinstance(x, str)
return x


def to_float(x: Any) -> float:
assert isinstance(x, float)
return x


def to_enum(c: Type[EnumT], x: Any) -> EnumT:
assert isinstance(x, c)
return x.value


def from_list(f: Callable[[Any], T], x: Any) -> List[T]:
assert isinstance(x, list)
return [f(y) for y in x]


def to_class(c: Type[T], x: Any) -> dict:
assert isinstance(x, c)
return cast(Any, x).to_dict()


def from_dict(f: Callable[[Any], T], x: Any) -> Dict[str, T]:
assert isinstance(x, dict)
return {k: f(v) for (k, v) in x.items()}


class CommonNameOfTheBand(Enum):
BLUE = "blue"
CIRRUS = "cirrus"
COASTAL = "coastal"
GREEN = "green"
LWIR = "lwir"
LWIR11 = "lwir11"
LWIR12 = "lwir12"
NIR = "nir"
NIR08 = "nir08"
NIR09 = "nir09"
PAN = "pan"
RED = "red"
REDEDGE = "rededge"
SWIR16 = "swir16"
SWIR22 = "swir22"
YELLOW = "yellow"


@dataclass
class EoBand:
center_wavelength: Optional[float] = None
common_name: Optional[CommonNameOfTheBand] = None
full_width_half_max: Optional[float] = None
name: Optional[str] = None
solar_illumination: Optional[float] = None

@staticmethod
def from_dict(obj: Any) -> "EoBand":
assert isinstance(obj, dict)
center_wavelength = from_union(
[from_float, from_none], obj.get("center_wavelength")
)
common_name = from_union(
[CommonNameOfTheBand, from_none], obj.get("common_name")
)
full_width_half_max = from_union(
[from_float, from_none], obj.get("full_width_half_max")
)
name = from_union([from_str, from_none], obj.get("name"))
solar_illumination = from_union(
[from_float, from_none], obj.get("solar_illumination")
)
return EoBand(
center_wavelength,
common_name,
full_width_half_max,
name,
solar_illumination,
)

def to_dict(self) -> dict:
return dict_no_none(
{
"center_wavelength": from_union(
[to_float, from_none], self.center_wavelength
),
"common_name": from_union(
[lambda x: to_enum(CommonNameOfTheBand, x), from_none],
self.common_name,
),
"full_width_half_max": from_union(
[to_float, from_none], self.full_width_half_max
),
"name": from_union([from_str, from_none], self.name),
"solar_illumination": from_union(
[to_float, from_none], self.solar_illumination
),
}
)
67 changes: 53 additions & 14 deletions src/openeo_aggregator/metadata/models/stac_summaries.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

import attr

from openeo_aggregator.metadata.models.stac_eo import EoBand
from openeo_aggregator.metadata.models.statistics import Statistics
from openeo_aggregator.metadata.utils import merge_lists_skip_duplicates
from openeo_aggregator.utils import common_prefix

T = TypeVar("T", bound="StacSummaries")

Expand Down Expand Up @@ -42,6 +44,8 @@ class StacSummaries:
It is generally allowed to add custom fields.
"""

# TODO: this is a confusing name, why not just "summaries"?
additional_properties: Dict[str, Union[List[Any], Statistics, Dict]] = attr.ib(init=True, factory=dict)

def to_dict(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -111,36 +115,71 @@ def __contains__(self, key: str) -> bool:

@staticmethod
def merge_all(
summaries_list: List[Tuple[str, "StacSummaries"]],
report
summaries_by_backend: Dict[str, "StacSummaries"],
report,
collection_id: str,
) -> "StacSummaries":
"""
Merge all summaries into one.
:param summaries_list: List of summaries to merge each as a tuple of (collection_identifier, summary).
:param summaries_list: List of summaries to merge each as a tuple of (backend_id, summary).
:param report: logging function to report errors
:return: Merged summaries
"""
additional_properties = [(cid, x.additional_properties) for cid, x in summaries_list]
by_backend = {
k: v.additional_properties for k, v in summaries_by_backend.items()
}
# Calculate the unique summary names.
unique_summary_names: Set[str] = functools.reduce(lambda a, b: a.union(b), (d.keys() for _, d in additional_properties), set())
unique_summary_names: Set[str] = functools.reduce(
lambda a, b: a.union(b), (d.keys() for d in by_backend.values()), set()
)

merged_addition_properties = {}
for summary_name in unique_summary_names:
if (
summary_name in ["constellation", "platform", "instruments"]
summary_name
in [
"constellation",
"platform",
"instruments",
"gsd",
]
or summary_name.startswith("sar:")
or summary_name.startswith("sat:")
):
summary_lists = [d.get(summary_name, []) for _, d in additional_properties]
summary_lists = [d.get(summary_name, []) for d in by_backend.values()]
merged_addition_properties[summary_name] = merge_lists_skip_duplicates(summary_lists)
elif summary_name == "eo:bands":
eo_bands_lists = []
for collection_summaries in by_backend.values():
try:
if summary_name in collection_summaries:
eo_bands_lists.append(
[
EoBand.from_dict(b)
for b in collection_summaries.get(summary_name)
]
)
except Exception as e:
report(
f"Failed to parse summary {summary_name!r}: {e!r}",
collection_id=collection_id,
)
prefix: List[EoBand] = common_prefix(eo_bands_lists)
if len(prefix) > 0:
eo_bands = [b.to_dict() for b in prefix]
else:
report(
f"Empty prefix for {summary_name!r}, falling back to first back-end's {summary_name!r}",
collection_id=collection_id,
)
eo_bands = next(
d.get(summary_name)
for d in by_backend.values()
if summary_name in d
)
merged_addition_properties[summary_name] = eo_bands
else:
backends = ",".join(
[cid for cid, d in additional_properties if summary_name in d]
)
report(
f"Unhandled merging of StacSummaries for summary_name: {summary_name!r}",
backend_id=backends,
)
report(f"Unhandled merging of summary {summary_name!r}")
return StacSummaries(additional_properties=merged_addition_properties)
17 changes: 17 additions & 0 deletions src/openeo_aggregator/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import itertools

import datetime
import functools
import logging
Expand Down Expand Up @@ -188,3 +190,18 @@ def timestamp_to_rfc3339(timestamp: float) -> str:

def normalize_issuer_url(url: str) -> str:
return url.rstrip("/").lower()


def common_prefix(lists: Iterable[Iterable[Any]]) -> List[Any]:
"""Find common prefix of a set of sequences."""
list_iterator = iter(lists)
try:
prefix = list(next(list_iterator))
except StopIteration:
prefix = []
for other in list_iterator:
prefix = [
t[0]
for t in itertools.takewhile(lambda t: t[0] == t[1], zip(prefix, other))
]
return prefix
16 changes: 16 additions & 0 deletions tests/metadata/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import pytest


class ListReporter:
"""Reporter for testing"""

def __init__(self):
self.logs = []

def __call__(self, msg, **kwargs):
self.logs.append({"msg": msg, **kwargs})


@pytest.fixture
def reporter() -> ListReporter:
return ListReporter()
16 changes: 0 additions & 16 deletions tests/metadata/test_merging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,6 @@
from openeo_aggregator.metadata.merging import ProcessMetadataMerger, json_diff
from openeo_aggregator.testing import same_repr


class ListReporter:
"""Reporter for testing"""

def __init__(self):
self.logs = []

def __call__(self, msg, **kwargs):
self.logs.append({"msg": msg, **kwargs})


@pytest.fixture
def reporter() -> ListReporter:
return ListReporter()


class TestMergeProcessMetadata:
@pytest.fixture
def merger(self, reporter) -> ProcessMetadataMerger:
Expand Down

0 comments on commit bb2d213

Please sign in to comment.