Skip to content

Commit

Permalink
Issue #401/#449 support format guessing in VectorCube.execute_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Jul 18, 2023
1 parent 8d1d947 commit 31766cb
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 57 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Expand Up @@ -8,7 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Add support in `VectoCube.download()` to guess output format from extension of a given filename
- Add support in `VectoCube.download()` and `VectorCube.execute_batch()` to guess output format from extension of a given filename
([#401](https://github.com/Open-EO/openeo-python-client/issues/401), [#449](https://github.com/Open-EO/openeo-python-client/issues/449))
- Added `load_stac` for Client Side Processing, based on the [openeo-processes-dask implementation](https://github.com/Open-EO/openeo-processes-dask/pull/127)

Expand Down
4 changes: 2 additions & 2 deletions openeo/rest/datacube.py
Expand Up @@ -1942,7 +1942,7 @@ def download(
:param options: Optional, file format options
:return: None if the result is stored to disk, or a bytes object returned by the backend.
"""
if format is None and outputfile is not None:
if format is None and outputfile:
# TODO #401/#449 don't guess/override format if there is already a save_result with format?
format = guess_format(outputfile)
cube = self._ensure_save_result(format=format, options=options)
Expand Down Expand Up @@ -2062,7 +2062,7 @@ def execute_batch(
"""
if "format" in format_options and not out_format:
out_format = format_options["format"] # align with 'download' call arg name
if not out_format and outputfile:
if out_format is None and outputfile:
# TODO #401/#449 don't guess/override format if there is already a save_result with format?
out_format = guess_format(outputfile)

Expand Down
27 changes: 16 additions & 11 deletions openeo/rest/vectorcube.py
Expand Up @@ -139,16 +139,22 @@ def execute(self) -> dict:
def download(self, outputfile: Union[str, pathlib.Path], format: Optional[str] = None, options: dict = None):
# TODO #401 make outputfile optional (See DataCube.download)
# TODO #401/#449 don't guess/override format if there is already a save_result with format?
if format is None and outputfile is not None:
if format is None and outputfile:
format = guess_format(outputfile)
cube = self._ensure_save_result(format=format, options=options)
return self._connection.download(cube.flat_graph(), outputfile)

def execute_batch(
self,
outputfile: Union[str, pathlib.Path] = None, out_format: str = None,
print=print, max_poll_interval=60, connection_retry_interval=30,
job_options=None, **format_options) -> BatchJob:
self,
outputfile: Optional[Union[str, pathlib.Path]] = None,
out_format: Optional[str] = None,
print=print,
max_poll_interval: float = 60,
connection_retry_interval: float = 30,
job_options: Optional[dict] = None,
# TODO: avoid using kwargs as format options
**format_options,
) -> BatchJob:
"""
Evaluate the process graph by creating a batch job, and retrieving the results when it is finished.
This method is mostly recommended if the batch job is expected to run in a reasonable amount of time.
Expand All @@ -159,8 +165,11 @@ def execute_batch(
:param outputfile: The path of a file to which a result can be written
:param out_format: (optional) Format of the job result.
:param format_options: String Parameters for the job result format
"""
if out_format is None and outputfile:
# TODO #401/#449 don't guess/override format if there is already a save_result with format?
out_format = guess_format(outputfile)

job = self.create_job(out_format, job_options=job_options, **format_options)
return job.run_synchronous(
# TODO #135 support multi file result sets too
Expand Down Expand Up @@ -193,11 +202,7 @@ def create_job(
"""
# TODO: avoid using all kwargs as format_options
# TODO: centralize `create_job` for `DataCube`, `VectorCube`, `MlModel`, ...
cube = self
if out_format:
# add `save_result` node
# TODO #401: avoid duplicate save_result
cube = cube.save_result(format=out_format, options=format_options)
cube = self._ensure_save_result(format=out_format, options=format_options or None)
return self._connection.create_job(
process_graph=cube.flat_graph(),
title=title,
Expand Down
178 changes: 135 additions & 43 deletions tests/rest/datacube/test_vectorcube.py
@@ -1,8 +1,9 @@
import re
from pathlib import Path
from typing import List

import pytest

from openeo import Connection
from openeo.internal.graph_building import PGNode
from openeo.rest.vectorcube import VectorCube

Expand All @@ -13,42 +14,101 @@ def vector_cube(con100) -> VectorCube:
return VectorCube(graph=pgnode, connection=con100)


class DownloadSpy:
class DummyBackend:
"""
Test helper to track download requests and optionally override next response to return.
Dummy backend that handles sync/batch execution requests
and allows inspection of posted process graphs
"""

__slots__ = ["requests", "next_response"]
def __init__(self, requests_mock, connection: Connection):
self.connection = connection
self.sync_requests = []
self.batch_jobs = {}
self.next_result = b"Result data"
requests_mock.post(connection.build_url("/result"), content=self._handle_post_result)
requests_mock.post(connection.build_url("/jobs"), content=self._handle_post_jobs)
requests_mock.post(
re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), content=self._handle_post_job_results
)
requests_mock.get(re.compile(connection.build_url(r"/jobs/(job-\d+)$")), json=self._handle_get_job)
requests_mock.get(
re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_get_job_results
)
requests_mock.get(
re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")),
content=self._handle_get_job_result_asset,
)

def __init__(self):
self.requests: List[dict] = []
self.next_response: bytes = b"Spy data"
def _handle_post_result(self, request, context):
"""handler of `POST /result` (synchronous execute)"""
pg = request.json()["process"]["process_graph"]
self.sync_requests.append(pg)
return self.next_result

@property
def only_request(self) -> dict:
"""Get progress graph of only request done"""
assert len(self.requests) == 1
return self.requests[-1]
def _handle_post_jobs(self, request, context):
"""handler of `POST /jobs` (create batch job)"""
pg = request.json()["process"]["process_graph"]
job_id = f"job-{len(self.batch_jobs):03d}"
self.batch_jobs[job_id] = {"job_id": job_id, "pg": pg, "status": "created"}
context.status_code = 201
context.headers["openeo-identifier"] = job_id

@property
def last_request(self) -> dict:
"""Get last progress graph"""
assert len(self.requests) > 0
return self.requests[-1]
def _get_job_id(self, request) -> str:
match = re.match(r"^/jobs/(job-\d+)(/|$)", request.path)
if not match:
raise ValueError(f"Failed to extract job_id from {request.path}")
job_id = match.group(1)
assert job_id in self.batch_jobs
return job_id

def _handle_post_job_results(self, request, context):
"""Handler of `POST /job/{job_id}/results` (start batch job)."""
job_id = self._get_job_id(request)
assert self.batch_jobs[job_id]["status"] == "created"
# TODO: support custom status sequence (instead of directly going to status "finished")?
self.batch_jobs[job_id]["status"] = "finished"
context.status_code = 202

@pytest.fixture
def download_spy(requests_mock, con100) -> DownloadSpy:
"""Test fixture to spy on (and mock) `POST /result` (download) requests."""
spy = DownloadSpy()
def _handle_get_job(self, request, context):
"""Handler of `GET /job/{job_id}` (get batch job status and metadata)."""
job_id = self._get_job_id(request)
return {"id": job_id, "status": self.batch_jobs[job_id]["status"]}

def post_result(request, context):
pg = request.json()["process"]["process_graph"]
spy.requests.append(pg)
return spy.next_response
def _handle_get_job_results(self, request, context):
"""Handler of `GET /job/{job_id}/results` (list batch job results)."""
job_id = self._get_job_id(request)
assert self.batch_jobs[job_id]["status"] == "finished"
return {
"id": job_id,
"assets": {"result.data": {"href": self.connection.build_url(f"/jobs/{job_id}/results/result.data")}},
}

def _handle_get_job_result_asset(self, request, context):
"""Handler of `GET /job/{job_id}/results/result.data` (get batch job result asset)."""
job_id = self._get_job_id(request)
assert self.batch_jobs[job_id]["status"] == "finished"
return self.next_result

def get_sync_pg(self) -> dict:
"""Get one and only synchronous process graph"""
assert len(self.sync_requests) == 1
return self.sync_requests[0]

def get_batch_pg(self) -> dict:
"""Get one and only batch process graph"""
assert len(self.batch_jobs) == 1
return self.batch_jobs[max(self.batch_jobs.keys())]["pg"]

requests_mock.post(con100.build_url("/result"), content=post_result)
yield spy
def get_pg(self) -> dict:
"""Get one and only batch process graph (sync or batch)"""
pgs = self.sync_requests + [b["pg"] for b in self.batch_jobs.values()]
assert len(pgs) == 1
return pgs[0]


@pytest.fixture
def dummy_backend(requests_mock, con100) -> DummyBackend:
yield DummyBackend(requests_mock=requests_mock, connection=con100)


def test_raster_to_vector(con100):
Expand Down Expand Up @@ -91,13 +151,19 @@ def test_raster_to_vector(con100):
],
)
@pytest.mark.parametrize("path_class", [str, Path])
@pytest.mark.parametrize("exec_mode", ["sync", "batch"])
def test_download_auto_save_result_only_file(
vector_cube, download_spy, tmp_path, filename, expected_format, path_class
vector_cube, dummy_backend, tmp_path, filename, expected_format, path_class, exec_mode
):
output_path = tmp_path / filename
vector_cube.download(path_class(output_path))
if exec_mode == "sync":
vector_cube.download(path_class(output_path))
elif exec_mode == "batch":
vector_cube.execute_batch(outputfile=path_class(output_path))
else:
raise ValueError(exec_mode)

assert download_spy.only_request == {
assert dummy_backend.get_pg() == {
"createvectorcube1": {"process_id": "create_vector_cube", "arguments": {}},
"saveresult1": {
"process_id": "save_result",
Expand All @@ -109,7 +175,7 @@ def test_download_auto_save_result_only_file(
"result": True,
},
}
assert output_path.read_bytes() == b"Spy data"
assert output_path.read_bytes() == b"Result data"


@pytest.mark.parametrize(
Expand All @@ -126,11 +192,19 @@ def test_download_auto_save_result_only_file(
# TODO #449 more formats to autodetect?
],
)
def test_download_auto_save_result_with_format(vector_cube, download_spy, tmp_path, filename, format, expected_format):
@pytest.mark.parametrize("exec_mode", ["sync", "batch"])
def test_download_auto_save_result_with_format(
vector_cube, dummy_backend, tmp_path, filename, format, expected_format, exec_mode
):
output_path = tmp_path / filename
vector_cube.download(output_path, format=format)
if exec_mode == "sync":
vector_cube.download(output_path, format=format)
elif exec_mode == "batch":
vector_cube.execute_batch(outputfile=output_path, out_format=format)
else:
raise ValueError(exec_mode)

assert download_spy.only_request == {
assert dummy_backend.get_pg() == {
"createvectorcube1": {"process_id": "create_vector_cube", "arguments": {}},
"saveresult1": {
"process_id": "save_result",
Expand All @@ -142,14 +216,23 @@ def test_download_auto_save_result_with_format(vector_cube, download_spy, tmp_pa
"result": True,
},
}
assert output_path.read_bytes() == b"Spy data"
assert output_path.read_bytes() == b"Result data"


def test_download_auto_save_result_with_options(vector_cube, download_spy, tmp_path):
@pytest.mark.parametrize("exec_mode", ["sync", "batch"])
def test_download_auto_save_result_with_options(vector_cube, dummy_backend, tmp_path, exec_mode):
output_path = tmp_path / "result.json"
vector_cube.download(output_path, format="GeoJSON", options={"precision": 7})
format = "GeoJSON"
options = {"precision": 7}

assert download_spy.only_request == {
if exec_mode == "sync":
vector_cube.download(output_path, format=format, options=options)
elif exec_mode == "batch":
vector_cube.execute_batch(outputfile=output_path, out_format=format, **options)
else:
raise ValueError(exec_mode)

assert dummy_backend.get_pg() == {
"createvectorcube1": {"process_id": "create_vector_cube", "arguments": {}},
"saveresult1": {
"process_id": "save_result",
Expand All @@ -161,7 +244,7 @@ def test_download_auto_save_result_with_options(vector_cube, download_spy, tmp_p
"result": True,
},
}
assert output_path.read_bytes() == b"Spy data"
assert output_path.read_bytes() == b"Result data"


@pytest.mark.parametrize(
Expand All @@ -173,17 +256,26 @@ def test_download_auto_save_result_with_options(vector_cube, download_spy, tmp_p
("result.nc", "netCDF", "netCDF"),
],
)
def test_save_result_and_download(vector_cube, download_spy, tmp_path, output_file, format, expected_format):
@pytest.mark.parametrize("exec_mode", ["sync", "batch"])
def test_save_result_and_download(
vector_cube, dummy_backend, tmp_path, output_file, format, expected_format, exec_mode
):
"""e.g. https://github.com/Open-EO/openeo-geopyspark-driver/issues/477"""
vector_cube = vector_cube.save_result(format=format)
output_path = tmp_path / output_file
vector_cube.download(output_path)
assert download_spy.only_request == {
if exec_mode == "sync":
vector_cube.download(output_path)
elif exec_mode == "batch":
vector_cube.execute_batch(outputfile=output_path)
else:
raise ValueError(exec_mode)

assert dummy_backend.get_pg() == {
"createvectorcube1": {"process_id": "create_vector_cube", "arguments": {}},
"saveresult1": {
"process_id": "save_result",
"arguments": {"data": {"from_node": "createvectorcube1"}, "format": expected_format, "options": {}},
"result": True,
},
}
assert output_path.read_bytes() == b"Spy data"
assert output_path.read_bytes() == b"Result data"

0 comments on commit 31766cb

Please sign in to comment.