Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ogc api processes subscriber #1313

Merged
6 changes: 6 additions & 0 deletions docs/source/data-publishing/ogcapi-processes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ Processing examples
-H "Prefer: respond-async"
-d "{\"inputs\":{\"name\": \"hi there2\"}}"

# execute a job for the ``hello-world`` process with a success subscriber
curl -X POST http://localhost:5000/processes/hello-world/execution \
-H "Content-Type: application/json" \
-d "{\"inputs\":{\"name\": \"hi there2\"}, \
\"subscriber\": {\"successUri\": \"https://www.example.com/success\"}}"


.. _`OGC API - Processes`: https://ogcapi.ogc.org/processes
.. _`sample`: https://github.com/geopython/pygeoapi/blob/master/pygeoapi/process/hello_world.py
Expand Down
26 changes: 24 additions & 2 deletions pygeoapi/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
TEMPLATES, to_json, get_api_rules, get_base_url,
get_crs_from_uri, get_supported_crs_list,
modify_pygeofilter, CrsTransformSpec,
transform_bbox)
transform_bbox, Subscriber)

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -174,6 +174,7 @@
'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/core',
'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/json',
'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/oas30'
'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/callback',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that you introduce the subscriber functionality as an optional property of the manager, then I think we cannot assume it by default.

We would need to add this at runtime, by checking if the manager supports it.

