Skip to content

Commit

Permalink
Merge c8a8b61 into 046c159
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed May 24, 2022
2 parents 046c159 + c8a8b61 commit 6d99051
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 75 deletions.
14 changes: 7 additions & 7 deletions daliuge-engine/dlg/apps/bash_shell_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,15 @@ def initialize(self, **kwargs):
super(BashShellBase, self).initialize(**kwargs)

self.proc = None
self._inputRedirect = self._getArg(kwargs, "input_redirection", "")
self._outputRedirect = self._getArg(kwargs, "output_redirection", "")
self._cmdLineArgs = self._getArg(kwargs, "command_line_arguments", "")
self._applicationArgs = self._getArg(kwargs, "applicationArgs", {})
self._argumentPrefix = self._getArg(kwargs, "argumentPrefix", "--")
self._paramValueSeparator = self._getArg(kwargs, "paramValueSeparator", " ")
self._inputRedirect = self._popArg(kwargs, "input_redirection", "")
self._outputRedirect = self._popArg(kwargs, "output_redirection", "")
self._cmdLineArgs = self._popArg(kwargs, "command_line_arguments", "")
self._applicationArgs = self._popArg(kwargs, "applicationArgs", {})
self._argumentPrefix = self._popArg(kwargs, "argumentPrefix", "--")
self._paramValueSeparator = self._popArg(kwargs, "paramValueSeparator", " ")

