Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/demobuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from pywps.app.Common import Metadata
from pywps.validator.mode import MODE
from pywps.inout.formats import FORMATS
from pywps.response.status import WPS_STATUS

inpt_vector = ComplexInput(
'vector',
Expand Down Expand Up @@ -100,7 +101,7 @@ def _handler(request, response):
# make buffer for each feature
while index < count:

response.update_status('Buffering feature %s' % index, float(index) / count)
response._update_status(WPS_STATUS.STARTED, 'Buffering feature %s' % index, float(index) / count)

# get the geometry
input_feature = input_layer.GetNextFeature()
Expand Down
114 changes: 56 additions & 58 deletions pywps/app/Process.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

from pywps import get_ElementMakerForVersion, E, dblog
from pywps.response import get_response
from pywps.response.status import STATUS
from pywps.response.status import WPS_STATUS
from pywps.response.execute import ExecuteResponse
from pywps.app.WPSRequest import WPSRequest
import pywps.configuration as config
from pywps._compat import PY2
Expand Down Expand Up @@ -108,10 +109,10 @@ def execute(self, wps_request, uuid):
if self.status_supported != 'true':
raise OperationNotSupported('Process does not support the updating of status')

wps_response.status = STATUS.STORE_AND_UPDATE_STATUS
wps_response.store_status_file = True
self.async = True
else:
wps_response.status = STATUS.STORE_STATUS
wps_response.store_status_file = False

LOGGER.debug('Check if updating of status is not required then no need to spawn a process')

Expand Down Expand Up @@ -154,22 +155,29 @@ def _execute_process(self, async, wps_request, wps_response):

# run immedietly
if running < maxparallel or maxparallel == -1:
wps_response._update_status(WPS_STATUS.ACCEPTED, u"PyWPS Request accepted", 0)
self._run_async(wps_request, wps_response)

# try to store for later usage
else:
wps_response = self._store_process(stored,
wps_request, wps_response)
maxprocesses = int(config.get_config_value('server', 'maxprocesses'))
if stored >= maxprocesses:
raise ServerBusy('Maximum number of parallel running processes reached. Please try later.')
LOGGER.debug("Store process in job queue, uuid=%s", self.uuid)
dblog.store_process(self.uuid, wps_request)
wps_response._update_status(WPS_STATUS.ACCEPTED, u'PyWPS Process stored in job queue', 0)

# not async
else:
if running < maxparallel or maxparallel == -1:
wps_response = self._run_process(wps_request, wps_response)
else:
if running >= maxparallel and maxparallel != -1:
raise ServerBusy('Maximum number of parallel running processes reached. Please try later.')
wps_response._update_status(WPS_STATUS.ACCEPTED, u"PyWPS Request accepted", 0)
wps_response = self._run_process(wps_request, wps_response)

return wps_response

# This function may not raise exception and must return a valid wps_response
# Failure must be reported as wps_response.status = WPS_STATUS.FAILED
def _run_async(self, wps_request, wps_response):
import pywps.processing
process = pywps.processing.Process(
Expand All @@ -178,21 +186,8 @@ def _run_async(self, wps_request, wps_response):
wps_response=wps_response)
process.start()

def _store_process(self, stored, wps_request, wps_response):
"""Try to store given requests
"""

maxprocesses = int(config.get_config_value('server', 'maxprocesses'))

if stored < maxprocesses:
LOGGER.debug("Store process in job queue, uuid=%s", self.uuid)
dblog.store_process(self.uuid, wps_request)
wps_response.update_status('PyWPS Process stored in job queue', 0)
else:
raise ServerBusy('Maximum number of parallel running processes reached. Please try later.')

return wps_response

# This function may not raise exception and must return a valid wps_response
# Failure must be reported as wps_response.status = WPS_STATUS.FAILED
def _run_process(self, wps_request, wps_response):
try:
self._set_grass(wps_request)
Expand All @@ -201,13 +196,13 @@ def _run_process(self, wps_request, wps_response):
os.environ['HOME'] = self.workdir
LOGGER.info('Setting HOME to current working directory: %s', os.environ['HOME'])
LOGGER.debug('ProcessID=%s, HOME=%s', self.uuid, os.environ.get('HOME'))
wps_response.update_status('PyWPS Process started', 0)
wps_response = self.handler(wps_request, wps_response)

# if (not wps_response.status_percentage) or (wps_response.status_percentage != 100):
LOGGER.debug('Updating process status to 100% if everything went correctly')
wps_response.update_status('PyWPS Process {} finished'.format(self.title),
100, STATUS.DONE_STATUS, clean=self.async)
wps_response._update_status(WPS_STATUS.STARTED, u'PyWPS Process started', 0)
self.handler(wps_request, wps_response) # the user must update the wps_response.
# Ensure process termination
if wps_response.status != WPS_STATUS.SUCCEEDED and wps_response.status != WPS_STATUS.FAILED:
# if (not wps_response.status_percentage) or (wps_response.status_percentage != 100):
LOGGER.debug('Updating process status to 100% if everything went correctly')
wps_response._update_status(WPS_STATUS.SUCCEEDED, 'PyWPS Process {} finished'.format(self.title), 100)
except Exception as e:
traceback.print_exc()
LOGGER.debug('Retrieving file and line number where exception occurred')
Expand All @@ -232,40 +227,43 @@ def _run_process(self, wps_request, wps_response):

msg = 'Process error: %s.%s Line %i %s' % (fname, method_name, exc_tb.tb_lineno, e)
LOGGER.error(msg)

if config.get_config_value("logging", "level") != "DEBUG":
msg = 'Process failed, please check server error log'
wps_response._update_status(WPS_STATUS.FAILED, msg, 100)

if not wps_response:
raise NoApplicableCode('Response is empty. Make sure the _handler method is returning a valid object.')
elif wps_request.raw:
raise
else:
wps_response.update_status(msg, -1, status=STATUS.ERROR_STATUS)

# tr
stored_request = dblog.get_first_stored()
if stored_request:
try:
(uuid, request_json) = (stored_request.uuid, stored_request.request)
if not PY2:
request_json = request_json.decode('utf-8')
new_wps_request = WPSRequest()
new_wps_request.json = json.loads(request_json)
process_identifier = new_wps_request.identifier
process = self.service.prepare_process_for_execution(process_identifier)
process._set_uuid(uuid)
process.async = True
response_cls = get_response("execute")
new_wps_response = response_cls(new_wps_request, process=process, uuid=uuid)
new_wps_response.status = STATUS.STORE_AND_UPDATE_STATUS
process._run_async(new_wps_request, new_wps_response)
dblog.remove_stored(uuid)
except Exception as e:
LOGGER.error("Could not run stored process. %s", e)
finally:
# The run of the next pending request if finished here, weather or not it successfull
self.launch_next_process()

return wps_response

def launch_next_process(self):
"""Look at the queue of async process, if the queue is not empty launch the next pending request.
"""
try:
LOGGER.debug("Checking for stored requests")

stored_request = dblog.get_first_stored()
if not stored_request:
LOGGER.debug("No stored request found")
return

(uuid, request_json) = (stored_request.uuid, stored_request.request)
if not PY2:
request_json = request_json.decode('utf-8')
LOGGER.debug("Launching the stored request %s", str(uuid))
new_wps_request = WPSRequest()
new_wps_request.json = json.loads(request_json)
process_identifier = new_wps_request.identifier
process = self.service.prepare_process_for_execution(process_identifier)
process._set_uuid(uuid)
process.async = True
new_wps_response = ExecuteResponse(new_wps_request, process=process, uuid=uuid)
process._run_async(new_wps_request, new_wps_response)
dblog.remove_stored(uuid)
except Exception as e:
LOGGER.error("Could not run stored process. %s", e)

def clean(self):
"""Clean the process working dir and other temporary files
"""
Expand Down
111 changes: 47 additions & 64 deletions pywps/app/Service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
from pywps.exceptions import MissingParameterValue, NoApplicableCode, InvalidParameterValue, FileSizeExceeded, \
StorageNotSupported, FileURLNotSupported
from pywps.inout.inputs import ComplexInput, LiteralInput, BoundingBoxInput
from pywps.dblog import log_request, update_response
from pywps.dblog import log_request, store_status
from pywps import response
from pywps.response.status import WPS_STATUS

from collections import deque, OrderedDict
import os
Expand Down Expand Up @@ -157,30 +158,7 @@ def _parse_and_execute(self, process, wps_request, uuid):
if outpt.identifier == wps_outpt:
outpt.as_reference = is_reference

# catch error generated by process code
try:
wps_response = process.execute(wps_request, uuid)
except Exception as e:
e_follow = e
if not isinstance(e, NoApplicableCode):
e_follow = NoApplicableCode('Service error: %s' % e)
if wps_request.raw:
resp = Response(e_follow.get_body(), mimetype='application/xml')
resp.call_on_close(process.clean)
return resp
else:
raise e_follow

# get the specified output as raw
if wps_request.raw:
for outpt in wps_request.outputs:
for proc_outpt in process.outputs:
if outpt == proc_outpt.identifier:
return Response(proc_outpt.data)

# if the specified identifier was not found raise error
raise InvalidParameterValue('')

wps_response = process.execute(wps_request, uuid)
return wps_response

def _get_complex_input_handler(self, href):
Expand Down Expand Up @@ -384,59 +362,64 @@ def create_bbox_inputs(self, source, inputs):

return outinputs

# May not raise exceptions, this function must return a valid werkzeug.wrappers.Response.
def call(self, http_request):

request_uuid = uuid.uuid1()

environ_cfg = http_request.environ.get('PYWPS_CFG')
if 'PYWPS_CFG' not in os.environ and environ_cfg:
LOGGER.debug('Setting PYWPS_CFG to %s', environ_cfg)
os.environ['PYWPS_CFG'] = environ_cfg

try:
# This try block handle Exception generated before the request is accepted. Once the request is accepted
# a valid wps_reponse must exist. To report error use the wps_response using
# wps_response._update_status(WPS_STATUS.FAILED, ...).
#
# We need this behaviour to handle the status file correctly, once the request is accepted, a
# status file may be created and failure must be reported in this file instead of a raw ows:ExceptionReport
#
# Exeception from CapabilityResponse and DescribeResponse are always catched by this try ... except close
# because they never have status.

request_uuid = uuid.uuid1()

environ_cfg = http_request.environ.get('PYWPS_CFG')
if 'PYWPS_CFG' not in os.environ and environ_cfg:
LOGGER.debug('Setting PYWPS_CFG to %s', environ_cfg)
os.environ['PYWPS_CFG'] = environ_cfg

wps_request = WPSRequest(http_request)
LOGGER.info('Request: %s', wps_request.operation)
if wps_request.operation in ['getcapabilities',
'describeprocess',
'execute']:
log_request(request_uuid, wps_request)
response = None
if wps_request.operation == 'getcapabilities':
response = self.get_capabilities(wps_request, request_uuid)

elif wps_request.operation == 'describeprocess':
response = self.describe(wps_request, request_uuid, wps_request.identifiers)

elif wps_request.operation == 'execute':
response = self.execute(
wps_request.identifier,
wps_request,
request_uuid
)
update_response(request_uuid, response, close=True)
return response
try:
response = None
if wps_request.operation == 'getcapabilities':
response = self.get_capabilities(wps_request, request_uuid)
response._update_status(WPS_STATUS.SUCCEEDED, u'', 100)

elif wps_request.operation == 'describeprocess':
response = self.describe(wps_request, request_uuid, wps_request.identifiers)
response._update_status(WPS_STATUS.SUCCEEDED, u'', 100)

elif wps_request.operation == 'execute':
response = self.execute(
wps_request.identifier,
wps_request,
request_uuid
)
return response
except Exception as e:
# This ensure that logged request get terminated in case of exception while the request is not
# accepted
store_status(request_uuid, WPS_STATUS.FAILED, u'Request rejected due to exception', 100)
raise e
else:
update_response(request_uuid, response, close=True)
raise RuntimeError("Unknown operation %r"
% wps_request.operation)

except HTTPException as e:
# transform HTTPException to OWS NoApplicableCode exception
if not isinstance(e, NoApplicableCode):
e = NoApplicableCode(e.description, code=e.code)

class FakeResponse:
message = e.locator
status = e.code
status_percentage = 100
try:
update_response(request_uuid, FakeResponse, close=True)
except NoApplicableCode as e:
return e
except NoApplicableCode as e:
return e
except HTTPException as e:
return NoApplicableCode(e.description, code=e.code)
except Exception as e:
e = NoApplicableCode("No applicable error code, please check error log", code=500)
return e
return NoApplicableCode("No applicable error code, please check error log", code=500)

@Request.application
def __call__(self, http_request):
Expand Down
20 changes: 2 additions & 18 deletions pywps/dblog.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,34 +112,18 @@ def get_first_stored():
return request


def update_response(uuid, response, close=False):
def store_status(uuid, wps_status, message=None, status_percentage=None):
"""Writes response to database
"""

session = get_session()
message = None
status_percentage = None
status = None

if hasattr(response, 'message'):
message = response.message
if hasattr(response, 'status_percentage'):
status_percentage = response.status_percentage
if hasattr(response, 'status'):
status = response.status

if status == '200 OK':
status = 3
elif status == 400:
status = 0

requests = session.query(ProcessInstance).filter_by(uuid=str(uuid))
if requests.count():
request = requests.one()
request.time_end = datetime.datetime.now()
request.message = str(message)
request.percent_done = status_percentage
request.status = status
request.status = wps_status
session.commit()
session.close()

Expand Down
6 changes: 4 additions & 2 deletions pywps/processing/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pywps.configuration as config
from pywps.processing.basic import Processing
from pywps.exceptions import SchedulerNotAvailable
from pywps.response.status import WPS_STATUS

import logging
LOGGER = logging.getLogger("PYWPS")
Expand All @@ -22,10 +23,11 @@ class Scheduler(Processing):
"""

def start(self):
self.job.wps_response.update_status('Submitting job ...', 0)
self.job.wps_response._update_status(WPS_STATUS.ACCEPTED, 'Submitting job ...', 0)
# run remote pywps process
jobid = self.run_job()
self.job.wps_response.update_status('Your job has been submitted with ID %s'.format(jobid), 0)
self.job.wps_response._update_status(WPS_STATUS.ACCEPTED,
'Your job has been submitted with ID %s'.format(jobid), 0)

def run_job(self):
LOGGER.info("Submitting job ...")
Expand Down
Loading