-
Notifications
You must be signed in to change notification settings - Fork 273
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
256 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
# ============================================================================= | ||
# Copyright (c) 2021 Tom Kralidis | ||
# | ||
# Author: Tom Kralidis <tomkralidis@gmail.com> | ||
# | ||
# Contact email: tomkralidis@gmail.com | ||
# ============================================================================= | ||
|
||
import logging | ||
|
||
import requests | ||
|
||
from owslib.ogcapi import API | ||
from owslib.util import Authentication | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class Processes(API): | ||
"""Abstraction for OGC API - Processes | ||
* https://ogcapi.ogc.org/processes/overview.html | ||
* https://docs.opengeospatial.org/DRAFTS/18-062.html | ||
* https://app.swaggerhub.com/apis/geoprocessing/WPS-all-in-one/1.0-draft.6-SNAPSHOT | ||
""" | ||
|
||
def __init__(self, url: str, json_: str = None, timeout: int = 30, | ||
headers: dict = None, auth: Authentication = None): | ||
__doc__ = API.__doc__ # noqa | ||
super().__init__(url, json_, timeout, headers, auth) | ||
|
||
def processes(self) -> dict: | ||
""" | ||
implements: GET /processes | ||
Lists the processes this API offers. | ||
@returns: `dict` of available processes. | ||
""" | ||
|
||
path = 'processes' | ||
return self._request(path) | ||
|
||
def process_description(self, process_id: str) -> dict: | ||
""" | ||
implements: GET /processes/{process-id} | ||
Returns a detailed description of a process. | ||
@returns: `dict` of a process description. | ||
""" | ||
|
||
path = f'processes/{process_id}' | ||
return self._request(path) | ||
|
||
def job_list(self, process_id: str) -> dict: | ||
""" | ||
implements: GET /processes/{process-id}/jobs | ||
Returns the running and finished jobs for a process (optional). | ||
@returns: `dict` of .... | ||
""" | ||
|
||
path = f'processes/{process_id}/jobs' | ||
return self._request(path) | ||
|
||
def execute(self, process_id: str, json: dict) -> dict: | ||
""" | ||
implements: POST /processes/{process-id}/jobs | ||
Executes a process, i.e. creates a new job. Inputs and outputs will have | ||
to be specified in a JSON document that needs to be send in the POST body. | ||
@returns: `dict` of ... | ||
""" | ||
|
||
path = f'processes/{process_id}/jobs' | ||
return self._request_post(path, json) | ||
|
||
def _request_post(self, path: str, json: dict) -> dict: | ||
# TODO: needs to be implemented in base class | ||
url = self._build_url(path) | ||
|
||
response = requests.post(url, json=json) | ||
|
||
if response.status_code != requests.codes.ok: | ||
raise RuntimeError(response.text) | ||
|
||
return response.json() | ||
|
||
def status(self, process_id: str, job_id: str) -> dict: | ||
""" | ||
implements: GET /processes/{process-id}/jobs/{job-id} | ||
Returns the status of a job of a process. | ||
@returns: `dict` of ... | ||
""" | ||
|
||
path = f'processes/{process_id}/jobs/{job_id}' | ||
return self._request(path) | ||
|
||
def cancel(self, process_id: str, job_id: str) -> dict: | ||
""" | ||
implements: DELETE /processes/{process-id}/jobs/{job-id} | ||
Cancel a job execution. | ||
@returns: `dict` of ... | ||
""" | ||
|
||
path = f'processes/{process_id}/jobs/{job_id}' | ||
return self._request(path) | ||
|
||
def result(self, process_id: str, job_id: str) -> dict: | ||
""" | ||
implements: GET /processes/{process-id}/jobs/{job-id}/results | ||
Returns the result of a job of a process. | ||
@returns: `dict` of ... | ||
""" | ||
|
||
path = f'processes/{process_id}/jobs/{job_id}/results' | ||
return self._request(path) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from tests.utils import service_ok | ||
|
||
import pytest | ||
|
||
from owslib.ogcapi.processes import Processes | ||
|
||
SERVICE_URL = 'http://geoprocessing.demo.52north.org:8080/javaps/rest/' | ||
|
||
|
||
@pytest.mark.online | ||
@pytest.mark.skipif(not service_ok(SERVICE_URL), | ||
reason='service is unreachable') | ||
def test_ogcapi_processes_52n(): | ||
w = Processes(SERVICE_URL) | ||
|
||
assert w.url == 'http://geoprocessing.demo.52north.org:8080/javaps/rest/' | ||
assert w.url_query_string is None | ||
|
||
# TODO: RuntimeError: Did not find service-desc link | ||
# api = w.api() | ||
# assert api['components']['parameters'] is not None | ||
# paths = api['paths'] | ||
# assert paths is not None | ||
# assert paths['/processes/hello-world'] is not None | ||
|
||
conformance = w.conformance() | ||
assert len(conformance['conformsTo']) == 5 | ||
|
||
# list processes | ||
processes = w.processes() | ||
assert len(processes) > 0 | ||
|
||
# process description | ||
echo = w.process_description('org.n52.javaps.test.EchoProcess') | ||
assert echo['id'] == 'org.n52.javaps.test.EchoProcess' | ||
assert echo['title'] == 'org.n52.javaps.test.EchoProcess' | ||
# assert "An example process that takes a name as input" in echo['description'] | ||
|
||
# running jobs | ||
jobs = w.job_list('org.n52.javaps.test.EchoProcess') | ||
assert len(jobs) >= 0 | ||
|
||
# TODO: post request not allowed at 52n? |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
from tests.utils import service_ok | ||
|
||
import pytest | ||
|
||
from owslib.ogcapi.processes import Processes | ||
|
||
SERVICE_URL = 'https://demo.pygeoapi.io/master' | ||
|
||
|
||
@pytest.mark.online | ||
@pytest.mark.skipif(not service_ok(SERVICE_URL), | ||
reason='service is unreachable') | ||
def test_ogcapi_processes_pygeoapi(): | ||
w = Processes(SERVICE_URL) | ||
|
||
assert w.url == 'https://demo.pygeoapi.io/master/' | ||
assert w.url_query_string is None | ||
|
||
api = w.api() | ||
assert api['components']['parameters'] is not None | ||
paths = api['paths'] | ||
assert paths is not None | ||
assert paths['/processes/hello-world'] is not None | ||
|
||
conformance = w.conformance() | ||
assert len(conformance['conformsTo']) == 9 | ||
|
||
# list processes | ||
processes = w.processes() | ||
assert len(processes) > 0 | ||
|
||
# process description | ||
hello = w.process_description('hello-world') | ||
assert hello['id'] == 'hello-world' | ||
assert hello['title'] == 'Hello World' | ||
assert "An example process that takes a name as input" in hello['description'] | ||
|
||
# running jobs | ||
jobs = w.job_list('hello-world') | ||
assert len(jobs) >= 0 | ||
|
||
# execute process in sync mode | ||
request_json = {"inputs": [{"id": "name", "value": "hello"}], "mode": "sync"} | ||
resp = w.execute('hello-world', json=request_json) | ||
assert len(resp['outputs']) == 1 | ||
assert resp['outputs'][0]['id'] == 'echo' | ||
assert resp['outputs'][0]['value'] == 'Hello hello!' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
from tests.utils import service_ok | ||
|
||
import pytest | ||
|
||
from owslib.ogcapi.processes import Processes | ||
|
||
SERVICE_URL = 'https://ogc-ades.crim.ca/ADES/' | ||
|
||
|
||
@pytest.mark.online | ||
@pytest.mark.skipif(not service_ok(SERVICE_URL), | ||
reason='service is unreachable') | ||
def test_ogcapi_processes_weaver(): | ||
w = Processes(SERVICE_URL) | ||
|
||
assert w.url == 'https://ogc-ades.crim.ca/ADES/' | ||
assert w.url_query_string is None | ||
|
||
# TODO: RuntimeError: Did not find service-desc link | ||
# api = w.api() | ||
# assert api['components']['parameters'] is not None | ||
# paths = api['paths'] | ||
# assert paths is not None | ||
# assert paths['/processes/hello-world'] is not None | ||
|
||
conformance = w.conformance() | ||
assert len(conformance['conformsTo']) == 5 | ||
|
||
# list processes | ||
processes = w.processes() | ||
assert len(processes) > 0 | ||
|
||
# process description | ||
# TODO: response not as expected {'process': {'id': 'ColibriFlyingpigeon_SubsetBbox'}} | ||
# should be {'id': 'ColibriFlyingpigeon_SubsetBbox'} | ||
# process = w.process_description('ColibriFlyingpigeon_SubsetBbox') | ||
# print(process) | ||
# assert process['process']['id'] == 'ColibriFlyingpigeon_SubsetBbox' | ||
# assert process['process']['title'] == 'ColibriFlyingpigeon_SubsetBbox' | ||
# assert "An example process that takes a name as input" in process['process']['description'] |