From 31766cb411ac05d37065f8eba02a179e7a391585 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Tue, 18 Jul 2023 22:00:33 +0200 Subject: [PATCH] Issue #401/#449 support format guessing in `VectorCube.execute_batch` --- CHANGELOG.md | 2 +- openeo/rest/datacube.py | 4 +- openeo/rest/vectorcube.py | 27 ++-- tests/rest/datacube/test_vectorcube.py | 178 +++++++++++++++++++------ 4 files changed, 154 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f3d99577..b466f8162 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Add support in `VectoCube.download()` to guess output format from extension of a given filename +- Add support in `VectoCube.download()` and `VectorCube.execute_batch()` to guess output format from extension of a given filename ([#401](https://github.com/Open-EO/openeo-python-client/issues/401), [#449](https://github.com/Open-EO/openeo-python-client/issues/449)) - Added `load_stac` for Client Side Processing, based on the [openeo-processes-dask implementation](https://github.com/Open-EO/openeo-processes-dask/pull/127) diff --git a/openeo/rest/datacube.py b/openeo/rest/datacube.py index 6d46bcd37..ac2147213 100644 --- a/openeo/rest/datacube.py +++ b/openeo/rest/datacube.py @@ -1942,7 +1942,7 @@ def download( :param options: Optional, file format options :return: None if the result is stored to disk, or a bytes object returned by the backend. """ - if format is None and outputfile is not None: + if format is None and outputfile: # TODO #401/#449 don't guess/override format if there is already a save_result with format? format = guess_format(outputfile) cube = self._ensure_save_result(format=format, options=options) @@ -2062,7 +2062,7 @@ def execute_batch( """ if "format" in format_options and not out_format: out_format = format_options["format"] # align with 'download' call arg name - if not out_format and outputfile: + if out_format is None and outputfile: # TODO #401/#449 don't guess/override format if there is already a save_result with format? out_format = guess_format(outputfile) diff --git a/openeo/rest/vectorcube.py b/openeo/rest/vectorcube.py index 57e33342b..78ccd6d17 100644 --- a/openeo/rest/vectorcube.py +++ b/openeo/rest/vectorcube.py @@ -139,16 +139,22 @@ def execute(self) -> dict: def download(self, outputfile: Union[str, pathlib.Path], format: Optional[str] = None, options: dict = None): # TODO #401 make outputfile optional (See DataCube.download) # TODO #401/#449 don't guess/override format if there is already a save_result with format? - if format is None and outputfile is not None: + if format is None and outputfile: format = guess_format(outputfile) cube = self._ensure_save_result(format=format, options=options) return self._connection.download(cube.flat_graph(), outputfile) def execute_batch( - self, - outputfile: Union[str, pathlib.Path] = None, out_format: str = None, - print=print, max_poll_interval=60, connection_retry_interval=30, - job_options=None, **format_options) -> BatchJob: + self, + outputfile: Optional[Union[str, pathlib.Path]] = None, + out_format: Optional[str] = None, + print=print, + max_poll_interval: float = 60, + connection_retry_interval: float = 30, + job_options: Optional[dict] = None, + # TODO: avoid using kwargs as format options + **format_options, + ) -> BatchJob: """ Evaluate the process graph by creating a batch job, and retrieving the results when it is finished. This method is mostly recommended if the batch job is expected to run in a reasonable amount of time. @@ -159,8 +165,11 @@ def execute_batch( :param outputfile: The path of a file to which a result can be written :param out_format: (optional) Format of the job result. :param format_options: String Parameters for the job result format - """ + if out_format is None and outputfile: + # TODO #401/#449 don't guess/override format if there is already a save_result with format? + out_format = guess_format(outputfile) + job = self.create_job(out_format, job_options=job_options, **format_options) return job.run_synchronous( # TODO #135 support multi file result sets too @@ -193,11 +202,7 @@ def create_job( """ # TODO: avoid using all kwargs as format_options # TODO: centralize `create_job` for `DataCube`, `VectorCube`, `MlModel`, ... - cube = self - if out_format: - # add `save_result` node - # TODO #401: avoid duplicate save_result - cube = cube.save_result(format=out_format, options=format_options) + cube = self._ensure_save_result(format=out_format, options=format_options or None) return self._connection.create_job( process_graph=cube.flat_graph(), title=title, diff --git a/tests/rest/datacube/test_vectorcube.py b/tests/rest/datacube/test_vectorcube.py index 77539cca3..c89ec09da 100644 --- a/tests/rest/datacube/test_vectorcube.py +++ b/tests/rest/datacube/test_vectorcube.py @@ -1,8 +1,9 @@ +import re from pathlib import Path -from typing import List import pytest +from openeo import Connection from openeo.internal.graph_building import PGNode from openeo.rest.vectorcube import VectorCube @@ -13,42 +14,101 @@ def vector_cube(con100) -> VectorCube: return VectorCube(graph=pgnode, connection=con100) -class DownloadSpy: +class DummyBackend: """ - Test helper to track download requests and optionally override next response to return. + Dummy backend that handles sync/batch execution requests + and allows inspection of posted process graphs """ - __slots__ = ["requests", "next_response"] + def __init__(self, requests_mock, connection: Connection): + self.connection = connection + self.sync_requests = [] + self.batch_jobs = {} + self.next_result = b"Result data" + requests_mock.post(connection.build_url("/result"), content=self._handle_post_result) + requests_mock.post(connection.build_url("/jobs"), content=self._handle_post_jobs) + requests_mock.post( + re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), content=self._handle_post_job_results + ) + requests_mock.get(re.compile(connection.build_url(r"/jobs/(job-\d+)$")), json=self._handle_get_job) + requests_mock.get( + re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_get_job_results + ) + requests_mock.get( + re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")), + content=self._handle_get_job_result_asset, + ) - def __init__(self): - self.requests: List[dict] = [] - self.next_response: bytes = b"Spy data" + def _handle_post_result(self, request, context): + """handler of `POST /result` (synchronous execute)""" + pg = request.json()["process"]["process_graph"] + self.sync_requests.append(pg) + return self.next_result - @property - def only_request(self) -> dict: - """Get progress graph of only request done""" - assert len(self.requests) == 1 - return self.requests[-1] + def _handle_post_jobs(self, request, context): + """handler of `POST /jobs` (create batch job)""" + pg = request.json()["process"]["process_graph"] + job_id = f"job-{len(self.batch_jobs):03d}" + self.batch_jobs[job_id] = {"job_id": job_id, "pg": pg, "status": "created"} + context.status_code = 201 + context.headers["openeo-identifier"] = job_id - @property - def last_request(self) -> dict: - """Get last progress graph""" - assert len(self.requests) > 0 - return self.requests[-1] + def _get_job_id(self, request) -> str: + match = re.match(r"^/jobs/(job-\d+)(/|$)", request.path) + if not match: + raise ValueError(f"Failed to extract job_id from {request.path}") + job_id = match.group(1) + assert job_id in self.batch_jobs + return job_id + def _handle_post_job_results(self, request, context): + """Handler of `POST /job/{job_id}/results` (start batch job).""" + job_id = self._get_job_id(request) + assert self.batch_jobs[job_id]["status"] == "created" + # TODO: support custom status sequence (instead of directly going to status "finished")? + self.batch_jobs[job_id]["status"] = "finished" + context.status_code = 202 -@pytest.fixture -def download_spy(requests_mock, con100) -> DownloadSpy: - """Test fixture to spy on (and mock) `POST /result` (download) requests.""" - spy = DownloadSpy() + def _handle_get_job(self, request, context): + """Handler of `GET /job/{job_id}` (get batch job status and metadata).""" + job_id = self._get_job_id(request) + return {"id": job_id, "status": self.batch_jobs[job_id]["status"]} - def post_result(request, context): - pg = request.json()["process"]["process_graph"] - spy.requests.append(pg) - return spy.next_response + def _handle_get_job_results(self, request, context): + """Handler of `GET /job/{job_id}/results` (list batch job results).""" + job_id = self._get_job_id(request) + assert self.batch_jobs[job_id]["status"] == "finished" + return { + "id": job_id, + "assets": {"result.data": {"href": self.connection.build_url(f"/jobs/{job_id}/results/result.data")}}, + } + + def _handle_get_job_result_asset(self, request, context): + """Handler of `GET /job/{job_id}/results/result.data` (get batch job result asset).""" + job_id = self._get_job_id(request) + assert self.batch_jobs[job_id]["status"] == "finished" + return self.next_result + + def get_sync_pg(self) -> dict: + """Get one and only synchronous process graph""" + assert len(self.sync_requests) == 1 + return self.sync_requests[0] + + def get_batch_pg(self) -> dict: + """Get one and only batch process graph""" + assert len(self.batch_jobs) == 1 + return self.batch_jobs[max(self.batch_jobs.keys())]["pg"] - requests_mock.post(con100.build_url("/result"), content=post_result) - yield spy + def get_pg(self) -> dict: + """Get one and only batch process graph (sync or batch)""" + pgs = self.sync_requests + [b["pg"] for b in self.batch_jobs.values()] + assert len(pgs) == 1 + return pgs[0] + + +@pytest.fixture +def dummy_backend(requests_mock, con100) -> DummyBackend: + yield DummyBackend(requests_mock=requests_mock, connection=con100) def test_raster_to_vector(con100): @@ -91,13 +151,19 @@ def test_raster_to_vector(con100): ], ) @pytest.mark.parametrize("path_class", [str, Path]) +@pytest.mark.parametrize("exec_mode", ["sync", "batch"]) def test_download_auto_save_result_only_file( - vector_cube, download_spy, tmp_path, filename, expected_format, path_class + vector_cube, dummy_backend, tmp_path, filename, expected_format, path_class, exec_mode ): output_path = tmp_path / filename - vector_cube.download(path_class(output_path)) + if exec_mode == "sync": + vector_cube.download(path_class(output_path)) + elif exec_mode == "batch": + vector_cube.execute_batch(outputfile=path_class(output_path)) + else: + raise ValueError(exec_mode) - assert download_spy.only_request == { + assert dummy_backend.get_pg() == { "createvectorcube1": {"process_id": "create_vector_cube", "arguments": {}}, "saveresult1": { "process_id": "save_result", @@ -109,7 +175,7 @@ def test_download_auto_save_result_only_file( "result": True, }, } - assert output_path.read_bytes() == b"Spy data" + assert output_path.read_bytes() == b"Result data" @pytest.mark.parametrize( @@ -126,11 +192,19 @@ def test_download_auto_save_result_only_file( # TODO #449 more formats to autodetect? ], ) -def test_download_auto_save_result_with_format(vector_cube, download_spy, tmp_path, filename, format, expected_format): +@pytest.mark.parametrize("exec_mode", ["sync", "batch"]) +def test_download_auto_save_result_with_format( + vector_cube, dummy_backend, tmp_path, filename, format, expected_format, exec_mode +): output_path = tmp_path / filename - vector_cube.download(output_path, format=format) + if exec_mode == "sync": + vector_cube.download(output_path, format=format) + elif exec_mode == "batch": + vector_cube.execute_batch(outputfile=output_path, out_format=format) + else: + raise ValueError(exec_mode) - assert download_spy.only_request == { + assert dummy_backend.get_pg() == { "createvectorcube1": {"process_id": "create_vector_cube", "arguments": {}}, "saveresult1": { "process_id": "save_result", @@ -142,14 +216,23 @@ def test_download_auto_save_result_with_format(vector_cube, download_spy, tmp_pa "result": True, }, } - assert output_path.read_bytes() == b"Spy data" + assert output_path.read_bytes() == b"Result data" -def test_download_auto_save_result_with_options(vector_cube, download_spy, tmp_path): +@pytest.mark.parametrize("exec_mode", ["sync", "batch"]) +def test_download_auto_save_result_with_options(vector_cube, dummy_backend, tmp_path, exec_mode): output_path = tmp_path / "result.json" - vector_cube.download(output_path, format="GeoJSON", options={"precision": 7}) + format = "GeoJSON" + options = {"precision": 7} - assert download_spy.only_request == { + if exec_mode == "sync": + vector_cube.download(output_path, format=format, options=options) + elif exec_mode == "batch": + vector_cube.execute_batch(outputfile=output_path, out_format=format, **options) + else: + raise ValueError(exec_mode) + + assert dummy_backend.get_pg() == { "createvectorcube1": {"process_id": "create_vector_cube", "arguments": {}}, "saveresult1": { "process_id": "save_result", @@ -161,7 +244,7 @@ def test_download_auto_save_result_with_options(vector_cube, download_spy, tmp_p "result": True, }, } - assert output_path.read_bytes() == b"Spy data" + assert output_path.read_bytes() == b"Result data" @pytest.mark.parametrize( @@ -173,12 +256,21 @@ def test_download_auto_save_result_with_options(vector_cube, download_spy, tmp_p ("result.nc", "netCDF", "netCDF"), ], ) -def test_save_result_and_download(vector_cube, download_spy, tmp_path, output_file, format, expected_format): +@pytest.mark.parametrize("exec_mode", ["sync", "batch"]) +def test_save_result_and_download( + vector_cube, dummy_backend, tmp_path, output_file, format, expected_format, exec_mode +): """e.g. https://github.com/Open-EO/openeo-geopyspark-driver/issues/477""" vector_cube = vector_cube.save_result(format=format) output_path = tmp_path / output_file - vector_cube.download(output_path) - assert download_spy.only_request == { + if exec_mode == "sync": + vector_cube.download(output_path) + elif exec_mode == "batch": + vector_cube.execute_batch(outputfile=output_path) + else: + raise ValueError(exec_mode) + + assert dummy_backend.get_pg() == { "createvectorcube1": {"process_id": "create_vector_cube", "arguments": {}}, "saveresult1": { "process_id": "save_result", @@ -186,4 +278,4 @@ def test_save_result_and_download(vector_cube, download_spy, tmp_path, output_fi "result": True, }, } - assert output_path.read_bytes() == b"Spy data" + assert output_path.read_bytes() == b"Result data"