From dd83b1dc5686d528f5481413cd053abb6d11703a Mon Sep 17 00:00:00 2001 From: geosolutions Date: Mon, 19 Jun 2017 19:12:52 +0100 Subject: [PATCH] close #8 : An unhandled exception might crash wpsagent and related WPS process --- src/wpsremote/servicebot.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/src/wpsremote/servicebot.py b/src/wpsremote/servicebot.py index 0e1d11b..36a85f2 100644 --- a/src/wpsremote/servicebot.py +++ b/src/wpsremote/servicebot.py @@ -162,7 +162,7 @@ def handle_execute(self, execute_message): logger.info("created process " + self.service + " with PId " + str(invoked_process.pid) + " and cmd: " + cmd ) #use a parallel thread to wait the end of the request handler process and get the exit code of the just created asynchronous process computation - thread.start_new_thread(self.output_parser_verbose, (invoked_process,)) + thread.start_new_thread(self.output_parser_verbose, (invoked_process, param_filepath,)) logger.info("end of execute message handler, going back in listening mode") @@ -209,7 +209,7 @@ def handle_getloadavg(self, getloadavg_message): # if return_code != 0: # logger.critical("Process " + self.service + " PId " + str(invoked_process.pid) + " terminated with exit code " + str(return_code)) - def output_parser_verbose(self, invoked_process): + def output_parser_verbose(self, invoked_process, param_filepath): logger = logging.getLogger("servicebot.output_parser_verbose") logger.info("wait for end of execution of created process " + self.service + ", PId " + str(invoked_process.pid) ) @@ -245,8 +245,23 @@ def output_parser_verbose(self, invoked_process): msg = "Process " + self.service + " PId " + str(invoked_process.pid) + " terminated with exit code " + str(return_code) logger.critical(msg) + logger.debug("gs_UID[%s] / gs_JID[%s]" % (gs_UID, gs_JID)) if gs_UID and gs_JID: self.bus.SendMessage( busIndipendentMessages.ErrorMessage( gs_JID, msg + " Exception: " + str(gs_MSG), gs_UID ) ) + elif self._remote_wps_endpoint: + self.bus.SendMessage( busIndipendentMessages.ErrorMessage( self._remote_wps_endpoint, msg ) ) + else: + exe_msg = None + try: + logger.debug("Trying to recover Originator from Process Params!") + exe_msg = busIndipendentMessages.ExecuteMessage.deserialize( param_filepath ) + if exe_msg.originator(): + self.bus.SendMessage( busIndipendentMessages.ErrorMessage( exe_msg.originator(), msg + " Exception: remote process exception. Please check outputs!", exe_msg.UniqueId() ) ) + except: + pass + if not exe_msg: + msg = "Process " + self.service + " PId " + str(invoked_process.pid) + " STALLED! Don't know who to send ERROR Message..." + logger.error(msg) else: msg = "Process " + self.service + " PId " + str(invoked_process.pid) + " terminated successfully!" logger.debug(msg) @@ -261,7 +276,11 @@ def send_error_message(self, msg): # self.bus.xmpp.get_roster() except: logger.info( "[XMPP Disconnected]: Service "+str(self.service)+" Could not send error message to GeoServer Endpoint "+str(self._remote_wps_endpoint)) - self.bus.SendMessage( busIndipendentMessages.ErrorMessage( self._remote_wps_endpoint, msg ) ) + if self._remote_wps_endpoint: + self.bus.SendMessage( busIndipendentMessages.ErrorMessage( self._remote_wps_endpoint, msg ) ) + else: + msg = "Process " + str(self.service) + " STALLED! Don't know who to send ERROR Message..." + logger.error(msg) def disconnect(self): self.bus.disconnect()