diff --git a/docs/demobuffer.py b/docs/demobuffer.py index 06b186a4d..d3c6af379 100644 --- a/docs/demobuffer.py +++ b/docs/demobuffer.py @@ -29,6 +29,7 @@ from pywps.app.Common import Metadata from pywps.validator.mode import MODE from pywps.inout.formats import FORMATS +from pywps.response.status import WPS_STATUS inpt_vector = ComplexInput( 'vector', @@ -100,7 +101,7 @@ def _handler(request, response): # make buffer for each feature while index < count: - response.update_status('Buffering feature %s' % index, float(index) / count) + response._update_status(WPS_STATUS.STARTED, 'Buffering feature %s' % index, float(index) / count) # get the geometry input_feature = input_layer.GetNextFeature() diff --git a/pywps/app/Process.py b/pywps/app/Process.py index 8d869e17c..083e3994f 100644 --- a/pywps/app/Process.py +++ b/pywps/app/Process.py @@ -13,7 +13,8 @@ from pywps import get_ElementMakerForVersion, E, dblog from pywps.response import get_response -from pywps.response.status import STATUS +from pywps.response.status import WPS_STATUS +from pywps.response.execute import ExecuteResponse from pywps.app.WPSRequest import WPSRequest import pywps.configuration as config from pywps._compat import PY2 @@ -108,10 +109,10 @@ def execute(self, wps_request, uuid): if self.status_supported != 'true': raise OperationNotSupported('Process does not support the updating of status') - wps_response.status = STATUS.STORE_AND_UPDATE_STATUS + wps_response.store_status_file = True self.async = True else: - wps_response.status = STATUS.STORE_STATUS + wps_response.store_status_file = False LOGGER.debug('Check if updating of status is not required then no need to spawn a process') @@ -154,22 +155,29 @@ def _execute_process(self, async, wps_request, wps_response): # run immedietly if running < maxparallel or maxparallel == -1: + wps_response._update_status(WPS_STATUS.ACCEPTED, u"PyWPS Request accepted", 0) self._run_async(wps_request, wps_response) # try to store for later usage else: - wps_response = self._store_process(stored, - wps_request, wps_response) + maxprocesses = int(config.get_config_value('server', 'maxprocesses')) + if stored >= maxprocesses: + raise ServerBusy('Maximum number of parallel running processes reached. Please try later.') + LOGGER.debug("Store process in job queue, uuid=%s", self.uuid) + dblog.store_process(self.uuid, wps_request) + wps_response._update_status(WPS_STATUS.ACCEPTED, u'PyWPS Process stored in job queue', 0) # not async else: - if running < maxparallel or maxparallel == -1: - wps_response = self._run_process(wps_request, wps_response) - else: + if running >= maxparallel and maxparallel != -1: raise ServerBusy('Maximum number of parallel running processes reached. Please try later.') + wps_response._update_status(WPS_STATUS.ACCEPTED, u"PyWPS Request accepted", 0) + wps_response = self._run_process(wps_request, wps_response) return wps_response + # This function may not raise exception and must return a valid wps_response + # Failure must be reported as wps_response.status = WPS_STATUS.FAILED def _run_async(self, wps_request, wps_response): import pywps.processing process = pywps.processing.Process( @@ -178,21 +186,8 @@ def _run_async(self, wps_request, wps_response): wps_response=wps_response) process.start() - def _store_process(self, stored, wps_request, wps_response): - """Try to store given requests - """ - - maxprocesses = int(config.get_config_value('server', 'maxprocesses')) - - if stored < maxprocesses: - LOGGER.debug("Store process in job queue, uuid=%s", self.uuid) - dblog.store_process(self.uuid, wps_request) - wps_response.update_status('PyWPS Process stored in job queue', 0) - else: - raise ServerBusy('Maximum number of parallel running processes reached. Please try later.') - - return wps_response - + # This function may not raise exception and must return a valid wps_response + # Failure must be reported as wps_response.status = WPS_STATUS.FAILED def _run_process(self, wps_request, wps_response): try: self._set_grass(wps_request) @@ -201,13 +196,13 @@ def _run_process(self, wps_request, wps_response): os.environ['HOME'] = self.workdir LOGGER.info('Setting HOME to current working directory: %s', os.environ['HOME']) LOGGER.debug('ProcessID=%s, HOME=%s', self.uuid, os.environ.get('HOME')) - wps_response.update_status('PyWPS Process started', 0) - wps_response = self.handler(wps_request, wps_response) - - # if (not wps_response.status_percentage) or (wps_response.status_percentage != 100): - LOGGER.debug('Updating process status to 100% if everything went correctly') - wps_response.update_status('PyWPS Process {} finished'.format(self.title), - 100, STATUS.DONE_STATUS, clean=self.async) + wps_response._update_status(WPS_STATUS.STARTED, u'PyWPS Process started', 0) + self.handler(wps_request, wps_response) # the user must update the wps_response. + # Ensure process termination + if wps_response.status != WPS_STATUS.SUCCEEDED and wps_response.status != WPS_STATUS.FAILED: + # if (not wps_response.status_percentage) or (wps_response.status_percentage != 100): + LOGGER.debug('Updating process status to 100% if everything went correctly') + wps_response._update_status(WPS_STATUS.SUCCEEDED, 'PyWPS Process {} finished'.format(self.title), 100) except Exception as e: traceback.print_exc() LOGGER.debug('Retrieving file and line number where exception occurred') @@ -232,40 +227,43 @@ def _run_process(self, wps_request, wps_response): msg = 'Process error: %s.%s Line %i %s' % (fname, method_name, exc_tb.tb_lineno, e) LOGGER.error(msg) - if config.get_config_value("logging", "level") != "DEBUG": msg = 'Process failed, please check server error log' + wps_response._update_status(WPS_STATUS.FAILED, msg, 100) - if not wps_response: - raise NoApplicableCode('Response is empty. Make sure the _handler method is returning a valid object.') - elif wps_request.raw: - raise - else: - wps_response.update_status(msg, -1, status=STATUS.ERROR_STATUS) - - # tr - stored_request = dblog.get_first_stored() - if stored_request: - try: - (uuid, request_json) = (stored_request.uuid, stored_request.request) - if not PY2: - request_json = request_json.decode('utf-8') - new_wps_request = WPSRequest() - new_wps_request.json = json.loads(request_json) - process_identifier = new_wps_request.identifier - process = self.service.prepare_process_for_execution(process_identifier) - process._set_uuid(uuid) - process.async = True - response_cls = get_response("execute") - new_wps_response = response_cls(new_wps_request, process=process, uuid=uuid) - new_wps_response.status = STATUS.STORE_AND_UPDATE_STATUS - process._run_async(new_wps_request, new_wps_response) - dblog.remove_stored(uuid) - except Exception as e: - LOGGER.error("Could not run stored process. %s", e) + finally: + # The run of the next pending request if finished here, weather or not it successfull + self.launch_next_process() return wps_response + def launch_next_process(self): + """Look at the queue of async process, if the queue is not empty launch the next pending request. + """ + try: + LOGGER.debug("Checking for stored requests") + + stored_request = dblog.get_first_stored() + if not stored_request: + LOGGER.debug("No stored request found") + return + + (uuid, request_json) = (stored_request.uuid, stored_request.request) + if not PY2: + request_json = request_json.decode('utf-8') + LOGGER.debug("Launching the stored request %s", str(uuid)) + new_wps_request = WPSRequest() + new_wps_request.json = json.loads(request_json) + process_identifier = new_wps_request.identifier + process = self.service.prepare_process_for_execution(process_identifier) + process._set_uuid(uuid) + process.async = True + new_wps_response = ExecuteResponse(new_wps_request, process=process, uuid=uuid) + process._run_async(new_wps_request, new_wps_response) + dblog.remove_stored(uuid) + except Exception as e: + LOGGER.error("Could not run stored process. %s", e) + def clean(self): """Clean the process working dir and other temporary files """ diff --git a/pywps/app/Service.py b/pywps/app/Service.py index 3fd6dd69a..c7e43c7ff 100755 --- a/pywps/app/Service.py +++ b/pywps/app/Service.py @@ -17,8 +17,9 @@ from pywps.exceptions import MissingParameterValue, NoApplicableCode, InvalidParameterValue, FileSizeExceeded, \ StorageNotSupported, FileURLNotSupported from pywps.inout.inputs import ComplexInput, LiteralInput, BoundingBoxInput -from pywps.dblog import log_request, update_response +from pywps.dblog import log_request, store_status from pywps import response +from pywps.response.status import WPS_STATUS from collections import deque, OrderedDict import os @@ -157,30 +158,7 @@ def _parse_and_execute(self, process, wps_request, uuid): if outpt.identifier == wps_outpt: outpt.as_reference = is_reference - # catch error generated by process code - try: - wps_response = process.execute(wps_request, uuid) - except Exception as e: - e_follow = e - if not isinstance(e, NoApplicableCode): - e_follow = NoApplicableCode('Service error: %s' % e) - if wps_request.raw: - resp = Response(e_follow.get_body(), mimetype='application/xml') - resp.call_on_close(process.clean) - return resp - else: - raise e_follow - - # get the specified output as raw - if wps_request.raw: - for outpt in wps_request.outputs: - for proc_outpt in process.outputs: - if outpt == proc_outpt.identifier: - return Response(proc_outpt.data) - - # if the specified identifier was not found raise error - raise InvalidParameterValue('') - + wps_response = process.execute(wps_request, uuid) return wps_response def _get_complex_input_handler(self, href): @@ -384,59 +362,64 @@ def create_bbox_inputs(self, source, inputs): return outinputs + # May not raise exceptions, this function must return a valid werkzeug.wrappers.Response. def call(self, http_request): - - request_uuid = uuid.uuid1() - - environ_cfg = http_request.environ.get('PYWPS_CFG') - if 'PYWPS_CFG' not in os.environ and environ_cfg: - LOGGER.debug('Setting PYWPS_CFG to %s', environ_cfg) - os.environ['PYWPS_CFG'] = environ_cfg - try: + # This try block handle Exception generated before the request is accepted. Once the request is accepted + # a valid wps_reponse must exist. To report error use the wps_response using + # wps_response._update_status(WPS_STATUS.FAILED, ...). + # + # We need this behaviour to handle the status file correctly, once the request is accepted, a + # status file may be created and failure must be reported in this file instead of a raw ows:ExceptionReport + # + # Exeception from CapabilityResponse and DescribeResponse are always catched by this try ... except close + # because they never have status. + + request_uuid = uuid.uuid1() + + environ_cfg = http_request.environ.get('PYWPS_CFG') + if 'PYWPS_CFG' not in os.environ and environ_cfg: + LOGGER.debug('Setting PYWPS_CFG to %s', environ_cfg) + os.environ['PYWPS_CFG'] = environ_cfg + wps_request = WPSRequest(http_request) LOGGER.info('Request: %s', wps_request.operation) if wps_request.operation in ['getcapabilities', 'describeprocess', 'execute']: log_request(request_uuid, wps_request) - response = None - if wps_request.operation == 'getcapabilities': - response = self.get_capabilities(wps_request, request_uuid) - - elif wps_request.operation == 'describeprocess': - response = self.describe(wps_request, request_uuid, wps_request.identifiers) - - elif wps_request.operation == 'execute': - response = self.execute( - wps_request.identifier, - wps_request, - request_uuid - ) - update_response(request_uuid, response, close=True) - return response + try: + response = None + if wps_request.operation == 'getcapabilities': + response = self.get_capabilities(wps_request, request_uuid) + response._update_status(WPS_STATUS.SUCCEEDED, u'', 100) + + elif wps_request.operation == 'describeprocess': + response = self.describe(wps_request, request_uuid, wps_request.identifiers) + response._update_status(WPS_STATUS.SUCCEEDED, u'', 100) + + elif wps_request.operation == 'execute': + response = self.execute( + wps_request.identifier, + wps_request, + request_uuid + ) + return response + except Exception as e: + # This ensure that logged request get terminated in case of exception while the request is not + # accepted + store_status(request_uuid, WPS_STATUS.FAILED, u'Request rejected due to exception', 100) + raise e else: - update_response(request_uuid, response, close=True) raise RuntimeError("Unknown operation %r" % wps_request.operation) - except HTTPException as e: - # transform HTTPException to OWS NoApplicableCode exception - if not isinstance(e, NoApplicableCode): - e = NoApplicableCode(e.description, code=e.code) - - class FakeResponse: - message = e.locator - status = e.code - status_percentage = 100 - try: - update_response(request_uuid, FakeResponse, close=True) - except NoApplicableCode as e: - return e + except NoApplicableCode as e: return e + except HTTPException as e: + return NoApplicableCode(e.description, code=e.code) except Exception as e: - e = NoApplicableCode("No applicable error code, please check error log", code=500) - return e + return NoApplicableCode("No applicable error code, please check error log", code=500) @Request.application def __call__(self, http_request): diff --git a/pywps/dblog.py b/pywps/dblog.py index 4f75a3f54..326de3b27 100644 --- a/pywps/dblog.py +++ b/pywps/dblog.py @@ -112,26 +112,10 @@ def get_first_stored(): return request -def update_response(uuid, response, close=False): +def store_status(uuid, wps_status, message=None, status_percentage=None): """Writes response to database """ - session = get_session() - message = None - status_percentage = None - status = None - - if hasattr(response, 'message'): - message = response.message - if hasattr(response, 'status_percentage'): - status_percentage = response.status_percentage - if hasattr(response, 'status'): - status = response.status - - if status == '200 OK': - status = 3 - elif status == 400: - status = 0 requests = session.query(ProcessInstance).filter_by(uuid=str(uuid)) if requests.count(): @@ -139,7 +123,7 @@ def update_response(uuid, response, close=False): request.time_end = datetime.datetime.now() request.message = str(message) request.percent_done = status_percentage - request.status = status + request.status = wps_status session.commit() session.close() diff --git a/pywps/processing/scheduler.py b/pywps/processing/scheduler.py index e352dfab9..6c1c1fc62 100644 --- a/pywps/processing/scheduler.py +++ b/pywps/processing/scheduler.py @@ -7,6 +7,7 @@ import pywps.configuration as config from pywps.processing.basic import Processing from pywps.exceptions import SchedulerNotAvailable +from pywps.response.status import WPS_STATUS import logging LOGGER = logging.getLogger("PYWPS") @@ -22,10 +23,11 @@ class Scheduler(Processing): """ def start(self): - self.job.wps_response.update_status('Submitting job ...', 0) + self.job.wps_response._update_status(WPS_STATUS.ACCEPTED, 'Submitting job ...', 0) # run remote pywps process jobid = self.run_job() - self.job.wps_response.update_status('Your job has been submitted with ID %s'.format(jobid), 0) + self.job.wps_response._update_status(WPS_STATUS.ACCEPTED, + 'Your job has been submitted with ID %s'.format(jobid), 0) def run_job(self): LOGGER.info("Submitting job ...") diff --git a/pywps/response/__init__.py b/pywps/response/__init__.py index 389e0ef49..24b00ef42 100644 --- a/pywps/response/__init__.py +++ b/pywps/response/__init__.py @@ -1,5 +1,5 @@ -from pywps.dblog import update_response -from pywps.response.status import STATUS +from pywps.dblog import store_status +from pywps.response.status import WPS_STATUS from jinja2 import Environment, PackageLoader import os @@ -31,7 +31,7 @@ def __init__(self, wps_request, uuid=None, version="1.0.0"): self.wps_request = wps_request self.uuid = uuid self.message = '' - self.status = STATUS.NO_STATUS + self.status = WPS_STATUS.ACCEPTED self.status_percentage = 0 self.doc = None self.version = version @@ -40,28 +40,19 @@ def __init__(self, wps_request, uuid=None, version="1.0.0"): trim_blocks=True, lstrip_blocks=True ) - self.update_status(message="Request accepted", status_percentage=0, status=self.status) - - def update_status(self, message=None, status_percentage=None, status=None, clean=True): + def _update_status(self, status, message, status_percentage): """ Update status report of currently running process instance :param str message: Message you need to share with the client :param int status_percentage: Percent done (number betwen <0-100>) - :param pywps.response.status.STATUS status: process status - user should usually + :param pywps.response.status.WPS_STATUS status: process status - user should usually ommit this parameter """ - - if message: - self.message = message - - if status is not None: - self.status = status - - if status_percentage is not None: - self.status_percentage = status_percentage - - update_response(self.uuid, self) + self.message = message + self.status = status + self.status_percentage = status_percentage + store_status(self.uuid, self.status, self.message, self.status_percentage) def get_response_doc(self): try: @@ -71,10 +62,10 @@ def get_response_doc(self): msg = e.description else: msg = e - self.update_status(message=msg, status_percentage=100, status=STATUS.ERROR_STATUS) + self._update_status(WPS_STATUS.FAILED, msg, 100) raise e else: - self.update_status(message="Response generated", status_percentage=100, status=STATUS.DONE_STATUS) + self._update_status(WPS_STATUS.SUCCEEDED, u"Response generated", 100) return self.doc diff --git a/pywps/response/capabilities.py b/pywps/response/capabilities.py index 3e69498b5..9346e011b 100644 --- a/pywps/response/capabilities.py +++ b/pywps/response/capabilities.py @@ -2,8 +2,8 @@ import pywps.configuration as config from pywps.app.basic import xml_response from pywps.response import WPSResponse -from pywps.response.status import STATUS from pywps import __version__ +from pywps.exceptions import NoApplicableCode import os @@ -69,5 +69,11 @@ def _construct_doc(self): @Request.application def __call__(self, request): - doc = self.get_response_doc() - return xml_response(doc) + # This function must return a valid response. + try: + doc = self.get_response_doc() + return xml_response(doc) + except NoApplicableCode as e: + return e + except Exception as e: + return NoApplicableCode(str(e)) diff --git a/pywps/response/describe.py b/pywps/response/describe.py index 7702bd8a2..5445dc3b9 100644 --- a/pywps/response/describe.py +++ b/pywps/response/describe.py @@ -5,7 +5,6 @@ from pywps.exceptions import MissingParameterValue from pywps.exceptions import InvalidParameterValue from pywps.response import WPSResponse -from pywps.response.status import STATUS from pywps import __version__ import os @@ -55,5 +54,11 @@ def _construct_doc(self): @Request.application def __call__(self, request): - doc = self.get_response_doc() - return xml_response(doc) + # This function must return a valid response. + try: + doc = self.get_response_doc() + return xml_response(doc) + except NoApplicableCode as e: + return e + except Exception as e: + return NoApplicableCode(str(e)) diff --git a/pywps/response/execute.py b/pywps/response/execute.py index a6ba23c6c..aecc1093c 100755 --- a/pywps/response/execute.py +++ b/pywps/response/execute.py @@ -13,9 +13,9 @@ from pywps.app.basic import xml_response from pywps.exceptions import NoApplicableCode import pywps.configuration as config -from pywps.dblog import update_response +from werkzeug.wrappers import Response -from pywps.response.status import STATUS +from pywps.response.status import WPS_STATUS from pywps.response import WPSResponse from pywps._compat import PY2 @@ -46,55 +46,41 @@ def __init__(self, wps_request, uuid, **kwargs): self.process = kwargs["process"] self.outputs = {o.identifier: o for o in self.process.outputs} + self.store_status_file = False - def update_status(self, message=None, status_percentage=None, status=None, - clean=True): + # override WPSResponse._update_status + def _update_status(self, status, message, status_percentage, clean=True): + super(ExecuteResponse, self)._update_status(status, message, status_percentage) + if self.store_status_file: + self.update_status_file(clean) + + def update_status(self, message, status_percentage=None): """ Update status report of currently running process instance :param str message: Message you need to share with the client :param int status_percentage: Percent done (number betwen <0-100>) - :param pywps.app.WPSResponse.STATUS status: process status - user should usually - ommit this parameter """ + if status_percentage is None: + status_percentage = self.status_percentage + self._update_status(WPS_STATUS.ACCEPTED, message, status_percentage) - if message: - self.message = message - - if status is not None: - self.status = status - - if status_percentage is not None: - self.status_percentage = status_percentage - - # check if storing of the status is requested - if self.status >= STATUS.STORE_AND_UPDATE_STATUS or self.status == STATUS.ERROR_STATUS: - # rebuild the doc and update the status xml file - self.write_response_doc(clean) - - update_response(self.uuid, self) - - def write_response_doc(self, clean=True): + def update_status_file(self, clean): # TODO: check if file/directory is still present, maybe deleted in mean time - - # check if storing of the status is requested - if self.status >= STATUS.STORE_AND_UPDATE_STATUS or \ - self.status == STATUS.ERROR_STATUS: - + try: # rebuild the doc and update the status xml file self.doc = self._construct_doc() - try: - with open(self.process.status_location, 'w') as f: - f.write(self.doc) - f.flush() - os.fsync(f.fileno()) + with open(self.process.status_location, 'w') as f: + f.write(self.doc) + f.flush() + os.fsync(f.fileno()) - if self.status >= STATUS.DONE_STATUS and clean: - self.process.clean() + if (self.status == WPS_STATUS.SUCCEEDED or self.status == WPS_STATUS.FAILED) and clean: + self.process.clean() - except IOError as e: - raise NoApplicableCode('Writing Response Document failed with : %s' % e) + except Exception as e: + raise NoApplicableCode('Writing Response Document failed with : %s' % e) def _process_accepted(self): return { @@ -158,28 +144,28 @@ def json(self): data["service_instance"] = self._get_serviceinstance() data["process"] = self.process.json - if self.status >= STATUS.STORE_STATUS: + if self.store_status_file: if self.process.status_location: data["status_location"] = self.process.status_url - if self.status == STATUS.STORE_AND_UPDATE_STATUS: - if self.status_percentage == 0: - self.message = 'PyWPS Process %s accepted' % self.process.identifier - data["status"] = self._process_accepted() - return data - elif self.status_percentage > 0: - data["percent_done"] = self.status_percentage - data["status"] = self._process_started() - return data + if self.status == WPS_STATUS.ACCEPTED: + self.message = 'PyWPS Process %s accepted' % self.process.identifier + data["status"] = self._process_accepted() + return data + + if self.status == WPS_STATUS.STARTED: + data["percent_done"] = self.status_percentage + data["status"] = self._process_started() + return data # check if process failed and display fail message - if self.status_percentage == -1 or self.status == STATUS.ERROR_STATUS: + if self.status == WPS_STATUS.FAILED: data["status"] = self._process_failed() return data # TODO: add paused status - if self.status == STATUS.DONE_STATUS: + if self.status == WPS_STATUS.SUCCEEDED: data["status"] = self._process_succeeded() # DataInputs and DataOutputs definition XML if lineage=true @@ -199,24 +185,32 @@ def json(self): return data def _construct_doc(self): - template = self.template_env.get_template(os.path.join(self.version, 'execute', 'main.xml')) - doc = template.render(**self.json) - return doc @Request.application def __call__(self, request): - doc = None - try: - doc = self._construct_doc() - except HTTPException as httpexp: - raise httpexp - except Exception as exp: - raise NoApplicableCode(exp) - - if self.status >= STATUS.DONE_STATUS: - self.process.clean() + if self.wps_request.raw: + if self.status == WPS_STATUS.FAILED: + return NoApplicableCode(self.message) + else: + wps_output_identifier = next(iter(self.wps_request.outputs)) # get the first key only + wps_output_value = self.outputs[wps_output_identifier] + if wps_output_value.source_type is None: + return NoApplicableCode("Expected output was not generated") + return Response(wps_output_value.data, + mimetype=self.wps_request.outputs[wps_output_identifier]['mimetype']) + else: + doc = None + try: + doc = self._construct_doc() + if self.store_status_file: + self.process.clean() + # TODO: If an exception occur here we must generate a valid status file + except HTTPException as httpexp: + return httpexp + except Exception as exp: + return NoApplicableCode(exp) - return xml_response(doc) + return Response(doc, mimetype='text/xml') diff --git a/pywps/response/status.py b/pywps/response/status.py index 04535257b..4a3606e04 100644 --- a/pywps/response/status.py +++ b/pywps/response/status.py @@ -1,6 +1,4 @@ from collections import namedtuple -_STATUS = namedtuple('Status', 'ERROR_STATUS, NO_STATUS, STORE_STATUS,' - 'STORE_AND_UPDATE_STATUS, DONE_STATUS') - -STATUS = _STATUS(0, 10, 20, 30, 40) +_WPS_STATUS = namedtuple('WPSStatus', ['UNKNOWN', 'ACCEPTED', 'STARTED', 'PAUSED', 'SUCCEEDED', 'FAILED']) +WPS_STATUS = _WPS_STATUS(0, 1, 2, 3, 4, 5)