Skip to content

Commit

Permalink
Merge pull request #218 from ICRAR/liu-332
Browse files Browse the repository at this point in the history
Liu 332
  • Loading branch information
awicenec committed Feb 1, 2023
2 parents 1b63f3f + 2ae3f65 commit ead220b
Show file tree
Hide file tree
Showing 17 changed files with 1,701 additions and 987 deletions.
14 changes: 10 additions & 4 deletions daliuge-common/dlg/common/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ def check_port(host, port, timeout=0, checking_open=True, return_socket=False):
"""

if return_socket and not checking_open:
raise ValueError("If return_socket is True then checking_open must be True")
raise ValueError(
"If return_socket is True then checking_open must be True"
)

start = time.time()
while True:
Expand Down Expand Up @@ -74,9 +76,13 @@ def check_port(host, port, timeout=0, checking_open=True, return_socket=False):
s.connect((host, port))
if not return_socket:
s.close()
except socket.error:
s.close()
raise
except (OSError, socket.error):
if socket.error:
s.close()
raise
elif OSError:
logger.error("Unable to connect to %s:%s", host, port)
return not checking_open

# Success if we were checking for an open port!
if checking_open:
Expand Down
64 changes: 45 additions & 19 deletions daliuge-engine/dlg/apps/bash_shell_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import threading
import time
import types
import collections
import json

from .. import droputils, utils
from ..ddap_protocol import AppDROPStates, DROPStates
Expand Down Expand Up @@ -139,7 +139,9 @@ def prepare_input_channel(data):
chan = lambda: None # a simple object where attributes can be set!
chan.read = pipe.read
chan.fileno = pipe.fileno
chan.close = types.MethodType(lambda s: close_and_remove(pipe, pipe_name), chan)
chan.close = types.MethodType(
lambda s: close_and_remove(pipe, pipe_name), chan
)
return chan

elif data.startswith(b"tcp://"):
Expand Down Expand Up @@ -173,7 +175,9 @@ def initialize(self, **kwargs):
self._cmdLineArgs = self._popArg(kwargs, "command_line_arguments", "")
self._applicationArgs = self._popArg(kwargs, "applicationArgs", {})
self._argumentPrefix = self._popArg(kwargs, "argumentPrefix", "--")
self._paramValueSeparator = self._popArg(kwargs, "paramValueSeparator", " ")
self._paramValueSeparator = self._popArg(
kwargs, "paramValueSeparator", " "
)

if not self.command:
self.command = self._popArg(kwargs, "command", None)
Expand All @@ -198,10 +202,14 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):
output of the process is piped to. If not given it is consumed by this
method and potentially logged.
"""
logger.debug("Parameters found: %s", self.parameters)
# logger.debug("Parameters found: %s", json.dumps(self.parameters))
# we only support passing a path for bash apps
fsInputs = {uid: i for uid, i in inputs.items() if droputils.has_path(i)}
fsOutputs = {uid: o for uid, o in outputs.items() if droputils.has_path(o)}
fsInputs = {
uid: i for uid, i in inputs.items() if droputils.has_path(i)
}
fsOutputs = {
uid: o for uid, o in outputs.items() if droputils.has_path(o)
}
dataURLInputs = {
uid: i for uid, i in inputs.items() if not droputils.has_path(i)
}
Expand All @@ -210,14 +218,22 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):
}

# deal with named ports
inport_names = self.parameters['inputs'] \
if "inputs" in self.parameters else []
outport_names = self.parameters['outputs'] \
if "outputs" in self.parameters else []
keyargs, pargs = droputils.replace_named_ports(inputs.items(), outputs.items(),
inport_names, outport_names, self.appArgs, argumentPrefix=self._argumentPrefix,
separator=self._paramValueSeparator)
argumentString = f"{' '.join(pargs + keyargs)}" # add kwargs to end of pargs
inport_names = (
self.parameters["inputs"] if "inputs" in self.parameters else []
)
outport_names = (
self.parameters["outputs"] if "outputs" in self.parameters else []
)
keyargs, pargs = droputils.replace_named_ports(
inputs.items(),
outputs.items(),
inport_names,
outport_names,
self.appArgs,
argumentPrefix=self._argumentPrefix,
separator=self._paramValueSeparator,
)
argumentString = f"{' '.join(map(str,pargs + keyargs))}" # add kwargs to end of pargs
# complete command including all additional parameters and optional redirects
if len(argumentString.strip()) > 0:
# the _cmdLineArgs would very likely make the command line invalid
Expand All @@ -235,7 +251,9 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):
# Replace inputs/outputs in command line with paths or data URLs
cmd = droputils.replace_path_placeholders(cmd, fsInputs, fsOutputs)