if not self.command:
self.command = self._getArg(kwargs, "command", None)
self.command = self._popArg(kwargs, "command", None)
if not self.command:
raise InvalidDropException(
self, "No command specified, cannot create BashShellApp"
Expand Down
24 changes: 12 additions & 12 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,12 @@ def initialize(self, **kwargs):
self._containerLock = multiprocessing.Lock()
super().initialize(**kwargs)

self._image = self._getArg(kwargs, "image", None)
self._env = self._getArg(kwargs, "env", None)
self._cmdLineArgs = self._getArg(kwargs, "command_line_arguments", "")
self._applicationArgs = self._getArg(kwargs, "applicationArgs", {})
self._argumentPrefix = self._getArg(kwargs, "argumentPrefix", "--")
self._paramValueSeparator = self._getArg(kwargs, "paramValueSeparator", " ")
self._image = self._popArg(kwargs, "image", None)
self._env = self._popArg(kwargs, "env", None)
self._cmdLineArgs = self._popArg(kwargs, "command_line_arguments", "")
self._applicationArgs = self._popArg(kwargs, "applicationArgs", {})
self._argumentPrefix = self._popArg(kwargs, "argumentPrefix", "--")
self._paramValueSeparator = self._popArg(kwargs, "paramValueSeparator", " ")
if not self._image:
raise InvalidDropException(
self, "No docker image specified, cannot create DockerApp"
Expand All @@ -273,7 +273,7 @@ def initialize(self, **kwargs):
self._image,
)

self._command = self._getArg(kwargs, "command", None)
self._command = self._popArg(kwargs, "command", None)

self._noBash = False
if not self._command or self._command[:2].strip() == "%%":
Expand Down Expand Up @@ -310,14 +310,14 @@ def initialize(self, **kwargs):
# might want to preserve them.
# TODO: This might be something that the data lifecycle manager could
# handle, but for the time being we do it here
self._removeContainer = self._getArg(kwargs, "removeContainer", True)
self._removeContainer = self._popArg(kwargs, "removeContainer", True)

# Ports - a comma seperated list of the host port mappings of form:
# "hostport1:containerport1, hostport2:containerport2"
self._portMappings = self._getArg(kwargs, "portMappings", "")
self._portMappings = self._popArg(kwargs, "portMappings", "")
logger.info(f"portMappings: {self._portMappings}")

self._shmSize = self._getArg(kwargs, "shmSize", "")
self._shmSize = self._popArg(kwargs, "shmSize", "")
logger.info(f"shmSize: {self._shmSize}")

# Additional volume bindings can be specified for existing files/dirs
Expand All @@ -329,7 +329,7 @@ def initialize(self, **kwargs):
f"{utils.getDlgDir()}/workspace/settings/passwd:/etc/passwd",
f"{utils.getDlgDir()}/workspace/settings/group:/etc/group",
]
additionalBindings = self._getArg(kwargs, "additionalBindings", [])
additionalBindings = self._popArg(kwargs, "additionalBindings", [])
additionalBindings = (
additionalBindings.split(",")
if isinstance(additionalBindings, str)
Expand Down Expand Up @@ -379,7 +379,7 @@ def initialize(self, **kwargs):
self._sessionId = self._dlg_session.sessionId if self._dlg_session else ""
if not self.workdir:
default_workingdir = os.path.join(utils.getDlgWorkDir(), self._sessionId)
self.workdir = self._getArg(kwargs, "workingDir", default_workingdir)
self.workdir = self._popArg(kwargs, "workingDir", default_workingdir)

c.api.close()

Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/apps/dynlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ class DynlibApp(DynlibAppBase, BarrierAppDROP):

def initialize(self, **kwargs):
super(DynlibApp, self).initialize(**kwargs)
self.ranks = self._getArg(kwargs, "rank", None)
self.ranks = self._popArg(kwargs, "rank", None)

def run(self):
input_closers = prepare_c_inputs(self._c_app, self.inputs)
Expand Down Expand Up @@ -487,7 +487,7 @@ def initialize(self, **kwargs):
if "lib" not in kwargs:
raise InvalidDropException(self, "library not specified")
self.libname = kwargs.pop("lib")
self.timeout = self._getArg(kwargs, "timeout", 600) # 10 minutes
self.timeout = self._popArg(kwargs, "timeout", 600) # 10 minutes
self.app_params = kwargs
self.proc = None

Expand Down
8 changes: 4 additions & 4 deletions daliuge-engine/dlg/apps/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ class MPIApp(BarrierAppDROP):
def initialize(self, **kwargs):
super(MPIApp, self).initialize(**kwargs)

self._command = self._getArg(kwargs, "command", None)
self._maxprocs = self._getArg(kwargs, "maxprocs", 1)
self._use_wrapper = self._getArg(kwargs, "use_wrapper", False)
self._args = self._getArg(kwargs, "args", [])
self._command = self._popArg(kwargs, "command", None)
self._maxprocs = self._popArg(kwargs, "maxprocs", 1)
self._use_wrapper = self._popArg(kwargs, "use_wrapper", False)
self._args = self._popArg(kwargs, "args", [])
if not self._command:
raise InvalidDropException(
self, "No command specified, cannot create MPIApp"
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,9 @@ def initialize(self, **kwargs):
"""
BarrierAppDROP.initialize(self, **kwargs)

self._applicationArgs = self._getArg(kwargs, "applicationArgs", {})
self._applicationArgs = self._popArg(kwargs, "applicationArgs", {})

self.func_code = self._getArg(kwargs, "func_code", None)
self.func_code = self._popArg(kwargs, "func_code", None)

# check for function definition arguments in applicationArgs
self.func_def_keywords = [
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ class SimpleBranch(BranchAppDrop, NullBarrierApp):
"""Simple branch app that is told the result of its condition"""

def initialize(self, **kwargs):
self.result = self._getArg(kwargs, "result", True)
self.result = self._popArg(kwargs, "result", True)
BranchAppDrop.initialize(self, **kwargs)

def run(self):
Expand Down Expand Up @@ -835,7 +835,7 @@ class ListAppendThrashingApp(BarrierAppDROP):
)

def initialize(self, **kwargs):
self.size = self._getArg(kwargs, "size", 100)
self.size = self._popArg(kwargs, "size", 100)
self.marray = []
super(ListAppendThrashingApp, self).initialize(**kwargs)

Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/dask_emulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class ResultTransmitter(BarrierAppDROP):

def initialize(self, **kwargs):
BarrierAppDROP.initialize(self, input_error_threshold=100, **kwargs)
self.host = self._getArg(kwargs, "host", "127.0.0.1")
self.port = self._getArg(kwargs, "port", None)
self.host = self._popArg(kwargs, "host", "127.0.0.1")
self.port = self._popArg(kwargs, "port", None)
if self.port is None:
raise InvalidDropException(self, "Missing port parameter")

Expand Down
33 changes: 15 additions & 18 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,24 +220,24 @@ def __init__(self, oid, uid, **kwargs):

# The physical graph drop type. This is determined
# by the drop category when generating the drop spec
self._type = self._getArg(kwargs, "type", None)
self._type = self._popArg(kwargs, "type", None)

# The Session owning this drop, if any
# In most real-world situations this attribute will be set, but in
# general it cannot be assumed it will (e.g., unit tests create drops
# directly outside the context of a session).
self._dlg_session = self._getArg(kwargs, "dlg_session", None)
self._dlg_session = self._popArg(kwargs, "dlg_session", None)

# A simple name that the Drop might receive
# This is usually set in the Logical Graph Editor,
# but is not necessarily always there
self.name = self._getArg(kwargs, "nm", "")
self.name = self._popArg(kwargs, "nm", "")

# The key of this drop in the original Logical Graph
# This information might or might not be present depending on how the
# physical graph was generated (or if this drop is being created as part
# of a graph, to begin with), so we default it to an empty value
self.lg_key = self._getArg(kwargs, "lg_key", "")
self.lg_key = self._popArg(kwargs, "lg_key", "")

# 1-to-N relationship: one DROP may have many consumers and producers.
# The potential consumers and producers are always AppDROPs instances
Expand Down Expand Up @@ -285,7 +285,7 @@ def __init__(self, oid, uid, **kwargs):
# support. A target phase is also set to hint the Data Lifecycle Manager
# about the level of resilience that this DROP should achieve.
self._phase = DROPPhases.PLASMA
self._targetPhase = self._getArg(kwargs, "targetPhase", DROPPhases.GAS)
self._targetPhase = self._popArg(kwargs, "targetPhase", DROPPhases.GAS)

# Calculating the checksum and maintaining the data size internally
# implies that the data represented by this DROP is written
Expand Down Expand Up @@ -329,21 +329,21 @@ def __init__(self, oid, uid, **kwargs):
# in the state they currently are. In this case an external entity must
# listen to the events and decide when to trigger the execution of the
# applications.
self._executionMode = self._getArg(kwargs, "executionMode", ExecutionMode.DROP)
self._executionMode = self._popArg(kwargs, "executionMode", ExecutionMode.DROP)

# The physical node where this DROP resides.
# This piece of information is mandatory when submitting the physical
# graph via the DataIslandManager, but in simpler scenarios such as
# tests or graph submissions via the NodeManager it might be
# missing.
self._node = self._getArg(kwargs, "node", None)
self._node = self._popArg(kwargs, "node", None)

# The host representing the Data Island where this DROP resides
# This piece of information is mandatory when submitting the physical
# graph via the MasterManager, but in simpler scenarios such as tests or
# graphs submissions via the DataIslandManager or NodeManager it might
# missing.
self._dataIsland = self._getArg(kwargs, "island", None)
self._dataIsland = self._popArg(kwargs, "island", None)

# DROP expiration.
# Expiration can be time-driven or usage-driven, which are mutually
Expand All @@ -357,10 +357,10 @@ def __init__(self, oid, uid, **kwargs):
"but they are mutually exclusive" % (self,),
)

self._expireAfterUse = self._getArg(kwargs, "expireAfterUse", False)
self._expireAfterUse = self._popArg(kwargs, "expireAfterUse", False)
self._expirationDate = -1
if not self._expireAfterUse:
lifespan = float(self._getArg(kwargs, "lifespan", -1))
lifespan = float(self._popArg(kwargs, "lifespan", -1))
if lifespan != -1:
self._expirationDate = time.time() + lifespan

Expand All @@ -371,7 +371,7 @@ def __init__(self, oid, uid, **kwargs):
self._expectedSize = int(kwargs.pop("expectedSize"))

# All DROPs are precious unless stated otherwise; used for replication
self._precious = self._getArg(kwargs, "precious", True)
self._precious = self._popArg(kwargs, "precious", True)

# Useful to have access to all EAGLE parameters without a prior knowledge
self._parameters = dict(kwargs)
Expand Down Expand Up @@ -461,16 +461,13 @@ def get_param_value(attr_name, default_value):
continue
setattr(self, attr_name, value)

def _getArg(self, kwargs, key, default):
def _popArg(self, kwargs, key, default):
"""
Pops the specified key arg from kwargs else returns the default
"""
val = default
if key in kwargs:
val = kwargs.pop(key)
elif logger.isEnabledFor(logging.DEBUG):
logger.debug("Defaulting %s to %s in %r" % (key, str(val), self))
return val
if key not in kwargs:
logger.debug("Defaulting %s to %s in %r" % (key, str(default), self))
return kwargs.pop(key, default)

def __hash__(self):
return hash(self._uid)
Expand Down
3 changes: 3 additions & 0 deletions daliuge-engine/dlg/environmentvar_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import io
import os
import json
import logging

from dlg.drop import AbstractDROP, DEFAULT_INTERNAL_PARAMETERS
from dlg.io import MemoryIO

logger = logging.getLogger(__name__)

class KeyValueDROP:
@abc.abstractmethod
Expand Down Expand Up @@ -77,6 +79,7 @@ def initialize(self, **kwargs):
"""
Runs through all parameters, putting each into this drop's variable dict
"""
logger.warning(f"params {self.parameters}")
super(EnvironmentVarDROP, self).initialize(**kwargs)
self._variables = dict()
self._variables.update(_filter_parameters(self.parameters))
Expand Down
52 changes: 26 additions & 26 deletions daliuge-engine/test/integrate/chiles/chilesdo.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def initialize(self, **kwargs):

super(SourceFlux, self).initialize(**kwargs)

self.casapy_path = self._getArg(kwargs, "casapy_path", None)
self.timeout = self._getArg(kwargs, "timeout", 180)
self.casapy_path = self._popArg(kwargs, "casapy_path", None)
self.timeout = self._popArg(kwargs, "timeout", 180)

def run(self):
inp = self.inputs[0]
Expand Down Expand Up @@ -65,22 +65,22 @@ def initialize(self, **kwargs):

super(Clean, self).initialize(**kwargs)

self.timeout = self._getArg(kwargs, "timeout", 3600)
self.casapy_path = self._getArg(kwargs, "casapy_path", None)
self.timeout = self._popArg(kwargs, "timeout", 3600)
self.casapy_path = self._popArg(kwargs, "casapy_path", None)

self.clean_args = {
"field": str(self._getArg(kwargs, "field", None)),
"mode": str(self._getArg(kwargs, "mode", None)),
"restfreq": str(self._getArg(kwargs, "restfreq", None)),
"nchan": self._getArg(kwargs, "nchan", None),
"start": str(self._getArg(kwargs, "start", None)),
"width": str(self._getArg(kwargs, "width", None)),
"interpolation": str(self._getArg(kwargs, "interpolation", None)),
"gain": self._getArg(kwargs, "gain", None),
"imsize": self._getArg(kwargs, "imsize", None),
"cell": [str(x) for x in self._getArg(kwargs, "cell", [])],
"phasecenter": str(self._getArg(kwargs, "phasecenter", None)),
"weighting": str(self._getArg(kwargs, "weighting", None)),
"field": str(self._popArg(kwargs, "field", None)),
"mode": str(self._popArg(kwargs, "mode", None)),
"restfreq": str(self._popArg(kwargs, "restfreq", None)),
"nchan": self._popArg(kwargs, "nchan", None),
"start": str(self._popArg(kwargs, "start", None)),
"width": str(self._popArg(kwargs, "width", None)),
"interpolation": str(self._popArg(kwargs, "interpolation", None)),
"gain": self._popArg(kwargs, "gain", None),
"imsize": self._popArg(kwargs, "imsize", None),
"cell": [str(x) for x in self._popArg(kwargs, "cell", [])],
"phasecenter": str(self._popArg(kwargs, "phasecenter", None)),
"weighting": str(self._popArg(kwargs, "weighting", None)),
"usescratch": False,
}

Expand Down Expand Up @@ -132,19 +132,19 @@ def initialize(self, **kwargs):

super(Split, self).initialize(**kwargs)

self.timeout = self._getArg(kwargs, "timeout", 3600)
self.casapy_path = self._getArg(kwargs, "casapy_path", False)
self.timeout = self._popArg(kwargs, "timeout", 3600)
self.casapy_path = self._popArg(kwargs, "casapy_path", False)

self.transform_args = {
"regridms": self._getArg(kwargs, "regridms", None),
"restfreq": str(self._getArg(kwargs, "restfreq", None)),
"mode": str(self._getArg(kwargs, "mode", None)),
"nchan": self._getArg(kwargs, "nchan", None),
"outframe": str(self._getArg(kwargs, "outframe", None)),
"interpolation": str(self._getArg(kwargs, "interpolation", None)),
"regridms": self._popArg(kwargs, "regridms", None),
"restfreq": str(self._popArg(kwargs, "restfreq", None)),
"mode": str(self._popArg(kwargs, "mode", None)),
"nchan": self._popArg(kwargs, "nchan", None),
"outframe": str(self._popArg(kwargs, "outframe", None)),
"interpolation": str(self._popArg(kwargs, "interpolation", None)),
"veltype": "radio",
"start": str(self._getArg(kwargs, "start", None)),
"width": str(self._getArg(kwargs, "width", None)),
"start": str(self._popArg(kwargs, "start", None)),
"width": str(self._popArg(kwargs, "width", None)),
"spw": "",
"combinespws": True,
"nspw": 1,
Expand Down

0 comments on commit 6d99051

Please sign in to comment.