What if we simply assume that subscriber is to be always supported? (More on this below)

],
'edr': [
'http://www.opengis.net/spec/ogcapi-edr-1/1.0/conf/core'
Expand Down Expand Up @@ -3490,6 +3491,23 @@ def execute_process(self, request: Union[APIRequest, Any],
data_dict = data.get('inputs', {})
LOGGER.debug(data_dict)

subscriber = None
subscriber_dict = data.get('subscriber')
if subscriber_dict:
try:
success_uri = subscriber_dict['successUri']
except KeyError:
return self.get_exception(
HTTPStatus.BAD_REQUEST, headers, request.format,
'MissingParameterValue', 'Missing successUri')
else:
subscriber = Subscriber(
# NOTE: successUri is mandatory according to the standard
success_uri=success_uri,
in_progress_uri=subscriber_dict.get('inProgressUri'),
failed_uri=subscriber_dict.get('failedUri'),
)

try:
execution_mode = RequestedProcessExecutionMode(
request.headers.get('Prefer', request.headers.get('prefer'))
Expand All @@ -3499,7 +3517,11 @@ def execute_process(self, request: Union[APIRequest, Any],
try:
LOGGER.debug('Executing process')
result = self.manager.execute_process(
process_id, data_dict, execution_mode=execution_mode)
process_id,
data_dict,
execution_mode=execution_mode,
subscriber=subscriber,
)
job_id, mime_type, outputs, status, additional_headers = result
headers.update(additional_headers or {})
headers['Location'] = f'{self.base_url}/jobs/{job_id}'
Expand Down
56 changes: 50 additions & 6 deletions pygeoapi/process/manager/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
from typing import Any, Dict, Tuple, Optional, OrderedDict
import uuid

import requests

from pygeoapi.plugin import load_plugin
from pygeoapi.process.base import (
BaseProcessor,
Expand All @@ -50,6 +52,7 @@
JobStatus,
ProcessExecutionMode,
RequestedProcessExecutionMode,
Subscriber,
)

LOGGER = logging.getLogger(__name__)
Expand All @@ -70,6 +73,7 @@ def __init__(self, manager_def: dict):

self.name = manager_def['name']
self.is_async = False
self.supports_subscribing = False
self.connection = manager_def.get('connection')
self.output_dir = manager_def.get('output_dir')

Expand All @@ -85,7 +89,7 @@ def __init__(self, manager_def: dict):
for id_, process_conf in manager_def.get('processes', {}).items():
self.processes[id_] = dict(process_conf)

def get_processor(self, process_id: str) -> Optional[BaseProcessor]:
def get_processor(self, process_id: str) -> BaseProcessor:
"""Instantiate a processor.

:param process_id: Identifier of the process
Expand Down Expand Up @@ -178,7 +182,9 @@ def delete_job(self, job_id: str) -> bool:
raise JobNotFoundError()

def _execute_handler_async(self, p: BaseProcessor, job_id: str,
data_dict: dict) -> Tuple[str, None, JobStatus]:
data_dict: dict,
subscriber: Optional[Subscriber] = None,
) -> Tuple[str, None, JobStatus]:
"""
This private execution handler executes a process in a background
thread using `multiprocessing.dummy`
Expand All @@ -194,13 +200,15 @@ def _execute_handler_async(self, p: BaseProcessor, job_id: str,
"""
_process = dummy.Process(
target=self._execute_handler_sync,
args=(p, job_id, data_dict)
args=(p, job_id, data_dict, subscriber)
)
_process.start()
return 'application/json', None, JobStatus.accepted

def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
data_dict: dict) -> Tuple[str, Any, JobStatus]:
data_dict: dict,
subscriber: Optional[Subscriber] = None,
Copy link
Contributor

@francescoingv francescoingv Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No docstring for subscriber parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have the permissions to push the repo directly and not sure if this warrants a PR, would you mind adding something like this line to the sync and async execute methods?
:param subscriber: optional `Subscriber` specifying callback URLs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll push a fix.

) -> Tuple[str, Any, JobStatus]:
"""
Synchronous execution handler

Expand Down Expand Up @@ -233,6 +241,7 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
}

self.add_job(job_metadata)
self._send_in_progress_notification(subscriber)

try:
if self.output_dir is not None:
Expand Down Expand Up @@ -276,6 +285,7 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
}

self.update_job(job_id, job_update_metadata)
self._send_success_notification(subscriber, outputs=outputs)

except Exception as err:
# TODO assess correct exception type and description to help users
Expand Down Expand Up @@ -308,13 +318,16 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,

self.update_job(job_id, job_metadata)

self._send_failed_notification(subscriber)

return jfmt, outputs, current_status

def execute_process(
self,
process_id: str,
data_dict: dict,
execution_mode: Optional[RequestedProcessExecutionMode] = None
execution_mode: Optional[RequestedProcessExecutionMode] = None,
subscriber: Optional[Subscriber] = None,
totycro marked this conversation as resolved.
Show resolved Hide resolved
) -> Tuple[str, Any, JobStatus, Optional[Dict[str, str]]]:
"""
Default process execution handler
Expand All @@ -323,6 +336,7 @@ def execute_process(
:param data_dict: `dict` of data parameters
:param execution_mode: `str` optionally specifying sync or async
processing.
:param subscriber: `Subscriber` optionally specifying callback urls

:raises UnknownProcessError: if the input process_id does not
correspond to a known process
Expand Down Expand Up @@ -367,9 +381,39 @@ def execute_process(
response_headers = None
# TODO: handler's response could also be allowed to include more HTTP
# headers
mime_type, outputs, status = handler(processor, job_id, data_dict)
mime_type, outputs, status = handler(
processor,
job_id,
data_dict,
# only pass subscriber if supported, otherwise this breaks existing
# managers
**({'subscriber': subscriber} if self.supports_subscribing else {})
totycro marked this conversation as resolved.
Show resolved Hide resolved
)
return job_id, mime_type, outputs, status, response_headers

def _send_in_progress_notification(self, subscriber: Optional[Subscriber]):
if subscriber and subscriber.in_progress_uri:
response = requests.post(subscriber.in_progress_uri, json={})
LOGGER.debug(
f'In progress notification response: {response.status_code}'
)

def _send_success_notification(
self, subscriber: Optional[Subscriber], outputs: Any
):
if subscriber:
response = requests.post(subscriber.success_uri, json=outputs)
LOGGER.debug(
f'Success notification response: {response.status_code}'
)

def _send_failed_notification(self, subscriber: Optional[Subscriber]):
if subscriber and subscriber.failed_uri:
response = requests.post(subscriber.failed_uri, json={})
LOGGER.debug(
f'Failed notification response: {response.status_code}'
)

def __repr__(self):
return f'<BaseManager> {self.name}'

Expand Down
1 change: 1 addition & 0 deletions pygeoapi/process/manager/mongodb_.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class MongoDBManager(BaseManager):
def __init__(self, manager_def):
super().__init__(manager_def)
self.is_async = True
self.supports_subscribing = True

def _connect(self):
try:
Expand Down
1 change: 1 addition & 0 deletions pygeoapi/process/manager/tinydb_.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(self, manager_def: dict):

super().__init__(manager_def)
self.is_async = True
self.supports_subscribing = True

@contextmanager
def _db(self):
Expand Down
11 changes: 11 additions & 0 deletions pygeoapi/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,17 @@ class JobStatus(Enum):
dismissed = 'dismissed'


@dataclass(frozen=True)
class Subscriber:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I do like the idea of using a dataclass, it seems a bit inconsistent that this introduces a difference on how the various parts of an execute request are parsed, as the main execution inputs are not parsed into a typed data structure.

Ideally we would parse the full request into a typed instance (using pydantic ❤️ ) and pass that to the inner code base.

This is not to say I am against this change (quite the contrary), just food for thought for @tomkralidis 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pretty much agree, I prefer a more explicit style here.

"""Store subscriber urls as defined in:

https://schemas.opengis.net/ogcapi/processes/part1/1.0/openapi/schemas/subscriber.yaml # noqa
"""
success_uri: str
in_progress_uri: Optional[str]
failed_uri: Optional[str]


def read_data(path: Union[Path, str]) -> Union[bytes, str]:
"""
helper function to read data (file or network)
Expand Down
29 changes: 29 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import time
import gzip
from http import HTTPStatus
from unittest import mock

from pyld import jsonld
import pytest
Expand Down Expand Up @@ -1738,6 +1739,16 @@ def test_execute_process(config, api_):
'name': None
}
}
req_body_7 = {
'inputs': {
'name': 'Test'
},
'subscriber': {
'successUri': 'https://example.com/success',
'inProgressUri': 'https://example.com/inProgress',
'failedUri': 'https://example.com/failed',
}
}

cleanup_jobs = set()

Expand Down Expand Up @@ -1865,6 +1876,24 @@ def test_execute_process(config, api_):
assert isinstance(response, dict)
assert code == HTTPStatus.CREATED

cleanup_jobs.add(tuple(['hello-world',
rsp_headers['Location'].split('/')[-1]]))

req = mock_request(data=req_body_7)
with mock.patch(
'pygeoapi.process.manager.base.requests.post'
) as post_mocker:
rsp_headers, code, response = api_.execute_process(req, 'hello-world')
assert code == HTTPStatus.OK
post_mocker.assert_any_call(
req_body_7['subscriber']['inProgressUri'], json={}
)
post_mocker.assert_any_call(
req_body_7['subscriber']['successUri'],
json={'id': 'echo', 'value': 'Hello Test!'}
)
assert post_mocker.call_count == 2

cleanup_jobs.add(tuple(['hello-world',
rsp_headers['Location'].split('/')[-1]]))

Expand Down
Loading