Skip to content

Commit

Permalink
Merge 9f93047 into a92ee81
Browse files Browse the repository at this point in the history
  • Loading branch information
awicenec committed Jul 31, 2022
2 parents a92ee81 + 9f93047 commit 5ca851d
Show file tree
Hide file tree
Showing 9 changed files with 450 additions and 234 deletions.
93 changes: 23 additions & 70 deletions daliuge-engine/dlg/apps/bash_shell_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def initialize(self, **kwargs):
self, "No command specified, cannot create BashShellApp"
)

self.appArgs = self._applicationArgs
self._recompute_data = {}

def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):
Expand All @@ -197,83 +198,43 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):
output of the process is piped to. If not given it is consumed by this
method and potentially logged.
"""
# we currently only support passing a path for bash apps
inputs_dict = collections.OrderedDict()
for uid, drop in inputs.items():
inputs_dict[uid] = drop.path

outputs_dict = collections.OrderedDict()
for uid, drop in outputs.items():
outputs_dict[uid] = drop.path
logger.debug("Parameters found: %s", self.parameters)
# we only support passing a path for bash apps
fsInputs = {uid: i for uid, i in inputs.items() if droputils.has_path(i)}
fsOutputs = {uid: o for uid, o in outputs.items() if droputils.has_path(o)}
dataURLInputs = {
uid: i for uid, i in inputs.items() if not droputils.has_path(i)
}
dataURLOutputs = {
uid: o for uid, o in outputs.items() if not droputils.has_path(o)
}

session_id = (
self._dlg_session.sessionId if self._dlg_session is not None else ""
)
logger.debug(f"Parameters found: {self.parameters}")
# pargs, keyargs = droputils.serialize_applicationArgs(
# self._applicationArgs, self._argumentPrefix, self._paramValueSeparator
# )
if "applicationArgs" in self.parameters:
appArgs = self.parameters["applicationArgs"]
else:
appArgs ={}
pargs = [arg for arg in appArgs if appArgs[arg]["positional"]]
pargsDict = collections.OrderedDict(zip(pargs,[None]*len(pargs)))
keyargs = {arg:appArgs[arg]["value"] for arg in appArgs if not appArgs[arg]["positional"]}
logger.debug("pargs: %s; keyargs: %s, appArgs: %s",pargs, keyargs, appArgs)
if "inputs" in self.parameters and isinstance(self.parameters['inputs'][0], dict):
pkeyargs = droputils.identify_named_ports(
inputs_dict,
self.parameters["inputs"],
pargs,
pargsDict,
appArgs,
check_len=len(inputs),
mode="inputs")
keyargs.update(pkeyargs)
else:
for i in range(min(len(inputs), len(pargs))):
keyargs.update({pargs[i]: list(inputs.values())[i]})
if "outputs" in self.parameters and isinstance(self.parameters['outputs'][0], dict):
pkeyargs = droputils.identify_named_ports(
outputs_dict,
self.parameters["outputs"],
pargs,
pargsDict,
appArgs,
check_len=len(outputs),
mode="outputs")
keyargs.update(pkeyargs)
else:
for i in range(min(len(outputs), len(pargs))):
keyargs.update({pargs[i]: list(outputs.values())[i]})
keyargs = droputils.serialize_kwargs(keyargs,
prefix=self._argumentPrefix,
# deal with named ports
inport_names = self.parameters['inputs'] \
if "inputs" in self.parameters else []
outport_names = self.parameters['outputs'] \
if "outputs" in self.parameters else []
keyargs, pargs = droputils.replace_named_ports(inputs.items(), outputs.items(),
inport_names, outport_names, self.appArgs, argumentPrefix=self._argumentPrefix,
separator=self._paramValueSeparator)
pargs = list(pargsDict.values())
argumentString = f"{' '.join(pargs + keyargs)}" # add kwargs to end of pargs
# complete command including all additional parameters and optional redirects
cmd = f"{self.command} {argumentString} {self._cmdLineArgs} "
if len(argumentString.strip()) > 0:
# the _cmdLineArgs would very likely make the command line invalid
cmd = f"{self.command} {argumentString} "
else:
cmd = f"{self.command} {argumentString} {self._cmdLineArgs} "
if self._outputRedirect:
cmd = f"{cmd} > {self._outputRedirect}"
if self._inputRedirect:
cmd = f"cat {self._inputRedirect} > {cmd}"
cmd = cmd.strip()

app_uid = self.uid
# self.run_bash(self._command, self.uid, session_id, *args, **kwargs)

# Replace inputs/outputs in command line with paths or data URLs
fsInputs = {uid: i for uid, i in inputs.items() if droputils.has_path(i)}
fsOutputs = {uid: o for uid, o in outputs.items() if droputils.has_path(o)}
cmd = droputils.replace_path_placeholders(cmd, fsInputs, fsOutputs)

dataURLInputs = {
uid: i for uid, i in inputs.items() if not droputils.has_path(i)
}
dataURLOutputs = {
uid: o for uid, o in outputs.items() if not droputils.has_path(o)
}
cmd = droputils.replace_dataurl_placeholders(cmd, dataURLInputs, dataURLOutputs)

# Pass down daliuge-specific information to the subprocesses as environment variables
Expand All @@ -283,14 +244,6 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):
env.update({"DLG_SESSION_ID": self._dlg_session.sessionId})

env.update({"DLG_ROOT": utils.getDlgDir()})
# # try to change to session directory, else just stay in current
# work_dir = utils.getDlgWorkDir()
# session_dir = f"{work_dir}/{session_id}"
# try:
# os.chdir(session_dir)
# logger.info("Changed to session directory: %s" % session_dir)
# except:
# logger.warning("Changing to session directory %s unsuccessful!" % session_dir)

# Wrap everything inside bash
cmd = ("/bin/bash", "-c", cmd)
Expand Down
63 changes: 33 additions & 30 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,16 +265,6 @@ def initialize(self, **kwargs):
# able to add any arguments straight after. This requires to use the placeholder string
# "%%" at the start of the command, else it is interpreted as a normal command.

# construct the actual command line from all application parameters
self.pargs, self.keyargs = droputils.serialize_applicationArgs(
self._applicationArgs, self._argumentPrefix,
self._paramValueSeparator
)
# defer construction of complete command to run method
# cmd = f"{self._command} {self._cmdLineArgs} {' '.join(self.pargs)} "+\
# f"{' '.join(self.keyargs)}"
# cmd = cmd.strip()
# self._command = cmd

# The user used to run the process in the docker container is now always the user
# who originally started the DALiuGE process as well. The information is passed through
Expand Down Expand Up @@ -411,7 +401,7 @@ def run(self):
fsInputs = {uid: i for uid, i in iitems if droputils.has_path(i)}
fsOutputs = {uid: o for uid, o in oitems if droputils.has_path(o)}
dockerInputs = {
# uid: DockerPath(utils.getDlgDir() + i.path) for uid, i in fsInputs.items()
# uid: DockerPath(utils.getDlgDir() + i.path) for uid, i in fsInputs.items()
uid: DockerPath(i.path)
for uid, i in fsInputs.items()
}
Expand All @@ -422,18 +412,6 @@ def run(self):
}
dataURLInputs = {uid: i for uid, i in iitems if not droputils.has_path(i)}
dataURLOutputs = {uid: o for uid, o in oitems if not droputils.has_path(o)}
self._command = f"{self._command} {' '.join(self.pargs)} "+\
f"{' '.join(self.keyargs)}"
# TODO: Deal with named inputs
if self._command:
cmd = droputils.replace_path_placeholders(
self._command, dockerInputs, dockerOutputs
)
cmd = droputils.replace_dataurl_placeholders(
cmd, dataURLInputs, dataURLOutputs
)
else:
cmd = ""

# We bind the inputs and outputs inside the docker under the utils.getDlgDir()
# directory, maintaining the rest of their original paths.
Expand Down Expand Up @@ -478,13 +456,6 @@ def run(self):
)
logger.debug(f"port mappings: {portMappings}")

# Wait until the DockerApps this application runtime depends on have
# started, and replace their IP placeholders by the real IPs
for waiter in self._waiters:
uid, ip = waiter.waitForIp()
cmd = cmd.replace("%containerIp[{0}]%".format(uid), ip)
logger.debug("Command after IP replacement is: %s", cmd)

# deal with environment variables
env = {}
env.update({"DLG_UID": self._uid})
Expand Down Expand Up @@ -528,6 +499,38 @@ def run(self):
)
logger.debug(f"Adding environment variables: {env}")

# deal with named ports
appArgs = self._applicationArgs
inport_names = self.parameters['inputs'] \
if "inputs" in self.parameters else []
outport_names = self.parameters['outputs'] \
if "outputs" in self.parameters else []
keyargs, pargs = droputils.replace_named_ports(iitems, oitems,
inport_names, outport_names, appArgs,
argumentPrefix=self._argumentPrefix,
separator=self._paramValueSeparator)

argumentString = f"{' '.join(keyargs + pargs)}"

# complete command including all additional parameters and optional redirects
cmd = f"{self._command} {argumentString} {self._cmdLineArgs} "
if cmd:
cmd = droputils.replace_path_placeholders(
cmd, dockerInputs, dockerOutputs
)
cmd = droputils.replace_dataurl_placeholders(
cmd, dataURLInputs, dataURLOutputs
)
else:
cmd = ""
###############
# Wait until the DockerApps this application runtime depends on have
# started, and replace their IP placeholders by the real IPs
for waiter in self._waiters:
uid, ip = waiter.waitForIp()
cmd = cmd.replace("%containerIp[{0}]%".format(uid), ip)
logger.debug("Command after IP replacement is: %s", cmd)

# Wrap everything inside bash
if len(cmd) > 0 and not self._noBash:
cmd = '/bin/bash -c "%s"' % (
Expand Down
34 changes: 24 additions & 10 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,10 @@ def optionalEval(x):
pargsDict = collections.OrderedDict(zip(posargs,[None]*len(posargs)))
if "applicationArgs" in self.parameters:
appArgs = self.parameters["applicationArgs"] # we'll pop the identified ones
pargsDict.update({k:self.parameters[k] for k in pargsDict if k in
_dum = [appArgs.pop(k) for k in self.func_def_keywords if k in appArgs]
logger.debug("Identified keyword arguments removed: %s",
[i['text'] for i in _dum])
pargsDict.update({k:self.parameters[k] for k in pargsDict if k in
self.parameters})
# if defined in both we use AppArgs values
pargsDict.update({k:appArgs[k]['value'] for k in pargsDict if k
Expand All @@ -448,12 +451,18 @@ def optionalEval(x):
else:
appArgs = {}

if ('inputs' in self.parameters and isinstance(self.parameters['inputs'][0], dict)):
if ('inputs' in self.parameters and
droputils.check_ports_dict(self.parameters['inputs'])):
check_len = min(len(inputs),self.fn_nargs+
len(self.arguments.kwonlyargs))
inputs_dict = collections.OrderedDict()
for inport in self.parameters['inputs']:
key = list(inport.keys())[0]
inputs_dict[key] = {
'name':inport[key],
'path':inputs[key]}
kwargs.update(droputils.identify_named_ports(
inputs,
self.parameters['inputs'],
inputs_dict,
posargs,
pargsDict,
appArgs,
Expand All @@ -466,12 +475,20 @@ def optionalEval(x):
logger.debug(f"Updating funcargs with input ports {kwargs}")
funcargs.update(kwargs)

if ('outputs' in self.parameters and isinstance(self.parameters['outputs'][0], dict)):
if ('outputs' in self.parameters and
droputils.check_ports_dict(self.parameters['outputs'])):
check_len = min(len(outputs),self.fn_nargs+
len(self.arguments.kwonlyargs))
outputs_dict = collections.OrderedDict()
for outport in self.parameters['outputs']:
key = list(outport.keys())[0]
outputs_dict[key] = {
'name':outport[key],
'path': outputs[key]
}

kwargs.update(droputils.identify_named_ports(
outputs,
self.parameters['outputs'],
outputs_dict,
posargs,
pargsDict,
appArgs,
Expand All @@ -480,9 +497,6 @@ def optionalEval(x):

# Try to get values for still missing positional arguments from Application Args
if "applicationArgs" in self.parameters:
_dum = [appArgs.pop(k) for k in self.func_def_keywords if k in appArgs]
logger.debug("Identified keyword arguments removed: %s",
[i['text'] for i in _dum])
for pa in posargs:
if pa != 'self' and pa not in funcargs:
if pa in appArgs:
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -1686,7 +1686,7 @@ def generate_reproduce_data(self):
# @param tag daliuge
# @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param ngsSrv NGAS Server/localhost/String/ComponentParameter/readwrite//False/False/The URL of the NGAS Server
# @param ngasSrv NGAS Server/localhost/String/ComponentParameter/readwrite//False/False/The URL of the NGAS Server
# @param ngasPort NGAS Port/7777/Integer/ComponentParameter/readwrite//False/False/The port of the NGAS Server
# @param ngasFileId File ID//String/ComponentParameter/readwrite//False/False/File ID on NGAS (for retrieval only)
# @param ngasConnectTimeout Connection timeout/2/Integer/ComponentParameter/readwrite//False/False/Timeout for connecting to the NGAS server
Expand Down Expand Up @@ -2386,7 +2386,7 @@ def _generateNamedOutputs(self):
"""
Generates a named mapping of output data drops. Can only be called during run().
"""
named_outputs: OrderedDict[str, DataDROP] = OrderedDict()
named_outputs: OrderedDict[str, DataDROP] = OrderedDict()
if 'outputs' in self.parameters and isinstance(self.parameters['outputs'][0], dict):
for i in range(len(self._outputs)):
key = list(self.parameters['outputs'][i].values())[0]
Expand Down

0 comments on commit 5ca851d

Please sign in to comment.