From 85760df1711c697f64b94b76634f6c55cc4e3c3a Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Tue, 5 Jul 2022 12:38:29 +0800 Subject: [PATCH 01/15] fixed a few corner cases with parameter handling --- daliuge-engine/dlg/apps/bash_shell_app.py | 9 ++++++--- daliuge-engine/dlg/apps/pyfunc.py | 6 +++--- daliuge-engine/dlg/droputils.py | 17 +++++++++++------ 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/daliuge-engine/dlg/apps/bash_shell_app.py b/daliuge-engine/dlg/apps/bash_shell_app.py index 7d6e3d3f7..b07928f43 100644 --- a/daliuge-engine/dlg/apps/bash_shell_app.py +++ b/daliuge-engine/dlg/apps/bash_shell_app.py @@ -219,10 +219,12 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): appArgs ={} pargs = [arg for arg in appArgs if appArgs[arg]["positional"]] pargsDict = collections.OrderedDict(zip(pargs,[None]*len(pargs))) + for arg in pargs: + pargsDict.update({arg:appArgs[arg]}) keyargs = {arg:appArgs[arg]["value"] for arg in appArgs if not appArgs[arg]["positional"]} - logger.debug("pargs: %s; keyargs: %s, appArgs: %s",pargs, keyargs, appArgs) + logger.debug("pargs: %s; keyargs: %s, appArgs: %s",pargsDict, keyargs, appArgs) if "inputs" in self.parameters and isinstance(self.parameters['inputs'][0], dict): - keyargs = droputils.identify_named_ports( + portargs = droputils.identify_named_ports( inputs_dict, self.parameters["inputs"], pargs, @@ -230,13 +232,14 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): appArgs, check_len=len(inputs), mode="inputs") + keyargs.update(portargs) else: for i in range(min(len(inputs), len(pargs))): keyargs.update({pargs[i]: list(inputs.values())[i]}) keyargs = droputils.serialize_kwargs(keyargs, prefix=self._argumentPrefix, separator=self._paramValueSeparator) - pargs = list(pargsDict.values()) + pargs = [pargsDict[arg]['value'] for arg in pargsDict] argumentString = f"{' '.join(pargs + keyargs)}" # add kwargs to end of pargs # complete command including all additional parameters and optional redirects cmd = f"{self.command} {argumentString} {self._cmdLineArgs} " diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index af10b84dd..5ea695020 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -448,6 +448,9 @@ def optionalEval(x): pargsDict = collections.OrderedDict(zip(posargs,[None]*len(posargs))) if "applicationArgs" in self.parameters: appArgs = self.parameters["applicationArgs"] # we'll pop the identified ones + _dum = [appArgs.pop(k) for k in self.func_def_keywords if k in appArgs] + logger.debug("Identified keyword arguments removed: %s", + [i['text'] for i in _dum]) pargsDict.update({k:self.parameters[k] for k in pargsDict if k in self.parameters}) # if defined in both we use AppArgs values @@ -489,9 +492,6 @@ def optionalEval(x): # Try to get values for still missing positional arguments from Application Args if "applicationArgs" in self.parameters: - _dum = [appArgs.pop(k) for k in self.func_def_keywords if k in appArgs] - logger.debug("Identified keyword arguments removed: %s", - [i['text'] for i in _dum]) for pa in posargs: if pa != 'self' and pa not in funcargs: if pa in appArgs: diff --git a/daliuge-engine/dlg/droputils.py b/daliuge-engine/dlg/droputils.py index 1781dd0a7..d2c8f45f7 100644 --- a/daliuge-engine/dlg/droputils.py +++ b/daliuge-engine/dlg/droputils.py @@ -535,7 +535,8 @@ def serialize_kwargs(keyargs, prefix="--", separator=" "): if prefix == "--" and len(name) == 1: kwargs += [f"-{name} {value}"] else: - kwargs += [f"{prefix}{name}{separator}{value}".strip()] + kwargs += [f"{prefix.strip()}{name.strip()}{separator.strip()}{str(value).strip()}"] + logger.debug("kwargs after serialization: %s",kwargs) return kwargs @@ -549,7 +550,9 @@ def serialize_applicationArgs(applicationArgs, prefix="--", separator=" "): else: logger.info("ApplicationArgs found %s", applicationArgs) # construct the actual command line from all application parameters - kwargs = {} + kwargs = {arg:applicationArgs[arg]["value"] for arg in applicationArgs + if not applicationArgs[arg]["positional"]} + # kwargs = {} pargs = [] positional = False precious = False @@ -579,7 +582,9 @@ def identify_named_ports(ports, port_dict, posargs, pargsDict, appArgs, check_le logger.debug("Using named ports to remove %s from arguments: %s %d", mode, port_dict, check_len) # pargsDict = collections.OrderedDict(zip(posargs,[None]*len(posargs))) - kwargs = {} + # kwargs = {arg:appArgs[arg]["value"] for arg in appArgs + # if not appArgs[arg]["positional"]} + portargs = {} for i in range(check_len): # key for final dict is value in named ports dict key = list(port_dict[i].values())[0] @@ -591,12 +596,12 @@ def identify_named_ports(ports, port_dict, posargs, pargsDict, appArgs, check_le logger.debug("Using %s '%s' for parg %s", mode, value, key) posargs.pop(posargs.index(key)) else: - kwargs.update({key:value}) + portargs.update({key:value}) logger.debug("Using %s '%s' for kwarg %s", mode, value, key) _dum = appArgs.pop(key) if key in appArgs else None logger.debug("Argument used as %s removed: %s", mode, _dum) - logger.debug("Returning mapped ports: %s", kwargs) - return kwargs + logger.debug("Returning mapped ports: %s", portargs) + return portargs # Easing the transition from single- to multi-package From 9b7997de32567f965cda2dbca481f02736f8c7ca Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Tue, 5 Jul 2022 15:00:12 +0800 Subject: [PATCH 02/15] fixed positional args for bash --- daliuge-engine/dlg/apps/bash_shell_app.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/daliuge-engine/dlg/apps/bash_shell_app.py b/daliuge-engine/dlg/apps/bash_shell_app.py index b07928f43..230d2cb43 100644 --- a/daliuge-engine/dlg/apps/bash_shell_app.py +++ b/daliuge-engine/dlg/apps/bash_shell_app.py @@ -217,29 +217,30 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): appArgs = self.parameters["applicationArgs"] else: appArgs ={} - pargs = [arg for arg in appArgs if appArgs[arg]["positional"]] - pargsDict = collections.OrderedDict(zip(pargs,[None]*len(pargs))) - for arg in pargs: + pargNames = [arg for arg in appArgs if appArgs[arg]["positional"]] + pargsDict = collections.OrderedDict(zip(pargNames,[None]*len(pargNames))) + for arg in pargNames: pargsDict.update({arg:appArgs[arg]}) + # pargNames = [arg for arg in pargsDict] keyargs = {arg:appArgs[arg]["value"] for arg in appArgs if not appArgs[arg]["positional"]} - logger.debug("pargs: %s; keyargs: %s, appArgs: %s",pargsDict, keyargs, appArgs) if "inputs" in self.parameters and isinstance(self.parameters['inputs'][0], dict): portargs = droputils.identify_named_ports( inputs_dict, self.parameters["inputs"], - pargs, + pargNames, pargsDict, appArgs, check_len=len(inputs), mode="inputs") keyargs.update(portargs) else: - for i in range(min(len(inputs), len(pargs))): - keyargs.update({pargs[i]: list(inputs.values())[i]}) + for i in range(min(len(inputs), len(pargNames))): + keyargs.update({pargNames[i]: list(inputs.values())[i]}) keyargs = droputils.serialize_kwargs(keyargs, prefix=self._argumentPrefix, separator=self._paramValueSeparator) - pargs = [pargsDict[arg]['value'] for arg in pargsDict] + logger.debug("pargNames: %s; pargsDict: %s; keyargs: %s, appArgs: %s", pargNames, pargsDict, keyargs, appArgs) + pargs = [pargsDict[arg] for arg in pargsDict] argumentString = f"{' '.join(pargs + keyargs)}" # add kwargs to end of pargs # complete command including all additional parameters and optional redirects cmd = f"{self.command} {argumentString} {self._cmdLineArgs} " From 5ee34c5b24934b4d746170cc7a8c44ad52e7926d Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Fri, 22 Jul 2022 13:02:43 +0800 Subject: [PATCH 03/15] Get named ports into bash and docker apps --- daliuge-engine/dlg/apps/bash_shell_app.py | 4 +- daliuge-engine/dlg/apps/dockerapp.py | 61 ++++++++++++++++++++--- 2 files changed, 58 insertions(+), 7 deletions(-) diff --git a/daliuge-engine/dlg/apps/bash_shell_app.py b/daliuge-engine/dlg/apps/bash_shell_app.py index 230d2cb43..3d329878b 100644 --- a/daliuge-engine/dlg/apps/bash_shell_app.py +++ b/daliuge-engine/dlg/apps/bash_shell_app.py @@ -204,7 +204,9 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): outputs_dict = collections.OrderedDict() for uid, drop in outputs.items(): - outputs_dict[uid] = drop.path + # if drop does not have a path we assume it is just passing the event + # Bash does not support memory drops anyway + outputs_dict[uid] = drop.path if hasattr(drop, 'path') else '' session_id = ( self._dlg_session.sessionId if self._dlg_session is not None else "" diff --git a/daliuge-engine/dlg/apps/dockerapp.py b/daliuge-engine/dlg/apps/dockerapp.py index 75cd0ced0..cb9f11800 100644 --- a/daliuge-engine/dlg/apps/dockerapp.py +++ b/daliuge-engine/dlg/apps/dockerapp.py @@ -290,10 +290,6 @@ def initialize(self, **kwargs): self._paramValueSeparator ) # defer construction of complete command to run method - # cmd = f"{self._command} {self._cmdLineArgs} {' '.join(self.pargs)} "+\ - # f"{' '.join(self.keyargs)}" - # cmd = cmd.strip() - # self._command = cmd # The user used to run the process in the docker container is now always the user # who originally started the DALiuGE process as well. The information is passed through @@ -430,7 +426,7 @@ def run(self): fsInputs = {uid: i for uid, i in iitems if droputils.has_path(i)} fsOutputs = {uid: o for uid, o in oitems if droputils.has_path(o)} dockerInputs = { - # uid: DockerPath(utils.getDlgDir() + i.path) for uid, i in fsInputs.items() + # uid: DockerPath(utils.getDlgDir() + i.path) for uid, i in fsInputs.items() uid: DockerPath(i.path) for uid, i in fsInputs.items() } @@ -443,7 +439,6 @@ def run(self): dataURLOutputs = {uid: o for uid, o in oitems if not droputils.has_path(o)} self._command = f"{self._command} {' '.join(self.pargs)} "+\ f"{' '.join(self.keyargs)}" - # TODO: Deal with named inputs if self._command: cmd = droputils.replace_path_placeholders( self._command, dockerInputs, dockerOutputs @@ -547,6 +542,60 @@ def run(self): ) logger.debug(f"Adding environment variables: {env}") + ########### + # deal with named ports + # TODO: This needs to be moved to droputils as well. + # Almost exact same code as for bash + inputs_dict = collections.OrderedDict() + for uid, drop in iitems: + inputs_dict[uid] = drop.path if hasattr(drop, 'path') else '' + + outputs_dict = collections.OrderedDict() + for uid, drop in oitems: + # if drop does not have a path we assume it is just passing the event + # Bash does not support memory drops anyway + outputs_dict[uid] = drop.path if hasattr(drop, 'path') else '' + + appArgs = self._applicationArgs + pargs = [arg for arg in appArgs if appArgs[arg]["positional"]] + pargsDict = collections.OrderedDict(zip(pargs,[None]*len(pargs))) + pkeyargs = {} + logger.debug("pargs: %s; pkeyargs: %s, appArgs: %s",pargs, pkeyargs, appArgs) + if "inputs" in self.parameters and isinstance(self.parameters['inputs'][0], dict): + ipkeyargs = droputils.identify_named_ports( + inputs_dict, + self.parameters["inputs"], + pargs, + pargsDict, + appArgs, + check_len=len(iitems), + mode="inputs") + pkeyargs.update(ipkeyargs) + else: + for i in range(min(len(iitems), len(pargs))): + pkeyargs.update({pargs[i]: list(self._inputs.values())[i]}) + if "outputs" in self.parameters and isinstance(self.parameters['outputs'][0], dict): + opkeyargs = droputils.identify_named_ports( + outputs_dict, + self.parameters["outputs"], + pargs, + pargsDict, + appArgs, + check_len=len(oitems), + mode="outputs") + pkeyargs.update(opkeyargs) + else: + for i in range(min(len(oitems), len(pargs))): + pkeyargs.update({pargs[i]: list(self._outputs.values())[i]}) + pkeyargs = droputils.serialize_kwargs(pkeyargs, + prefix=self._argumentPrefix, + separator=self._paramValueSeparator) + pargs = list(pargsDict.values()) + argumentString = f"{' '.join(pkeyargs + pargs)}" # add kwargs to end of pargs + # complete command including all additional parameters and optional redirects + cmd = f"{cmd} {argumentString} {self._cmdLineArgs} " + ############### + # Wrap everything inside bash if len(cmd) > 0 and not self._noBash: cmd = '/bin/bash -c "%s"' % ( From a4dca2667fd284bd564600c126466cec3470d849 Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Fri, 22 Jul 2022 13:52:18 +0800 Subject: [PATCH 04/15] Merge with master --- daliuge-engine/dlg/apps/bash_shell_app.py | 1 + 1 file changed, 1 insertion(+) diff --git a/daliuge-engine/dlg/apps/bash_shell_app.py b/daliuge-engine/dlg/apps/bash_shell_app.py index 8fa5e8842..82770a04d 100644 --- a/daliuge-engine/dlg/apps/bash_shell_app.py +++ b/daliuge-engine/dlg/apps/bash_shell_app.py @@ -221,6 +221,7 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): appArgs ={} pargNames = [arg for arg in appArgs if appArgs[arg]["positional"]] pargsDict = collections.OrderedDict(zip(pargNames,[None]*len(pargNames))) + pargs = pargsDict.keys() for arg in pargNames: pargsDict.update({arg:appArgs[arg]}) # pargNames = [arg for arg in pargsDict] From fca319b73d7a02e40e156324166ee8351a1bb5ea Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Thu, 28 Jul 2022 00:12:55 +0800 Subject: [PATCH 05/15] dockerapps working with named ports --- daliuge-engine/dlg/apps/dockerapp.py | 79 +++++++++++++++------------- daliuge-engine/dlg/drop.py | 2 +- daliuge-engine/dlg/droputils.py | 49 ++++++++++++----- 3 files changed, 78 insertions(+), 52 deletions(-) diff --git a/daliuge-engine/dlg/apps/dockerapp.py b/daliuge-engine/dlg/apps/dockerapp.py index cb9f11800..e7c18c0ec 100644 --- a/daliuge-engine/dlg/apps/dockerapp.py +++ b/daliuge-engine/dlg/apps/dockerapp.py @@ -285,9 +285,13 @@ def initialize(self, **kwargs): # "%%" at the start of the command, else it is interpreted as a normal command. # construct the actual command line from all application parameters - self.pargs, self.keyargs = droputils.serialize_applicationArgs( - self._applicationArgs, self._argumentPrefix, - self._paramValueSeparator + # self.pargs, self.keyargs = droputils.serialize_applicationArgs( + # self._applicationArgs, prefix=self._argumentPrefix, + # separator=self._paramValueSeparator + # ) + self.appArgs = droputils.clean_applicationArgs( + self._applicationArgs, prefix=self._argumentPrefix, + separator=self._paramValueSeparator ) # defer construction of complete command to run method @@ -437,17 +441,6 @@ def run(self): } dataURLInputs = {uid: i for uid, i in iitems if not droputils.has_path(i)} dataURLOutputs = {uid: o for uid, o in oitems if not droputils.has_path(o)} - self._command = f"{self._command} {' '.join(self.pargs)} "+\ - f"{' '.join(self.keyargs)}" - if self._command: - cmd = droputils.replace_path_placeholders( - self._command, dockerInputs, dockerOutputs - ) - cmd = droputils.replace_dataurl_placeholders( - cmd, dataURLInputs, dataURLOutputs - ) - else: - cmd = "" # We bind the inputs and outputs inside the docker under the utils.getDlgDir() # directory, maintaining the rest of their original paths. @@ -555,45 +548,57 @@ def run(self): # if drop does not have a path we assume it is just passing the event # Bash does not support memory drops anyway outputs_dict[uid] = drop.path if hasattr(drop, 'path') else '' - - appArgs = self._applicationArgs - pargs = [arg for arg in appArgs if appArgs[arg]["positional"]] - pargsDict = collections.OrderedDict(zip(pargs,[None]*len(pargs))) - pkeyargs = {} - logger.debug("pargs: %s; pkeyargs: %s, appArgs: %s",pargs, pkeyargs, appArgs) + logger.debug("appArgs: %s", self.appArgs) + posargs = [arg for arg in self.appArgs if self.appArgs[arg]["positional"]] + keyargs = {key:self.appArgs[key]["value"] for (key, value) in self.appArgs.items() if not self.appArgs[key]["positional"]} + posargsDict = collections.OrderedDict(zip(posargs,[None]*len(posargs))) + portkeyargs = {} + logger.debug("posargs: %s; keyargs: %s",posargs, keyargs) if "inputs" in self.parameters and isinstance(self.parameters['inputs'][0], dict): ipkeyargs = droputils.identify_named_ports( inputs_dict, self.parameters["inputs"], - pargs, - pargsDict, - appArgs, + posargs, + posargsDict, + self.appArgs, check_len=len(iitems), mode="inputs") - pkeyargs.update(ipkeyargs) + portkeyargs.update(ipkeyargs) else: - for i in range(min(len(iitems), len(pargs))): - pkeyargs.update({pargs[i]: list(self._inputs.values())[i]}) + for i in range(min(len(iitems), len(posargs))): + portkeyargs.update({posargs[i]: list(self._inputs.values())[i]}) if "outputs" in self.parameters and isinstance(self.parameters['outputs'][0], dict): opkeyargs = droputils.identify_named_ports( outputs_dict, self.parameters["outputs"], - pargs, - pargsDict, - appArgs, + posargs, + posargsDict, + self.appArgs, check_len=len(oitems), mode="outputs") - pkeyargs.update(opkeyargs) + portkeyargs.update(opkeyargs) else: - for i in range(min(len(oitems), len(pargs))): - pkeyargs.update({pargs[i]: list(self._outputs.values())[i]}) - pkeyargs = droputils.serialize_kwargs(pkeyargs, + for i in range(min(len(oitems), len(posargs))): + portkeyargs.update({posargs[i]: list(self._outputs.values())[i]}) + keyargs.update(portkeyargs) + keyargs = droputils.serialize_kwargs(keyargs, prefix=self._argumentPrefix, - separator=self._paramValueSeparator) - pargs = list(pargsDict.values()) - argumentString = f"{' '.join(pkeyargs + pargs)}" # add kwargs to end of pargs + separator=self._paramValueSeparator) if len(keyargs) > 0 else [''] + pargs = list(posargsDict.values()) + pargs = [''] if len(pargs) == 0 or None in pargs else pargs + logger.debug("After port replacement: pargs: %s; keyargs: %s",pargs, keyargs) + argumentString = f"{' '.join(keyargs + pargs)}" # add kwargs to end of pargs # complete command including all additional parameters and optional redirects - cmd = f"{cmd} {argumentString} {self._cmdLineArgs} " + cmd = f"{self._command} {argumentString} {self._cmdLineArgs} " + if cmd: + cmd = droputils.replace_path_placeholders( + cmd, dockerInputs, dockerOutputs + ) + cmd = droputils.replace_dataurl_placeholders( + cmd, dataURLInputs, dataURLOutputs + ) + else: + cmd = "" ############### # Wrap everything inside bash diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index 3d44bf19a..5094c4290 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -2422,7 +2422,7 @@ def _generateNamedOutputs(self): """ Generates a named mapping of output data drops. Can only be called during run(). """ - named_outputs: OrderedDict[str, DataDROP] = OrderedDict() + named_outputs: OrderedDict[str, DataDROP] = OrderedDict() if 'outputs' in self.parameters and isinstance(self.parameters['outputs'][0], dict): for i in range(len(self._outputs)): key = list(self.parameters['outputs'][i].values())[0] diff --git a/daliuge-engine/dlg/droputils.py b/daliuge-engine/dlg/droputils.py index 5050afb19..fad9c0c93 100644 --- a/daliuge-engine/dlg/droputils.py +++ b/daliuge-engine/dlg/droputils.py @@ -535,16 +535,17 @@ def serialize_kwargs(keyargs, prefix="--", separator=" "): if prefix == "--" and len(name) == 1: kwargs += [f"-{name} {value}"] else: - kwargs += [f"{prefix.strip()}{name.strip()}{separator.strip()}{str(value).strip()}"] + kwargs += [f"{prefix.strip()}{name.strip()}{separator}{str(value).strip()}"] logger.debug("kwargs after serialization: %s",kwargs) return kwargs - -def serialize_applicationArgs(applicationArgs, prefix="--", separator=" "): +def clean_applicationArgs(applicationArgs, prefix="--", separator=" "): """ - Unpacks the applicationArgs dictionary and returns a string - that can be used as command line parameters. + Removes arguments with None and False values, if not precious. + + Returns a dictionary with the relevant arguments only. """ + cleanedArgs = {} if not isinstance(applicationArgs, dict): logger.info("applicationArgs are not passed as a dict. Ignored!") else: @@ -560,21 +561,37 @@ def serialize_applicationArgs(applicationArgs, prefix="--", separator=" "): if vdict in [None, False, ""]: continue elif isinstance(vdict, bool): - value = "" + vdict = {"precious": False, "value": "", "positional": False} elif isinstance(vdict, dict): precious = vdict["precious"] - value = vdict["value"] - if value in [None, False, ""] and not precious: + if vdict["value"] in [None, False, ""] and not precious: continue - positional = vdict["positional"] + cleanedArgs.update({name: vdict}) + return cleanedArgs + +def serialize_applicationArgs(applicationArgs, prefix="--", separator=" "): + """ + Unpacks the applicationArgs dictionary and returns a string + that can be used as command line parameters. + """ + applicationArgs = clean_applicationArgs(applicationArgs, + prefix=prefix, separator=separator) + pargs = [] + kwargs = {} + for (name, vdict) in applicationArgs.items(): + precious = vdict["precious"] + value = vdict["value"] + if value in [None, False, ""] and not precious: + continue + positional = vdict["positional"] if positional: pargs.append(str(value).strip()) else: kwargs.update({name:value}) - kwargs = serialize_kwargs(kwargs, prefix=prefix, separator=separator) + skwargs = serialize_kwargs(kwargs, prefix=prefix, separator=separator) logger.info('Constructed command line arguments: %s %s', pargs, kwargs) # return f"{' '.join(pargs + kwargs)}" # add kwargs to end of pargs - return (pargs, kwargs) + return (pargs, skwargs) def identify_named_ports(ports, port_dict, posargs, pargsDict, appArgs, check_len=0, mode="inputs"): """ @@ -585,6 +602,7 @@ def identify_named_ports(ports, port_dict, posargs, pargsDict, appArgs, check_le # kwargs = {arg:appArgs[arg]["value"] for arg in appArgs # if not appArgs[arg]["positional"]} portargs = {} + posargs = list(posargs) for i in range(check_len): # key for final dict is value in named ports dict key = list(port_dict[i].values())[0] @@ -595,11 +613,14 @@ def identify_named_ports(ports, port_dict, posargs, pargsDict, appArgs, check_le pargsDict.update({key:value}) logger.debug("Using %s '%s' for parg %s", mode, value, key) posargs.pop(posargs.index(key)) - else: + elif key in appArgs: + # if not found in appArgs we don't put them into portargs either portargs.update({key:value}) logger.debug("Using %s '%s' for kwarg %s", mode, value, key) - _dum = appArgs.pop(key) if key in appArgs else None - logger.debug("Argument used as %s removed: %s", mode, _dum) + _dum = appArgs.pop(key) + logger.debug("Argument used as %s removed: %s", mode, _dum) + else: + logger.debug("No matching argument found for %s key %s", mode, key) logger.debug("Returning mapped ports: %s", portargs) return portargs From 6cc6501e6ab471df7128c340ae084fafafcc5673 Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Thu, 28 Jul 2022 10:14:10 +0800 Subject: [PATCH 06/15] Moved evaluation of initial cmd. Fixed type in NgasDROP --- daliuge-engine/dlg/apps/dockerapp.py | 13 ++++++------- daliuge-engine/dlg/drop.py | 2 +- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/daliuge-engine/dlg/apps/dockerapp.py b/daliuge-engine/dlg/apps/dockerapp.py index e7c18c0ec..ea6686148 100644 --- a/daliuge-engine/dlg/apps/dockerapp.py +++ b/daliuge-engine/dlg/apps/dockerapp.py @@ -485,13 +485,6 @@ def run(self): ) logger.debug(f"port mappings: {portMappings}") - # Wait until the DockerApps this application runtime depends on have - # started, and replace their IP placeholders by the real IPs - for waiter in self._waiters: - uid, ip = waiter.waitForIp() - cmd = cmd.replace("%containerIp[{0}]%".format(uid), ip) - logger.debug("Command after IP replacement is: %s", cmd) - # deal with environment variables env = {} env.update({"DLG_UID": self._uid}) @@ -600,6 +593,12 @@ def run(self): else: cmd = "" ############### + # Wait until the DockerApps this application runtime depends on have + # started, and replace their IP placeholders by the real IPs + for waiter in self._waiters: + uid, ip = waiter.waitForIp() + cmd = cmd.replace("%containerIp[{0}]%".format(uid), ip) + logger.debug("Command after IP replacement is: %s", cmd) # Wrap everything inside bash if len(cmd) > 0 and not self._noBash: diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index 59120b34c..44074571d 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -1693,7 +1693,7 @@ def generate_reproduce_data(self): # \~English Estimated size of the data contained in this node # @param[in] cparam/group_end Group end/False/Boolean/readwrite/False//False/ # \~English Is this node the end of a group? -# @param[in] cparam/ngsSrv NGAS Server/localhost/String/readwrite/False//False/ +# @param[in] cparam/ngasSrv NGAS Server/localhost/String/readwrite/False//False/ # \~English The URL of the NGAS Server # @param[in] cparam/ngasPort NGAS Port/7777/Integer/readwrite/False//False/ # \~English The port of the NGAS Server From 1ec4aba0dfb634eac971d55bc3c74c774007db53 Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Thu, 28 Jul 2022 14:56:48 +0800 Subject: [PATCH 07/15] Refactored treatment of ports (now in droputils) --- daliuge-engine/dlg/apps/bash_shell_app.py | 62 +++----------------- daliuge-engine/dlg/apps/dockerapp.py | 68 +++------------------- daliuge-engine/dlg/droputils.py | 69 ++++++++++++++++++++--- 3 files changed, 77 insertions(+), 122 deletions(-) diff --git a/daliuge-engine/dlg/apps/bash_shell_app.py b/daliuge-engine/dlg/apps/bash_shell_app.py index 82770a04d..c4a2e41d8 100644 --- a/daliuge-engine/dlg/apps/bash_shell_app.py +++ b/daliuge-engine/dlg/apps/bash_shell_app.py @@ -182,6 +182,10 @@ def initialize(self, **kwargs): self, "No command specified, cannot create BashShellApp" ) + self.appArgs = droputils.clean_applicationArgs( + self._applicationArgs, prefix=self._argumentPrefix, + separator=self._paramValueSeparator + ) self._recompute_data = {} def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): @@ -198,65 +202,15 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): method and potentially logged. """ # we currently only support passing a path for bash apps - inputs_dict = collections.OrderedDict() - for uid, drop in inputs.items(): - inputs_dict[uid] = drop.path - - outputs_dict = collections.OrderedDict() - for uid, drop in outputs.items(): - # if drop does not have a path we assume it is just passing the event - # Bash does not support memory drops anyway - outputs_dict[uid] = drop.path if hasattr(drop, 'path') else '' - session_id = ( self._dlg_session.sessionId if self._dlg_session is not None else "" ) logger.debug(f"Parameters found: {self.parameters}") - # pargs, keyargs = droputils.serialize_applicationArgs( - # self._applicationArgs, self._argumentPrefix, self._paramValueSeparator - # ) - if "applicationArgs" in self.parameters: - appArgs = self.parameters["applicationArgs"] - else: - appArgs ={} - pargNames = [arg for arg in appArgs if appArgs[arg]["positional"]] - pargsDict = collections.OrderedDict(zip(pargNames,[None]*len(pargNames))) - pargs = pargsDict.keys() - for arg in pargNames: - pargsDict.update({arg:appArgs[arg]}) - # pargNames = [arg for arg in pargsDict] - keyargs = {arg:appArgs[arg]["value"] for arg in appArgs if not appArgs[arg]["positional"]} - if "inputs" in self.parameters and isinstance(self.parameters['inputs'][0], dict): - pkeyargs = droputils.identify_named_ports( - inputs_dict, - self.parameters["inputs"], - pargNames, - pargsDict, - appArgs, - check_len=len(inputs), - mode="inputs") - keyargs.update(pkeyargs) - else: - for i in range(min(len(inputs), len(pargs))): - keyargs.update({pargs[i]: list(inputs.values())[i]}) - if "outputs" in self.parameters and isinstance(self.parameters['outputs'][0], dict): - pkeyargs = droputils.identify_named_ports( - outputs_dict, - self.parameters["outputs"], - pargs, - pargsDict, - appArgs, - check_len=len(outputs), - mode="outputs") - keyargs.update(pkeyargs) - else: - for i in range(min(len(outputs), len(pargs))): - keyargs.update({pargs[i]: list(outputs.values())[i]}) - keyargs = droputils.serialize_kwargs(keyargs, - prefix=self._argumentPrefix, + + # deal with named ports + keyargs, pargs = droputils.replace_named_ports(inputs.items(), outputs.items(), + self.parameters, self.appArgs, argumentPrefix=self._argumentPrefix, separator=self._paramValueSeparator) - logger.debug("pargNames: %s; pargsDict: %s; keyargs: %s, appArgs: %s", pargNames, pargsDict, keyargs, appArgs) - pargs = [pargsDict[arg] for arg in pargsDict] argumentString = f"{' '.join(pargs + keyargs)}" # add kwargs to end of pargs # complete command including all additional parameters and optional redirects cmd = f"{self.command} {argumentString} {self._cmdLineArgs} " diff --git a/daliuge-engine/dlg/apps/dockerapp.py b/daliuge-engine/dlg/apps/dockerapp.py index ea6686148..81d2f1953 100644 --- a/daliuge-engine/dlg/apps/dockerapp.py +++ b/daliuge-engine/dlg/apps/dockerapp.py @@ -273,6 +273,10 @@ def initialize(self, **kwargs): self._image, ) + self.appArgs = droputils.clean_applicationArgs( + self._applicationArgs, prefix=self._argumentPrefix, + separator=self._paramValueSeparator + ) self._command = self._getArg(kwargs, "command", None) self._noBash = False @@ -284,16 +288,6 @@ def initialize(self, **kwargs): # able to add any arguments straight after. This requires to use the placeholder string # "%%" at the start of the command, else it is interpreted as a normal command. - # construct the actual command line from all application parameters - # self.pargs, self.keyargs = droputils.serialize_applicationArgs( - # self._applicationArgs, prefix=self._argumentPrefix, - # separator=self._paramValueSeparator - # ) - self.appArgs = droputils.clean_applicationArgs( - self._applicationArgs, prefix=self._argumentPrefix, - separator=self._paramValueSeparator - ) - # defer construction of complete command to run method # The user used to run the process in the docker container is now always the user # who originally started the DALiuGE process as well. The information is passed through @@ -528,59 +522,11 @@ def run(self): ) logger.debug(f"Adding environment variables: {env}") - ########### # deal with named ports - # TODO: This needs to be moved to droputils as well. - # Almost exact same code as for bash - inputs_dict = collections.OrderedDict() - for uid, drop in iitems: - inputs_dict[uid] = drop.path if hasattr(drop, 'path') else '' - - outputs_dict = collections.OrderedDict() - for uid, drop in oitems: - # if drop does not have a path we assume it is just passing the event - # Bash does not support memory drops anyway - outputs_dict[uid] = drop.path if hasattr(drop, 'path') else '' - logger.debug("appArgs: %s", self.appArgs) - posargs = [arg for arg in self.appArgs if self.appArgs[arg]["positional"]] - keyargs = {key:self.appArgs[key]["value"] for (key, value) in self.appArgs.items() if not self.appArgs[key]["positional"]} - posargsDict = collections.OrderedDict(zip(posargs,[None]*len(posargs))) - portkeyargs = {} - logger.debug("posargs: %s; keyargs: %s",posargs, keyargs) - if "inputs" in self.parameters and isinstance(self.parameters['inputs'][0], dict): - ipkeyargs = droputils.identify_named_ports( - inputs_dict, - self.parameters["inputs"], - posargs, - posargsDict, - self.appArgs, - check_len=len(iitems), - mode="inputs") - portkeyargs.update(ipkeyargs) - else: - for i in range(min(len(iitems), len(posargs))): - portkeyargs.update({posargs[i]: list(self._inputs.values())[i]}) - if "outputs" in self.parameters and isinstance(self.parameters['outputs'][0], dict): - opkeyargs = droputils.identify_named_ports( - outputs_dict, - self.parameters["outputs"], - posargs, - posargsDict, - self.appArgs, - check_len=len(oitems), - mode="outputs") - portkeyargs.update(opkeyargs) - else: - for i in range(min(len(oitems), len(posargs))): - portkeyargs.update({posargs[i]: list(self._outputs.values())[i]}) - keyargs.update(portkeyargs) - keyargs = droputils.serialize_kwargs(keyargs, - prefix=self._argumentPrefix, - separator=self._paramValueSeparator) if len(keyargs) > 0 else [''] - pargs = list(posargsDict.values()) - pargs = [''] if len(pargs) == 0 or None in pargs else pargs - logger.debug("After port replacement: pargs: %s; keyargs: %s",pargs, keyargs) + keyargs, pargs = droputils.replace_named_ports(iitems, oitems, self.parameters, + self.appArgs, argumentPrefix=self._argumentPrefix, separator=self._paramValueSeparator) argumentString = f"{' '.join(keyargs + pargs)}" # add kwargs to end of pargs + # complete command including all additional parameters and optional redirects cmd = f"{self._command} {argumentString} {self._cmdLineArgs} " if cmd: diff --git a/daliuge-engine/dlg/droputils.py b/daliuge-engine/dlg/droputils.py index fad9c0c93..2fd7c52dc 100644 --- a/daliuge-engine/dlg/droputils.py +++ b/daliuge-engine/dlg/droputils.py @@ -541,7 +541,9 @@ def serialize_kwargs(keyargs, prefix="--", separator=" "): def clean_applicationArgs(applicationArgs, prefix="--", separator=" "): """ - Removes arguments with None and False values, if not precious. + Removes arguments with None and False values, if not precious. This + is in particular used for Bash and Docker app command lines, else + we would have empty values for command line arguments. Returns a dictionary with the relevant arguments only. """ @@ -571,18 +573,16 @@ def clean_applicationArgs(applicationArgs, prefix="--", separator=" "): def serialize_applicationArgs(applicationArgs, prefix="--", separator=" "): """ - Unpacks the applicationArgs dictionary and returns a string - that can be used as command line parameters. + Unpacks the applicationArgs dictionary and returns two strings, one for + positional arguments and one for kw arguments that can be used to construct + the final command line. """ applicationArgs = clean_applicationArgs(applicationArgs, prefix=prefix, separator=separator) pargs = [] kwargs = {} for (name, vdict) in applicationArgs.items(): - precious = vdict["precious"] value = vdict["value"] - if value in [None, False, ""] and not precious: - continue positional = vdict["positional"] if positional: pargs.append(str(value).strip()) @@ -590,7 +590,6 @@ def serialize_applicationArgs(applicationArgs, prefix="--", separator=" "): kwargs.update({name:value}) skwargs = serialize_kwargs(kwargs, prefix=prefix, separator=separator) logger.info('Constructed command line arguments: %s %s', pargs, kwargs) - # return f"{' '.join(pargs + kwargs)}" # add kwargs to end of pargs return (pargs, skwargs) def identify_named_ports(ports, port_dict, posargs, pargsDict, appArgs, check_len=0, mode="inputs"): @@ -624,6 +623,62 @@ def identify_named_ports(ports, port_dict, posargs, pargsDict, appArgs, check_le logger.debug("Returning mapped ports: %s", portargs) return portargs +def replace_named_ports(iitems, oitems, parameters, appArgs, argumentPrefix="--", + separator=" "): + """ + """ + inputs_dict = collections.OrderedDict() + for uid, drop in iitems: + inputs_dict[uid] = drop.path if hasattr(drop, 'path') else '' + + outputs_dict = collections.OrderedDict() + for uid, drop in oitems: + # if drop does not have a path we assume it is just passing the event + # Bash does not support memory drops anyway + outputs_dict[uid] = drop.path if hasattr(drop, 'path') else '' + logger.debug("appArgs: %s", appArgs) + # get positional args + posargs = [arg for arg in appArgs if appArgs[arg]["positional"]] + # get kwargs + keyargs = {arg:appArgs[arg]["value"] for arg in appArgs + if not appArgs[arg]["positional"]} + posargsDict = collections.OrderedDict(zip(posargs,[None]*len(posargs))) + portkeyargs = {} + logger.debug("posargs: %s; keyargs: %s",posargs, keyargs) + if "inputs" in parameters and isinstance(parameters['inputs'][0], dict): + ipkeyargs = identify_named_ports( + inputs_dict, + parameters["inputs"], + posargs, + posargsDict, + appArgs, + check_len=len(iitems), + mode="inputs") + portkeyargs.update(ipkeyargs) + else: + for i in range(min(len(iitems), len(posargs))): + portkeyargs.update({posargs[i]: iitems[i][1]}) + if "outputs" in parameters and isinstance(parameters['outputs'][0], dict): + opkeyargs = identify_named_ports( + outputs_dict, + parameters["outputs"], + posargs, + posargsDict, + appArgs, + check_len=len(oitems), + mode="outputs") + portkeyargs.update(opkeyargs) + else: + for i in range(min(len(oitems), len(posargs))): + portkeyargs.update({posargs[i]: oitems[i][1]}) + keyargs.update(portkeyargs) + keyargs = serialize_kwargs(keyargs, + prefix=argumentPrefix, + separator=separator) if len(keyargs) > 0 else [''] + pargs = list(posargsDict.values()) + pargs = [''] if len(pargs) == 0 or None in pargs else pargs + logger.debug("After port replacement: pargs: %s; keyargs: %s",pargs, keyargs) + return keyargs, pargs # Easing the transition from single- to multi-package get_leaves = common.get_leaves From 381a62404a63cb7d421a0c5d81c304a5850f197a Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Fri, 29 Jul 2022 15:58:32 +0800 Subject: [PATCH 08/15] Cleanup --- daliuge-engine/dlg/apps/bash_shell_app.py | 21 +++++++++------------ daliuge-engine/dlg/droputils.py | 22 ++++++++++++++-------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/daliuge-engine/dlg/apps/bash_shell_app.py b/daliuge-engine/dlg/apps/bash_shell_app.py index c4a2e41d8..8d081be44 100644 --- a/daliuge-engine/dlg/apps/bash_shell_app.py +++ b/daliuge-engine/dlg/apps/bash_shell_app.py @@ -202,10 +202,15 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): method and potentially logged. """ # we currently only support passing a path for bash apps - session_id = ( - self._dlg_session.sessionId if self._dlg_session is not None else "" - ) - logger.debug(f"Parameters found: {self.parameters}") + fsInputs = {uid: i for uid, i in inputs.items() if droputils.has_path(i)} + fsOutputs = {uid: o for uid, o in outputs.items() if droputils.has_path(o)} + dataURLInputs = { + uid: i for uid, i in inputs.items() if not droputils.has_path(i) + } + dataURLOutputs = { + uid: o for uid, o in outputs.items() if not droputils.has_path(o) + } + # deal with named ports keyargs, pargs = droputils.replace_named_ports(inputs.items(), outputs.items(), @@ -224,16 +229,8 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): # self.run_bash(self._command, self.uid, session_id, *args, **kwargs) # Replace inputs/outputs in command line with paths or data URLs - fsInputs = {uid: i for uid, i in inputs.items() if droputils.has_path(i)} - fsOutputs = {uid: o for uid, o in outputs.items() if droputils.has_path(o)} cmd = droputils.replace_path_placeholders(cmd, fsInputs, fsOutputs) - dataURLInputs = { - uid: i for uid, i in inputs.items() if not droputils.has_path(i) - } - dataURLOutputs = { - uid: o for uid, o in outputs.items() if not droputils.has_path(o) - } cmd = droputils.replace_dataurl_placeholders(cmd, dataURLInputs, dataURLOutputs) # Pass down daliuge-specific information to the subprocesses as environment variables diff --git a/daliuge-engine/dlg/droputils.py b/daliuge-engine/dlg/droputils.py index 2fd7c52dc..a7ddeb095 100644 --- a/daliuge-engine/dlg/droputils.py +++ b/daliuge-engine/dlg/droputils.py @@ -595,8 +595,8 @@ def serialize_applicationArgs(applicationArgs, prefix="--", separator=" "): def identify_named_ports(ports, port_dict, posargs, pargsDict, appArgs, check_len=0, mode="inputs"): """ """ - logger.debug("Using named ports to remove %s from arguments: %s %d", mode, - port_dict, check_len) + logger.debug("Using named ports to remove %s from arguments (ports, port_dict, check_len): %s %s %d", mode, + ports, port_dict, check_len) # pargsDict = collections.OrderedDict(zip(posargs,[None]*len(posargs))) # kwargs = {arg:appArgs[arg]["value"] for arg in appArgs # if not appArgs[arg]["positional"]} @@ -626,16 +626,22 @@ def identify_named_ports(ports, port_dict, posargs, pargsDict, appArgs, check_le def replace_named_ports(iitems, oitems, parameters, appArgs, argumentPrefix="--", separator=" "): """ + Function attempts to identify component arguments that match port names. + + Inputs: + iitems: itemized input port dictionary + oitems: itemized output port dictionary + parameters: """ - inputs_dict = collections.OrderedDict() + inputs_path_dict = collections.OrderedDict() for uid, drop in iitems: - inputs_dict[uid] = drop.path if hasattr(drop, 'path') else '' + inputs_path_dict[uid] = drop.path if hasattr(drop, 'path') else '' - outputs_dict = collections.OrderedDict() + outputs_path_dict = collections.OrderedDict() for uid, drop in oitems: # if drop does not have a path we assume it is just passing the event # Bash does not support memory drops anyway - outputs_dict[uid] = drop.path if hasattr(drop, 'path') else '' + outputs_path_dict[uid] = drop.path if hasattr(drop, 'path') else '' logger.debug("appArgs: %s", appArgs) # get positional args posargs = [arg for arg in appArgs if appArgs[arg]["positional"]] @@ -647,7 +653,7 @@ def replace_named_ports(iitems, oitems, parameters, appArgs, argumentPrefix="--" logger.debug("posargs: %s; keyargs: %s",posargs, keyargs) if "inputs" in parameters and isinstance(parameters['inputs'][0], dict): ipkeyargs = identify_named_ports( - inputs_dict, + inputs_path_dict, parameters["inputs"], posargs, posargsDict, @@ -660,7 +666,7 @@ def replace_named_ports(iitems, oitems, parameters, appArgs, argumentPrefix="--" portkeyargs.update({posargs[i]: iitems[i][1]}) if "outputs" in parameters and isinstance(parameters['outputs'][0], dict): opkeyargs = identify_named_ports( - outputs_dict, + outputs_path_dict, parameters["outputs"], posargs, posargsDict, From becdc37520d0b6cd10f203ba7fa93de7679f3b5f Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Fri, 29 Jul 2022 19:46:13 +0800 Subject: [PATCH 09/15] More cleanup --- daliuge-engine/dlg/apps/bash_shell_app.py | 10 ++-- daliuge-engine/dlg/apps/dockerapp.py | 12 +++-- daliuge-engine/dlg/droputils.py | 59 ++++++++++++++--------- 3 files changed, 51 insertions(+), 30 deletions(-) diff --git a/daliuge-engine/dlg/apps/bash_shell_app.py b/daliuge-engine/dlg/apps/bash_shell_app.py index 8d081be44..cc5068be6 100644 --- a/daliuge-engine/dlg/apps/bash_shell_app.py +++ b/daliuge-engine/dlg/apps/bash_shell_app.py @@ -201,7 +201,8 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): output of the process is piped to. If not given it is consumed by this method and potentially logged. """ - # we currently only support passing a path for bash apps + logger.debug("Parameters found: %s", self.parameters) + # we only support passing a path for bash apps fsInputs = {uid: i for uid, i in inputs.items() if droputils.has_path(i)} fsOutputs = {uid: o for uid, o in outputs.items() if droputils.has_path(o)} dataURLInputs = { @@ -211,10 +212,13 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): uid: o for uid, o in outputs.items() if not droputils.has_path(o) } - # deal with named ports + inport_names = self.parameters['inputs'] \ + if "inputs" in self.parameters else [] + outport_names = self.parameters['outputs'] \ + if "outputs" in self.parameters else [] keyargs, pargs = droputils.replace_named_ports(inputs.items(), outputs.items(), - self.parameters, self.appArgs, argumentPrefix=self._argumentPrefix, + inport_names, outport_names, self.appArgs, argumentPrefix=self._argumentPrefix, separator=self._paramValueSeparator) argumentString = f"{' '.join(pargs + keyargs)}" # add kwargs to end of pargs # complete command including all additional parameters and optional redirects diff --git a/daliuge-engine/dlg/apps/dockerapp.py b/daliuge-engine/dlg/apps/dockerapp.py index 81d2f1953..fbf6958ca 100644 --- a/daliuge-engine/dlg/apps/dockerapp.py +++ b/daliuge-engine/dlg/apps/dockerapp.py @@ -523,9 +523,15 @@ def run(self): logger.debug(f"Adding environment variables: {env}") # deal with named ports - keyargs, pargs = droputils.replace_named_ports(iitems, oitems, self.parameters, - self.appArgs, argumentPrefix=self._argumentPrefix, separator=self._paramValueSeparator) - argumentString = f"{' '.join(keyargs + pargs)}" # add kwargs to end of pargs + inport_names = self.parameters['inputs'] \ + if "inputs" in self.parameters else [] + outport_names = self.parameters['outputs'] \ + if "outputs" in self.parameters else [] + keyargs, pargs = droputils.replace_named_ports(iitems, oitems, + inport_names, outport_names, self.appArgs, + argumentPrefix=self._argumentPrefix, + separator=self._paramValueSeparator) + argumentString = f"{' '.join(keyargs + pargs)}" # complete command including all additional parameters and optional redirects cmd = f"{self._command} {argumentString} {self._cmdLineArgs} " diff --git a/daliuge-engine/dlg/droputils.py b/daliuge-engine/dlg/droputils.py index a7ddeb095..7d77ac8bd 100644 --- a/daliuge-engine/dlg/droputils.py +++ b/daliuge-engine/dlg/droputils.py @@ -592,21 +592,18 @@ def serialize_applicationArgs(applicationArgs, prefix="--", separator=" "): logger.info('Constructed command line arguments: %s %s', pargs, kwargs) return (pargs, skwargs) -def identify_named_ports(ports, port_dict, posargs, pargsDict, appArgs, check_len=0, mode="inputs"): +def identify_named_ports(port_dict, posargs, pargsDict, + appArgs, check_len=0, mode="inputs"): """ """ - logger.debug("Using named ports to remove %s from arguments (ports, port_dict, check_len): %s %s %d", mode, - ports, port_dict, check_len) - # pargsDict = collections.OrderedDict(zip(posargs,[None]*len(posargs))) - # kwargs = {arg:appArgs[arg]["value"] for arg in appArgs - # if not appArgs[arg]["positional"]} + logger.debug("Using named ports to remove %s from arguments port_dict, check_len): %s %d", + mode, port_dict, check_len) portargs = {} posargs = list(posargs) + keys = list(port_dict.keys()) for i in range(check_len): - # key for final dict is value in named ports dict - key = list(port_dict[i].values())[0] - # value for final dict is value in ports dict - value = ports[list(port_dict[i].keys())[0]] + key = port_dict[keys[i]]['name'] + value = port_dict[keys[i]]['path'] if not value: value = '' # make sure we are passing NULL drop events if key in posargs: pargsDict.update({key:value}) @@ -623,8 +620,17 @@ def identify_named_ports(ports, port_dict, posargs, pargsDict, appArgs, check_le logger.debug("Returning mapped ports: %s", portargs) return portargs -def replace_named_ports(iitems, oitems, parameters, appArgs, argumentPrefix="--", - separator=" "): +def check_ports_dict(ports:list) -> bool: + """ + Checks whether all ports in ports list are of type dict + + Input: list of ports + """ + return all(isinstance(p, dict) for p in ports) + + +def replace_named_ports(iitems, oitems, inport_names, outport_names, + appArgs, argumentPrefix="--", separator=" "): """ Function attempts to identify component arguments that match port names. @@ -633,15 +639,15 @@ def replace_named_ports(iitems, oitems, parameters, appArgs, argumentPrefix="--" oitems: itemized output port dictionary parameters: """ - inputs_path_dict = collections.OrderedDict() + logger.debug("iitems: %s; inport_names: %s; outport_names: %s", + iitems, inport_names, outport_names) + inputs_dict = {} for uid, drop in iitems: - inputs_path_dict[uid] = drop.path if hasattr(drop, 'path') else '' + inputs_dict[uid] = {'path': drop.path if hasattr(drop, 'path') else ''} - outputs_path_dict = collections.OrderedDict() + outputs_dict = {} for uid, drop in oitems: - # if drop does not have a path we assume it is just passing the event - # Bash does not support memory drops anyway - outputs_path_dict[uid] = drop.path if hasattr(drop, 'path') else '' + outputs_dict[uid] = {'path': drop.path if hasattr(drop, 'path') else ''} logger.debug("appArgs: %s", appArgs) # get positional args posargs = [arg for arg in appArgs if appArgs[arg]["positional"]] @@ -651,10 +657,12 @@ def replace_named_ports(iitems, oitems, parameters, appArgs, argumentPrefix="--" posargsDict = collections.OrderedDict(zip(posargs,[None]*len(posargs))) portkeyargs = {} logger.debug("posargs: %s; keyargs: %s",posargs, keyargs) - if "inputs" in parameters and isinstance(parameters['inputs'][0], dict): + if check_ports_dict(inport_names): + for inport in inport_names: + key = list(inport.keys())[0] + inputs_dict[key].update({'name':inport[key]}) ipkeyargs = identify_named_ports( - inputs_path_dict, - parameters["inputs"], + inputs_dict, posargs, posargsDict, appArgs, @@ -664,10 +672,13 @@ def replace_named_ports(iitems, oitems, parameters, appArgs, argumentPrefix="--" else: for i in range(min(len(iitems), len(posargs))): portkeyargs.update({posargs[i]: iitems[i][1]}) - if "outputs" in parameters and isinstance(parameters['outputs'][0], dict): + + if check_ports_dict(outport_names): + for outport in outport_names: + key = list(outport.keys())[0] + outputs_dict[key].update({'name':outport[key]}) opkeyargs = identify_named_ports( - outputs_path_dict, - parameters["outputs"], + outputs_dict, posargs, posargsDict, appArgs, From 61820cef0fd397f3abdf2972f54537ef2fa4029c Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Sat, 30 Jul 2022 14:18:24 +0800 Subject: [PATCH 10/15] Adjusted pyfunc to named ports updates --- daliuge-engine/dlg/apps/pyfunc.py | 27 +++++++++++++++++++++------ daliuge-engine/dlg/droputils.py | 18 +++++++++++++----- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index 2653d23d3..978b01b5a 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -38,6 +38,7 @@ from dlg import droputils, utils from dlg.drop import BarrierAppDROP +from pyparsing import col from dlg.exceptions import InvalidDropException from dlg.meta import ( dlg_string_param, @@ -461,12 +462,18 @@ def optionalEval(x): else: appArgs = {} - if ('inputs' in self.parameters and isinstance(self.parameters['inputs'][0], dict)): + if ('inputs' in self.parameters and + droputils.check_ports_dict(self.parameters['inputs'])): check_len = min(len(inputs),self.fn_nargs+ len(self.arguments.kwonlyargs)) + inputs_dict = collections.OrderedDict() + for inport in self.parameters['inputs']: + key = list(inport.keys())[0] + inputs_dict[key] = { + 'name':inport[key], + 'path':inputs[key]} kwargs.update(droputils.identify_named_ports( - inputs, - self.parameters['inputs'], + inputs_dict, posargs, pargsDict, appArgs, @@ -479,12 +486,20 @@ def optionalEval(x): logger.debug(f"Updating funcargs with input ports {kwargs}") funcargs.update(kwargs) - if ('outputs' in self.parameters and isinstance(self.parameters['outputs'][0], dict)): + if ('outputs' in self.parameters and + droputils.check_ports_dict(self.parameters['outputs'])): check_len = min(len(outputs),self.fn_nargs+ len(self.arguments.kwonlyargs)) + outputs_dict = collections.OrderedDict() + for outport in self.parameters['outputs']: + key = list(outport.keys())[0] + outputs_dict[key] = { + 'name':outport[key], + 'path': outputs[key] + } + kwargs.update(droputils.identify_named_ports( - outputs, - self.parameters['outputs'], + outputs_dict, posargs, pargsDict, appArgs, diff --git a/daliuge-engine/dlg/droputils.py b/daliuge-engine/dlg/droputils.py index 7d77ac8bd..e961f3967 100644 --- a/daliuge-engine/dlg/droputils.py +++ b/daliuge-engine/dlg/droputils.py @@ -602,8 +602,12 @@ def identify_named_ports(port_dict, posargs, pargsDict, posargs = list(posargs) keys = list(port_dict.keys()) for i in range(check_len): - key = port_dict[keys[i]]['name'] - value = port_dict[keys[i]]['path'] + try: + key = port_dict[keys[i]]['name'] + value = port_dict[keys[i]]['path'] + except KeyError: + print(port_dict) + raise KeyError if not value: value = '' # make sure we are passing NULL drop events if key in posargs: pargsDict.update({key:value}) @@ -626,7 +630,11 @@ def check_ports_dict(ports:list) -> bool: Input: list of ports """ - return all(isinstance(p, dict) for p in ports) + # all returns true if list is empty! + if len(ports) > 0: + return all(isinstance(p, dict) for p in ports) + else: + return False def replace_named_ports(iitems, oitems, inport_names, outport_names, @@ -641,11 +649,11 @@ def replace_named_ports(iitems, oitems, inport_names, outport_names, """ logger.debug("iitems: %s; inport_names: %s; outport_names: %s", iitems, inport_names, outport_names) - inputs_dict = {} + inputs_dict = collections.OrderedDict() for uid, drop in iitems: inputs_dict[uid] = {'path': drop.path if hasattr(drop, 'path') else ''} - outputs_dict = {} + outputs_dict = collections.OrderedDict() for uid, drop in oitems: outputs_dict[uid] = {'path': drop.path if hasattr(drop, 'path') else ''} logger.debug("appArgs: %s", appArgs) From 6eef43c06e12571bf6fdc2a2507e690586c7d8bb Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Sat, 30 Jul 2022 15:19:31 +0800 Subject: [PATCH 11/15] Cleanup and doc strings --- daliuge-engine/dlg/apps/bash_shell_app.py | 9 ---- daliuge-engine/dlg/droputils.py | 50 +++++++++++++++++++---- 2 files changed, 42 insertions(+), 17 deletions(-) diff --git a/daliuge-engine/dlg/apps/bash_shell_app.py b/daliuge-engine/dlg/apps/bash_shell_app.py index cc5068be6..b0d6826e5 100644 --- a/daliuge-engine/dlg/apps/bash_shell_app.py +++ b/daliuge-engine/dlg/apps/bash_shell_app.py @@ -230,7 +230,6 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): cmd = cmd.strip() app_uid = self.uid - # self.run_bash(self._command, self.uid, session_id, *args, **kwargs) # Replace inputs/outputs in command line with paths or data URLs cmd = droputils.replace_path_placeholders(cmd, fsInputs, fsOutputs) @@ -244,14 +243,6 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): env.update({"DLG_SESSION_ID": self._dlg_session.sessionId}) env.update({"DLG_ROOT": utils.getDlgDir()}) - # # try to change to session directory, else just stay in current - # work_dir = utils.getDlgWorkDir() - # session_dir = f"{work_dir}/{session_id}" - # try: - # os.chdir(session_dir) - # logger.info("Changed to session directory: %s" % session_dir) - # except: - # logger.warning("Changing to session directory %s unsuccessful!" % session_dir) # Wrap everything inside bash cmd = ("/bin/bash", "-c", cmd) diff --git a/daliuge-engine/dlg/droputils.py b/daliuge-engine/dlg/droputils.py index e961f3967..65d475e4c 100644 --- a/daliuge-engine/dlg/droputils.py +++ b/daliuge-engine/dlg/droputils.py @@ -31,7 +31,7 @@ import re import threading import traceback -from typing import IO, Any, AsyncIterable, BinaryIO, Dict, Iterable, overload +from typing import IO, Any, AsyncIterable, BinaryIO, Dict, Iterable, OrderedDict, Tuple, overload import numpy as np from dlg.ddap_protocol import DROPStates @@ -592,9 +592,24 @@ def serialize_applicationArgs(applicationArgs, prefix="--", separator=" "): logger.info('Constructed command line arguments: %s %s', pargs, kwargs) return (pargs, skwargs) -def identify_named_ports(port_dict, posargs, pargsDict, - appArgs, check_len=0, mode="inputs"): +def identify_named_ports( + port_dict:dict, + posargs:list, + pargsDict:dict, + appArgs:dict, + check_len: int=0, + mode: str="inputs" + ) -> dict: """ + Checks port names for matches with arguments and returns mapped ports. + + Args: + port_dict (dict): ports {uid:name,...} + posargs (list): available positional arguments (will be modified) + pargsDict (dict): mapped arguments (will be modified) + + Returns: + dict: port arguments """ logger.debug("Using named ports to remove %s from arguments port_dict, check_len): %s %d", mode, port_dict, check_len) @@ -626,9 +641,14 @@ def identify_named_ports(port_dict, posargs, pargsDict, def check_ports_dict(ports:list) -> bool: """ - Checks whether all ports in ports list are of type dict + Checks whether all ports in ports list are of type dict. This is + for backwards compatibility. + + Args: + ports (list): - Input: list of ports + Returns: + bool: True if all ports are dict, else False """ # all returns true if list is empty! if len(ports) > 0: @@ -637,15 +657,28 @@ def check_ports_dict(ports:list) -> bool: return False -def replace_named_ports(iitems, oitems, inport_names, outport_names, - appArgs, argumentPrefix="--", separator=" "): +def replace_named_ports( + iitems:dict, + oitems:dict, + inport_names:dict, + outport_names:dict, + appArgs:dict, argumentPrefix="--", + separator=" " + ) -> Tuple[str, str]: """ Function attempts to identify component arguments that match port names. Inputs: iitems: itemized input port dictionary oitems: itemized output port dictionary - parameters: + inport_names: dictionary of input port names (key: uid) + outport_names: dictionary of output port names (key: uid) + appArgs: dictionary of all arguments + argumentPrefix: prefix for keyword arguments + separator: character used between keyword and value + + Returns: + tuple of serialized keyword arguments and positional arguments """ logger.debug("iitems: %s; inport_names: %s; outport_names: %s", iitems, inport_names, outport_names) @@ -669,6 +702,7 @@ def replace_named_ports(iitems, oitems, inport_names, outport_names, for inport in inport_names: key = list(inport.keys())[0] inputs_dict[key].update({'name':inport[key]}) + ipkeyargs = identify_named_ports( inputs_dict, posargs, From fe1853aef43c6a731fb42814f9a36e983b9f3cb5 Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Sat, 30 Jul 2022 22:42:13 +0800 Subject: [PATCH 12/15] Cleanup of args after matching ports --- daliuge-engine/dlg/apps/bash_shell_app.py | 4 +- daliuge-engine/dlg/apps/dockerapp.py | 8 ++-- daliuge-engine/dlg/droputils.py | 58 ++++++++++++++--------- 3 files changed, 39 insertions(+), 31 deletions(-) diff --git a/daliuge-engine/dlg/apps/bash_shell_app.py b/daliuge-engine/dlg/apps/bash_shell_app.py index ab717d468..b91845ccb 100644 --- a/daliuge-engine/dlg/apps/bash_shell_app.py +++ b/daliuge-engine/dlg/apps/bash_shell_app.py @@ -183,9 +183,7 @@ def initialize(self, **kwargs): ) self.appArgs = droputils.clean_applicationArgs( - self._applicationArgs, prefix=self._argumentPrefix, - separator=self._paramValueSeparator - ) + self._applicationArgs) self._recompute_data = {} def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): diff --git a/daliuge-engine/dlg/apps/dockerapp.py b/daliuge-engine/dlg/apps/dockerapp.py index 2c357a6c6..a1a7e3b93 100644 --- a/daliuge-engine/dlg/apps/dockerapp.py +++ b/daliuge-engine/dlg/apps/dockerapp.py @@ -254,10 +254,6 @@ def initialize(self, **kwargs): self._image, ) - self.appArgs = droputils.clean_applicationArgs( - self._applicationArgs, prefix=self._argumentPrefix, - separator=self._paramValueSeparator - ) self._command = self._getArg(kwargs, "command", None) self._noBash = False @@ -504,14 +500,16 @@ def run(self): logger.debug(f"Adding environment variables: {env}") # deal with named ports + appArgs = self._applicationArgs inport_names = self.parameters['inputs'] \ if "inputs" in self.parameters else [] outport_names = self.parameters['outputs'] \ if "outputs" in self.parameters else [] keyargs, pargs = droputils.replace_named_ports(iitems, oitems, - inport_names, outport_names, self.appArgs, + inport_names, outport_names, appArgs, argumentPrefix=self._argumentPrefix, separator=self._paramValueSeparator) + argumentString = f"{' '.join(keyargs + pargs)}" # complete command including all additional parameters and optional redirects diff --git a/daliuge-engine/dlg/droputils.py b/daliuge-engine/dlg/droputils.py index 65d475e4c..9eacc5d06 100644 --- a/daliuge-engine/dlg/droputils.py +++ b/daliuge-engine/dlg/droputils.py @@ -539,26 +539,23 @@ def serialize_kwargs(keyargs, prefix="--", separator=" "): logger.debug("kwargs after serialization: %s",kwargs) return kwargs -def clean_applicationArgs(applicationArgs, prefix="--", separator=" "): +def clean_applicationArgs(applicationArgs:dict) -> dict: """ Removes arguments with None and False values, if not precious. This is in particular used for Bash and Docker app command lines, else we would have empty values for command line arguments. - Returns a dictionary with the relevant arguments only. + Args: + applicationsArgs (dict): the complete set of arguments + + Returns: + dict: a dictionary with the relevant arguments only. """ cleanedArgs = {} if not isinstance(applicationArgs, dict): logger.info("applicationArgs are not passed as a dict. Ignored!") else: logger.info("ApplicationArgs found %s", applicationArgs) - # construct the actual command line from all application parameters - kwargs = {arg:applicationArgs[arg]["value"] for arg in applicationArgs - if not applicationArgs[arg]["positional"]} - # kwargs = {} - pargs = [] - positional = False - precious = False for (name, vdict) in applicationArgs.items(): if vdict in [None, False, ""]: continue @@ -595,8 +592,8 @@ def serialize_applicationArgs(applicationArgs, prefix="--", separator=" "): def identify_named_ports( port_dict:dict, posargs:list, - pargsDict:dict, - appArgs:dict, + pargsDict:dict, + keyargs: dict, check_len: int=0, mode: str="inputs" ) -> dict: @@ -610,6 +607,9 @@ def identify_named_ports( Returns: dict: port arguments + + Side effect: + modifies the pargsDict OrderedDict """ logger.debug("Using named ports to remove %s from arguments port_dict, check_len): %s %d", mode, port_dict, check_len) @@ -621,22 +621,21 @@ def identify_named_ports( key = port_dict[keys[i]]['name'] value = port_dict[keys[i]]['path'] except KeyError: - print(port_dict) + logger.debug("portDict: %s", port_dict) raise KeyError if not value: value = '' # make sure we are passing NULL drop events if key in posargs: pargsDict.update({key:value}) logger.debug("Using %s '%s' for parg %s", mode, value, key) posargs.pop(posargs.index(key)) - elif key in appArgs: + elif key in keyargs: # if not found in appArgs we don't put them into portargs either portargs.update({key:value}) logger.debug("Using %s '%s' for kwarg %s", mode, value, key) - _dum = appArgs.pop(key) - logger.debug("Argument used as %s removed: %s", mode, _dum) + _dum = keyargs.pop(key) # remove from original arg list else: logger.debug("No matching argument found for %s key %s", mode, key) - logger.debug("Returning mapped ports: %s", portargs) + logger.debug("Returning kw mapped ports: %s", portargs) return portargs def check_ports_dict(ports:list) -> bool: @@ -695,7 +694,9 @@ def replace_named_ports( # get kwargs keyargs = {arg:appArgs[arg]["value"] for arg in appArgs if not appArgs[arg]["positional"]} - posargsDict = collections.OrderedDict(zip(posargs,[None]*len(posargs))) + # we will need an ordered dict for all positional arguments + # thus we create it here and fill it with values + portPosargsDict = collections.OrderedDict(zip(posargs,[None]*len(posargs))) portkeyargs = {} logger.debug("posargs: %s; keyargs: %s",posargs, keyargs) if check_ports_dict(inport_names): @@ -706,8 +707,8 @@ def replace_named_ports( ipkeyargs = identify_named_ports( inputs_dict, posargs, - posargsDict, - appArgs, + portPosargsDict, + keyargs, check_len=len(iitems), mode="inputs") portkeyargs.update(ipkeyargs) @@ -722,19 +723,30 @@ def replace_named_ports( opkeyargs = identify_named_ports( outputs_dict, posargs, - posargsDict, - appArgs, + portPosargsDict, + keyargs, check_len=len(oitems), mode="outputs") portkeyargs.update(opkeyargs) else: for i in range(min(len(oitems), len(posargs))): portkeyargs.update({posargs[i]: oitems[i][1]}) - keyargs.update(portkeyargs) + # now that we have the mapped ports we can cleanup the appArgs + # and construct the final keyargs and pargs + appArgs = clean_applicationArgs(appArgs) + # get cleaned positional args + posargs = {arg:appArgs[arg]["value"] for arg in appArgs + if appArgs[arg]["positional"]} + # get cleaned kwargs + keyargs = {arg:appArgs[arg]["value"] for arg in appArgs + if not appArgs[arg]["positional"]} + # update port dictionaries + portkeyargs.update(keyargs) + portPosargsDict.update(posargs) keyargs = serialize_kwargs(keyargs, prefix=argumentPrefix, separator=separator) if len(keyargs) > 0 else [''] - pargs = list(posargsDict.values()) + pargs = list(portPosargsDict.values()) pargs = [''] if len(pargs) == 0 or None in pargs else pargs logger.debug("After port replacement: pargs: %s; keyargs: %s",pargs, keyargs) return keyargs, pargs From bf568f60019f5bb2aa77e0800f56ad69845ae194 Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Sat, 30 Jul 2022 23:46:32 +0800 Subject: [PATCH 13/15] Dummy argument not required to have value anymore --- daliuge-engine/dlg/apps/bash_shell_app.py | 9 ++++++--- daliuge-engine/dlg/droputils.py | 9 +++++++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/daliuge-engine/dlg/apps/bash_shell_app.py b/daliuge-engine/dlg/apps/bash_shell_app.py index b91845ccb..583124ef2 100644 --- a/daliuge-engine/dlg/apps/bash_shell_app.py +++ b/daliuge-engine/dlg/apps/bash_shell_app.py @@ -182,8 +182,7 @@ def initialize(self, **kwargs): self, "No command specified, cannot create BashShellApp" ) - self.appArgs = droputils.clean_applicationArgs( - self._applicationArgs) + self.appArgs = self._applicationArgs self._recompute_data = {} def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): @@ -220,7 +219,11 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): separator=self._paramValueSeparator) argumentString = f"{' '.join(pargs + keyargs)}" # add kwargs to end of pargs # complete command including all additional parameters and optional redirects - cmd = f"{self.command} {argumentString} {self._cmdLineArgs} " + if len(argumentString.strip()) > 0: + # the _cmdLineArgs would very likely make the command line invalid + cmd = f"{self.command} {argumentString} " + else: + cmd = f"{self.command} {argumentString} {self._cmdLineArgs} " if self._outputRedirect: cmd = f"{cmd} > {self._outputRedirect}" if self._inputRedirect: diff --git a/daliuge-engine/dlg/droputils.py b/daliuge-engine/dlg/droputils.py index 9eacc5d06..6e4651428 100644 --- a/daliuge-engine/dlg/droputils.py +++ b/daliuge-engine/dlg/droputils.py @@ -733,6 +733,7 @@ def replace_named_ports( portkeyargs.update({posargs[i]: oitems[i][1]}) # now that we have the mapped ports we can cleanup the appArgs # and construct the final keyargs and pargs + logger.debug("Arguments from ports: %s %s", portkeyargs, portPosargsDict) appArgs = clean_applicationArgs(appArgs) # get cleaned positional args posargs = {arg:appArgs[arg]["value"] for arg in appArgs @@ -741,8 +742,12 @@ def replace_named_ports( keyargs = {arg:appArgs[arg]["value"] for arg in appArgs if not appArgs[arg]["positional"]} # update port dictionaries - portkeyargs.update(keyargs) - portPosargsDict.update(posargs) + # portkeyargs.update({key:arg for key, arg in keyargs.items() + # if key not in portkeyargs}) + # portPosargsDict.update({key:arg for key, arg in posargs.items() + # if key not in portPosargsDict}) + keyargs.update(portkeyargs) + posargs.update(portPosargsDict) keyargs = serialize_kwargs(keyargs, prefix=argumentPrefix, separator=separator) if len(keyargs) > 0 else [''] From 18a776dd630c311e638be18e4a8a79750c565fa0 Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Sun, 31 Jul 2022 12:50:12 +0800 Subject: [PATCH 14/15] Skip multi-processing test and fix relative loading of graphs --- daliuge-engine/test/apps/test_simple.py | 7 +- .../test/reproducibility/testSingle.graph | 85 ++++++++++ .../test/reproducibility/test_lg_blockdag.py | 152 +++++++++--------- .../test/reproducibility/test_toposort.py | 17 +- 4 files changed, 175 insertions(+), 86 deletions(-) create mode 100644 daliuge-engine/test/reproducibility/testSingle.graph diff --git a/daliuge-engine/test/apps/test_simple.py b/daliuge-engine/test/apps/test_simple.py index 826919678..f9b5027ee 100644 --- a/daliuge-engine/test/apps/test_simple.py +++ b/daliuge-engine/test/apps/test_simple.py @@ -317,9 +317,10 @@ def test_multi_listappendthrashing(self, size=1000, parallel=True): # Must be called to unlink all shared memory memory_manager.shutdown_all() - @unittest.skipIf( - sys.version_info < (3, 8), "Multiprocessing not compatible with Python < 3.8" - ) + # @unittest.skipIf( + # sys.version_info < (3, 8), "Multiprocessing not compatible with Python < 3.8" + # ) + @unittest.skip def test_speedup(self): """ Run serial and parallel test and report speedup. diff --git a/daliuge-engine/test/reproducibility/testSingle.graph b/daliuge-engine/test/reproducibility/testSingle.graph new file mode 100644 index 000000000..8382b4eb2 --- /dev/null +++ b/daliuge-engine/test/reproducibility/testSingle.graph @@ -0,0 +1,85 @@ +{ + "linkDataArray": [], + "modelData": { + "filePath": "testSingle.graph", + "fileType": "graph", + "git_url": "", + "repo": "", + "repoBranch": "", + "repoService": "Unknown", + "sha": "" + }, + "nodeDataArray": [ + { + "canHaveInputs": true, + "canHaveOutputs": true, + "category": "BashShellApp", + "categoryType": "Application", + "collapsed": false, + "color": "#1C2833", + "description": "An application component run within the Bash Shell", + "drawOrderHint": 0, + "exitApplicationName": "", + "exitApplicationType": "None", + "expanded": false, + "fields": [ + { + "description": "", + "name": "execution_time", + "text": "Execution time", + "value": "5" + }, + { + "description": "", + "name": "num_cpus", + "text": "Num CPUs", + "value": "1" + }, + { + "description": "", + "name": "group_start", + "text": "Group start", + "value": "0" + }, + { + "description": "The command line to be executed", + "name": "Arg01", + "text": "Arg01", + "value": "" + } + ], + "height": 200, + "inputAppFields": [], + "inputApplicationName": "", + "inputApplicationType": "None", + "inputLocalPorts": [], + "inputPorts": [ + { + "Id": "e9c700b7-778e-406e-a124-c3ab86903ef6", + "IdText": "event" + } + ], + "isData": false, + "isGroup": false, + "key": -1, + "outputAppFields": [], + "outputApplicationName": "", + "outputApplicationType": "None", + "outputLocalPorts": [], + "outputPorts": [ + { + "Id": "a2dc769f-9745-4440-851d-c8a9f7f66720", + "IdText": "event" + } + ], + "selected": true, + "showPorts": false, + "streaming": false, + "subject": null, + "text": "Bash Shell App", + "width": 200, + "x": 1044, + "y": 208 + } + ] +} \ No newline at end of file diff --git a/daliuge-engine/test/reproducibility/test_lg_blockdag.py b/daliuge-engine/test/reproducibility/test_lg_blockdag.py index ed66f29ea..02a39add6 100644 --- a/daliuge-engine/test/reproducibility/test_lg_blockdag.py +++ b/daliuge-engine/test/reproducibility/test_lg_blockdag.py @@ -29,6 +29,7 @@ import json import unittest +import pkg_resources from dlg.common.reproducibility.constants import ReproducibilityFlags, ALL_RMODES from dlg.common.reproducibility.reproducibility import ( @@ -39,9 +40,8 @@ def _init_graph(filename): - file = open(filename) - lgt = json.load(file) - file.close() + with pkg_resources.resource_stream("test.reproducibility", filename) as file: + lgt = json.load(file) return lgt @@ -58,7 +58,9 @@ def test_single(self): Tests a single drop A """ - lgt = _init_graph("test/reproducibility/topoGraphs/testSingle.graph") + # f = pkg_resources.resource_stream("test.reproducibility","topoGraphs/testSingle.graph") + # lgt = json.load(f) + lgt = _init_graph("topoGraphs/testSingle.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -71,7 +73,7 @@ def test_twostart(self): C B --> """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoStart.graph") + lgt = _init_graph("topoGraphs/testTwoStart.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -91,7 +93,7 @@ def test_twoend(self): A --> C """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoEnd.graph") + lgt = _init_graph("topoGraphs/testTwoEnd.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -103,7 +105,7 @@ def test_twolines(self): A --> B C --> D """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoLines.graph") + lgt = _init_graph("topoGraphs/testTwoLines.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -113,7 +115,7 @@ def test_data_fan(self): """ Tests that a single data source scatters its signature to downstream data drops. """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFan.graph") + lgt = _init_graph("topoGraphs/dataFan.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -130,7 +132,7 @@ def test_data_funnel(self): """ Tests that two data sources are collected in a single downstream data drop """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFunnel.graph") + lgt = _init_graph("topoGraphs/dataFunnel.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -145,7 +147,7 @@ def test_data_sandwich(self): Tests two data drops with an interim computing drop :return: """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataSandwich.graph") + lgt = _init_graph("topoGraphs/dataSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -159,7 +161,7 @@ def test_computation_sandwich(self): """ Tests that an internal data drop surrounded by computing drops is handled correctly. """ - lgt = _init_graph("test/reproducibility/topoGraphs/computationSandwich.graph") + lgt = _init_graph("topoGraphs/computationSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -183,7 +185,7 @@ def test_single(self): Tests a single drop A """ - lgt = _init_graph("test/reproducibility/topoGraphs/testSingle.graph") + lgt = _init_graph("topoGraphs/testSingle.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -196,7 +198,7 @@ def test_twostart(self): C B --> """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoStart.graph") + lgt = _init_graph("topoGraphs/testTwoStart.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -216,7 +218,7 @@ def test_twoend(self): A --> C """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoEnd.graph") + lgt = _init_graph("topoGraphs/testTwoEnd.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -228,7 +230,7 @@ def test_twolines(self): A --> B C --> D """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoLines.graph") + lgt = _init_graph("topoGraphs/testTwoLines.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -238,7 +240,7 @@ def test_data_fan(self): """ Tests that a single data source scatters its signature to downstream data drops. """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFan.graph") + lgt = _init_graph("topoGraphs/dataFan.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -255,7 +257,7 @@ def test_data_funnel(self): """ Tests that two data sources are collected in a single downstream data drop """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFunnel.graph") + lgt = _init_graph("topoGraphs/dataFunnel.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -270,7 +272,7 @@ def test_data_sandwich(self): Tests two data drops with an interim computing drop :return: """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataSandwich.graph") + lgt = _init_graph("topoGraphs/dataSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -284,7 +286,7 @@ def test_computation_sandwich(self): """ Tests that an internal data drop surrounded by computing drops is handled correctly. """ - lgt = _init_graph("test/reproducibility/topoGraphs/computationSandwich.graph") + lgt = _init_graph("topoGraphs/computationSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -308,7 +310,7 @@ def test_single(self): Tests a single drop A """ - lgt = _init_graph("test/reproducibility/topoGraphs/testSingle.graph") + lgt = _init_graph("topoGraphs/testSingle.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -321,7 +323,7 @@ def test_twostart(self): C B --> """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoStart.graph") + lgt = _init_graph("topoGraphs/testTwoStart.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -341,7 +343,7 @@ def test_twoend(self): A --> C """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoEnd.graph") + lgt = _init_graph("topoGraphs/testTwoEnd.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -353,7 +355,7 @@ def test_twolines(self): A --> B C --> D """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoLines.graph") + lgt = _init_graph("topoGraphs/testTwoLines.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -363,7 +365,7 @@ def test_data_fan(self): """ Tests that a single data source scatters its signature to downstream data drops. """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFan.graph") + lgt = _init_graph("topoGraphs/dataFan.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -380,7 +382,7 @@ def test_data_funnel(self): """ Tests that two data sources are collected in a single downstream data drop """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFunnel.graph") + lgt = _init_graph("topoGraphs/dataFunnel.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -395,7 +397,7 @@ def test_data_sandwich(self): Tests two data drops with an interim computing drop :return: """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataSandwich.graph") + lgt = _init_graph("topoGraphs/dataSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -409,7 +411,7 @@ def test_computation_sandwich(self): """ Tests that an internal data drop surrounded by computing drops is handled correctly. """ - lgt = _init_graph("test/reproducibility/topoGraphs/computationSandwich.graph") + lgt = _init_graph("topoGraphs/computationSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -434,7 +436,7 @@ def test_single(self): Tests a single drop A """ - lgt = _init_graph("test/reproducibility/topoGraphs/testSingle.graph") + lgt = _init_graph("topoGraphs/testSingle.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -447,7 +449,7 @@ def test_twostart(self): C B --> """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoStart.graph") + lgt = _init_graph("topoGraphs/testTwoStart.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -471,7 +473,7 @@ def test_twoend(self): A --> C """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoEnd.graph") + lgt = _init_graph("topoGraphs/testTwoEnd.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -483,7 +485,7 @@ def test_twolines(self): A --> B C --> D """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoLines.graph") + lgt = _init_graph("topoGraphs/testTwoLines.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -493,7 +495,7 @@ def test_data_fan(self): """ Tests that a single data source scatters its signature to downstream data drops. """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFan.graph") + lgt = _init_graph("topoGraphs/dataFan.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -510,7 +512,7 @@ def test_data_funnel(self): """ Tests that two data sources are collected in a single downstream data drop """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFunnel.graph") + lgt = _init_graph("topoGraphs/dataFunnel.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -528,7 +530,7 @@ def test_data_sandwich(self): Tests two data drops with an interim computing drop :return: """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataSandwich.graph") + lgt = _init_graph("topoGraphs/dataSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -542,7 +544,7 @@ def test_computation_sandwich(self): """ Tests that an internal data drop surrounded by computing drops is handled correctly. """ - lgt = _init_graph("test/reproducibility/topoGraphs/computationSandwich.graph") + lgt = _init_graph("topoGraphs/computationSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -566,7 +568,7 @@ def test_single(self): Tests a single drop A """ - lgt = _init_graph("test/reproducibility/topoGraphs/testSingle.graph") + lgt = _init_graph("topoGraphs/testSingle.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -579,7 +581,7 @@ def test_twostart(self): C B --> """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoStart.graph") + lgt = _init_graph("topoGraphs/testTwoStart.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -599,7 +601,7 @@ def test_twoend(self): A --> C """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoEnd.graph") + lgt = _init_graph("topoGraphs/testTwoEnd.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -611,7 +613,7 @@ def test_twolines(self): A --> B C --> D """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoLines.graph") + lgt = _init_graph("topoGraphs/testTwoLines.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -621,7 +623,7 @@ def test_data_fan(self): """ Tests that a single data source scatters its signature to downstream data drops. """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFan.graph") + lgt = _init_graph("topoGraphs/dataFan.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -638,7 +640,7 @@ def test_data_funnel(self): """ Tests that two data sources are collected in a single downstream data drop """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFunnel.graph") + lgt = _init_graph("topoGraphs/dataFunnel.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -653,7 +655,7 @@ def test_data_sandwich(self): Tests two data drops with an interim computing drop :return: """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataSandwich.graph") + lgt = _init_graph("topoGraphs/dataSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -667,7 +669,7 @@ def test_computation_sandwich(self): """ Tests that an internal data drop surrounded by computing drops is handled correctly. """ - lgt = _init_graph("test/reproducibility/topoGraphs/computationSandwich.graph") + lgt = _init_graph("topoGraphs/computationSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -691,7 +693,7 @@ def test_single(self): Tests a single drop A """ - lgt = _init_graph("test/reproducibility/topoGraphs/testSingle.graph") + lgt = _init_graph("topoGraphs/testSingle.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -704,7 +706,7 @@ def test_twostart(self): C B --> """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoStart.graph") + lgt = _init_graph("topoGraphs/testTwoStart.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -724,7 +726,7 @@ def test_twoend(self): A --> C """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoEnd.graph") + lgt = _init_graph("topoGraphs/testTwoEnd.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -736,7 +738,7 @@ def test_twolines(self): A --> B C --> D """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoLines.graph") + lgt = _init_graph("topoGraphs/testTwoLines.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -746,7 +748,7 @@ def test_data_fan(self): """ Tests that a single data source scatters its signature to downstream data drops. """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFan.graph") + lgt = _init_graph("topoGraphs/dataFan.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -763,7 +765,7 @@ def test_data_funnel(self): """ Tests that two data sources are collected in a single downstream data drop """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFunnel.graph") + lgt = _init_graph("topoGraphs/dataFunnel.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -778,7 +780,7 @@ def test_data_sandwich(self): Tests two data drops with an interim computing drop :return: """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataSandwich.graph") + lgt = _init_graph("topoGraphs/dataSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -792,7 +794,7 @@ def test_computation_sandwich(self): """ Tests that an internal data drop surrounded by computing drops is handled correctly. """ - lgt = _init_graph("test/reproducibility/topoGraphs/computationSandwich.graph") + lgt = _init_graph("topoGraphs/computationSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -816,7 +818,7 @@ def test_single(self): Tests a single drop A """ - lgt = _init_graph("test/reproducibility/topoGraphs/testSingle.graph") + lgt = _init_graph("topoGraphs/testSingle.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -829,7 +831,7 @@ def test_twostart(self): C B --> """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoStart.graph") + lgt = _init_graph("topoGraphs/testTwoStart.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -849,7 +851,7 @@ def test_twoend(self): A --> C """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoEnd.graph") + lgt = _init_graph("topoGraphs/testTwoEnd.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -861,7 +863,7 @@ def test_twolines(self): A --> B C --> D """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoLines.graph") + lgt = _init_graph("topoGraphs/testTwoLines.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -871,7 +873,7 @@ def test_data_fan(self): """ Tests that a single data source scatters its signature to downstream data drops. """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFan.graph") + lgt = _init_graph("topoGraphs/dataFan.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -888,7 +890,7 @@ def test_data_funnel(self): """ Tests that two data sources are collected in a single downstream data drop """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFunnel.graph") + lgt = _init_graph("topoGraphs/dataFunnel.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -903,7 +905,7 @@ def test_data_sandwich(self): Tests two data drops with an interim computing drop :return: """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataSandwich.graph") + lgt = _init_graph("topoGraphs/dataSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -917,7 +919,7 @@ def test_computation_sandwich(self): """ Tests that an internal data drop surrounded by computing drops is handled correctly. """ - lgt = _init_graph("test/reproducibility/topoGraphs/computationSandwich.graph") + lgt = _init_graph("topoGraphs/computationSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -940,7 +942,7 @@ def test_single(self): Tests a single drop A """ - lgt = _init_graph("test/reproducibility/topoGraphs/testSingle.graph") + lgt = _init_graph("topoGraphs/testSingle.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -953,7 +955,7 @@ def test_twostart(self): C B --> """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoStart.graph") + lgt = _init_graph("topoGraphs/testTwoStart.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -977,7 +979,7 @@ def test_twoend(self): A --> C """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoEnd.graph") + lgt = _init_graph("topoGraphs/testTwoEnd.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -989,7 +991,7 @@ def test_twolines(self): A --> B C --> D """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoLines.graph") + lgt = _init_graph("topoGraphs/testTwoLines.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) leaves = lg_build_blockdag(lgt)[0] @@ -999,7 +1001,7 @@ def test_data_fan(self): """ Tests that a single data source scatters its signature to downstream data drops. """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFan.graph") + lgt = _init_graph("topoGraphs/dataFan.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -1022,7 +1024,7 @@ def test_data_funnel(self): """ Tests that two data sources are collected in a single downstream data drop """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFunnel.graph") + lgt = _init_graph("topoGraphs/dataFunnel.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -1044,7 +1046,7 @@ def test_data_sandwich(self): Tests two data drops with an interim computing drop :return: """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataSandwich.graph") + lgt = _init_graph("topoGraphs/dataSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -1066,7 +1068,7 @@ def test_computation_sandwich(self): """ Tests that an internal data drop surrounded by computing drops is handled correctly. """ - lgt = _init_graph("test/reproducibility/topoGraphs/computationSandwich.graph") + lgt = _init_graph("topoGraphs/computationSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) lg_build_blockdag(lgt) @@ -1096,7 +1098,7 @@ def test_single(self): Tests a single drop A """ - lgt = _init_graph("test/reproducibility/topoGraphs/testSingle.graph") + lgt = _init_graph("topoGraphs/testSingle.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) for rmode in ALL_RMODES: @@ -1110,7 +1112,7 @@ def test_twostart(self): C B --> """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoStart.graph") + lgt = _init_graph("topoGraphs/testTwoStart.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) for rmode in ALL_RMODES: @@ -1134,7 +1136,7 @@ def test_twoend(self): A --> C """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoEnd.graph") + lgt = _init_graph("topoGraphs/testTwoEnd.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) for rmode in ALL_RMODES: @@ -1147,7 +1149,7 @@ def test_twolines(self): A --> B C --> D """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoLines.graph") + lgt = _init_graph("topoGraphs/testTwoLines.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) for rmode in ALL_RMODES: @@ -1158,7 +1160,7 @@ def test_data_fan(self): """ Tests that a single data source scatters its signature to downstream data drops. """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFan.graph") + lgt = _init_graph("topoGraphs/dataFan.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) for rmode in ALL_RMODES: @@ -1184,7 +1186,7 @@ def test_data_funnel(self): """ Tests that two data sources are collected in a single downstream data drop """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataFunnel.graph") + lgt = _init_graph("topoGraphs/dataFunnel.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) for rmode in ALL_RMODES: @@ -1209,7 +1211,7 @@ def test_data_sandwich(self): Tests two data drops with an interim computing drop :return: """ - lgt = _init_graph("test/reproducibility/topoGraphs/dataSandwich.graph") + lgt = _init_graph("topoGraphs/dataSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) for rmode in ALL_RMODES: @@ -1232,7 +1234,7 @@ def test_computation_sandwich(self): """ Tests that an internal data drop surrounded by computing drops is handled correctly. """ - lgt = _init_graph("test/reproducibility/topoGraphs/computationSandwich.graph") + lgt = _init_graph("topoGraphs/computationSandwich.graph") init_lgt_repro_data(lgt, rmode=str(self.rmode.value)) init_lg_repro_data(lgt) for rmode in ALL_RMODES: diff --git a/daliuge-engine/test/reproducibility/test_toposort.py b/daliuge-engine/test/reproducibility/test_toposort.py index 62c54c2a9..717c3869b 100644 --- a/daliuge-engine/test/reproducibility/test_toposort.py +++ b/daliuge-engine/test/reproducibility/test_toposort.py @@ -29,6 +29,8 @@ import json import unittest +import pkg_resources + from dlg.common.reproducibility.reproducibility import ( init_lgt_repro_data, init_lg_repro_data, @@ -54,9 +56,8 @@ def _init_graph(filename): - file = open(filename) - lgt = json.load(file) - file.close() + with pkg_resources.resource_stream("test.reproducibility", filename) as file: + lgt = json.load(file) for drop in lgt["nodeDataArray"]: drop["reprodata"] = {} drop["reprodata"]["lg_parenthashes"] = [] @@ -108,7 +109,7 @@ def test_lg_blockdag_single(self): Tests a single drop A """ - lgt = _init_graph("test/reproducibility/topoGraphs/testSingle.graph") + lgt = _init_graph("topoGraphs/testSingle.graph") init_lgt_repro_data(lgt, "1") init_lg_repro_data(lgt) visited = lg_build_blockdag(lgt)[1] @@ -121,7 +122,7 @@ def test_lg_blockdag_twostart(self): C B --> """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoStart.graph") + lgt = _init_graph("topoGraphs/testTwoStart.graph") init_lgt_repro_data(lgt, "1") init_lg_repro_data(lgt) visited = lg_build_blockdag(lgt)[1] @@ -134,7 +135,7 @@ def test_lg_blockdag_twoend(self): A --> C """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoEnd.graph") + lgt = _init_graph("topoGraphs/testTwoEnd.graph") init_lgt_repro_data(lgt, "1") init_lg_repro_data(lgt) visited = lg_build_blockdag(lgt)[1] @@ -146,7 +147,7 @@ def test_lg_blockdag_twolines(self): A --> B C --> D """ - lgt = _init_graph("test/reproducibility/topoGraphs/testTwoLines.graph") + lgt = _init_graph("topoGraphs/testTwoLines.graph") init_lgt_repro_data(lgt, "1") init_lg_repro_data(lgt) visited = lg_build_blockdag(lgt)[1] @@ -156,7 +157,7 @@ def test_lg_blockdag_empty(self): """ Tests an empty graph. Should fail gracefully. """ - lgt = _init_graph("test/reproducibility/topoGraphs/testEmpty.graph") + lgt = _init_graph("topoGraphs/testEmpty.graph") init_lgt_repro_data(lgt, "1") init_lg_repro_data(lgt) visited = lg_build_blockdag(lgt)[1] From 9f93047e0c65fe1beb79c4cebfc932832dd7c15e Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Sun, 31 Jul 2022 17:09:22 +0800 Subject: [PATCH 15/15] removed stale import --- daliuge-engine/dlg/apps/pyfunc.py | 1 - 1 file changed, 1 deletion(-) diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index af7ef2567..5ffdd8f23 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -38,7 +38,6 @@ from dlg import droputils, utils from dlg.drop import BarrierAppDROP -from pyparsing import col from dlg.exceptions import InvalidDropException from dlg.meta import ( dlg_string_param,