diff --git a/setup.py b/setup.py index 26b2319..7698c19 100644 --- a/setup.py +++ b/setup.py @@ -6,20 +6,20 @@ """ To deploy a new version of the package on PyPi - - 1. Update "version" + + 1. Update "version" 2. Update REAMDE.md Change Log and pip install version - + 3. Follow the instruction at http://peterdowns.com/posts/first-time-with-pypi.html - + 4. Deploy on pypitest: - + python setup.py sdist upload -r pypitest python setup.py bdist --format=gztar upload -r pypitest python setup.py bdist_wheel upload -r pypitest 5. Deploy on pypi: - + python setup.py sdist upload -r pypi python setup.py bdist --format=gztar upload -r pypi python setup.py bdist_wheel upload -r pypi @@ -33,7 +33,7 @@ setup( name = "wps-remote", - version = "2.14.1", + version = "2.14.2", author = "GeoServer Developers", author_email = "geoserver-devel@lists.sourceforge.net", description = "A library that allows users to publish their executables as GeoServer WPS Processes through the XMPP protocol", @@ -55,16 +55,16 @@ packages = find_packages('src'), package_data = { '': [ - 'xmpp_data/*.*', - 'xmpp_data/configs/*.*', + 'xmpp_data/*.*', + 'xmpp_data/configs/*.*', 'xmpp_data/configs/myservice/*.*', 'xmpp_data/configs/myservice/code/*.*', - 'xmpp_data/output/*.*', - 'xmpp_data/resource_dir/*.*', - 'xmpp_data/resource_dir/srtm_39_04/*.*', - 'xmpp_data/share/placemark', - 'xmpp_data/ssl/*.*', - 'xmpp_data/test/*.*', + 'xmpp_data/output/*.*', + 'xmpp_data/resource_dir/*.*', + 'xmpp_data/resource_dir/srtm_39_04/*.*', + 'xmpp_data/share/placemark', + 'xmpp_data/ssl/*.*', + 'xmpp_data/test/*.*', ] }, include_package_data = True, diff --git a/src/wpsremote/processbot.py b/src/wpsremote/processbot.py index 218deca..972e08e 100644 --- a/src/wpsremote/processbot.py +++ b/src/wpsremote/processbot.py @@ -36,8 +36,8 @@ class ProcessBot(object): """ - This script starts when the user call a new WPS execution. - His task is to call the proper external executable/scripts according to the service.config file (provided in the cmd line with -s option) and send back to the WPS logging/progress + This script starts when the user call a new WPS execution. + His task is to call the proper external executable/scripts according to the service.config file (provided in the cmd line with -s option) and send back to the WPS logging/progress information and error information if something unexpected happens. All the output including the log file is generated in a sand box directory created with joint information from service.config and external process start-up information. """ @@ -45,7 +45,7 @@ def __init__(self, remote_config_filepath, service_config_filepath, execute_mess self._uniqueExeId = execute_message.UniqueId() self._remote_wps_endpoint = execute_message.originator() self._remote_wps_baseurl = execute_message.BaseURL() - self._input_values = execute_message.variables() + self._input_values = execute_message.variables() #read remote config remote_config = configInstance.create(remote_config_filepath) @@ -64,9 +64,9 @@ def __init__(self, remote_config_filepath, service_config_filepath, execute_mess self._wps_execution_shared_dir.mkdir() else: self._wps_execution_shared_dir = None - + #the config file is read with raw=False because the unique_exe_id value will be used (interpolated) in the config - serviceConfig = configInstance.create(service_config_filepath, case_sensitive=True, variables = {'unique_exe_id' : self._uniqueExeId, 'wps_execution_shared_dir' : self._wps_execution_shared_dir}, raw=False) + serviceConfig = configInstance.create(service_config_filepath, case_sensitive=True, variables = {'unique_exe_id' : self._uniqueExeId, 'wps_execution_shared_dir' : self._wps_execution_shared_dir}, raw=False) self.service = serviceConfig.get("DEFAULT", "service") #todo: what is? self.namespace = serviceConfig.get("DEFAULT", "namespace") @@ -99,7 +99,7 @@ def __init__(self, remote_config_filepath, service_config_filepath, execute_mess privatekey = open(uploader_private_rsa_key, "r") rsa_key = RSA.importKey(privatekey, passphrase=uploader_passphrase) uploader_password = rsa_key.decrypt(base64.b64decode(uploader_password)) - + self._uploader = introspection.get_class_four_arg(uploader_class_name, uploader_host, uploader_username, uploader_password, self._uniqueExeId) else: self._uploader = None @@ -166,12 +166,12 @@ def SpawnProcess(self): #prepare cmd line cmd = self._executable_cmd + " " + self._input_params_actions.get_cmd_line() - + #spawn the computational job process - invoked_process = subprocess.Popen(args=cmd.split(), cwd=self._executable_path, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False) + invoked_process = subprocess.Popen(args=cmd.split(), cwd=self._executable_path, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False) logger.info("process " + self.service + " created with PId " + str(invoked_process.pid) + " and command line: " + cmd) - #read the resource file + #read the resource file rc = resource_cleaner.Resource.create_from_file(self._uniqueExeId, os.getpid()) #add the pid of the computational job to the resource file rc.set_from_processbot( os.getpid(), [ invoked_process.pid ] ) #todo: check if cmds contains ","!!! --> pickle? @@ -186,7 +186,7 @@ def SpawnProcess(self): os._exit(return_code) #go to process output synchronuosly - self.process_output_parser( invoked_process ) + self.process_output_parser( invoked_process ) def process_output_parser(self, invoked_process): logger = logging.getLogger("ProcessBot.process_output_parser") @@ -258,7 +258,7 @@ def process_output_parser(self, invoked_process): self.bus.SendMessage( busIndipendentMessages.ErrorMessage( self._remote_wps_endpoint, res.group(2).strip() ) ) match=True break - elif (action=="ignore"): + elif (action=="ignore"): match=True break else: @@ -270,7 +270,7 @@ def process_output_parser(self, invoked_process): #wait for process exit code return_code = invoked_process.wait() logger.info( "process exit code is " + str(return_code)) - + if return_code==0: #success logger.info("process exit code is " + str(return_code) + ": success") @@ -320,7 +320,7 @@ def process_output_parser(self, invoked_process): self.send_error_message( error_message ) #self.bus.disconnect() - logger.info( "after send job-error message to WPS") + logger.info( "after send job-error message to WPS") thread.interrupt_main() os._exit(return_code) @@ -336,7 +336,7 @@ def handle_finish(self, finished_message): def send_error_message(self, msg): logger = logging.getLogger("ProcessBot.send_error_message to " + str(self._remote_wps_endpoint)) - logger.error( msg ) + logger.error( msg ) with self._lock_bus: if self.bus.state() == 'connected': self.bus.SendMessage( busIndipendentMessages.ErrorMessage( self._remote_wps_endpoint, msg ) ) @@ -353,7 +353,7 @@ def send_error_message(self, msg): except: sys.stdout.write( "[XMPP Disconnected]: Process "+str(self._uniqueExeId)+" Could not send error message to GeoServer Endpoint "+str(self._remote_wps_endpoint)+" "+msg.replace('\n', ' ').replace('\r', '')+"") - logger.debug( "send error msg complete" ) + logger.debug( "send error msg complete" ) thread.interrupt_main() os._exit(-1) @@ -364,5 +364,3 @@ def disconnect(self): def handle_abort(self): #todo pass - - diff --git a/src/wpsremote/run_threads.sh b/src/wpsremote/run_threads.sh index de9dc5c..06fb8d2 100755 --- a/src/wpsremote/run_threads.sh +++ b/src/wpsremote/run_threads.sh @@ -1,2 +1,2 @@ -python wpsagent.py -r /share/xmpp_data/configs/remote.config -s /share/xmpp_data/configs/myservice/service.config service & +python wpsagent.py -r ./xmpp_data/configs/remote.config -s ./xmpp_data/configs/myservice/service.config service & diff --git a/src/wpsremote/xmppBus.py b/src/wpsremote/xmppBus.py index 3d46ce5..a08c4d9 100644 --- a/src/wpsremote/xmppBus.py +++ b/src/wpsremote/xmppBus.py @@ -32,7 +32,7 @@ def __init__(self, config, service_name, service_name_namespace, id="master"): self.username=config.get("DEFAULT", "user") self.password=config.get("DEFAULT", "password") self.nameSpacePassword = config.get("DEFAULT", "mucServicePassword") - + self._service_name = service_name self._service_name_namespace = service_name_namespace self._fully_qualified_service_name = self._service_name_namespace + "." + self._service_name @@ -65,7 +65,7 @@ def _get_MUC_JId(self): return self._service_name_namespace + "@" + self.MUC_name def _MUC_service_nickname(self): - return '%s@%s' % (self._service_name, str(self.xmpp.boundjid).split('@', 1).pop()) + return '%s@%s' % (self._service_name, str(self.xmpp.boundjid).split('@', 1).pop()) def CheckServerIdentity(self, serverId): @@ -108,7 +108,7 @@ def _handleXMPPSignal(self, msg): def CreateIndipendentMessage(self, msg): if msg['type'] in ('normal', 'chat'): - payload = msg['body'] + payload = msg['body'] origin = msg['from'] logging.info('Received XMPP bus signal from %s: "%s"' % (origin, payload)) @@ -128,13 +128,16 @@ def CreateIndipendentMessage(self, msg): variables = dict([tuple(each.strip().split('=')) for each in payload.split('&')]) requestParams=pickle.loads(urllib.unquote(variables['message'])) if variables['message'] is not None else None #print str(variables) - return busIndipendentMessages.ExecuteMessage( msg['from'], variables['id'], variables['baseURL'], requestParams ) + return busIndipendentMessages.ExecuteMessage( msg['from'], variables['id'], variables['baseURL'], requestParams ) elif ("topic=invite" in payload): logging.info("handle invite message from WPS " + str(payload)) return busIndipendentMessages.InviteMessage(payload, msg['from']) elif ("topic=finish" in payload): logging.info("handle finish message from WPS " + str(payload)) return busIndipendentMessages.FinishMessage(payload, msg['from']) + elif ("topic=abort" in payload): + logging.info("handle abort message from WPS " + str(payload)) + return busIndipendentMessages.AbortMessage(payload, msg['from']) elif ("topic=getloadavg" in payload): return busIndipendentMessages.GetLoadAverageMessage(payload, msg['from']) else: @@ -154,10 +157,10 @@ def _register(self, iq): def Convert(self, busIndipendentMsg): if (type(busIndipendentMsg) is busIndipendentMessages.RegisterMessage): return xmppMessages.XMPPRegisterMessage(self, busIndipendentMsg.originator(), busIndipendentMsg.service, busIndipendentMsg.namespace, busIndipendentMsg.description, busIndipendentMsg.input_parameters(), busIndipendentMsg.output) - + if (type(busIndipendentMsg) is busIndipendentMessages.ProgressMessage): return xmppMessages.XMPPProgressMessage( busIndipendentMsg.originator, self, busIndipendentMsg.progress ) - + if (type(busIndipendentMsg) is busIndipendentMessages.LogMessage): return xmppMessages.XMPPLogMessage(busIndipendentMsg.originator, self, busIndipendentMsg.level, busIndipendentMsg.msg) @@ -176,6 +179,6 @@ def Convert(self, busIndipendentMsg): def disconnect(self): self.xmpp.disconnect(wait=True) - + def state(self): return self.xmpp.state.current_state() diff --git a/src/wpsremote/xmpp_data/configs/myservice/service.config b/src/wpsremote/xmpp_data/configs/myservice/service.config index e54fb60..9728653 100644 --- a/src/wpsremote/xmpp_data/configs/myservice/service.config +++ b/src/wpsremote/xmpp_data/configs/myservice/service.config @@ -97,17 +97,17 @@ publish_layer_name = contour # . Optionally it is possible to specify a root folder if the uploader class supports it. # upload_data_root = /remote-wps/default -[Output2] -name = result2 -type = application/x-netcdf -output_mime_type = application/x-netcdf -description = NetCDF Binary File -title = flexpart -filepath = %(output_dir)s/flexpart.nc -publish_as_layer = true -publish_default_style = raster -publish_target_workspace = it.geosolutions -publish_layer_name = flexpart +#[Output2] +#name = result2 +#type = application/x-netcdf +#output_mime_type = application/x-netcdf +#description = NetCDF Binary File +#title = flexpart +#filepath = %(output_dir)s/flexpart.nc +#publish_as_layer = true +#publish_default_style = raster +#publish_target_workspace = it.geosolutions +#publish_layer_name = flexpart # . Enable this option in order to perform a backup of this output # . before sending it to GeoServer. @@ -122,15 +122,15 @@ publish_layer_name = flexpart # . Optionally it is possible to specify a root folder if the uploader class supports it. # upload_data_root = /remote-wps/default -[Output3] -name = result3 -type = application/owc -output_mime_type = application/xml -description = WPS OWC Json MapContext -layers_to_publish = result2 -publish_as_layer = true -publish_layer_name = owc_json_ctx -publish_metadata = ./xmpp_data/resource_dir/owc_json_ctx.json +#[Output3] +#name = result3 +#type = application/owc +#output_mime_type = application/xml +#description = WPS OWC Json MapContext +#layers_to_publish = result2 +#publish_as_layer = true +#publish_layer_name = owc_json_ctx +#publish_metadata = ./xmpp_data/resource_dir/owc_json_ctx.json # ########################################### # # Logging Options Declaration #