cmd = droputils.replace_dataurl_placeholders(cmd, dataURLInputs, dataURLOutputs)
cmd = droputils.replace_dataurl_placeholders(
cmd, dataURLInputs, dataURLOutputs
)

# Pass down daliuge-specific information to the subprocesses as environment variables
env = os.environ.copy()
Expand Down Expand Up @@ -270,18 +288,26 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):
pstdout = b"<piped-out>"
pcode = process.returncode
end = time.time()
logger.info("Finished in %.3f [s] with exit code %d", (end - start), pcode)
logger.info(
"Finished in %.3f [s] with exit code %d", (end - start), pcode
)

logger.info("Finished in %.3f [s] with exit code %d", (end - start), pcode)
logger.info(
"Finished in %.3f [s] with exit code %d", (end - start), pcode
)
self._recompute_data["stdout"] = str(pstdout)
self._recompute_data["stderr"] = str(pstderr)
self._recompute_data["status"] = str(pcode)
if pcode == 0 and logger.isEnabledFor(logging.DEBUG):
logger.debug(
mesage_stdouts("Command finished successfully", pstdout, pstderr)
mesage_stdouts(
"Command finished successfully", pstdout, pstderr
)
)
elif pcode != 0:
message = "Command didn't finish successfully (exit code %d)" % (pcode,)
message = "Command didn't finish successfully (exit code %d)" % (
pcode,
)
logger.error(mesage_stdouts(message, pstdout, pstderr))
raise Exception(message)

Expand Down
102 changes: 102 additions & 0 deletions daliuge-engine/dlg/apps/constructs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from dlg.drop import BarrierAppDROP

##
# @brief Loop
# @details A loop placeholder drop
# @par EAGLE_START
# @param category Loop
# @param tag template
# @param appclass Application Class//String/ComponentParameter/readonly//False/False/Application class
# @param execution_time Execution Time//Float/ComponentParameter/readonly//False/False/Estimated execution time
# @param num_cpus No. of CPUs//Integer/ComponentParameter/readonly//False/False/Number of cores used
# @param group_start Group start/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the start of a group?
# @param num_of_iter No. of iterations/2/Integer/ComponentParameter/readwrite//False/False/Number of iterations
# @par EAGLE_END
class LoopDrop(BarrierAppDROP):
"""
This only exists to make sure we have a loop in the template palette
"""

pass


##
# @brief MKN
# @details A MKN placeholder drop
# @par EAGLE_START
# @param category MKN
# @param tag template
# @param appclass Application Class//String/ComponentParameter/readonly//False/False/Application class
# @param execution_time Execution Time//Float/ComponentParameter/readonly//False/False/Estimated execution time
# @param num_cpus No. of CPUs//Integer/ComponentParameter/readonly//False/False/Number of cores used
# @param group_start Group start/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the start of a group?
# @par EAGLE_END
class MKNDrop(BarrierAppDROP):
"""
This only exists to make sure we have a MKN in the template palette
"""

pass


##
# @brief SubGraph
# @details A SubGraph placeholder drop
# @par EAGLE_START
# @param category SubGraph
# @param tag template
# @param appclass Application Class//String/ComponentParameter/readonly//False/False/Application class
# @param execution_time Execution Time//Float/ComponentParameter/readonly//False/False/Estimated execution time
# @param num_cpus No. of CPUs//Integer/ComponentParameter/readonly//False/False/Number of cores used
# @par EAGLE_END
class SubGraphDrop(BarrierAppDROP):
"""
This only exists to make sure we have a SubGraph in the template palette
"""

pass


##
# @brief Comment
# @details A comment placeholder drop
# @par EAGLE_START
# @param category Comment
# @param tag template
# @par EAGLE_END
class CommentDrop(BarrierAppDROP):
"""
This only exists to make sure we have a comment in the template palette
"""

pass


##
# @brief Description
# @details A loop placeholder drop
# @par EAGLE_START
# @param category Description
# @param tag template
# @par EAGLE_END
class DescriptionDrop(BarrierAppDROP):
"""
This only exists to make sure we have a description in the template palette
"""

pass


##
# @brief Exclusive Force Node
# @details An Exclusive Force Node placeholder
# @par EAGLE_START
# @param category ExclusiveForceNode
# @param tag template
# @par EAGLE_END
class ExclusiveForceDrop(BarrierAppDROP):
"""
This only exists to make sure we have an exclusive force node in the template palette
"""

pass

0 comments on commit ead220b

Please sign in to comment.