From fa3a37751e799a8157a85818bb7555eaed65fd89 Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Wed, 11 Jul 2018 11:43:55 +0200 Subject: [PATCH 1/2] Replace the current management of ExecuteResponse.status The current status management is a custom list of status that also define the behaviour of the storage of the status file. This newer version of status management drop the old scheme and reuse the status defined by WPS standard, i.e. accepted, started, succeeded, and failed. It also separate the management of behaviour of the status file. --- docs/demobuffer.py | 3 +- pywps/app/Process.py | 19 ++++---- pywps/app/Service.py | 13 ++---- pywps/dblog.py | 20 +------- pywps/processing/scheduler.py | 6 ++- pywps/response/__init__.py | 31 +++++-------- pywps/response/capabilities.py | 1 - pywps/response/describe.py | 1 - pywps/response/execute.py | 85 ++++++++++++++-------------------- pywps/response/status.py | 6 +-- 10 files changed, 71 insertions(+), 114 deletions(-) diff --git a/docs/demobuffer.py b/docs/demobuffer.py index 06b186a4d..d3c6af379 100644 --- a/docs/demobuffer.py +++ b/docs/demobuffer.py @@ -29,6 +29,7 @@ from pywps.app.Common import Metadata from pywps.validator.mode import MODE from pywps.inout.formats import FORMATS +from pywps.response.status import WPS_STATUS inpt_vector = ComplexInput( 'vector', @@ -100,7 +101,7 @@ def _handler(request, response): # make buffer for each feature while index < count: - response.update_status('Buffering feature %s' % index, float(index) / count) + response._update_status(WPS_STATUS.STARTED, 'Buffering feature %s' % index, float(index) / count) # get the geometry input_feature = input_layer.GetNextFeature() diff --git a/pywps/app/Process.py b/pywps/app/Process.py index 8d869e17c..ca2b245cc 100644 --- a/pywps/app/Process.py +++ b/pywps/app/Process.py @@ -13,7 +13,7 @@ from pywps import get_ElementMakerForVersion, E, dblog from pywps.response import get_response -from pywps.response.status import STATUS +from pywps.response.status import WPS_STATUS from pywps.app.WPSRequest import WPSRequest import pywps.configuration as config from pywps._compat import PY2 @@ -108,10 +108,10 @@ def execute(self, wps_request, uuid): if self.status_supported != 'true': raise OperationNotSupported('Process does not support the updating of status') - wps_response.status = STATUS.STORE_AND_UPDATE_STATUS + wps_response.store_status_file = True self.async = True else: - wps_response.status = STATUS.STORE_STATUS + wps_response.store_status_file = False LOGGER.debug('Check if updating of status is not required then no need to spawn a process') @@ -172,6 +172,9 @@ def _execute_process(self, async, wps_request, wps_response): def _run_async(self, wps_request, wps_response): import pywps.processing + # required to return correct status in the main process, otherwise the status file may be invalid until the + # fork is perfomed. + wps_response._update_status(WPS_STATUS.ACCEPTED, u"PyWPS Request accepted", 0) process = pywps.processing.Process( process=self, wps_request=wps_request, @@ -187,7 +190,7 @@ def _store_process(self, stored, wps_request, wps_response): if stored < maxprocesses: LOGGER.debug("Store process in job queue, uuid=%s", self.uuid) dblog.store_process(self.uuid, wps_request) - wps_response.update_status('PyWPS Process stored in job queue', 0) + wps_response._update_status(WPS_STATUS.ACCEPTED, u'PyWPS Process stored in job queue', 0) else: raise ServerBusy('Maximum number of parallel running processes reached. Please try later.') @@ -201,13 +204,12 @@ def _run_process(self, wps_request, wps_response): os.environ['HOME'] = self.workdir LOGGER.info('Setting HOME to current working directory: %s', os.environ['HOME']) LOGGER.debug('ProcessID=%s, HOME=%s', self.uuid, os.environ.get('HOME')) - wps_response.update_status('PyWPS Process started', 0) + wps_response._update_status(WPS_STATUS.STARTED, u'PyWPS Process started', 0) wps_response = self.handler(wps_request, wps_response) # if (not wps_response.status_percentage) or (wps_response.status_percentage != 100): LOGGER.debug('Updating process status to 100% if everything went correctly') - wps_response.update_status('PyWPS Process {} finished'.format(self.title), - 100, STATUS.DONE_STATUS, clean=self.async) + wps_response._update_status(WPS_STATUS.SUCCEEDED, 'PyWPS Process {} finished'.format(self.title), 100) except Exception as e: traceback.print_exc() LOGGER.debug('Retrieving file and line number where exception occurred') @@ -241,7 +243,7 @@ def _run_process(self, wps_request, wps_response): elif wps_request.raw: raise else: - wps_response.update_status(msg, -1, status=STATUS.ERROR_STATUS) + wps_response._update_status(WPS_STATUS.FAILLED, msg, -1) # tr stored_request = dblog.get_first_stored() @@ -258,7 +260,6 @@ def _run_process(self, wps_request, wps_response): process.async = True response_cls = get_response("execute") new_wps_response = response_cls(new_wps_request, process=process, uuid=uuid) - new_wps_response.status = STATUS.STORE_AND_UPDATE_STATUS process._run_async(new_wps_request, new_wps_response) dblog.remove_stored(uuid) except Exception as e: diff --git a/pywps/app/Service.py b/pywps/app/Service.py index 3fd6dd69a..7340c9673 100755 --- a/pywps/app/Service.py +++ b/pywps/app/Service.py @@ -17,8 +17,9 @@ from pywps.exceptions import MissingParameterValue, NoApplicableCode, InvalidParameterValue, FileSizeExceeded, \ StorageNotSupported, FileURLNotSupported from pywps.inout.inputs import ComplexInput, LiteralInput, BoundingBoxInput -from pywps.dblog import log_request, update_response +from pywps.dblog import log_request, store_status from pywps import response +from pywps.response.status import WPS_STATUS from collections import deque, OrderedDict import os @@ -403,9 +404,11 @@ def call(self, http_request): response = None if wps_request.operation == 'getcapabilities': response = self.get_capabilities(wps_request, request_uuid) + response._update_status(WPS_STATUS.SUCCEEDED, u'', 100) elif wps_request.operation == 'describeprocess': response = self.describe(wps_request, request_uuid, wps_request.identifiers) + response._update_status(WPS_STATUS.SUCCEEDED, u'', 100) elif wps_request.operation == 'execute': response = self.execute( @@ -413,10 +416,8 @@ def call(self, http_request): wps_request, request_uuid ) - update_response(request_uuid, response, close=True) return response else: - update_response(request_uuid, response, close=True) raise RuntimeError("Unknown operation %r" % wps_request.operation) @@ -425,12 +426,8 @@ def call(self, http_request): if not isinstance(e, NoApplicableCode): e = NoApplicableCode(e.description, code=e.code) - class FakeResponse: - message = e.locator - status = e.code - status_percentage = 100 try: - update_response(request_uuid, FakeResponse, close=True) + store_status(request_uuid, WPS_STATUS.FAILED, e.locator, 100) except NoApplicableCode as e: return e return e diff --git a/pywps/dblog.py b/pywps/dblog.py index 4f75a3f54..326de3b27 100644 --- a/pywps/dblog.py +++ b/pywps/dblog.py @@ -112,26 +112,10 @@ def get_first_stored(): return request -def update_response(uuid, response, close=False): +def store_status(uuid, wps_status, message=None, status_percentage=None): """Writes response to database """ - session = get_session() - message = None - status_percentage = None - status = None - - if hasattr(response, 'message'): - message = response.message - if hasattr(response, 'status_percentage'): - status_percentage = response.status_percentage - if hasattr(response, 'status'): - status = response.status - - if status == '200 OK': - status = 3 - elif status == 400: - status = 0 requests = session.query(ProcessInstance).filter_by(uuid=str(uuid)) if requests.count(): @@ -139,7 +123,7 @@ def update_response(uuid, response, close=False): request.time_end = datetime.datetime.now() request.message = str(message) request.percent_done = status_percentage - request.status = status + request.status = wps_status session.commit() session.close() diff --git a/pywps/processing/scheduler.py b/pywps/processing/scheduler.py index e352dfab9..6c1c1fc62 100644 --- a/pywps/processing/scheduler.py +++ b/pywps/processing/scheduler.py @@ -7,6 +7,7 @@ import pywps.configuration as config from pywps.processing.basic import Processing from pywps.exceptions import SchedulerNotAvailable +from pywps.response.status import WPS_STATUS import logging LOGGER = logging.getLogger("PYWPS") @@ -22,10 +23,11 @@ class Scheduler(Processing): """ def start(self): - self.job.wps_response.update_status('Submitting job ...', 0) + self.job.wps_response._update_status(WPS_STATUS.ACCEPTED, 'Submitting job ...', 0) # run remote pywps process jobid = self.run_job() - self.job.wps_response.update_status('Your job has been submitted with ID %s'.format(jobid), 0) + self.job.wps_response._update_status(WPS_STATUS.ACCEPTED, + 'Your job has been submitted with ID %s'.format(jobid), 0) def run_job(self): LOGGER.info("Submitting job ...") diff --git a/pywps/response/__init__.py b/pywps/response/__init__.py index 389e0ef49..24b00ef42 100644 --- a/pywps/response/__init__.py +++ b/pywps/response/__init__.py @@ -1,5 +1,5 @@ -from pywps.dblog import update_response -from pywps.response.status import STATUS +from pywps.dblog import store_status +from pywps.response.status import WPS_STATUS from jinja2 import Environment, PackageLoader import os @@ -31,7 +31,7 @@ def __init__(self, wps_request, uuid=None, version="1.0.0"): self.wps_request = wps_request self.uuid = uuid self.message = '' - self.status = STATUS.NO_STATUS + self.status = WPS_STATUS.ACCEPTED self.status_percentage = 0 self.doc = None self.version = version @@ -40,28 +40,19 @@ def __init__(self, wps_request, uuid=None, version="1.0.0"): trim_blocks=True, lstrip_blocks=True ) - self.update_status(message="Request accepted", status_percentage=0, status=self.status) - - def update_status(self, message=None, status_percentage=None, status=None, clean=True): + def _update_status(self, status, message, status_percentage): """ Update status report of currently running process instance :param str message: Message you need to share with the client :param int status_percentage: Percent done (number betwen <0-100>) - :param pywps.response.status.STATUS status: process status - user should usually + :param pywps.response.status.WPS_STATUS status: process status - user should usually ommit this parameter """ - - if message: - self.message = message - - if status is not None: - self.status = status - - if status_percentage is not None: - self.status_percentage = status_percentage - - update_response(self.uuid, self) + self.message = message + self.status = status + self.status_percentage = status_percentage + store_status(self.uuid, self.status, self.message, self.status_percentage) def get_response_doc(self): try: @@ -71,10 +62,10 @@ def get_response_doc(self): msg = e.description else: msg = e - self.update_status(message=msg, status_percentage=100, status=STATUS.ERROR_STATUS) + self._update_status(WPS_STATUS.FAILED, msg, 100) raise e else: - self.update_status(message="Response generated", status_percentage=100, status=STATUS.DONE_STATUS) + self._update_status(WPS_STATUS.SUCCEEDED, u"Response generated", 100) return self.doc diff --git a/pywps/response/capabilities.py b/pywps/response/capabilities.py index 3e69498b5..abde7c58d 100644 --- a/pywps/response/capabilities.py +++ b/pywps/response/capabilities.py @@ -2,7 +2,6 @@ import pywps.configuration as config from pywps.app.basic import xml_response from pywps.response import WPSResponse -from pywps.response.status import STATUS from pywps import __version__ import os diff --git a/pywps/response/describe.py b/pywps/response/describe.py index 7702bd8a2..1354d0ef9 100644 --- a/pywps/response/describe.py +++ b/pywps/response/describe.py @@ -5,7 +5,6 @@ from pywps.exceptions import MissingParameterValue from pywps.exceptions import InvalidParameterValue from pywps.response import WPSResponse -from pywps.response.status import STATUS from pywps import __version__ import os diff --git a/pywps/response/execute.py b/pywps/response/execute.py index a6ba23c6c..9da896ceb 100755 --- a/pywps/response/execute.py +++ b/pywps/response/execute.py @@ -13,9 +13,8 @@ from pywps.app.basic import xml_response from pywps.exceptions import NoApplicableCode import pywps.configuration as config -from pywps.dblog import update_response -from pywps.response.status import STATUS +from pywps.response.status import WPS_STATUS from pywps.response import WPSResponse from pywps._compat import PY2 @@ -46,55 +45,41 @@ def __init__(self, wps_request, uuid, **kwargs): self.process = kwargs["process"] self.outputs = {o.identifier: o for o in self.process.outputs} + self.store_status_file = False - def update_status(self, message=None, status_percentage=None, status=None, - clean=True): + # override WPSResponse._update_status + def _update_status(self, status, message, status_percentage, clean=True): + super(ExecuteResponse, self)._update_status(status, message, status_percentage) + if self.store_status_file: + self.update_status_file(clean) + + def update_status(self, message, status_percentage=None): """ Update status report of currently running process instance :param str message: Message you need to share with the client :param int status_percentage: Percent done (number betwen <0-100>) - :param pywps.app.WPSResponse.STATUS status: process status - user should usually - ommit this parameter """ + if status_percentage is None: + status_percentage = self.status_percentage + self._update_status(WPS_STATUS.ACCEPTED, message, status_percentage) - if message: - self.message = message - - if status is not None: - self.status = status - - if status_percentage is not None: - self.status_percentage = status_percentage - - # check if storing of the status is requested - if self.status >= STATUS.STORE_AND_UPDATE_STATUS or self.status == STATUS.ERROR_STATUS: - # rebuild the doc and update the status xml file - self.write_response_doc(clean) - - update_response(self.uuid, self) - - def write_response_doc(self, clean=True): + def update_status_file(self, clean): # TODO: check if file/directory is still present, maybe deleted in mean time - - # check if storing of the status is requested - if self.status >= STATUS.STORE_AND_UPDATE_STATUS or \ - self.status == STATUS.ERROR_STATUS: - + try: # rebuild the doc and update the status xml file self.doc = self._construct_doc() - try: - with open(self.process.status_location, 'w') as f: - f.write(self.doc) - f.flush() - os.fsync(f.fileno()) + with open(self.process.status_location, 'w') as f: + f.write(self.doc) + f.flush() + os.fsync(f.fileno()) - if self.status >= STATUS.DONE_STATUS and clean: - self.process.clean() + if (self.status == WPS_STATUS.SUCCEEDED or self.status == WPS_STATUS.FAILED) and clean: + self.process.clean() - except IOError as e: - raise NoApplicableCode('Writing Response Document failed with : %s' % e) + except Exception as e: + raise NoApplicableCode('Writing Response Document failed with : %s' % e) def _process_accepted(self): return { @@ -158,28 +143,28 @@ def json(self): data["service_instance"] = self._get_serviceinstance() data["process"] = self.process.json - if self.status >= STATUS.STORE_STATUS: + if self.store_status_file: if self.process.status_location: data["status_location"] = self.process.status_url - if self.status == STATUS.STORE_AND_UPDATE_STATUS: - if self.status_percentage == 0: - self.message = 'PyWPS Process %s accepted' % self.process.identifier - data["status"] = self._process_accepted() - return data - elif self.status_percentage > 0: - data["percent_done"] = self.status_percentage - data["status"] = self._process_started() - return data + if self.status == WPS_STATUS.ACCEPTED: + self.message = 'PyWPS Process %s accepted' % self.process.identifier + data["status"] = self._process_accepted() + return data + + if self.status == WPS_STATUS.STARTED: + data["percent_done"] = self.status_percentage + data["status"] = self._process_started() + return data # check if process failed and display fail message - if self.status_percentage == -1 or self.status == STATUS.ERROR_STATUS: + if self.status == WPS_STATUS.FAILED: data["status"] = self._process_failed() return data # TODO: add paused status - if self.status == STATUS.DONE_STATUS: + if self.status == WPS_STATUS.SUCCEEDED: data["status"] = self._process_succeeded() # DataInputs and DataOutputs definition XML if lineage=true @@ -216,7 +201,7 @@ def __call__(self, request): except Exception as exp: raise NoApplicableCode(exp) - if self.status >= STATUS.DONE_STATUS: + if self.store_status_file: self.process.clean() return xml_response(doc) diff --git a/pywps/response/status.py b/pywps/response/status.py index 04535257b..4a3606e04 100644 --- a/pywps/response/status.py +++ b/pywps/response/status.py @@ -1,6 +1,4 @@ from collections import namedtuple -_STATUS = namedtuple('Status', 'ERROR_STATUS, NO_STATUS, STORE_STATUS,' - 'STORE_AND_UPDATE_STATUS, DONE_STATUS') - -STATUS = _STATUS(0, 10, 20, 30, 40) +_WPS_STATUS = namedtuple('WPSStatus', ['UNKNOWN', 'ACCEPTED', 'STARTED', 'PAUSED', 'SUCCEEDED', 'FAILED']) +WPS_STATUS = _WPS_STATUS(0, 1, 2, 3, 4, 5) From 7c6c972d2bc3d9073a7a189644f0c4acd3042211 Mon Sep 17 00:00:00 2001 From: Benoit Gschwind Date: Fri, 13 Jul 2018 15:39:43 +0200 Subject: [PATCH 2/2] Define and implement the rational of exception and failure This patch defineand implement the rational of exception handle in Service.call function. --- pywps/app/Process.py | 107 ++++++++++++++++----------------- pywps/app/Service.py | 104 ++++++++++++++------------------ pywps/response/capabilities.py | 11 +++- pywps/response/describe.py | 10 ++- pywps/response/execute.py | 39 +++++++----- 5 files changed, 138 insertions(+), 133 deletions(-) diff --git a/pywps/app/Process.py b/pywps/app/Process.py index ca2b245cc..083e3994f 100644 --- a/pywps/app/Process.py +++ b/pywps/app/Process.py @@ -14,6 +14,7 @@ from pywps import get_ElementMakerForVersion, E, dblog from pywps.response import get_response from pywps.response.status import WPS_STATUS +from pywps.response.execute import ExecuteResponse from pywps.app.WPSRequest import WPSRequest import pywps.configuration as config from pywps._compat import PY2 @@ -154,48 +155,39 @@ def _execute_process(self, async, wps_request, wps_response): # run immedietly if running < maxparallel or maxparallel == -1: + wps_response._update_status(WPS_STATUS.ACCEPTED, u"PyWPS Request accepted", 0) self._run_async(wps_request, wps_response) # try to store for later usage else: - wps_response = self._store_process(stored, - wps_request, wps_response) + maxprocesses = int(config.get_config_value('server', 'maxprocesses')) + if stored >= maxprocesses: + raise ServerBusy('Maximum number of parallel running processes reached. Please try later.') + LOGGER.debug("Store process in job queue, uuid=%s", self.uuid) + dblog.store_process(self.uuid, wps_request) + wps_response._update_status(WPS_STATUS.ACCEPTED, u'PyWPS Process stored in job queue', 0) # not async else: - if running < maxparallel or maxparallel == -1: - wps_response = self._run_process(wps_request, wps_response) - else: + if running >= maxparallel and maxparallel != -1: raise ServerBusy('Maximum number of parallel running processes reached. Please try later.') + wps_response._update_status(WPS_STATUS.ACCEPTED, u"PyWPS Request accepted", 0) + wps_response = self._run_process(wps_request, wps_response) return wps_response + # This function may not raise exception and must return a valid wps_response + # Failure must be reported as wps_response.status = WPS_STATUS.FAILED def _run_async(self, wps_request, wps_response): import pywps.processing - # required to return correct status in the main process, otherwise the status file may be invalid until the - # fork is perfomed. - wps_response._update_status(WPS_STATUS.ACCEPTED, u"PyWPS Request accepted", 0) process = pywps.processing.Process( process=self, wps_request=wps_request, wps_response=wps_response) process.start() - def _store_process(self, stored, wps_request, wps_response): - """Try to store given requests - """ - - maxprocesses = int(config.get_config_value('server', 'maxprocesses')) - - if stored < maxprocesses: - LOGGER.debug("Store process in job queue, uuid=%s", self.uuid) - dblog.store_process(self.uuid, wps_request) - wps_response._update_status(WPS_STATUS.ACCEPTED, u'PyWPS Process stored in job queue', 0) - else: - raise ServerBusy('Maximum number of parallel running processes reached. Please try later.') - - return wps_response - + # This function may not raise exception and must return a valid wps_response + # Failure must be reported as wps_response.status = WPS_STATUS.FAILED def _run_process(self, wps_request, wps_response): try: self._set_grass(wps_request) @@ -205,11 +197,12 @@ def _run_process(self, wps_request, wps_response): LOGGER.info('Setting HOME to current working directory: %s', os.environ['HOME']) LOGGER.debug('ProcessID=%s, HOME=%s', self.uuid, os.environ.get('HOME')) wps_response._update_status(WPS_STATUS.STARTED, u'PyWPS Process started', 0) - wps_response = self.handler(wps_request, wps_response) - - # if (not wps_response.status_percentage) or (wps_response.status_percentage != 100): - LOGGER.debug('Updating process status to 100% if everything went correctly') - wps_response._update_status(WPS_STATUS.SUCCEEDED, 'PyWPS Process {} finished'.format(self.title), 100) + self.handler(wps_request, wps_response) # the user must update the wps_response. + # Ensure process termination + if wps_response.status != WPS_STATUS.SUCCEEDED and wps_response.status != WPS_STATUS.FAILED: + # if (not wps_response.status_percentage) or (wps_response.status_percentage != 100): + LOGGER.debug('Updating process status to 100% if everything went correctly') + wps_response._update_status(WPS_STATUS.SUCCEEDED, 'PyWPS Process {} finished'.format(self.title), 100) except Exception as e: traceback.print_exc() LOGGER.debug('Retrieving file and line number where exception occurred') @@ -234,39 +227,43 @@ def _run_process(self, wps_request, wps_response): msg = 'Process error: %s.%s Line %i %s' % (fname, method_name, exc_tb.tb_lineno, e) LOGGER.error(msg) - if config.get_config_value("logging", "level") != "DEBUG": msg = 'Process failed, please check server error log' + wps_response._update_status(WPS_STATUS.FAILED, msg, 100) - if not wps_response: - raise NoApplicableCode('Response is empty. Make sure the _handler method is returning a valid object.') - elif wps_request.raw: - raise - else: - wps_response._update_status(WPS_STATUS.FAILLED, msg, -1) - - # tr - stored_request = dblog.get_first_stored() - if stored_request: - try: - (uuid, request_json) = (stored_request.uuid, stored_request.request) - if not PY2: - request_json = request_json.decode('utf-8') - new_wps_request = WPSRequest() - new_wps_request.json = json.loads(request_json) - process_identifier = new_wps_request.identifier - process = self.service.prepare_process_for_execution(process_identifier) - process._set_uuid(uuid) - process.async = True - response_cls = get_response("execute") - new_wps_response = response_cls(new_wps_request, process=process, uuid=uuid) - process._run_async(new_wps_request, new_wps_response) - dblog.remove_stored(uuid) - except Exception as e: - LOGGER.error("Could not run stored process. %s", e) + finally: + # The run of the next pending request if finished here, weather or not it successfull + self.launch_next_process() return wps_response + def launch_next_process(self): + """Look at the queue of async process, if the queue is not empty launch the next pending request. + """ + try: + LOGGER.debug("Checking for stored requests") + + stored_request = dblog.get_first_stored() + if not stored_request: + LOGGER.debug("No stored request found") + return + + (uuid, request_json) = (stored_request.uuid, stored_request.request) + if not PY2: + request_json = request_json.decode('utf-8') + LOGGER.debug("Launching the stored request %s", str(uuid)) + new_wps_request = WPSRequest() + new_wps_request.json = json.loads(request_json) + process_identifier = new_wps_request.identifier + process = self.service.prepare_process_for_execution(process_identifier) + process._set_uuid(uuid) + process.async = True + new_wps_response = ExecuteResponse(new_wps_request, process=process, uuid=uuid) + process._run_async(new_wps_request, new_wps_response) + dblog.remove_stored(uuid) + except Exception as e: + LOGGER.error("Could not run stored process. %s", e) + def clean(self): """Clean the process working dir and other temporary files """ diff --git a/pywps/app/Service.py b/pywps/app/Service.py index 7340c9673..c7e43c7ff 100755 --- a/pywps/app/Service.py +++ b/pywps/app/Service.py @@ -158,30 +158,7 @@ def _parse_and_execute(self, process, wps_request, uuid): if outpt.identifier == wps_outpt: outpt.as_reference = is_reference - # catch error generated by process code - try: - wps_response = process.execute(wps_request, uuid) - except Exception as e: - e_follow = e - if not isinstance(e, NoApplicableCode): - e_follow = NoApplicableCode('Service error: %s' % e) - if wps_request.raw: - resp = Response(e_follow.get_body(), mimetype='application/xml') - resp.call_on_close(process.clean) - return resp - else: - raise e_follow - - # get the specified output as raw - if wps_request.raw: - for outpt in wps_request.outputs: - for proc_outpt in process.outputs: - if outpt == proc_outpt.identifier: - return Response(proc_outpt.data) - - # if the specified identifier was not found raise error - raise InvalidParameterValue('') - + wps_response = process.execute(wps_request, uuid) return wps_response def _get_complex_input_handler(self, href): @@ -385,55 +362,64 @@ def create_bbox_inputs(self, source, inputs): return outinputs + # May not raise exceptions, this function must return a valid werkzeug.wrappers.Response. def call(self, http_request): - - request_uuid = uuid.uuid1() - - environ_cfg = http_request.environ.get('PYWPS_CFG') - if 'PYWPS_CFG' not in os.environ and environ_cfg: - LOGGER.debug('Setting PYWPS_CFG to %s', environ_cfg) - os.environ['PYWPS_CFG'] = environ_cfg - try: + # This try block handle Exception generated before the request is accepted. Once the request is accepted + # a valid wps_reponse must exist. To report error use the wps_response using + # wps_response._update_status(WPS_STATUS.FAILED, ...). + # + # We need this behaviour to handle the status file correctly, once the request is accepted, a + # status file may be created and failure must be reported in this file instead of a raw ows:ExceptionReport + # + # Exeception from CapabilityResponse and DescribeResponse are always catched by this try ... except close + # because they never have status. + + request_uuid = uuid.uuid1() + + environ_cfg = http_request.environ.get('PYWPS_CFG') + if 'PYWPS_CFG' not in os.environ and environ_cfg: + LOGGER.debug('Setting PYWPS_CFG to %s', environ_cfg) + os.environ['PYWPS_CFG'] = environ_cfg + wps_request = WPSRequest(http_request) LOGGER.info('Request: %s', wps_request.operation) if wps_request.operation in ['getcapabilities', 'describeprocess', 'execute']: log_request(request_uuid, wps_request) - response = None - if wps_request.operation == 'getcapabilities': - response = self.get_capabilities(wps_request, request_uuid) - response._update_status(WPS_STATUS.SUCCEEDED, u'', 100) - - elif wps_request.operation == 'describeprocess': - response = self.describe(wps_request, request_uuid, wps_request.identifiers) - response._update_status(WPS_STATUS.SUCCEEDED, u'', 100) - - elif wps_request.operation == 'execute': - response = self.execute( - wps_request.identifier, - wps_request, - request_uuid - ) - return response + try: + response = None + if wps_request.operation == 'getcapabilities': + response = self.get_capabilities(wps_request, request_uuid) + response._update_status(WPS_STATUS.SUCCEEDED, u'', 100) + + elif wps_request.operation == 'describeprocess': + response = self.describe(wps_request, request_uuid, wps_request.identifiers) + response._update_status(WPS_STATUS.SUCCEEDED, u'', 100) + + elif wps_request.operation == 'execute': + response = self.execute( + wps_request.identifier, + wps_request, + request_uuid + ) + return response + except Exception as e: + # This ensure that logged request get terminated in case of exception while the request is not + # accepted + store_status(request_uuid, WPS_STATUS.FAILED, u'Request rejected due to exception', 100) + raise e else: raise RuntimeError("Unknown operation %r" % wps_request.operation) - except HTTPException as e: - # transform HTTPException to OWS NoApplicableCode exception - if not isinstance(e, NoApplicableCode): - e = NoApplicableCode(e.description, code=e.code) - - try: - store_status(request_uuid, WPS_STATUS.FAILED, e.locator, 100) - except NoApplicableCode as e: - return e + except NoApplicableCode as e: return e + except HTTPException as e: + return NoApplicableCode(e.description, code=e.code) except Exception as e: - e = NoApplicableCode("No applicable error code, please check error log", code=500) - return e + return NoApplicableCode("No applicable error code, please check error log", code=500) @Request.application def __call__(self, http_request): diff --git a/pywps/response/capabilities.py b/pywps/response/capabilities.py index abde7c58d..9346e011b 100644 --- a/pywps/response/capabilities.py +++ b/pywps/response/capabilities.py @@ -3,6 +3,7 @@ from pywps.app.basic import xml_response from pywps.response import WPSResponse from pywps import __version__ +from pywps.exceptions import NoApplicableCode import os @@ -68,5 +69,11 @@ def _construct_doc(self): @Request.application def __call__(self, request): - doc = self.get_response_doc() - return xml_response(doc) + # This function must return a valid response. + try: + doc = self.get_response_doc() + return xml_response(doc) + except NoApplicableCode as e: + return e + except Exception as e: + return NoApplicableCode(str(e)) diff --git a/pywps/response/describe.py b/pywps/response/describe.py index 1354d0ef9..5445dc3b9 100644 --- a/pywps/response/describe.py +++ b/pywps/response/describe.py @@ -54,5 +54,11 @@ def _construct_doc(self): @Request.application def __call__(self, request): - doc = self.get_response_doc() - return xml_response(doc) + # This function must return a valid response. + try: + doc = self.get_response_doc() + return xml_response(doc) + except NoApplicableCode as e: + return e + except Exception as e: + return NoApplicableCode(str(e)) diff --git a/pywps/response/execute.py b/pywps/response/execute.py index 9da896ceb..aecc1093c 100755 --- a/pywps/response/execute.py +++ b/pywps/response/execute.py @@ -13,6 +13,7 @@ from pywps.app.basic import xml_response from pywps.exceptions import NoApplicableCode import pywps.configuration as config +from werkzeug.wrappers import Response from pywps.response.status import WPS_STATUS from pywps.response import WPSResponse @@ -184,24 +185,32 @@ def json(self): return data def _construct_doc(self): - template = self.template_env.get_template(os.path.join(self.version, 'execute', 'main.xml')) - doc = template.render(**self.json) - return doc @Request.application def __call__(self, request): - doc = None - try: - doc = self._construct_doc() - except HTTPException as httpexp: - raise httpexp - except Exception as exp: - raise NoApplicableCode(exp) - - if self.store_status_file: - self.process.clean() - - return xml_response(doc) + if self.wps_request.raw: + if self.status == WPS_STATUS.FAILED: + return NoApplicableCode(self.message) + else: + wps_output_identifier = next(iter(self.wps_request.outputs)) # get the first key only + wps_output_value = self.outputs[wps_output_identifier] + if wps_output_value.source_type is None: + return NoApplicableCode("Expected output was not generated") + return Response(wps_output_value.data, + mimetype=self.wps_request.outputs[wps_output_identifier]['mimetype']) + else: + doc = None + try: + doc = self._construct_doc() + if self.store_status_file: + self.process.clean() + # TODO: If an exception occur here we must generate a valid status file + except HTTPException as httpexp: + return httpexp + except Exception as exp: + return NoApplicableCode(exp) + + return Response(doc, mimetype='text/xml')