Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): add explicit parameters to renku run #2583

Merged
merged 9 commits into from Jan 20, 2022
17 changes: 17 additions & 0 deletions renku/cli/run.py
Expand Up @@ -91,6 +91,18 @@
the arguments that are passed on the command line. Files or directories
specified with this option will not be passed as input arguments to the
script.
You can specify ``--input name=path`` or just `--input path``, the former
of which would also set the name of the input on the resulting Plan.

.. topic:: Specifying auxiliary parameters (``--param``)

You can specify extra parameters to your program explicitly by using the
``--param`` option. This is useful for getting Renku to consider a
parameter as just a string even if it matches a file name in the project.
This option is not a replacement for the arguments that are passed on the
command line.
You can specify ``--param name=value`` or just `--param value``, the former
of which would also set the name of the parameter on the resulting Plan.

.. topic:: Disabling input detection (``--no-input-detection``)

Expand Down Expand Up @@ -148,6 +160,8 @@
``--output`` option. These output must exist after the execution of the
``renku run`` command. However, they do not need to be modified by
the command.
You can specify ``--output name=path`` or just `--output path``, the former
of which would also set the name of the output on the resulting Plan.

.. topic:: Disabling output detection (``--no-output-detection``)

Expand Down Expand Up @@ -240,6 +254,7 @@
@click.option("--keyword", multiple=True, help="List of keywords for the workflow.")
@click.option("explicit_inputs", "--input", multiple=True, help="Force a path to be considered as an input.")
@click.option("explicit_outputs", "--output", multiple=True, help="Force a path to be considered an output.")
@click.option("explicit_parameters", "--param", multiple=True, help="Force a string to be considered a parameter.")
@click.option("--no-output", is_flag=True, default=False, help="Allow command without output files.")
@click.option("--no-input-detection", is_flag=True, default=False, help="Disable auto-detection of inputs.")
@click.option("--no-output-detection", is_flag=True, default=False, help="Disable auto-detection of outputs.")
Expand All @@ -259,6 +274,7 @@ def run(
keyword,
explicit_inputs,
explicit_outputs,
explicit_parameters,
no_output,
no_input_detection,
no_output_detection,
Expand All @@ -281,6 +297,7 @@ def run(
keyword=keyword,
explicit_inputs=explicit_inputs,
explicit_outputs=explicit_outputs,
explicit_parameters=explicit_parameters,
no_output=no_output,
no_input_detection=no_input_detection,
no_output_detection=no_output_detection,
Expand Down
25 changes: 25 additions & 0 deletions renku/core/commands/run.py
Expand Up @@ -48,6 +48,7 @@ def _run_command(
keyword,
explicit_inputs,
explicit_outputs,
explicit_parameters,
no_output,
no_input_detection,
no_output_detection,
Expand Down Expand Up @@ -119,10 +120,34 @@ def _run_command(
sys.stderr = system_stderr

working_dir = str(client.repository.path)

def parse_explicit_definition(entries, type):
result = [tuple(e.split("=", maxsplit=1)[::-1]) if "=" in e else (e, None) for e in entries]

if not result:
return result

values, names = zip(*result)

if len(values) != len(set(values)):
raise errors.UsageError(f"Cannot specify the same explicit {type} value twice.")

names = [n for n in names if n]

if names and len(names) != len(set(names)):
raise errors.UsageError(f"Cannot specify the same explicit {type} name twice.")

return result

explicit_inputs = parse_explicit_definition(explicit_inputs, "input")
explicit_outputs = parse_explicit_definition(explicit_outputs, "output")
explicit_parameters = parse_explicit_definition(explicit_parameters, "param")

factory = PlanFactory(
command_line=command_line,
explicit_inputs=explicit_inputs,
explicit_outputs=explicit_outputs,
explicit_parameters=explicit_parameters,
directory=os.getcwd(),
working_dir=working_dir,
no_input_detection=no_input_detection,
Expand Down
103 changes: 74 additions & 29 deletions renku/core/management/workflow/plan_factory.py
Expand Up @@ -61,8 +61,9 @@ class PlanFactory:
def __init__(
self,
command_line: str,
explicit_inputs: List[str] = None,
explicit_outputs: List[str] = None,
explicit_inputs: Optional[List[Tuple[str, Optional[str]]]] = None,
explicit_outputs: Optional[List[Tuple[str, Optional[str]]]] = None,
explicit_parameters: Optional[List[Tuple[str, Optional[str]]]] = None,
directory: Optional[str] = None,
working_dir: Optional[str] = None,
no_input_detection: bool = False,
Expand Down Expand Up @@ -99,8 +100,13 @@ def __init__(

self.success_codes = success_codes or []

self.explicit_inputs = [Path(os.path.abspath(p)) for p in explicit_inputs] if explicit_inputs else []
self.explicit_outputs = [Path(os.path.abspath(p)) for p in explicit_outputs] if explicit_outputs else []
self.explicit_inputs = (
[(Path(os.path.abspath(path)), name) for path, name in explicit_inputs] if explicit_inputs else []
)
self.explicit_outputs = (
[(Path(os.path.abspath(path)), name) for path, name in explicit_outputs] if explicit_outputs else []
)
self.explicit_parameters = explicit_parameters if explicit_parameters else []

self.stdin = stdin
self.stdout = stdout
Expand Down Expand Up @@ -248,10 +254,11 @@ def add_inputs_and_parameters(self, *arguments):
assert isinstance(default, File)
self.add_command_input(default_value=str(default), encoding_format=default.mime_type, position=position)

def add_outputs(self, candidates: Set[str]):
def add_outputs(self, candidates: Set[Tuple[Union[Path, str], Optional[str]]]):
"""Yield detected output and changed command input parameter."""
# TODO what to do with duplicate paths & inputs with same defaults
tree = DirectoryTree.from_list(candidates)
candidate_paths = list(map(lambda x: x[0], candidates))
tree = DirectoryTree.from_list(candidate_paths)

input_candidates = {}
parameter_candidates = {}
Expand All @@ -268,6 +275,9 @@ def add_outputs(self, candidates: Set[str]):

for parameter in self.parameters:
# NOTE: find parameters that might actually be outputs
if any(parameter.default_value == value for value, _ in self.explicit_parameters):
continue

try:
path = self.directory / str(parameter.default_value)
input_path = Path(os.path.abspath(path)).relative_to(self.working_dir)
Expand All @@ -281,7 +291,7 @@ def add_outputs(self, candidates: Set[str]):
parameter_candidates[str(input_path)] = parameter
parameter_candidates[str(input_path)] = parameter

for path in candidates:
for path, name in candidates:
candidate = self._resolve_existing_subpath(self.working_dir / path)

if candidate is None:
Expand All @@ -292,22 +302,22 @@ def add_outputs(self, candidates: Set[str]):
if glob in input_candidates:
input = input_candidates[glob]

self.add_command_output_from_input(input)
self.add_command_output_from_input(input, name=name)
elif glob in parameter_candidates:
param = parameter_candidates[glob]

self.add_command_output_from_parameter(param)
self.add_command_output_from_parameter(param, name=name)
else:
encoding_format = [DIRECTORY_MIME_TYPE] if candidate.is_dir() else self._get_mimetype(candidate)
self.add_command_output(default_value=glob, encoding_format=encoding_format)
self.add_command_output(default_value=glob, encoding_format=encoding_format, name=name)

def _check_potential_output_directory(
self, input_path: Path, candidates: Set[str], tree: DirectoryTree
) -> Set[str]:
self, input_path: Path, candidates: Set[Tuple[str, Optional[str]]], tree: DirectoryTree
) -> Set[Tuple[str, Optional[str]]]:
"""Check an input/parameter for being a potential output directory."""
subpaths = {str(input_path / path) for path in tree.get(input_path, default=[])}
absolute_path = os.path.abspath(input_path)
if Path(absolute_path) not in self.explicit_outputs:
if all(Path(absolute_path) != path for path, _ in self.explicit_outputs):
content = {str(path) for path in input_path.rglob("*") if not path.is_dir() and path.name != ".gitkeep"}
preexisting_paths = content - subpaths
if preexisting_paths:
Expand All @@ -326,9 +336,9 @@ def _check_potential_output_directory(
)

# Remove files from the input directory
candidates = {path for path in candidates if path not in subpaths}
candidates = {(path, name) for path, name in candidates if path not in subpaths}
# Include input path in the candidates to check
candidates.add(str(input_path))
candidates.add((str(input_path), None))

return candidates

Expand All @@ -340,7 +350,7 @@ def _get_mimetype(file: Path) -> List[str]:

def guess_type(self, value: Union[Path, str], ignore_filenames: Set[str] = None) -> Tuple[Any, str]:
"""Return new value and CWL parameter type."""
if not self._is_ignored_path(value, ignore_filenames):
if not self._is_ignored_path(value, ignore_filenames) and all(value != v for v, _ in self.explicit_parameters):
candidate = self._resolve_existing_subpath(value)
if candidate:
if candidate.is_dir():
Expand All @@ -364,10 +374,11 @@ def add_command_input(
prefix: Optional[str] = None,
position: Optional[int] = None,
postfix: Optional[str] = None,
name: Optional[str] = None,
encoding_format: List[str] = None,
):
"""Create a CommandInput."""
if self.no_input_detection and Path(default_value).resolve() not in self.explicit_inputs:
if self.no_input_detection and all(Path(default_value).resolve() != path for path, _ in self.explicit_inputs):
return

mapped_stream = self.get_stream_mapping_for_value(default_value)
Expand All @@ -385,6 +396,7 @@ def add_command_input(
mapped_to=mapped_stream,
encoding_format=encoding_format,
postfix=postfix,
name=name,
)
)

Expand All @@ -395,9 +407,10 @@ def add_command_output(
position: Optional[int] = None,
postfix: Optional[str] = None,
encoding_format: List[str] = None,
name: Optional[str] = None,
):
"""Create a CommandOutput."""
if self.no_output_detection and Path(default_value).resolve() not in self.explicit_outputs:
if self.no_output_detection and all(Path(default_value).resolve() != path for path, _ in self.explicit_outputs):
return

create_folder = False
Expand Down Expand Up @@ -431,10 +444,11 @@ def add_command_output(
encoding_format=encoding_format,
postfix=postfix,
create_folder=create_folder,
name=name,
)
)

def add_command_output_from_input(self, input: CommandInput):
def add_command_output_from_input(self, input: CommandInput, name):
"""Create a CommandOutput from an input."""
self.inputs.remove(input)
self.outputs.append(
Expand All @@ -445,10 +459,11 @@ def add_command_output_from_input(self, input: CommandInput):
position=input.position,
mapped_to=input.mapped_to,
encoding_format=input.encoding_format,
name=name,
)
)

def add_command_output_from_parameter(self, parameter: CommandParameter):
def add_command_output_from_parameter(self, parameter: CommandParameter, name):
"""Create a CommandOutput from a parameter."""
self.parameters.remove(parameter)
value = Path(self._path_relative_to_root(parameter.default_value))
Expand All @@ -458,6 +473,7 @@ def add_command_output_from_parameter(self, parameter: CommandParameter):
prefix=parameter.prefix,
position=parameter.position,
encoding_format=encoding_format,
name=name,
)

def add_command_parameter(
Expand All @@ -483,7 +499,7 @@ def add_explicit_inputs(self):
input_paths = [input.default_value for input in self.inputs]
input_id = len(self.inputs) + len(self.parameters)

for explicit_input in self.explicit_inputs:
for explicit_input, name in self.explicit_inputs:
try:
relative_explicit_input = str(explicit_input.relative_to(self.working_dir))
except ValueError:
Expand All @@ -493,6 +509,11 @@ def add_explicit_inputs(self):
)

if relative_explicit_input in input_paths:
if name:
existing_inputs = [i for i in self.inputs if i.default_value == relative_explicit_input]

for existing_input in existing_inputs:
existing_input.name = name
continue

input_paths.append(explicit_input)
Expand All @@ -510,8 +531,26 @@ def add_explicit_inputs(self):
default_value=str(default),
postfix=str(input_id),
encoding_format=[DIRECTORY_MIME_TYPE] if type == "Directory" else default.mime_type,
name=name,
)

def add_explicit_parameters(self):
"""Add explicit parameters."""
parameter_names = [parameter.name for parameter in self.parameters]

for explicit_parameter, name in self.explicit_parameters:
if name and name in parameter_names:
continue

existing_parameters = [p for p in self.parameters if p.default_value == explicit_parameter]

if existing_parameters and name:
# NOTE: Update names of existing parameters
for p in existing_parameters:
p.name = name
elif not existing_parameters:
self.add_command_parameter(explicit_parameter, name=name)

@contextmanager
@inject.autoparams()
def watch(self, client_dispatcher: IClientDispatcher, no_output=False):
Expand Down Expand Up @@ -555,21 +594,25 @@ def watch(self, client_dispatcher: IClientDispatcher, no_output=False):
if not self.no_output_detection:
# Calculate possible output paths.
# Capture newly created files through redirects.
candidates |= {file_ for file_ in repository.untracked_files}
candidates |= {(file_, None) for file_ in repository.untracked_files}

# Capture modified files through redirects.
candidates |= {o.b_path for o in repository.unstaged_changes if not o.deleted}
candidates |= {(o.b_path, None) for o in repository.unstaged_changes if not o.deleted}

# Include explicit outputs
candidates |= {str(path.relative_to(self.working_dir)) for path in self.explicit_outputs}
candidates |= {(str(path.relative_to(self.working_dir)), name) for path, name in self.explicit_outputs}

candidates = {path for path in candidates if is_path_safe(path)}
candidates = {(path, name) for path, name in candidates if is_path_safe(path)}

self.add_outputs(candidates)

for stream_name in ("stdout", "stderr"):
stream = getattr(self, stream_name)
if stream and stream not in candidates and Path(os.path.abspath(stream)) not in self.explicit_outputs:
if (
stream
and all(stream != path for path, _ in candidates)
and (Path(os.path.abspath(stream)) != path for path, _ in self.explicit_outputs)
):
unmodified.add(stream)
elif stream:
output_paths.append(stream)
Expand Down Expand Up @@ -600,7 +643,9 @@ def _include_indirect_parameters(self):
run_parameters = read_indirect_parameters(self.working_dir)

for k, v in run_parameters.items():
self.add_command_parameter(name=k, default_value=str(v))
self.explicit_parameters.append((str(v), k))

self.add_explicit_parameters()

def add_indirect_inputs(self):
"""Read indirect inputs list and add them to explicit inputs."""
Expand All @@ -609,7 +654,7 @@ def add_indirect_inputs(self):
for indirect_input in self._read_files_list(indirect_inputs_list):
# treat indirect inputs like explicit inputs
path = Path(os.path.abspath(indirect_input))
self.explicit_inputs.append(path)
self.explicit_inputs.append((path, None))

# add new explicit inputs (if any) to inputs
self.add_explicit_inputs()
Expand All @@ -621,7 +666,7 @@ def add_indirect_outputs(self):
for indirect_output in self._read_files_list(indirect_outputs_list):
# treat indirect outputs like explicit outputs
path = Path(os.path.abspath(indirect_output))
self.explicit_outputs.append(path)
self.explicit_outputs.append((path, None))

def iter_input_files(self, basedir):
"""Yield tuples with input id and path."""
Expand Down