Skip to content

Commit

Permalink
Merge f7da88e into c8b2df5
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed Aug 15, 2022
2 parents c8b2df5 + f7da88e commit 7874664
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 77 deletions.
14 changes: 7 additions & 7 deletions daliuge-engine/dlg/apps/bash_shell_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,15 @@ def initialize(self, **kwargs):
super(BashShellBase, self).initialize(**kwargs)

self.proc = None
self._inputRedirect = self._getArg(kwargs, "input_redirection", "")
self._outputRedirect = self._getArg(kwargs, "output_redirection", "")
self._cmdLineArgs = self._getArg(kwargs, "command_line_arguments", "")
self._applicationArgs = self._getArg(kwargs, "applicationArgs", {})
self._argumentPrefix = self._getArg(kwargs, "argumentPrefix", "--")
self._paramValueSeparator = self._getArg(kwargs, "paramValueSeparator", " ")
self._inputRedirect = self._popArg(kwargs, "input_redirection", "")
self._outputRedirect = self._popArg(kwargs, "output_redirection", "")
self._cmdLineArgs = self._popArg(kwargs, "command_line_arguments", "")
self._applicationArgs = self._popArg(kwargs, "applicationArgs", {})
self._argumentPrefix = self._popArg(kwargs, "argumentPrefix", "--")
self._paramValueSeparator = self._popArg(kwargs, "paramValueSeparator", " ")

