Skip to content

Commit

Permalink
Improve support for keyword-positional arguments
Browse files Browse the repository at this point in the history
- Split up repeated code to improve readability
- Add flag to identify_named_ports to allow for the addition of positional arguments to keyword argument dicionaries (required for the PyFunc drops to work).
  • Loading branch information
myxie committed May 15, 2024
1 parent bf3a8fc commit 7b89f00
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 119 deletions.
2 changes: 2 additions & 0 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ def _ports2args(self, inputs, outputs, posargs, pargsDict, keyargsDict) -> dict:
keyargsDict,
check_len=check_len,
mode="inputs",
addPositionalToKeyword=True
)
)
else:
Expand Down Expand Up @@ -490,6 +491,7 @@ def _ports2args(self, inputs, outputs, posargs, pargsDict, keyargsDict) -> dict:
keyargsDict,
check_len=check_len,
mode="outputs",
addPositionalToKeyword=True
)
)
return portargs
Expand Down
254 changes: 135 additions & 119 deletions daliuge-engine/dlg/named_port_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,31 +82,38 @@ def serialize_applicationArgs(applicationArgs, prefix="--", separator=" "):


def identify_named_ports(
port_dict: dict,
posargs: list,
pargsDict: dict,
keyargs: dict,
check_len: int = 0,
mode: str = "inputs",
parser: callable = None,
port_dict: dict,
positionalArgs: list,
positionalPortArgs: dict,
keywordArgs: dict,
check_len: int = 0,
mode: str = "inputs",
parser: callable = None,
addPositionalToKeyword=False
) -> dict:
"""
Checks port names for matches with arguments and returns mapped ports.
Args:
port_dict (dict): ports {uid:name,...}
posargs (list): available positional arguments (will be modified)
pargsDict (dict): mapped arguments (will be modified)
keyargs (dict): keyword arguments
positionalArgs (list): available positional arguments (will be modified)
positionalPortArgs (dict): mapped arguments (will be modified)
keywordArgs (dict): keyword arguments
check_len (int): number of of ports to be checked
mode (str ["inputs"]): mode, used just for logging messages
parser (function): parser function for this port
addPositionalToKeyword (bool): Adds a positional argument to the keyword
argument dictionary. Necessary where you have the optional of a
keyword-positional argument, such as in a python function.
Returns:
dict: port arguments
Side effect:
modifies pargsDict
Modifies:
- positionalArgus
- positionalPortArgs
- keywordArgs
"""
# p_name = [p["name"] for p in port_dict]
logger.debug(
Expand All @@ -115,11 +122,11 @@ def identify_named_ports(
port_dict,
check_len,
)
logger.debug("Checking against keyargs: %s", keyargs)
portargs = {}
posargs = list(posargs)
logger.debug("Checking against keyargs: %s", keywordArgs)
keywordPortArgs = {}
positionalArgs = list(positionalArgs)
keys = list(port_dict.keys())
logger.debug("Checking ports: %s against %s %s", keys, posargs, keyargs)
logger.debug("Checking ports: %s against %s %s", keys, positionalArgs, keywordArgs)
for i in range(check_len):
try:
key = port_dict[keys[i]]["name"]
Expand All @@ -129,34 +136,35 @@ def identify_named_ports(
raise KeyError
if value is None:
value = "" # make sure we are passing NULL drop events
if key in posargs:
if key in positionalArgs:
if parser:
logger.debug("Reading from port using %s", parser.__repr__())
value = parser(port_dict[keys[i]]["drop"])
pargsDict.update({key: value})
positionalPortArgs.update({key: value})
logger.debug("Using %s '%s' for parg %s", mode, value, key)
portargs.update({key: value})
posargs.pop(posargs.index(key))
elif key in keyargs:
positionalArgs.pop(positionalArgs.index(key))
# We have positional argument that is also a keyword
if addPositionalToKeyword:
keywordPortArgs.update({key: value})
elif key in keywordArgs:
if parser:
logger.debug("Reading from port using %s", parser.__repr__())
value = parser(port_dict[keys[i]]["drop"])
# if not found in appArgs we don't put them into portargs either
# pargsDict.update({key: value})
portargs.update({key: value})
keywordPortArgs.update({key: value})
logger.debug("Using %s of type %s for kwarg %s", mode, type(value), key)
_ = keyargs.pop(key) # remove from original arg list
_ = keywordArgs.pop(key) # remove from original arg list
else:
logger.debug(
"No matching argument found for %s key %s, %s, %s",
mode,
key,
keyargs,
posargs,
mode, key,
keywordArgs,
positionalArgs,
)

logger.debug("Returning kw mapped ports: %s", portargs)
return portargs
logger.debug("Returning kw mapped ports: %s", keywordPortArgs)
return keywordPortArgs


def check_ports_dict(ports: list) -> bool:
Expand All @@ -178,14 +186,14 @@ def check_ports_dict(ports: list) -> bool:


def replace_named_ports(
iitems: dict,
oitems: dict,
inport_names: dict,
outport_names: dict,
appArgs: dict,
argumentPrefix: str = "--",
separator: str = " ",
parser: callable = None,
iitems: dict,
oitems: dict,
inport_names: dict,
outport_names: dict,
appArgs: dict,
argumentPrefix: str = "--",
separator: str = " ",
parser: callable = None,
) -> Tuple[str, str]:
"""
Function attempts to identify CLI component arguments that match port names.
Expand All @@ -200,16 +208,18 @@ def replace_named_ports(
separator: character used between keyword and value
parser: reader function for ports
This method is focused on creating two 'sets' of arguments:
- The arguments that are passed to the application (keyword and
positional arguments)
- The arguments that are passed to the application that are derived from
the ports of the drop (keywordPort and positionalPort arguments).
Returns:
tuple of serialized keyword arguments and positional arguments
"""
logger.debug(
"iitems: %s; inport_names: %s; outport_names: %s",
iitems,
inport_names,
outport_names,
iitems, inport_names, outport_names,
)
inputs_dict = collections.OrderedDict()
for uid, drop in iitems:
Expand All @@ -221,100 +231,106 @@ def replace_named_ports(
outputs_dict = collections.OrderedDict()
for uid, drop in oitems:
outputs_dict[uid] = {"path": drop.path if hasattr(drop, "path") else ""}
# logger.debug("appArgs: %s", appArgs)
# get positional args
posargs = [arg for arg in appArgs if appArgs[arg]["positional"]]
# get kwargs
keyargs = {
arg: appArgs[arg]["value"] for arg in appArgs if not appArgs[arg]["positional"]
}

positionalArgs = _get_args(appArgs, positional=True)
keywordArgs = _get_args(appArgs, positional=False)
# we will need an ordered dict for all positional arguments
# thus we create it here and fill it with values
portPosargsDict = collections.OrderedDict(zip(posargs, [None] * len(posargs)))
positionalPortArgs = collections.OrderedDict(
zip(positionalArgs, [None] * len(positionalArgs)))

logger.debug(
"posargs: %s; keyargs: %s, %s",
posargs,
keyargs,
check_ports_dict(inport_names),
)
portkeyargs = {}
ipkeyargs = {}
opkeyargs = {}
if check_ports_dict(inport_names):
for inport in inport_names:
key = list(inport.keys())[0]
inputs_dict[key].update({"name": inport[key]})

ipkeyargs = identify_named_ports(
inputs_dict,
posargs,
portPosargsDict,
keyargs,
check_len=len(iitems),
mode="inputs",
parser=parser,
)
portkeyargs.update(ipkeyargs)
else:
for i in range(min(len(iitems), len(posargs))):
portkeyargs.update({list(posargs)[i]: list(iitems)[i][1]})

if check_ports_dict(outport_names):
for outport in outport_names:
key = list(outport.keys())[0]
outputs_dict[key].update({"name": outport[key]})
opkeyargs = identify_named_ports(
outputs_dict,
posargs,
portPosargsDict,
keyargs,
check_len=len(oitems),
mode="outputs",
)
portkeyargs.update(opkeyargs)
else:
for i in range(min(len(oitems), len(posargs))):
portkeyargs.update({posargs[i]: list(oitems)[i][1]})
# now that we have the mapped ports we can cleanup the appArgs
# and construct the final keyargs and pargs
logger.debug(
"Arguments from ports: %s, %s, %s, %s",
portkeyargs,
portPosargsDict,
ipkeyargs,
opkeyargs,
positionalArgs, keywordArgs, check_ports_dict(inport_names),
)

keywordPortArgs = {}
# Update the argument dictionaries in-place based on the port names.
# This needs to be done for both the input ports and output ports on the drop.
_process_port(inport_names, inputs_dict, keywordPortArgs, positionalArgs,
positionalPortArgs, keywordArgs, iitems, parser, "inputs")

_process_port(outport_names, outputs_dict, keywordPortArgs, positionalArgs,
positionalPortArgs, keywordArgs, oitems, parser, "outputs")

logger.debug("Arguments from ports: %s, %s,",
keywordPortArgs, positionalPortArgs)

# Clean arguments for Docker and Bash applications
appArgs = clean_applicationArgs(appArgs)
# get cleaned positional args
posargs = {
arg: appArgs[arg]["value"] for arg in appArgs if appArgs[arg]["positional"]
}
logger.debug("posargs: %s", posargs)
# get cleaned kwargs
keyargs = {
arg: appArgs[arg]["value"] for arg in appArgs if not appArgs[arg]["positional"]
}
for k, v in portkeyargs.items():
positionalArgs = _get_args(appArgs, positional=True)
keywordArgs = _get_args(appArgs, positional=False)

# Construct the final keywordArguments and positionalPortArguments
for k, v in keywordPortArgs.items():
if v not in [None, ""]:
keyargs.update({k: v})
for k, v in portPosargsDict.items():
keywordArgs.update({k: v})
for k, v in positionalPortArgs.items():
logger.debug("port posarg %s has value %s", k, v)
# logger.debug("default posarg %s has value %s", k, posargs[k])
if k == "input_redirection":
v = f"cat {v} > "
if k == "output_redirection":
v = f"> {v}"
if v not in [None, ""]:
posargs.update({k: v})
keyargs = (
serialize_kwargs(keyargs, prefix=argumentPrefix, separator=separator)
if len(keyargs) > 0
positionalArgs.update({k: v})

keywordArgs = (
serialize_kwargs(keywordArgs, prefix=argumentPrefix, separator=separator)
if len(keywordArgs) > 0
else [""]
)
pargs = list(posargs.values())
pargs = [""] if len(pargs) == 0 or None in pargs else pargs
logger.debug("After port replacement: pargs: %s; keyargs: %s", pargs, keyargs)
return keyargs, pargs
pargs = list(positionalArgs.values())
if not pargs:
pargs = [""]

logger.debug("After port replacement: pargs: %s; keyargs: %s", pargs, keywordArgs)
return keywordArgs, pargs


def _process_port(port_names, ports, keywordPortArgs, positionalArgs,
positionalPortArgs, keywordArgs, iitems, parser, mode):
"""
For the set of port names, perform a backwards compatible update of the:
- Keyword Arguments (application, and port name)
- Positional Arguments (application, and port name)
Note: This performs an IN-PLACE transformation of the dictionaries, through the
identify_named_ports() method.
"""

if check_ports_dict(port_names):
for port in port_names:
key = list(port.keys())[0]
ports[key].update({"name": port[key]})
keywordPortArgs.update(identify_named_ports(
ports,
positionalArgs,
positionalPortArgs,
keywordArgs,
check_len=len(iitems),
mode=mode,
parser=parser,
))
else:
for i in range(min(len(iitems), len(positionalArgs))):
keywordPortArgs.update({list(positionalArgs)[i]: list(iitems)[i][1]})


def _get_args(appArgs, positional=False):
"""
Separate out the arguments dependening on if we want positional or keyword style
"""
args = {
arg: appArgs[arg]["value"] for arg in appArgs
if (appArgs[arg]["positional"] == positional)
}

argType = 'Keyword'
if positional:
argType = 'Positional'
logger.debug("%s arguments: %s", argType, args)
return args


def get_port_reader_function(input_parser: DropParser):
Expand Down

0 comments on commit 7b89f00

Please sign in to comment.