From c8a8b61a6c72f073f89284f135ab7034eb3315ad Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Tue, 24 May 2022 12:27:46 +0800 Subject: [PATCH] refactor to popArg --- daliuge-engine/dlg/apps/bash_shell_app.py | 14 ++--- daliuge-engine/dlg/apps/dockerapp.py | 24 ++++----- daliuge-engine/dlg/apps/dynlib.py | 4 +- daliuge-engine/dlg/apps/mpi.py | 8 +-- daliuge-engine/dlg/apps/pyfunc.py | 4 +- daliuge-engine/dlg/apps/simple.py | 4 +- daliuge-engine/dlg/dask_emulation.py | 4 +- daliuge-engine/dlg/drop.py | 33 ++++++------ daliuge-engine/dlg/environmentvar_drop.py | 3 ++ .../test/integrate/chiles/chilesdo.py | 52 +++++++++---------- 10 files changed, 75 insertions(+), 75 deletions(-) diff --git a/daliuge-engine/dlg/apps/bash_shell_app.py b/daliuge-engine/dlg/apps/bash_shell_app.py index 12ee198b3..d479894c7 100644 --- a/daliuge-engine/dlg/apps/bash_shell_app.py +++ b/daliuge-engine/dlg/apps/bash_shell_app.py @@ -167,15 +167,15 @@ def initialize(self, **kwargs): super(BashShellBase, self).initialize(**kwargs) self.proc = None - self._inputRedirect = self._getArg(kwargs, "input_redirection", "") - self._outputRedirect = self._getArg(kwargs, "output_redirection", "") - self._cmdLineArgs = self._getArg(kwargs, "command_line_arguments", "") - self._applicationArgs = self._getArg(kwargs, "applicationArgs", {}) - self._argumentPrefix = self._getArg(kwargs, "argumentPrefix", "--") - self._paramValueSeparator = self._getArg(kwargs, "paramValueSeparator", " ") + self._inputRedirect = self._popArg(kwargs, "input_redirection", "") + self._outputRedirect = self._popArg(kwargs, "output_redirection", "") + self._cmdLineArgs = self._popArg(kwargs, "command_line_arguments", "") + self._applicationArgs = self._popArg(kwargs, "applicationArgs", {}) + self._argumentPrefix = self._popArg(kwargs, "argumentPrefix", "--") + self._paramValueSeparator = self._popArg(kwargs, "paramValueSeparator", " ") if not self.command: - self.command = self._getArg(kwargs, "command", None) + self.command = self._popArg(kwargs, "command", None) if not self.command: raise InvalidDropException( self, "No command specified, cannot create BashShellApp" diff --git a/daliuge-engine/dlg/apps/dockerapp.py b/daliuge-engine/dlg/apps/dockerapp.py index 17a57cf43..6177fc72d 100644 --- a/daliuge-engine/dlg/apps/dockerapp.py +++ b/daliuge-engine/dlg/apps/dockerapp.py @@ -255,12 +255,12 @@ def initialize(self, **kwargs): self._containerLock = multiprocessing.Lock() super().initialize(**kwargs) - self._image = self._getArg(kwargs, "image", None) - self._env = self._getArg(kwargs, "env", None) - self._cmdLineArgs = self._getArg(kwargs, "command_line_arguments", "") - self._applicationArgs = self._getArg(kwargs, "applicationArgs", {}) - self._argumentPrefix = self._getArg(kwargs, "argumentPrefix", "--") - self._paramValueSeparator = self._getArg(kwargs, "paramValueSeparator", " ") + self._image = self._popArg(kwargs, "image", None) + self._env = self._popArg(kwargs, "env", None) + self._cmdLineArgs = self._popArg(kwargs, "command_line_arguments", "") + self._applicationArgs = self._popArg(kwargs, "applicationArgs", {}) + self._argumentPrefix = self._popArg(kwargs, "argumentPrefix", "--") + self._paramValueSeparator = self._popArg(kwargs, "paramValueSeparator", " ") if not self._image: raise InvalidDropException( self, "No docker image specified, cannot create DockerApp" @@ -273,7 +273,7 @@ def initialize(self, **kwargs): self._image, ) - self._command = self._getArg(kwargs, "command", None) + self._command = self._popArg(kwargs, "command", None) self._noBash = False if not self._command or self._command[:2].strip() == "%%": @@ -310,14 +310,14 @@ def initialize(self, **kwargs): # might want to preserve them. # TODO: This might be something that the data lifecycle manager could # handle, but for the time being we do it here - self._removeContainer = self._getArg(kwargs, "removeContainer", True) + self._removeContainer = self._popArg(kwargs, "removeContainer", True) # Ports - a comma seperated list of the host port mappings of form: # "hostport1:containerport1, hostport2:containerport2" - self._portMappings = self._getArg(kwargs, "portMappings", "") + self._portMappings = self._popArg(kwargs, "portMappings", "") logger.info(f"portMappings: {self._portMappings}") - self._shmSize = self._getArg(kwargs, "shmSize", "") + self._shmSize = self._popArg(kwargs, "shmSize", "") logger.info(f"shmSize: {self._shmSize}") # Additional volume bindings can be specified for existing files/dirs @@ -329,7 +329,7 @@ def initialize(self, **kwargs): f"{utils.getDlgDir()}/workspace/settings/passwd:/etc/passwd", f"{utils.getDlgDir()}/workspace/settings/group:/etc/group", ] - additionalBindings = self._getArg(kwargs, "additionalBindings", []) + additionalBindings = self._popArg(kwargs, "additionalBindings", []) additionalBindings = ( additionalBindings.split(",") if isinstance(additionalBindings, str) @@ -379,7 +379,7 @@ def initialize(self, **kwargs): self._sessionId = self._dlg_session.sessionId if self._dlg_session else "" if not self.workdir: default_workingdir = os.path.join(utils.getDlgWorkDir(), self._sessionId) - self.workdir = self._getArg(kwargs, "workingDir", default_workingdir) + self.workdir = self._popArg(kwargs, "workingDir", default_workingdir) c.api.close() diff --git a/daliuge-engine/dlg/apps/dynlib.py b/daliuge-engine/dlg/apps/dynlib.py index f684e2d3c..421974a8c 100644 --- a/daliuge-engine/dlg/apps/dynlib.py +++ b/daliuge-engine/dlg/apps/dynlib.py @@ -385,7 +385,7 @@ class DynlibApp(DynlibAppBase, BarrierAppDROP): def initialize(self, **kwargs): super(DynlibApp, self).initialize(**kwargs) - self.ranks = self._getArg(kwargs, "rank", None) + self.ranks = self._popArg(kwargs, "rank", None) def run(self): input_closers = prepare_c_inputs(self._c_app, self.inputs) @@ -487,7 +487,7 @@ def initialize(self, **kwargs): if "lib" not in kwargs: raise InvalidDropException(self, "library not specified") self.libname = kwargs.pop("lib") - self.timeout = self._getArg(kwargs, "timeout", 600) # 10 minutes + self.timeout = self._popArg(kwargs, "timeout", 600) # 10 minutes self.app_params = kwargs self.proc = None diff --git a/daliuge-engine/dlg/apps/mpi.py b/daliuge-engine/dlg/apps/mpi.py index 1f6ed1867..ee53d84c1 100644 --- a/daliuge-engine/dlg/apps/mpi.py +++ b/daliuge-engine/dlg/apps/mpi.py @@ -78,10 +78,10 @@ class MPIApp(BarrierAppDROP): def initialize(self, **kwargs): super(MPIApp, self).initialize(**kwargs) - self._command = self._getArg(kwargs, "command", None) - self._maxprocs = self._getArg(kwargs, "maxprocs", 1) - self._use_wrapper = self._getArg(kwargs, "use_wrapper", False) - self._args = self._getArg(kwargs, "args", []) + self._command = self._popArg(kwargs, "command", None) + self._maxprocs = self._popArg(kwargs, "maxprocs", 1) + self._use_wrapper = self._popArg(kwargs, "use_wrapper", False) + self._args = self._popArg(kwargs, "args", []) if not self._command: raise InvalidDropException( self, "No command specified, cannot create MPIApp" diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index 0deb323e9..2a3b483cc 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -283,9 +283,9 @@ def initialize(self, **kwargs): """ BarrierAppDROP.initialize(self, **kwargs) - self._applicationArgs = self._getArg(kwargs, "applicationArgs", {}) + self._applicationArgs = self._popArg(kwargs, "applicationArgs", {}) - self.func_code = self._getArg(kwargs, "func_code", None) + self.func_code = self._popArg(kwargs, "func_code", None) # check for function definition arguments in applicationArgs self.func_def_keywords = [ diff --git a/daliuge-engine/dlg/apps/simple.py b/daliuge-engine/dlg/apps/simple.py index fbf6ce627..0256e29d0 100644 --- a/daliuge-engine/dlg/apps/simple.py +++ b/daliuge-engine/dlg/apps/simple.py @@ -779,7 +779,7 @@ class SimpleBranch(BranchAppDrop, NullBarrierApp): """Simple branch app that is told the result of its condition""" def initialize(self, **kwargs): - self.result = self._getArg(kwargs, "result", True) + self.result = self._popArg(kwargs, "result", True) BranchAppDrop.initialize(self, **kwargs) def run(self): @@ -835,7 +835,7 @@ class ListAppendThrashingApp(BarrierAppDROP): ) def initialize(self, **kwargs): - self.size = self._getArg(kwargs, "size", 100) + self.size = self._popArg(kwargs, "size", 100) self.marray = [] super(ListAppendThrashingApp, self).initialize(**kwargs) diff --git a/daliuge-engine/dlg/dask_emulation.py b/daliuge-engine/dlg/dask_emulation.py index 5913080f0..7238e1db7 100644 --- a/daliuge-engine/dlg/dask_emulation.py +++ b/daliuge-engine/dlg/dask_emulation.py @@ -45,8 +45,8 @@ class ResultTransmitter(BarrierAppDROP): def initialize(self, **kwargs): BarrierAppDROP.initialize(self, input_error_threshold=100, **kwargs) - self.host = self._getArg(kwargs, "host", "127.0.0.1") - self.port = self._getArg(kwargs, "port", None) + self.host = self._popArg(kwargs, "host", "127.0.0.1") + self.port = self._popArg(kwargs, "port", None) if self.port is None: raise InvalidDropException(self, "Missing port parameter") diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index 5e0d738e6..dd6864ccc 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -220,24 +220,24 @@ def __init__(self, oid, uid, **kwargs): # The physical graph drop type. This is determined # by the drop category when generating the drop spec - self._type = self._getArg(kwargs, "type", None) + self._type = self._popArg(kwargs, "type", None) # The Session owning this drop, if any # In most real-world situations this attribute will be set, but in # general it cannot be assumed it will (e.g., unit tests create drops # directly outside the context of a session). - self._dlg_session = self._getArg(kwargs, "dlg_session", None) + self._dlg_session = self._popArg(kwargs, "dlg_session", None) # A simple name that the Drop might receive # This is usually set in the Logical Graph Editor, # but is not necessarily always there - self.name = self._getArg(kwargs, "nm", "") + self.name = self._popArg(kwargs, "nm", "") # The key of this drop in the original Logical Graph # This information might or might not be present depending on how the # physical graph was generated (or if this drop is being created as part # of a graph, to begin with), so we default it to an empty value - self.lg_key = self._getArg(kwargs, "lg_key", "") + self.lg_key = self._popArg(kwargs, "lg_key", "") # 1-to-N relationship: one DROP may have many consumers and producers. # The potential consumers and producers are always AppDROPs instances @@ -285,7 +285,7 @@ def __init__(self, oid, uid, **kwargs): # support. A target phase is also set to hint the Data Lifecycle Manager # about the level of resilience that this DROP should achieve. self._phase = DROPPhases.PLASMA - self._targetPhase = self._getArg(kwargs, "targetPhase", DROPPhases.GAS) + self._targetPhase = self._popArg(kwargs, "targetPhase", DROPPhases.GAS) # Calculating the checksum and maintaining the data size internally # implies that the data represented by this DROP is written @@ -329,21 +329,21 @@ def __init__(self, oid, uid, **kwargs): # in the state they currently are. In this case an external entity must # listen to the events and decide when to trigger the execution of the # applications. - self._executionMode = self._getArg(kwargs, "executionMode", ExecutionMode.DROP) + self._executionMode = self._popArg(kwargs, "executionMode", ExecutionMode.DROP) # The physical node where this DROP resides. # This piece of information is mandatory when submitting the physical # graph via the DataIslandManager, but in simpler scenarios such as # tests or graph submissions via the NodeManager it might be # missing. - self._node = self._getArg(kwargs, "node", None) + self._node = self._popArg(kwargs, "node", None) # The host representing the Data Island where this DROP resides # This piece of information is mandatory when submitting the physical # graph via the MasterManager, but in simpler scenarios such as tests or # graphs submissions via the DataIslandManager or NodeManager it might # missing. - self._dataIsland = self._getArg(kwargs, "island", None) + self._dataIsland = self._popArg(kwargs, "island", None) # DROP expiration. # Expiration can be time-driven or usage-driven, which are mutually @@ -357,10 +357,10 @@ def __init__(self, oid, uid, **kwargs): "but they are mutually exclusive" % (self,), ) - self._expireAfterUse = self._getArg(kwargs, "expireAfterUse", False) + self._expireAfterUse = self._popArg(kwargs, "expireAfterUse", False) self._expirationDate = -1 if not self._expireAfterUse: - lifespan = float(self._getArg(kwargs, "lifespan", -1)) + lifespan = float(self._popArg(kwargs, "lifespan", -1)) if lifespan != -1: self._expirationDate = time.time() + lifespan @@ -371,7 +371,7 @@ def __init__(self, oid, uid, **kwargs): self._expectedSize = int(kwargs.pop("expectedSize")) # All DROPs are precious unless stated otherwise; used for replication - self._precious = self._getArg(kwargs, "precious", True) + self._precious = self._popArg(kwargs, "precious", True) # Useful to have access to all EAGLE parameters without a prior knowledge self._parameters = dict(kwargs) @@ -461,16 +461,13 @@ def get_param_value(attr_name, default_value): continue setattr(self, attr_name, value) - def _getArg(self, kwargs, key, default): + def _popArg(self, kwargs, key, default): """ Pops the specified key arg from kwargs else returns the default """ - val = default - if key in kwargs: - val = kwargs.pop(key) - elif logger.isEnabledFor(logging.DEBUG): - logger.debug("Defaulting %s to %s in %r" % (key, str(val), self)) - return val + if key not in kwargs: + logger.debug("Defaulting %s to %s in %r" % (key, str(default), self)) + return kwargs.pop(key, default) def __hash__(self): return hash(self._uid) diff --git a/daliuge-engine/dlg/environmentvar_drop.py b/daliuge-engine/dlg/environmentvar_drop.py index d0f5948c8..993bc40a2 100644 --- a/daliuge-engine/dlg/environmentvar_drop.py +++ b/daliuge-engine/dlg/environmentvar_drop.py @@ -23,10 +23,12 @@ import io import os import json +import logging from dlg.drop import AbstractDROP, DEFAULT_INTERNAL_PARAMETERS from dlg.io import MemoryIO +logger = logging.getLogger(__name__) class KeyValueDROP: @abc.abstractmethod @@ -77,6 +79,7 @@ def initialize(self, **kwargs): """ Runs through all parameters, putting each into this drop's variable dict """ + logger.warning(f"params {self.parameters}") super(EnvironmentVarDROP, self).initialize(**kwargs) self._variables = dict() self._variables.update(_filter_parameters(self.parameters)) diff --git a/daliuge-engine/test/integrate/chiles/chilesdo.py b/daliuge-engine/test/integrate/chiles/chilesdo.py index 1bbf0f3b0..342627cbb 100755 --- a/daliuge-engine/test/integrate/chiles/chilesdo.py +++ b/daliuge-engine/test/integrate/chiles/chilesdo.py @@ -35,8 +35,8 @@ def initialize(self, **kwargs): super(SourceFlux, self).initialize(**kwargs) - self.casapy_path = self._getArg(kwargs, "casapy_path", None) - self.timeout = self._getArg(kwargs, "timeout", 180) + self.casapy_path = self._popArg(kwargs, "casapy_path", None) + self.timeout = self._popArg(kwargs, "timeout", 180) def run(self): inp = self.inputs[0] @@ -65,22 +65,22 @@ def initialize(self, **kwargs): super(Clean, self).initialize(**kwargs) - self.timeout = self._getArg(kwargs, "timeout", 3600) - self.casapy_path = self._getArg(kwargs, "casapy_path", None) + self.timeout = self._popArg(kwargs, "timeout", 3600) + self.casapy_path = self._popArg(kwargs, "casapy_path", None) self.clean_args = { - "field": str(self._getArg(kwargs, "field", None)), - "mode": str(self._getArg(kwargs, "mode", None)), - "restfreq": str(self._getArg(kwargs, "restfreq", None)), - "nchan": self._getArg(kwargs, "nchan", None), - "start": str(self._getArg(kwargs, "start", None)), - "width": str(self._getArg(kwargs, "width", None)), - "interpolation": str(self._getArg(kwargs, "interpolation", None)), - "gain": self._getArg(kwargs, "gain", None), - "imsize": self._getArg(kwargs, "imsize", None), - "cell": [str(x) for x in self._getArg(kwargs, "cell", [])], - "phasecenter": str(self._getArg(kwargs, "phasecenter", None)), - "weighting": str(self._getArg(kwargs, "weighting", None)), + "field": str(self._popArg(kwargs, "field", None)), + "mode": str(self._popArg(kwargs, "mode", None)), + "restfreq": str(self._popArg(kwargs, "restfreq", None)), + "nchan": self._popArg(kwargs, "nchan", None), + "start": str(self._popArg(kwargs, "start", None)), + "width": str(self._popArg(kwargs, "width", None)), + "interpolation": str(self._popArg(kwargs, "interpolation", None)), + "gain": self._popArg(kwargs, "gain", None), + "imsize": self._popArg(kwargs, "imsize", None), + "cell": [str(x) for x in self._popArg(kwargs, "cell", [])], + "phasecenter": str(self._popArg(kwargs, "phasecenter", None)), + "weighting": str(self._popArg(kwargs, "weighting", None)), "usescratch": False, } @@ -132,19 +132,19 @@ def initialize(self, **kwargs): super(Split, self).initialize(**kwargs) - self.timeout = self._getArg(kwargs, "timeout", 3600) - self.casapy_path = self._getArg(kwargs, "casapy_path", False) + self.timeout = self._popArg(kwargs, "timeout", 3600) + self.casapy_path = self._popArg(kwargs, "casapy_path", False) self.transform_args = { - "regridms": self._getArg(kwargs, "regridms", None), - "restfreq": str(self._getArg(kwargs, "restfreq", None)), - "mode": str(self._getArg(kwargs, "mode", None)), - "nchan": self._getArg(kwargs, "nchan", None), - "outframe": str(self._getArg(kwargs, "outframe", None)), - "interpolation": str(self._getArg(kwargs, "interpolation", None)), + "regridms": self._popArg(kwargs, "regridms", None), + "restfreq": str(self._popArg(kwargs, "restfreq", None)), + "mode": str(self._popArg(kwargs, "mode", None)), + "nchan": self._popArg(kwargs, "nchan", None), + "outframe": str(self._popArg(kwargs, "outframe", None)), + "interpolation": str(self._popArg(kwargs, "interpolation", None)), "veltype": "radio", - "start": str(self._getArg(kwargs, "start", None)), - "width": str(self._getArg(kwargs, "width", None)), + "start": str(self._popArg(kwargs, "start", None)), + "width": str(self._popArg(kwargs, "width", None)), "spw": "", "combinespws": True, "nspw": 1,