Skip to content

Commit

Permalink
- better management of failure states of the spawned processbot
Browse files Browse the repository at this point in the history
  • Loading branch information
afabiani committed Dec 12, 2018
1 parent 8436d3a commit 9abaf9e
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 202 deletions.
221 changes: 107 additions & 114 deletions src/wpsremote/processbot.py
Expand Up @@ -132,8 +132,6 @@ def __init__(self, remote_config_filepath, service_config_filepath, execute_mess
self._input_params_actions = computational_job_input_actions.ComputationalJobInputActions.create_from_config(
action_sections)

# create the concrete bus object
self._lock_bus = thread.allocate_lock()
self.bus = introspection.get_class_four_arg(bus_class_name,
remote_config,
self.service,
Expand All @@ -146,6 +144,12 @@ def __init__(self, remote_config_filepath, service_config_filepath, execute_mess
self.bus.RegisterMessageCallback(busIndipendentMessages.FinishMessage, self.handle_finish)
self.bus.RegisterMessageCallback(busIndipendentMessages.AbortMessage, self.handle_abort)

def exit(self, return_code, exception=None):
# if exception:
# raise Exception(exception)
sys.stdout.write('')
sys.exit(return_code)

def get_resource_file_dir(self):
return self._resource_file_dir

Expand Down Expand Up @@ -200,48 +204,39 @@ def SpawnProcess(self):
# todo: check if cmds contains ","!!! --> pickle?
rc.set_from_processbot(os.getpid(), [invoked_process.pid])
rc.write()

# go to process output synchronuosly
self.process_output_parser(invoked_process)
except Exception as ex:
logging.exception("Process "+str(self._uniqueExeId)+" Exception: " +
str(traceback.format_exc(sys.exc_info())))
error_message = "process failure\n" + str(ex)
self.send_error_message(error_message)
# self.bus.disconnect()
logger.info("after send job-error message to WPS")
thread.interrupt_main()
os._exit(-1)

# go to process output synchronuosly
self.process_output_parser(invoked_process)
self._finished = True
self.exit(-1, exception=ex)

def process_output_parser(self, invoked_process):
logger = logging.getLogger("ProcessBot.process_output_parser")
logger.info("start parsing stdout of created process " + self.service)

with self._lock_bus:
try:
if self.bus.state() != 'connected':
self.bus.xmpp.reconnect()
self.bus.xmpp.send_presence()
if self.bus.state() == 'connected':
self.bus.SendMessage(busIndipendentMessages.
LogMessage(self._remote_wps_endpoint,
"INFO",
"start parsing stdout of created process " + self.service))
else:
try:
self.bus.xmpp.reconnect()
self.bus.xmpp.send_presence()
# self.bus.xmpp.get_roster()

if self.bus.state() == 'connected':
self.bus.SendMessage(busIndipendentMessages.
LogMessage(self._remote_wps_endpoint,
"INFO",
"start parsing stdout of created process " + self.service))
else:
logger.info("[XMPP Disconnected]: Process " +
str(self._uniqueExeId) +
" Could not send info message to GeoServer Endpoint " +
str(self._remote_wps_endpoint))
except BaseException:
logger.info("[XMPP Disconnected]: Process "+str(self._uniqueExeId) +
" Could not send info message to GeoServer Endpoint "+str(self._remote_wps_endpoint))
logger.info("[XMPP Disconnected]: Process " +
str(self._uniqueExeId) +
" Could not send info message to GeoServer Endpoint " +
str(self._remote_wps_endpoint))
except BaseException:
logger.info("[XMPP Disconnected]: Process "+str(self._uniqueExeId) +
" Could not send info message to GeoServer Endpoint "+str(self._remote_wps_endpoint))

# Listen to stdout
stdout_parser_compiled = [re.compile(r) for r in self._stdout_parser]
Expand All @@ -258,52 +253,51 @@ def process_output_parser(self, invoked_process):
res = rgx.match(line)
if (res):
if (action == "progress"):
with self._lock_bus:
if self.bus.state() != 'connected':
try:
self.bus.xmpp.reconnect()
self.bus.xmpp.send_presence()
# self.bus.xmpp.get_roster()
except BaseException:
logger.info("[XMPP Disconnected]: Process " +
str(self._uniqueExeId) +
" Could not send info message to GeoServer Endpoint " +
str(self._remote_wps_endpoint))
if self.bus.state() != 'connected':
try:
self.bus.xmpp.reconnect()
self.bus.xmpp.send_presence()
except BaseException:
logger.info("[XMPP Disconnected]: Process " +
str(self._uniqueExeId) +
" Could not send info message to GeoServer Endpoint " +
str(self._remote_wps_endpoint))
if self.bus.state() == 'connected':
self.bus.SendMessage(busIndipendentMessages.
ProgressMessage(self._remote_wps_endpoint,
float(res.group(1).strip())))
# match = True
break
elif (action == "log"):
with self._lock_bus:
if self.bus.state() != 'connected':
try:
self.bus.xmpp.reconnect()
self.bus.xmpp.send_presence()
# self.bus.xmpp.get_roster()
except BaseException:
logger.info("[XMPP Disconnected]: Process " +
str(self._uniqueExeId) +
" Could not send info message to GeoServer Endpoint " +
str(self._remote_wps_endpoint))
if self.bus.state() != 'connected':
try:
self.bus.xmpp.reconnect()
self.bus.xmpp.send_presence()
# self.bus.xmpp.get_roster()
except BaseException:
logger.info("[XMPP Disconnected]: Process " +
str(self._uniqueExeId) +
" Could not send info message to GeoServer Endpoint " +
str(self._remote_wps_endpoint))
if self.bus.state() == 'connected':
self.bus.SendMessage(busIndipendentMessages.
LogMessage(self._remote_wps_endpoint,
res.group(1).strip(),
res.group(2).strip()))
# match = True
break
elif (action == "abort"):
with self._lock_bus:
if self.bus.state() != 'connected':
try:
self.bus.xmpp.reconnect()
self.bus.xmpp.send_presence()
# self.bus.xmpp.get_roster()
except BaseException:
logger.info("[XMPP Disconnected]: Process " +
str(self._uniqueExeId) +
" Could not send info message to GeoServer Endpoint " +
str(self._remote_wps_endpoint))
if self.bus.state() != 'connected':
try:
self.bus.xmpp.reconnect()
self.bus.xmpp.send_presence()
# self.bus.xmpp.get_roster()
except BaseException:
logger.info("[XMPP Disconnected]: Process " +
str(self._uniqueExeId) +
" Could not send info message to GeoServer Endpoint " +
str(self._remote_wps_endpoint))
if self.bus.state() == 'connected':
self.bus.SendMessage(busIndipendentMessages.
ErrorMessage(self._remote_wps_endpoint,
res.group(2).strip()))
Expand Down Expand Up @@ -338,33 +332,30 @@ def process_output_parser(self, invoked_process):
p.get_publish_default_style(),
p.get_publish_target_workspace(),
p.get_metadata()]
except BaseException:
except BaseException as ex:
logging.exception("Process "+str(self._uniqueExeId)+" Exception: " +
str(traceback.format_exc(sys.exc_info())))
error_message = "process exit code is " + \
str(return_code) + ": failure\n" + "\n".join(str(e) for e in stack_trace)
self.send_error_message(error_message)
# self.bus.disconnect()
self.bus.disconnect()
logger.info("after send job-error message to WPS")
thread.interrupt_main()
os._exit(return_code)
self._finished = True
self.exit(return_code, exception=ex)

logger.info("trying to acquire bus lock...")
with self._lock_bus:
logger.info("bus lock acquired...")
counter = 1
while not self._finished:
if self.bus.state() != 'connected':
try:
self.bus.xmpp.reconnect()
self.bus.xmpp.send_presence()
# self.bus.xmpp.get_roster()
except BaseException:
logger.info("[XMPP Disconnected]: Process " +
str(self._uniqueExeId) +
" Could not send info message to GeoServer Endpoint " +
str(self._remote_wps_endpoint))

counter = 1
while not self._finished:
if self.bus.state() == 'connected':
logger.info("sending 'completed' message tentative #" + str(counter))
self.bus.SendMessage(busIndipendentMessages.
CompletedMessage(self._remote_wps_endpoint,
Expand All @@ -374,76 +365,78 @@ def process_output_parser(self, invoked_process):
sleep(10)
else:
logger.error("Could not contact Remote WPS with. Forcibly shutdown the process...")
thread.interrupt_main()
os._exit(-1)
self._finished = True
self.exit(-1)

logger.info("after send job-completed message to WPS")
else:
error_message = "process exit code is " + \
str(return_code) + ": failure\n" + "\n".join(str(e) for e in stack_trace)
logger.critical("process exit code is " + str(return_code) + ": failure")

# todo: should i wait for finish message here as well? No
self.send_error_message(error_message)

# self.bus.disconnect()
self.bus.disconnect()
logger.info("after send job-error message to WPS")
thread.interrupt_main()
os._exit(return_code)
self._finished = True
self.exit(return_code)

def handle_finish(self, finished_message):
logger = logging.getLogger("ProcessBot.handle_finish")
logger.info("received finish mesasge from WPS")
logger = logging.getLogger("ProcessBot.handle_finish to " + str(self._remote_wps_endpoint))
logger.error("Received finish mesasge from GeoServer WPS...")
self._finished = True
with self._lock_bus:
self.bus.disconnect()
logger.info("disconnected from communication bus")

sys.exit(0)
self.bus.disconnect()
logger.error("disconnected from communication bus")
self._finished = True
self.exit(0)

def handle_abort(self, aborted_message):
logger = logging.getLogger("ProcessBot.handle_abort")
logger.info("received abort mesasge from WPS")
logger = logging.getLogger("ProcessBot.handle_abort to " + str(self._remote_wps_endpoint))
logger.error("Received abort mesasge from GeoServer WPS...")
self._finished = True
with self._lock_bus:
self.bus.disconnect()
logger.info("disconnected from communication bus")

sys.exit(-1)
self.bus.disconnect()
logger.error("disconnected from communication bus")
self._finished = True
self.exit(-1)

def send_error_message(self, msg):
logger = logging.getLogger("ProcessBot.send_error_message to " + str(self._remote_wps_endpoint))
logger.error(msg)
with self._lock_bus:
if self.bus.state() == 'connected':
self.bus.SendMessage(busIndipendentMessages.ErrorMessage(self._remote_wps_endpoint, msg))
else:
try:
self.bus.xmpp.reconnect()
self.bus.xmpp.send_presence()
# self.bus.xmpp.get_roster()
if self.bus.state() != 'connected':
try:
self.bus.xmpp.reconnect()
self.bus.xmpp.send_presence()
except BaseException:
logger.info("[XMPP Disconnected]: Process " +
str(self._uniqueExeId) +
" Could not send info message to GeoServer Endpoint " +
str(self._remote_wps_endpoint))
if self.bus.state() == 'connected':
self.bus.SendMessage(busIndipendentMessages.ErrorMessage(self._remote_wps_endpoint, msg))
else:
try:
self.bus.xmpp.reconnect()
self.bus.xmpp.send_presence()
# self.bus.xmpp.get_roster()

if self.bus.state() == 'connected':
self.bus.SendMessage(busIndipendentMessages.ErrorMessage(self._remote_wps_endpoint, msg))
else:
sys.stdout.write("[XMPP Disconnected]: Process <UID>" +
str(self._uniqueExeId) +
"</UID> Could not send error message to GeoServer Endpoint <JID>" +
str(self._remote_wps_endpoint) +
"</JID> <MSG>" +
msg.replace('\n', ' ').replace('\r', '') + "</MSG>")
except BaseException:
if self.bus.state() == 'connected':
self.bus.SendMessage(busIndipendentMessages.ErrorMessage(self._remote_wps_endpoint, msg))
else:
sys.stdout.write("[XMPP Disconnected]: Process <UID>" +
str(self._uniqueExeId) +
"</UID> Could not send error message to GeoServer Endpoint <JID>" +
str(self._remote_wps_endpoint) +
"</JID> <MSG>" +
msg.replace('\n', ' ').replace('\r', '') + "</MSG>")

except BaseException:
sys.stdout.write("[XMPP Disconnected]: Process <UID>" +
str(self._uniqueExeId) +
"</UID> Could not send error message to GeoServer Endpoint <JID>" +
str(self._remote_wps_endpoint) +
"</JID> <MSG>" +
msg.replace('\n', ' ').replace('\r', '') + "</MSG>")
logger.debug("send error msg complete")
thread.interrupt_main()
os._exit(-1)
self._finished = True
self.exit(-1)

def disconnect(self):
with self._lock_bus:
self.bus.disconnect()
self.bus.disconnect()

0 comments on commit 9abaf9e

Please sign in to comment.