Skip to content

Commit

Permalink
Merge pull request #225 from ICRAR/liu-343
Browse files Browse the repository at this point in the history
added support for ConstructParam
  • Loading branch information
awicenec committed Apr 5, 2023
2 parents e01e8c3 + cccd245 commit 2699f71
Show file tree
Hide file tree
Showing 14 changed files with 1,877 additions and 2,435 deletions.
10 changes: 8 additions & 2 deletions daliuge-engine/dlg/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class AppDROP(ContainerDROP):
"""

def initialize(self, **kwargs):

super(AppDROP, self).initialize(**kwargs)

# Inputs and Outputs are the DROPs that get read from and written
Expand Down Expand Up @@ -145,6 +144,10 @@ def _generateNamedInputs(self):
list(self.parameters["inputs"][i].keys())[0]
]
named_inputs[key] = value
else:
for key, field in self.parameters["applicationArgs"].items():
if field["usage"] in ["InputPort", "InputOutput"]:
named_inputs[field["text"]] = field
return named_inputs

def _generateNamedOutputs(self):
Expand All @@ -161,6 +164,10 @@ def _generateNamedOutputs(self):
list(self.parameters["outputs"][i].keys())[0]
]
named_outputs[key] = value
else:
for key, field in self.parameters["applicationArgs"].items():
if field["usage"] in ["OutputPort", "InputOutput"]:
named_outputs[field["text"]] = field
return named_outputs

def handleEvent(self, e):
Expand Down Expand Up @@ -354,7 +361,6 @@ def dropCompleted(self, uid, drop_state):

# We have enough inputs to proceed
if (skipped_len + error_len + ok_len) == n_eff_inputs:

# calculate the number of errors that have already occurred
percent_failed = math.floor(
(error_len / float(n_eff_inputs)) * 100
Expand Down
6 changes: 3 additions & 3 deletions daliuge-engine/dlg/apps/bash_shell_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):
output of the process is piped to. If not given it is consumed by this
method and potentially logged.
"""
# logger.debug("Parameters found: %s", json.dumps(self.parameters))
logger.debug("Parameters found: %s", json.dumps(self.parameters))
logger.debug("Bash Inputs: %s; Bash Outputs: %s", inputs, outputs)
# we only support passing a path for bash apps
fsInputs = {
uid: i for uid, i in inputs.items() if droputils.has_path(i)
Expand All @@ -216,7 +217,6 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):
dataURLOutputs = {
uid: o for uid, o in outputs.items() if not droputils.has_path(o)
}

# deal with named ports
inport_names = (
self.parameters["inputs"] if "inputs" in self.parameters else []
Expand Down Expand Up @@ -265,7 +265,7 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):

# Wrap everything inside bash
cmd = ("/bin/bash", "-c", cmd)
logger.debug("Command after wrapping is: %s", cmd)
logger.info("Command after wrapping is: %s", cmd)

start = time.time()

Expand Down
5 changes: 3 additions & 2 deletions daliuge-engine/dlg/apps/constructs.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
from dlg.apps.app_base import BarrierAppDROP


##
# @brief Scatter
# @details A Scatter template drop
# @par EAGLE_START
# @param category Scatter
# @param tag template
# @param num_of_splits No. of splits/2/Integer/ApplicationArgument/readwrite//False/False/Number of splits
# @param num_of_copies Scatter dimension/4/Integer/ComponentParameter/readwrite//False/False/Specifies the number of replications of the content of the scatter construct
# @par EAGLE_END
class ScatterDrop(BarrierAppDROP):
"""
This only exists to make sure we have a GroupBy in the template palette
This only exists to make sure we have a Scatter in the template palette
"""

pass
Expand Down
15 changes: 8 additions & 7 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ def containerId(self):
return self._containerId

def handleInterest(self, drop):

# The only interest we currently have is the containerIp of other
# DockerApps, and only if our command actually uses this IP
if isinstance(drop, DockerApp):
Expand Down Expand Up @@ -577,17 +576,19 @@ def run(self):

# Wrap everything inside bash
if len(cmd) > 0 and not self._noBash:
cmd = '/bin/bash -c "%s"' % (
utils.escapeQuotes(cmd, singleQuotes=False)
cmd = (
'/bin/bash -c "%s"'
% (utils.escapeQuotes(cmd, singleQuotes=False)).strip()
)
logger.debug(
logger.info(
"Command after user creation and wrapping is: %s", cmd
)
else:
logger.debug(
"executing container with default cmd and wrapped arguments"
cmd = f"{utils.escapeQuotes(cmd, singleQuotes=False)}".strip()
logger.info(
"executing container with default cmd and wrapped arguments: %s",
cmd,
)
cmd = f"{utils.escapeQuotes(cmd, singleQuotes=False)}"

c = DockerApp._get_client()
logger.debug(
Expand Down
Loading

0 comments on commit 2699f71

Please sign in to comment.