diff --git a/CHANGELOG.md b/CHANGELOG.md index d3d26f233..de7159df5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Start depending on `pystac`, initially for better `load_stac` support ([#133](https://github.com/Open-EO/openeo-python-client/issues/133), [#527](https://github.com/Open-EO/openeo-python-client/issues/527)) +- Added `connection.load_stac_from_job()` to load the results from a job by using the STAC metadata. ([#566](https://github.com/Open-EO/openeo-python-client/issues/566)) ### Changed diff --git a/openeo/rest/connection.py b/openeo/rest/connection.py index 0b164e094..805e0eae2 100644 --- a/openeo/rest/connection.py +++ b/openeo/rest/connection.py @@ -1261,7 +1261,7 @@ def load_result( def load_stac( self, url: str, - spatial_extent: Optional[Dict[str, float]] = None, + spatial_extent: Union[Dict[str, float], Parameter, None] = None, temporal_extent: Union[Sequence[InputDate], Parameter, str, None] = None, bands: Optional[List[str]] = None, properties: Optional[Dict[str, Union[str, PGNode, Callable]]] = None, @@ -1385,6 +1385,60 @@ def load_stac( _log.warning(f"Failed to extract cube metadata from STAC URL {url}", exc_info=True) return cube + def load_stac_from_job( + self, + job: Union[BatchJob, str], + spatial_extent: Union[Dict[str, float], Parameter, None] = None, + temporal_extent: Union[Sequence[InputDate], Parameter, str, None] = None, + bands: Optional[List[str]] = None, + properties: Optional[Dict[str, Union[str, PGNode, Callable]]] = None, + ) -> DataCube: + """ + Wrapper for :py:meth:`load_stac` that loads the result of a previous job using the STAC collection of its results. + + :param job: a :py:class:`~openeo.rest.job.BatchJob` or job id pointing to a finished job. + Note that the :py:class:`~openeo.rest.job.BatchJob` approach allows to point + to a batch job on a different back-end. + :param spatial_extent: limit data to specified bounding box or polygons + :param temporal_extent: limit data to specified temporal interval. + :param bands: limit data to the specified bands + + .. versionadded:: 0.30.0 + """ + if isinstance(job, str): + job = BatchJob(job_id=job, connection=self) + elif not isinstance(job, BatchJob): + raise ValueError("job must be a BatchJob or job id") + + try: + job_results = job.get_results() + + canonical_links = [ + link["href"] + for link in job_results.get_metadata().get("links", []) + if link.get("rel") == "canonical" and "href" in link + ] + if len(canonical_links) == 0: + _log.warning("No canonical link found in job results metadata. Using job results URL instead.") + stac_link = job.get_results_metadata_url(full=True) + else: + if len(canonical_links) > 1: + _log.warning( + f"Multiple canonical links found in job results metadata: {canonical_links}. Picking first one." + ) + stac_link = canonical_links[0] + except OpenEoApiError as e: + _log.warning(f"Failed to get the canonical job results: {e!r}. Using job results URL instead.") + stac_link = job.get_results_metadata_url(full=True) + + return self.load_stac( + url=stac_link, + spatial_extent=spatial_extent, + temporal_extent=temporal_extent, + bands=bands, + properties=properties, + ) + def load_ml_model(self, id: Union[str, BatchJob]) -> MlModel: """ Loads a machine learning model from a STAC Item. diff --git a/tests/rest/test_connection.py b/tests/rest/test_connection.py index 2c4bd25d8..26d1a2364 100644 --- a/tests/rest/test_connection.py +++ b/tests/rest/test_connection.py @@ -2456,6 +2456,105 @@ def test_properties(self, con120): } } + def test_load_stac_from_job_canonical(self, con120, requests_mock): + requests_mock.get( + API_URL + "jobs/j0bi6/results", + json={ + "links": [ + { + "href": "https://wrong.test", + "rel": "self", + }, + { + "href": "https://stac.test", + "rel": "canonical", + }, + ] + }, + ) + job = con120.job("j0bi6") + cube = con120.load_stac_from_job(job) + + fg = cube.flat_graph() + assert fg == { + "loadstac1": { + "process_id": "load_stac", + "arguments": {"url": "https://stac.test"}, + "result": True, + } + } + + def test_load_stac_from_job_unsigned(self, con120, requests_mock): + requests_mock.get( + API_URL + "jobs/j0bi6/results", + json={ + "links": [ + { + "href": "https://wrong.test", + "rel": "self", + }, + ] + }, + ) + job = con120.job("j0bi6") + unsigned_link = job.get_results_metadata_url(full=True) + + cube = con120.load_stac_from_job(job) + fg = cube.flat_graph() + + assert fg == { + "loadstac1": { + "process_id": "load_stac", + "arguments": {"url": API_URL + "jobs/j0bi6/results"}, + "result": True, + } + } + + def test_load_stac_from_job_from_jobid(self, con120, requests_mock): + requests_mock.get( + API_URL + "jobs/j0bi6/results", + json={ + "links": [ + { + "href": "https://wrong.test", + "rel": "self", + }, + { + "href": "https://stac.test", + "rel": "canonical", + }, + ] + }, + ) + jobid = "j0bi6" + cube = con120.load_stac_from_job(jobid) + + fg = cube.flat_graph() + assert fg == { + "loadstac1": { + "process_id": "load_stac", + "arguments": {"url": "https://stac.test"}, + "result": True, + } + } + + def test_load_stac_from_job_empty_result(self, con120, requests_mock): + requests_mock.get( + API_URL + "jobs/j0bi6/results", + json={"links": []}, + ) + jobid = "j0bi6" + cube = con120.load_stac_from_job(jobid) + + fg = cube.flat_graph() + assert fg == { + "loadstac1": { + "process_id": "load_stac", + "arguments": {"url": API_URL + "jobs/j0bi6/results"}, + "result": True, + } + } + @pytest.mark.parametrize( "data",