Skip to content

Commit

Permalink
Merge branch 'named-ports'
Browse files Browse the repository at this point in the history
  • Loading branch information
awicenec committed Jul 4, 2022
2 parents b704466 + 7c7410c commit 90f0170
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 72 deletions.
56 changes: 51 additions & 5 deletions daliuge-engine/dlg/apps/bash_shell_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import threading
import time
import types
import collections

from .. import droputils, utils
from ..ddap_protocol import AppDROPStates, DROPStates
Expand Down Expand Up @@ -196,13 +197,47 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):
output of the process is piped to. If not given it is consumed by this
method and potentially logged.
"""
# we currently only support passing a path for bash apps
inputs_dict = collections.OrderedDict()
for uid, drop in inputs.items():
inputs_dict[uid] = drop.path

outputs_dict = collections.OrderedDict()
for uid, drop in outputs.items():
outputs_dict[uid] = drop.path

session_id = (
self._dlg_session.sessionId if self._dlg_session is not None else ""
)
argumentString = droputils.serialize_applicationArgs(
self._applicationArgs, self._argumentPrefix, self._paramValueSeparator
)
logger.debug(f"Parameters found: {self.parameters}")
# pargs, keyargs = droputils.serialize_applicationArgs(
# self._applicationArgs, self._argumentPrefix, self._paramValueSeparator
# )
if "applicationArgs" in self.parameters:
appArgs = self.parameters["applicationArgs"]
else:
appArgs ={}
pargs = [arg for arg in appArgs if appArgs[arg]["positional"]]
pargsDict = collections.OrderedDict(zip(pargs,[None]*len(pargs)))
keyargs = {arg:appArgs[arg]["value"] for arg in appArgs if not appArgs[arg]["positional"]}
logger.debug("pargs: %s; keyargs: %s, appArgs: %s",pargs, keyargs, appArgs)
if "inputs" in self.parameters and isinstance(self.parameters['inputs'][0], dict):
keyargs = droputils.identify_named_ports(
inputs_dict,
self.parameters["inputs"],
pargs,
pargsDict,
appArgs,
check_len=len(inputs),
mode="inputs")
else:
for i in range(min(len(inputs), len(pargs))):
keyargs.update({pargs[i]: list(inputs.values())[i]})
keyargs = droputils.serialize_kwargs(keyargs,
prefix=self._argumentPrefix,
separator=self._paramValueSeparator)
pargs = list(pargsDict.values())
argumentString = f"{' '.join(pargs + keyargs)}" # add kwargs to end of pargs
# complete command including all additional parameters and optional redirects
cmd = f"{self.command} {argumentString} {self._cmdLineArgs} "
if self._outputRedirect:
Expand All @@ -229,8 +264,19 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):

# Pass down daliuge-specific information to the subprocesses as environment variables
env = os.environ.copy()
env["DLG_UID"] = app_uid
env["DLG_SESSION_ID"] = session_id
env.update({"DLG_UID": self._uid})
if self._dlg_session:
env.update({"DLG_SESSION_ID": self._dlg_session.sessionId})

env.update({"DLG_ROOT": utils.getDlgDir()})
# # try to change to session directory, else just stay in current
# work_dir = utils.getDlgWorkDir()
# session_dir = f"{work_dir}/{session_id}"
# try:
# os.chdir(session_dir)
# logger.info("Changed to session directory: %s" % session_dir)
# except:
# logger.warning("Changing to session directory %s unsuccessful!" % session_dir)

# Wrap everything inside bash
cmd = ("/bin/bash", "-c", cmd)
Expand Down
24 changes: 14 additions & 10 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,22 +276,24 @@ def initialize(self, **kwargs):
self._command = self._getArg(kwargs, "command", None)

self._noBash = False
if not self._command or self._command[:2].strip() == "%%":
if not self._command or self._command.strip()[:2] == "%%":
logger.warning("Assume a default command is executed in the container")
self._command = self._command.strip()[2:].strip() if self._command else ""
self._command = self._command.strip().strip()[:2] if self._command else ""
self._noBash = True
# This makes sure that we can retain any command defined in the image, but still be
# able to add any arguments straight after. This requires to use the placeholder string
# "%%" at the start of the command, else it is interpreted as a normal command.

# construct the actual command line from all application parameters
argumentString = droputils.serialize_applicationArgs(
self._applicationArgs, self._argumentPrefix, self._paramValueSeparator
self.pargs, self.keyargs = droputils.serialize_applicationArgs(
self._applicationArgs, self._argumentPrefix,
self._paramValueSeparator
)
# complete command including all additional parameters and optional redirects
cmd = f"{self._command} {argumentString} {self._cmdLineArgs} "
cmd = cmd.strip()
self._command = cmd
# defer construction of complete command to run method
# cmd = f"{self._command} {self._cmdLineArgs} {' '.join(self.pargs)} "+\
# f"{' '.join(self.keyargs)}"
# cmd = cmd.strip()
# self._command = cmd

# The user used to run the process in the docker container is now always the user
# who originally started the DALiuGE process as well. The information is passed through
Expand Down Expand Up @@ -433,13 +435,15 @@ def run(self):
for uid, i in fsInputs.items()
}
dockerOutputs = {
# uid: DockerPath(utils.getDlgDir() + o.path) for uid, o in fsOutputs.items()
# uid: DockerPath(utils.getDlgDir() + o.path) for uid, o in fsOutputs.items()
uid: DockerPath(o.path)
for uid, o in fsOutputs.items()
}
dataURLInputs = {uid: i for uid, i in iitems if not droputils.has_path(i)}
dataURLOutputs = {uid: o for uid, o in oitems if not droputils.has_path(o)}

self._command = f"{self._command} {' '.join(self.pargs)} "+\
f"{' '.join(self.keyargs)}"
# TODO: Deal with named inputs
if self._command:
cmd = droputils.replace_path_placeholders(
self._command, dockerInputs, dockerOutputs
Expand Down
57 changes: 20 additions & 37 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,22 +458,16 @@ def optionalEval(x):
appArgs = {}

if ('inputs' in self.parameters and isinstance(self.parameters['inputs'][0], dict)):
logger.debug(f"Using named ports to identify inputs: "+\
f"{self.parameters['inputs']}")
for i in range(min(len(inputs),self.fn_nargs +\
len(self.arguments.kwonlyargs))):
# key for final dict is value in named ports dict
key = list(self.parameters['inputs'][i].values())[0]
# value for final dict is value in inputs dict
value = inputs[list(self.parameters['inputs'][i].keys())[0]]
if not value: value = '' # make sure we are passing NULL drop events
if key in posargs:
pargsDict.update({key:value})
else:
kwargs.update({key:value})
_dum = appArgs.pop(key) if key in appArgs else None
logger.debug("Using input %s for argument %s", value, key)
logger.debug("Argument used as input removed: %s", _dum)
check_len = min(len(inputs),self.fn_nargs+
len(self.arguments.kwonlyargs))
kwargs.update(droputils.identify_named_ports(
inputs,
self.parameters['inputs'],
posargs,
pargsDict,
appArgs,
check_len=check_len,
mode="inputs"))
else:
for i in range(min(len(inputs),self.fn_nargs)):
kwargs.update({self.arguments.args[i]: list(inputs.values())[i]})
Expand All @@ -482,27 +476,16 @@ def optionalEval(x):
funcargs.update(kwargs)

if ('outputs' in self.parameters and isinstance(self.parameters['outputs'][0], dict)):
out_names = [list(i.values())[0] for i in self.parameters['outputs']]
logger.debug(f"Using named ports to remove outputs from arguments: "+\
f"{out_names}")
for i in range(min(len(out_names),self.fn_nargs +\
len(self.arguments.kwonlyargs))):
# key for final dict is value in named ports dict
key = list(self.parameters['outputs'][i].values())[0]
# value for final dict is value in inputs dict
value = outputs[list(self.parameters['outputs'][i].keys())[0]]
if not value: value = '' # make sure we are passing NULL drop events
if key in posargs:
pargsDict.update({key:value})
else:
kwargs.update({key:value})
_dum = appArgs.pop(key) if key in appArgs else None
logger.debug("Using output %s for argument %s", value, key)
logger.debug("Argument used as output removed: %s", _dum)
# _dum = [appArgs.pop(k) for k in out_names if k in appArgs]
# if len(_dum) > 0:
# logger.debug("Application arguments used as outputs removed : %s",
# [i['text'] for i in _dum])
check_len = min(len(outputs),self.fn_nargs+
len(self.arguments.kwonlyargs))
kwargs.update(droputils.identify_named_ports(
outputs,
self.parameters['outputs'],
posargs,
pargsDict,
appArgs,
check_len=check_len,
mode="outputs"))

# Try to get values for still missing positional arguments from Application Args
if "applicationArgs" in self.parameters:
Expand Down
77 changes: 62 additions & 15 deletions daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import collections
import io
import json
import time
import logging
import pickle
import re
Expand Down Expand Up @@ -137,16 +137,32 @@ def copyDropContents(source: DataDROP, target: DataDROP, bufsize=4096):
"""
Manually copies data from one DROP into another, in bufsize steps
"""
logger.debug(f"Copying from {repr(source)} to {repr(target)}")
logger.debug(
"Copying from %s to %s", repr(source), repr(target))
desc = source.open()
buf = source.read(desc, bufsize)
logger.debug(f"Read {len(buf)} bytes from {repr(source)}")
logger.debug("Read %d bytes from %s", len(buf),
repr(source))
st = time.time()
tot_w = len(buf)
ofl = True
while buf:
target.write(buf)
logger.debug(f"Wrote {len(buf)} bytes to {repr(target)}")
tot_w += len(buf)
dur = time.time() - st
if int(dur) % 5 == 0 and ofl:
logger.debug("Wrote %.1f MB to %s; rate %.2f MB/s",
tot_w/1024**2, repr(target), tot_w/(1024**2*dur))
ofl = False
elif int(dur) % 5 == 4:
ofl = True
buf = source.read(desc, bufsize)
if buf is not None:
logger.debug(f"Read {len(buf)} bytes from {repr(source)}")
# if buf is not None:
# logger.debug(f"Read {len(buf)} bytes from {repr(source)}")
dur = time.time() - st
logger.debug("Wrote %.1f MB to %s; rate %.2f MB/s",
tot_w/1024**2, repr(target), tot_w/(1024**2*dur))

source.close(desc)


Expand Down Expand Up @@ -513,6 +529,15 @@ def replace_dataurl_placeholders(cmd, inputs, outputs):

return cmd

def serialize_kwargs(keyargs, prefix="--", separator=" "):
kwargs = []
for (name, value) in iter(keyargs.items()):
if prefix == "--" and len(name) == 1:
kwargs += [f"-{name} {value}"]
else:
kwargs += [f"{prefix}{name}{separator}{value}".strip()]
return kwargs


def serialize_applicationArgs(applicationArgs, prefix="--", separator=" "):
"""
Expand All @@ -524,7 +549,7 @@ def serialize_applicationArgs(applicationArgs, prefix="--", separator=" "):
else:
logger.info("ApplicationArgs found %s", applicationArgs)
# construct the actual command line from all application parameters
args = []
kwargs = {}
pargs = []
positional = False
precious = False
Expand All @@ -539,17 +564,39 @@ def serialize_applicationArgs(applicationArgs, prefix="--", separator=" "):
if value in [None, False, ""] and not precious:
continue
positional = vdict["positional"]
# short and long version of keywords
if positional:
pargs.append(str(value).strip())
else:
if prefix == "--" and len(name) == 1:
arg = [f"-{name} {value}"]
else:
arg = [f"{prefix}{name}{separator}{value}".strip()]
args += arg
logger.info('Arguments of bash command: %s %s', args, pargs)
return f"{' '.join(args + pargs)}" # add positional arguments to end of args
kwargs.update({name:value})
kwargs = serialize_kwargs(kwargs, prefix=prefix, separator=separator)
logger.info('Constructed command line arguments: %s %s', pargs, kwargs)
# return f"{' '.join(pargs + kwargs)}" # add kwargs to end of pargs
return (pargs, kwargs)