if not self.command:
self.command = self._getArg(kwargs, "command", None)
self.command = self._popArg(kwargs, "command", None)
if not self.command:
raise InvalidDropException(
self, "No command specified, cannot create BashShellApp"
Expand Down
24 changes: 12 additions & 12 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,12 @@ def initialize(self, **kwargs):
self._containerLock = multiprocessing.Lock()
super().initialize(**kwargs)

self._image = self._getArg(kwargs, "image", None)
self._env = self._getArg(kwargs, "env", None)
self._cmdLineArgs = self._getArg(kwargs, "command_line_arguments", "")
self._applicationArgs = self._getArg(kwargs, "applicationArgs", {})
self._argumentPrefix = self._getArg(kwargs, "argumentPrefix", "--")
self._paramValueSeparator = self._getArg(kwargs, "paramValueSeparator", " ")
self._image = self._popArg(kwargs, "image", None)
self._env = self._popArg(kwargs, "env", None)
self._cmdLineArgs = self._popArg(kwargs, "command_line_arguments", "")
self._applicationArgs = self._popArg(kwargs, "applicationArgs", {})
self._argumentPrefix = self._popArg(kwargs, "argumentPrefix", "--")
self._paramValueSeparator = self._popArg(kwargs, "paramValueSeparator", " ")
if not self._image:
raise InvalidDropException(
self, "No docker image specified, cannot create DockerApp"
Expand All @@ -254,7 +254,7 @@ def initialize(self, **kwargs):
self._image,
)

self._command = self._getArg(kwargs, "command", None)
self._command = self._popArg(kwargs, "command", None)

self._noBash = False
if not self._command or self._command.strip()[:2] == "%%":
Expand Down Expand Up @@ -283,14 +283,14 @@ def initialize(self, **kwargs):
# might want to preserve them.
# TODO: This might be something that the data lifecycle manager could
# handle, but for the time being we do it here
self._removeContainer = self._getArg(kwargs, "removeContainer", True)
self._removeContainer = self._popArg(kwargs, "removeContainer", True)

# Ports - a comma seperated list of the host port mappings of form:
# "hostport1:containerport1, hostport2:containerport2"
self._portMappings = self._getArg(kwargs, "portMappings", "")
self._portMappings = self._popArg(kwargs, "portMappings", "")
logger.info(f"portMappings: {self._portMappings}")

self._shmSize = self._getArg(kwargs, "shmSize", "")
self._shmSize = self._popArg(kwargs, "shmSize", "")
logger.info(f"shmSize: {self._shmSize}")

# Additional volume bindings can be specified for existing files/dirs
Expand All @@ -302,7 +302,7 @@ def initialize(self, **kwargs):
f"{utils.getDlgDir()}/workspace/settings/passwd:/etc/passwd",
f"{utils.getDlgDir()}/workspace/settings/group:/etc/group",
]
additionalBindings = self._getArg(kwargs, "additionalBindings", [])
additionalBindings = self._popArg(kwargs, "additionalBindings", [])
additionalBindings = (
additionalBindings.split(",")
if isinstance(additionalBindings, str)
Expand Down Expand Up @@ -352,7 +352,7 @@ def initialize(self, **kwargs):
self._sessionId = self._dlg_session.sessionId if self._dlg_session else ""
if not self.workdir:
default_workingdir = os.path.join(utils.getDlgWorkDir(), self._sessionId)
self.workdir = self._getArg(kwargs, "workingDir", default_workingdir)
self.workdir = self._popArg(kwargs, "workingDir", default_workingdir)

c.api.close()

Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/apps/dynlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ class DynlibApp(DynlibAppBase, BarrierAppDROP):

def initialize(self, **kwargs):
super(DynlibApp, self).initialize(**kwargs)
self.ranks = self._getArg(kwargs, "rank", None)
self.ranks = self._popArg(kwargs, "rank", None)

def run(self):
input_closers = prepare_c_inputs(self._c_app, self.inputs)
Expand Down Expand Up @@ -475,7 +475,7 @@ def initialize(self, **kwargs):
if "lib" not in kwargs:
raise InvalidDropException(self, "library not specified")
self.libname = kwargs.pop("lib")
self.timeout = self._getArg(kwargs, "timeout", 600) # 10 minutes
self.timeout = self._popArg(kwargs, "timeout", 600) # 10 minutes
self.app_params = kwargs
self.proc = None

Expand Down
8 changes: 4 additions & 4 deletions daliuge-engine/dlg/apps/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ class MPIApp(BarrierAppDROP):
def initialize(self, **kwargs):
super(MPIApp, self).initialize(**kwargs)

self._command = self._getArg(kwargs, "command", None)
self._maxprocs = self._getArg(kwargs, "maxprocs", 1)
self._use_wrapper = self._getArg(kwargs, "use_wrapper", False)
self._args = self._getArg(kwargs, "args", [])
self._command = self._popArg(kwargs, "command", None)
self._maxprocs = self._popArg(kwargs, "maxprocs", 1)
self._use_wrapper = self._popArg(kwargs, "use_wrapper", False)
self._args = self._popArg(kwargs, "args", [])
if not self._command:
raise InvalidDropException(
self, "No command specified, cannot create MPIApp"
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,9 @@ def initialize(self, **kwargs):
"""
BarrierAppDROP.initialize(self, **kwargs)

self._applicationArgs = self._getArg(kwargs, "applicationArgs", {})
self._applicationArgs = self._popArg(kwargs, "applicationArgs", {})

self.func_code = self._getArg(kwargs, "func_code", None)
self.func_code = self._popArg(kwargs, "func_code", None)

# check for function definition arguments in applicationArgs
self.func_def_keywords = [
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ class SimpleBranch(BranchAppDrop, NullBarrierApp):
"""Simple branch app that is told the result of its condition"""

def initialize(self, **kwargs):
self.result = self._getArg(kwargs, "result", True)
self.result = self._popArg(kwargs, "result", True)
BranchAppDrop.initialize(self, **kwargs)

def run(self):
Expand Down Expand Up @@ -749,7 +749,7 @@ class ListAppendThrashingApp(BarrierAppDROP):
)

def initialize(self, **kwargs):
self.size = self._getArg(kwargs, "size", 100)
self.size = self._popArg(kwargs, "size", 100)
self.marray = []
super(ListAppendThrashingApp, self).initialize(**kwargs)

Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/dask_emulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class ResultTransmitter(BarrierAppDROP):

def initialize(self, **kwargs):
BarrierAppDROP.initialize(self, input_error_threshold=100, **kwargs)
self.host = self._getArg(kwargs, "host", "127.0.0.1")
self.port = self._getArg(kwargs, "port", None)
self.host = self._popArg(kwargs, "host", "127.0.0.1")
self.port = self._popArg(kwargs, "port", None)
if self.port is None:
raise InvalidDropException(self, "Missing port parameter")

Expand Down
32 changes: 15 additions & 17 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,24 +216,24 @@ def __init__(self, oid, uid, **kwargs):

# The physical graph drop type. This is determined
# by the drop category when generating the drop spec
self._type = self._getArg(kwargs, "type", None)
self._type = self._popArg(kwargs, "type", None)

# The Session owning this drop, if any
# In most real-world situations this attribute will be set, but in
# general it cannot be assumed it will (e.g., unit tests create drops
# directly outside the context of a session).
self._dlg_session = self._getArg(kwargs, "dlg_session", None)
self._dlg_session = self._popArg(kwargs, "dlg_session", None)

# A simple name that the Drop might receive
# This is usually set in the Logical Graph Editor,
# but is not necessarily always there
self.name = self._getArg(kwargs, "nm", "")
self.name = self._popArg(kwargs, "nm", "")

# The key of this drop in the original Logical Graph
# This information might or might not be present depending on how the
# physical graph was generated (or if this drop is being created as part
# of a graph, to begin with), so we default it to an empty value
self.lg_key = self._getArg(kwargs, "lg_key", "")
self.lg_key = self._popArg(kwargs, "lg_key", "")

# 1-to-N relationship: one DROP may have many consumers and producers.
# The potential consumers and producers are always AppDROPs instances
Expand Down Expand Up @@ -281,7 +281,7 @@ def __init__(self, oid, uid, **kwargs):
# support. A target phase is also set to hint the Data Lifecycle Manager
# about the level of resilience that this DROP should achieve.
self._phase = DROPPhases.PLASMA
self._targetPhase = self._getArg(kwargs, "targetPhase", DROPPhases.GAS)
self._targetPhase = self._popArg(kwargs, "targetPhase", DROPPhases.GAS)

# Calculating the checksum and maintaining the data size internally
# implies that the data represented by this DROP is written
Expand Down Expand Up @@ -325,21 +325,21 @@ def __init__(self, oid, uid, **kwargs):
# in the state they currently are. In this case an external entity must
# listen to the events and decide when to trigger the execution of the
# applications.
self._executionMode = self._getArg(kwargs, "executionMode", ExecutionMode.DROP)
self._executionMode = self._popArg(kwargs, "executionMode", ExecutionMode.DROP)

# The physical node where this DROP resides.
# This piece of information is mandatory when submitting the physical
# graph via the DataIslandManager, but in simpler scenarios such as
# tests or graph submissions via the NodeManager it might be
# missing.
self._node = self._getArg(kwargs, "node", None)
self._node = self._popArg(kwargs, "node", None)

# The host representing the Data Island where this DROP resides
# This piece of information is mandatory when submitting the physical
# graph via the MasterManager, but in simpler scenarios such as tests or
# graphs submissions via the DataIslandManager or NodeManager it might
# missing.
self._dataIsland = self._getArg(kwargs, "island", None)
self._dataIsland = self._popArg(kwargs, "island", None)

# DROP expiration.
# Expiration can be time-driven or usage-driven, which are mutually
Expand All @@ -353,10 +353,10 @@ def __init__(self, oid, uid, **kwargs):
"but they are mutually exclusive" % (self,),
)

self._expireAfterUse = self._getArg(kwargs, "expireAfterUse", False)
self._expireAfterUse = self._popArg(kwargs, "expireAfterUse", False)
self._expirationDate = -1
if not self._expireAfterUse:
lifespan = float(self._getArg(kwargs, "lifespan", -1))
lifespan = float(self._popArg(kwargs, "lifespan", -1))
if lifespan != -1:
self._expirationDate = time.time() + lifespan

Expand All @@ -367,7 +367,7 @@ def __init__(self, oid, uid, **kwargs):
self._expectedSize = int(kwargs.pop("expectedSize"))

# All DROPs are precious unless stated otherwise; used for replication
self._precious = self._getArg(kwargs, "precious", True)
self._precious = self._popArg(kwargs, "precious", True)

# Useful to have access to all EAGLE parameters without a prior knowledge
self._parameters = dict(kwargs)
Expand Down Expand Up @@ -463,15 +463,13 @@ def get_param_value(attr_name, default_value):
continue
setattr(self, attr_name, value)

def _getArg(self, kwargs, key, default):
def _popArg(self, kwargs, key, default):
"""
Pops the specified key arg from kwargs else returns the default
"""
val = default
if key in kwargs:
val = kwargs.pop(key)
logger.debug("Defaulting %s to %s in %r", key, str(val), self)
return val
if key not in kwargs:
logger.debug("Defaulting %s to %s in %r", key, str(default), self)
return kwargs.pop(key, default)

def __hash__(self):
return hash(self._uid)
Expand Down
52 changes: 26 additions & 26 deletions daliuge-engine/test/integrate/chiles/chilesdo.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def initialize(self, **kwargs):

super(SourceFlux, self).initialize(**kwargs)

self.casapy_path = self._getArg(kwargs, "casapy_path", None)
self.timeout = self._getArg(kwargs, "timeout", 180)
self.casapy_path = self._popArg(kwargs, "casapy_path", None)
self.timeout = self._popArg(kwargs, "timeout", 180)

def run(self):
inp = self.inputs[0]
Expand Down Expand Up @@ -65,22 +65,22 @@ def initialize(self, **kwargs):

super(Clean, self).initialize(**kwargs)

self.timeout = self._getArg(kwargs, "timeout", 3600)
self.casapy_path = self._getArg(kwargs, "casapy_path", None)
self.timeout = self._popArg(kwargs, "timeout", 3600)
self.casapy_path = self._popArg(kwargs, "casapy_path", None)

self.clean_args = {
"field": str(self._getArg(kwargs, "field", None)),
"mode": str(self._getArg(kwargs, "mode", None)),
"restfreq": str(self._getArg(kwargs, "restfreq", None)),
"nchan": self._getArg(kwargs, "nchan", None),
"start": str(self._getArg(kwargs, "start", None)),
"width": str(self._getArg(kwargs, "width", None)),
"interpolation": str(self._getArg(kwargs, "interpolation", None)),
"gain": self._getArg(kwargs, "gain", None),
"imsize": self._getArg(kwargs, "imsize", None),
"cell": [str(x) for x in self._getArg(kwargs, "cell", [])],
"phasecenter": str(self._getArg(kwargs, "phasecenter", None)),
"weighting": str(self._getArg(kwargs, "weighting", None)),
"field": str(self._popArg(kwargs, "field", None)),
"mode": str(self._popArg(kwargs, "mode", None)),
"restfreq": str(self._popArg(kwargs, "restfreq", None)),
"nchan": self._popArg(kwargs, "nchan", None),
"start": str(self._popArg(kwargs, "start", None)),
"width": str(self._popArg(kwargs, "width", None)),
"interpolation": str(self._popArg(kwargs, "interpolation", None)),
"gain": self._popArg(kwargs, "gain", None),
"imsize": self._popArg(kwargs, "imsize", None),
"cell": [str(x) for x in self._popArg(kwargs, "cell", [])],
"phasecenter": str(self._popArg(kwargs, "phasecenter", None)),
"weighting": str(self._popArg(kwargs, "weighting", None)),
"usescratch": False,
}

Expand Down Expand Up @@ -132,19 +132,19 @@ def initialize(self, **kwargs):

super(Split, self).initialize(**kwargs)

self.timeout = self._getArg(kwargs, "timeout", 3600)
self.casapy_path = self._getArg(kwargs, "casapy_path", False)
self.timeout = self._popArg(kwargs, "timeout", 3600)
self.casapy_path = self._popArg(kwargs, "casapy_path", False)

self.transform_args = {
"regridms": self._getArg(kwargs, "regridms", None),
"restfreq": str(self._getArg(kwargs, "restfreq", None)),
"mode": str(self._getArg(kwargs, "mode", None)),
"nchan": self._getArg(kwargs, "nchan", None),
"outframe": str(self._getArg(kwargs, "outframe", None)),
"interpolation": str(self._getArg(kwargs, "interpolation", None)),
"regridms": self._popArg(kwargs, "regridms", None),
"restfreq": str(self._popArg(kwargs, "restfreq", None)),
"mode": str(self._popArg(kwargs, "mode", None)),
"nchan": self._popArg(kwargs, "nchan", None),
"outframe": str(self._popArg(kwargs, "outframe", None)),
"interpolation": str(self._popArg(kwargs, "interpolation", None)),
"veltype": "radio",
"start": str(self._getArg(kwargs, "start", None)),
"width": str(self._getArg(kwargs, "width", None)),
"start": str(self._popArg(kwargs, "start", None)),
"width": str(self._popArg(kwargs, "width", None)),
"spw": "",
"combinespws": True,
"nspw": 1,
Expand Down
9 changes: 6 additions & 3 deletions tools/xml2palette/xml2palette.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,9 @@ def process_compounddef_default(compounddef, language):
if child.tag == "detaileddescription" and len(child) > 0 and casa_mode:
# for child in ggchild:
dStr = child[0][0].text
descDict = parseCasaDocs(dStr)
descDict, comp_description = parseCasaDocs(dStr)
member["params"].append({"key": "description", "direction": None, "value": comp_description})

pkeys = {p["key"]:i for i,p in enumerate(member["params"])}
for p in descDict.keys():
if p in pkeys:
Expand Down Expand Up @@ -1027,7 +1029,7 @@ def parseCasaDocs(dStr:str) -> dict:
end_ind = [idx for idx, s in enumerate(dList) if '-- example' in s][0]
except IndexError:
logging.debug('Problems finding start or end index for task: {task}')
return {}
return {}, ""
paramsList = dList[start_ind:end_ind]
paramsSidx = [idx+1 for idx, p in enumerate(paramsList) if len(p) > 0 and p[0] != ' ']
paramsEidx = paramsSidx[1:] + [len(paramsList) - 1]
Expand All @@ -1040,7 +1042,8 @@ def parseCasaDocs(dStr:str) -> dict:
pl = [p.strip() for p in paramsList[paramsSidx[i]:paramsEidx[i]-1] if len(p.strip()) > 0]
paramDocs[i] = paramDocs[i] + ' '+' '.join(pl)
params = dict(zip(paramNames, paramDocs))
return params
comp_description = "\n".join(dList[:start_ind-1]) # return main description as well
return params, comp_description


if __name__ == "__main__":
Expand Down

0 comments on commit 7874664

Please sign in to comment.