Skip to content

Commit

Permalink
Add support for parameter Outputs of OGC API Processes (#1602)
Browse files Browse the repository at this point in the history
* Solve issue #1420

Add support for OGC API Processes Outputs

* Resolve Jan, 3 2024 totycro comments

* Solve issue 1420 with full backward compatibility for Processors.

* changed formattings

* Some additional formatting changes

* Update api.py

missing line in api

* Update base.py

To resolve conflict with #1603

* Update base.py

Added subscriber inline doc

* After Ricardo Silva comments on 13 Apr.

Included all the suggested changes on code format andparams name.

* Changed line length

* fixed trailing spaces.

* Update formatting base.py

* Update base.py

---------

Co-authored-by: FrancescoIngv <FrancescoIngv@users.noreply.github.com>
  • Loading branch information
francescoingv and francescoingv committed May 10, 2024
1 parent 17080d8 commit 6a6ec9c
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 12 deletions.
6 changes: 6 additions & 0 deletions pygeoapi/api/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
# Colin Blackburn <colb@bgs.ac.uk>
# Ricardo Garcia Silva <ricardo.garcia.silva@geobeyond.it>
# Bernhard Mallinger <bernhard.mallinger@eox.at>
# Francesco Martinelli <francesco.martinelli@ingv.it>
#
# Copyright (c) 2024 Tom Kralidis
# Copyright (c) 2022 Francesco Bartoli
# Copyright (c) 2022 John A Stevenson and Colin Blackburn
# Copyright (c) 2023 Ricardo Garcia Silva
# Copyright (c) 2024 Bernhard Mallinger
# Copyright (c) 2024 Francesco Martinelli
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
Expand Down Expand Up @@ -374,6 +376,9 @@ def execute_process(api: API, request: APIRequest,
data_dict = data.get('inputs', {})
LOGGER.debug(data_dict)

requested_outputs = data.get('outputs')
LOGGER.debug(f'outputs: {requested_outputs}')

subscriber = None
subscriber_dict = data.get('subscriber')
if subscriber_dict:
Expand Down Expand Up @@ -401,6 +406,7 @@ def execute_process(api: API, request: APIRequest,
LOGGER.debug('Executing process')
result = api.manager.execute_process(
process_id, data_dict, execution_mode=execution_mode,
requested_outputs=requested_outputs,
subscriber=subscriber)
job_id, mime_type, outputs, status, additional_headers = result
headers.update(additional_headers or {})
Expand Down
13 changes: 10 additions & 3 deletions pygeoapi/process/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# =================================================================
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
# Francesco Martinelli <francesco.martinelli@ingv.it>
#
# Copyright (c) 2022 Tom Kralidis
# Copyright (c) 2024 Francesco Martinelli
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
Expand All @@ -28,7 +30,7 @@
# =================================================================

import logging
from typing import Any, Tuple
from typing import Any, Tuple, Optional

from pygeoapi.error import GenericError

Expand All @@ -49,14 +51,19 @@ def __init__(self, processor_def: dict, process_metadata: dict):
"""
self.name = processor_def['name']
self.metadata = process_metadata
self.supports_outputs = False

def execute(self, data: dict) -> Tuple[str, Any]:
def execute(self, data: dict, outputs: Optional[dict] = None
) -> Tuple[str, Any]:
"""
execute the process
:param data: Dict with the input data that the process needs in order
to execute
:param outputs: `dict` optionally specify the subset of required
outputs - defaults to all outputs.
The value of any key may be an object and include the property
`transmissionMode` - defauts to `value`.
:returns: tuple of MIME type and process response
(string or bytes, or dict)
"""
Expand Down
18 changes: 11 additions & 7 deletions pygeoapi/process/hello_world.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# =================================================================
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
# Francesco Martinelli <francesco.martinelli@ingv.it>
#
# Copyright (c) 2022 Tom Kralidis
# Copyright (c) 2024 Francesco Martinelli
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
Expand Down Expand Up @@ -115,9 +117,9 @@ def __init__(self, processor_def):
"""

super().__init__(processor_def, PROCESS_METADATA)
self.supports_outputs = True

def execute(self, data):

def execute(self, data, outputs=None):
mimetype = 'application/json'
name = data.get('name')

Expand All @@ -127,12 +129,14 @@ def execute(self, data):
message = data.get('message', '')
value = f'Hello {name}! {message}'.strip()

outputs = {
'id': 'echo',
'value': value
}
produced_outputs = {}
if outputs is None or 'echo' in outputs:
produced_outputs = {
'id': 'echo',
'value': value
}

return mimetype, outputs
return mimetype, produced_outputs

def __repr__(self):
return f'<HelloWorldProcessor> {self.name}'
32 changes: 30 additions & 2 deletions pygeoapi/process/manager/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
# Ricardo Garcia Silva <ricardo.garcia.silva@geobeyond.it>
# Francesco Martinelli <francesco.martinelli@ingv.it>
#
# Copyright (c) 2024 Tom Kralidis
# (c) 2023 Ricardo Garcia Silva
# (c) 2024 Francesco Martinelli
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
Expand Down Expand Up @@ -183,6 +185,7 @@ def delete_job(self, job_id: str) -> bool:

def _execute_handler_async(self, p: BaseProcessor, job_id: str,
data_dict: dict,
requested_outputs: Optional[dict] = None,
subscriber: Optional[Subscriber] = None,
) -> Tuple[str, None, JobStatus]:
"""
Expand All @@ -194,20 +197,26 @@ def _execute_handler_async(self, p: BaseProcessor, job_id: str,
:param p: `pygeoapi.process` object
:param job_id: job identifier
:param data_dict: `dict` of data parameters
:param requested_outputs: `dict` specify the subset of required
outputs - defaults to all outputs.
The value of any key may be an object and include the property
`transmissionMode` - defauts to `value`.
Note: 'optional' is for backward compatibility.
:param subscriber: optional `Subscriber` specifying callback URLs
:returns: tuple of None (i.e. initial response payload)
and JobStatus.accepted (i.e. initial job status)
"""
_process = dummy.Process(
target=self._execute_handler_sync,
args=(p, job_id, data_dict, subscriber)
args=(p, job_id, data_dict, requested_outputs, subscriber)
)
_process.start()
return 'application/json', None, JobStatus.accepted

def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
data_dict: dict,
requested_outputs: Optional[dict] = None,
subscriber: Optional[Subscriber] = None,
) -> Tuple[str, Any, JobStatus]:
"""
Expand All @@ -220,6 +229,12 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
:param p: `pygeoapi.process` object
:param job_id: job identifier
:param data_dict: `dict` of data parameters
:param requested_outputs: `dict` specify the subset of required
outputs - defaults to all outputs.
The value of any key may be an object and include the property
`transmissionMode` - defauts to `value`.
Note: 'optional' is for backward compatibility.
:param subscriber: optional `Subscriber` specifying callback URLs
:returns: tuple of MIME type, response payload and status
"""
Expand Down Expand Up @@ -252,7 +267,13 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
job_filename = None

current_status = JobStatus.running
jfmt, outputs = p.execute(data_dict)
jfmt, outputs = p.execute(
data_dict,
# only pass requested_outputs if supported,
# otherwise this breaks existing processes
**({'outputs': requested_outputs}
if p.supports_outputs else {})
)

self.update_job(job_id, {
'status': current_status.value,
Expand Down Expand Up @@ -327,6 +348,7 @@ def execute_process(
process_id: str,
data_dict: dict,
execution_mode: Optional[RequestedProcessExecutionMode] = None,
requested_outputs: Optional[dict] = None,
subscriber: Optional[Subscriber] = None
) -> Tuple[str, Any, JobStatus, Optional[Dict[str, str]]]:
"""
Expand All @@ -336,6 +358,11 @@ def execute_process(
:param data_dict: `dict` of data parameters
:param execution_mode: `str` optionally specifying sync or async
processing.
:param requested_outputs: `dict` optionally specify the subset of
required outputs - defaults to all outputs.
The value of any key may be an object and include the property
`transmissionMode` - defauts to `value`.
Note: 'optional' is for backward compatibility.
:param subscriber: `Subscriber` optionally specifying callback urls
:raises UnknownProcessError: if the input process_id does not
Expand Down Expand Up @@ -385,6 +412,7 @@ def execute_process(
processor,
job_id,
data_dict,
requested_outputs,
# only pass subscriber if supported, otherwise this breaks existing
# managers
**({'subscriber': subscriber} if self.supports_subscribing else {})
Expand Down

0 comments on commit 6a6ec9c

Please sign in to comment.