def identify_named_ports(ports, port_dict, posargs, pargsDict, appArgs, check_len=0, mode="inputs"):
"""
"""
logger.debug("Using named ports to remove %s from arguments: %s %d", mode,
port_dict, check_len)
# pargsDict = collections.OrderedDict(zip(posargs,[None]*len(posargs)))
kwargs = {}
for i in range(check_len):
# key for final dict is value in named ports dict
key = list(port_dict[i].values())[0]
# value for final dict is value in inputs dict
value = ports[list(port_dict[i].keys())[0]]
if not value: value = '' # make sure we are passing NULL drop events
if key in posargs:
pargsDict.update({key:value})
logger.debug("Using %s '%s' for parg %s", mode, value, key)
posargs.pop(posargs.index(key))
else:
kwargs.update({key:value})
logger.debug("Using %s '%s' for kwarg %s", mode, value, key)
_dum = appArgs.pop(key) if key in appArgs else None
logger.debug("Argument used as %s removed: %s", mode, _dum)
logger.debug("Returning mapped ports: %s", kwargs)
return kwargs


# Easing the transition from single- to multi-package
Expand Down
11 changes: 6 additions & 5 deletions daliuge-engine/test/apps/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def test_multi_listappendthrashing(self, size=1000, parallel=True):
_ = [d.addOutput(m) for d, m in zip(drops, mdrops)]
_ = [X.addInput(m) for m in mdrops]
X.addOutput(Z)
print(f"Number of inputs/outputs: {len(X.inputs)}, {len(X.outputs)}")
logger.info(f"Number of inputs/outputs: {len(X.inputs)}, {len(X.outputs)}")
self._test_graph_runs([S, X, Z] + drops + mdrops, S, Z, timeout=200)
# Need to run our 'copy' of the averaging APP
num_array = []
Expand All @@ -326,16 +326,17 @@ def test_speedup(self):
NOTE: In order to get the stdout you need to run pyest with
--capture=tee-sys
"""
size = 2000
print("Starting serial test..")
size = 3000
logger.info("Starting serial test..")
st = time.time()
self.test_multi_listappendthrashing(size=size, parallel=False)
t1 = time.time() - st
print("Starting parallel test..")
time.sleep(5)
logger.info("Starting parallel test..")
st = time.time()
self.test_multi_listappendthrashing(size=size, parallel=True)
t2 = time.time() - st
print(f"Speedup: {t1 / t2:.2f} from {cpu_count(logical=False)} cores")
logger.info(f"Speedup: {t1 / t2:.2f} from {cpu_count(logical=False)} cores")
# TODO: This is unpredictable, but maybe we can do something meaningful.
# self.assertAlmostEqual(t1/cpu_count(logical=False), t2, 1)
# How about this? We only need to see some type of speedup
Expand Down

0 comments on commit 90f0170

Please sign in to comment.