Skip to content

Commit ce8ba67

Browse files
mohammad-alisafaeejsam
authored andcommitted
feat: explicit input output specification (#598)
* feat: can specify inputs and outputs explicityly * build: refactor and add tests * feat: allow working with dirty repo * review: apply review comments * refactor: minor refactoring * undo allow working with dirty repo * feat: stage all explicit inputs in tool's output directory * feat: stage ALL file and directory inputs in the tools output directory. * chore: fix bad formatting * review: apply review comments
1 parent 52d8989 commit ce8ba67

File tree

6 files changed

+402
-105
lines changed

6 files changed

+402
-105
lines changed

conftest.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,3 +297,30 @@ def zenodo_sandbox(client):
297297
'zenodo', 'access_token',
298298
'HPwXfABPZ7JNiwXMrktL7pevuuo9jt4gsUCkh3Gs2apg65ixa3JPyFukdGup'
299299
)
300+
301+
302+
@pytest.fixture
303+
def cli(client, run):
304+
"""Return a callable Renku CLI.
305+
306+
It returns the exit code and content of the resulting CWL tool.
307+
"""
308+
import yaml
309+
from renku.models.cwl import CWLClass
310+
311+
def renku_cli(*args):
312+
before_cwl_files = set(client.workflow_path.glob('*.cwl'))
313+
exit_code = run(args)
314+
after_cwl_files = set(client.workflow_path.glob('*.cwl'))
315+
new_files = after_cwl_files - before_cwl_files
316+
assert len(new_files) <= 1
317+
if new_files:
318+
cwl_filepath = new_files.pop()
319+
with cwl_filepath.open('r') as f:
320+
content = CWLClass.from_cwl(yaml.safe_load(f))
321+
else:
322+
content = None
323+
324+
return exit_code, content
325+
326+
return renku_cli

renku/cli/run.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@
4949
as an **output**;
5050
* a path is not passed as an argument to ``renku run``.
5151
52+
.. topic:: Specifying auxiliary inputs (``--input``)
53+
54+
You can specify extra inputs to your program explicitly by using the
55+
``--input`` option. This is useful for specifying hidden dependencies
56+
that don't appear on the command line. These input file must exist before
57+
execution of ``renku run`` command. This option is not a replacement for
58+
the arguments that are passed on the command line. Files or directories
59+
specified with this option will not be passed as input arguments to the
60+
script.
61+
5262
Detecting output paths
5363
~~~~~~~~~~~~~~~~~~~~~~
5464
@@ -84,6 +94,13 @@
8494
You can specify the ``--no-output`` option to force tracking of such
8595
an execution.
8696
97+
.. topic:: Specifying outputs explicitly (``--output``)
98+
99+
You can specify expected outputs of your program explicitly by using the
100+
``--output`` option. These output must exist after the execution of the
101+
``renku run`` command. However, they do not need to be modified by
102+
the command.
103+
87104
.. cli-run-std
88105
89106
Detecting standard streams
@@ -134,6 +151,12 @@
134151

135152

136153
@click.command(context_settings=dict(ignore_unknown_options=True, ))
154+
@click.option(
155+
'inputs',
156+
'--input',
157+
multiple=True,
158+
help='Force a path to be considered as an input.',
159+
)
137160
@click.option(
138161
'outputs',
139162
'--output',
@@ -162,12 +185,16 @@
162185
commit=True,
163186
ignore_std_streams=True,
164187
)
165-
def run(client, outputs, no_output, success_codes, isolation, command_line):
188+
def run(
189+
client, inputs, outputs, no_output, success_codes, isolation, command_line
190+
):
166191
"""Tracking work on a specific problem."""
167192
working_dir = client.repo.working_dir
168193
mapped_std = _mapped_std_streams(client.candidate_paths)
169194
factory = CommandLineToolFactory(
170195
command_line=command_line,
196+
explicit_inputs=inputs,
197+
explicit_outputs=outputs,
171198
directory=os.getcwd(),
172199
working_dir=working_dir,
173200
successCodes=success_codes,
@@ -177,9 +204,7 @@ def run(client, outputs, no_output, success_codes, isolation, command_line):
177204
}
178205
)
179206
with client.with_workflow_storage() as wf:
180-
with factory.watch(
181-
client, no_output=no_output, outputs=outputs
182-
) as tool:
207+
with factory.watch(client, no_output=no_output) as tool:
183208
# Don't compute paths if storage is disabled.
184209
if client.has_external_storage:
185210
# Make sure all inputs are pulled from a storage.

renku/errors.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ def __init__(self, repo, unmodified):
185185
for path in unmodified) + '\n'
186186
'\nOnce you have removed the files that should be used as outputs,'
187187
'\nyou can safely rerun the previous command.'
188+
'\nYou can use --output flag to specify outputs explicitly.'
188189
)
189190

190191

@@ -214,6 +215,7 @@ def __init__(self, repo, inputs):
214215
) + '\n\n'
215216
'Once you have removed files that should be used as outputs,\n'
216217
'you can safely rerun the previous command.'
218+
'\nYou can use --output flag to specify outputs explicitly.'
217219
)
218220
else:
219221
msg += (
@@ -224,6 +226,10 @@ def __init__(self, repo, inputs):
224226
super(OutputsNotFound, self).__init__(msg)
225227

226228

229+
class InvalidInputPath(RenkuException, click.ClickException):
230+
"""Raise when input path does not exist or is not in the repository."""
231+
232+
227233
class InvalidSuccessCode(RenkuException, click.ClickException):
228234
"""Raise when the exit-code is not 0 or redefined."""
229235

renku/models/cwl/command_line_tool.py

Lines changed: 146 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,15 @@ class CommandLineToolFactory(object):
162162
if isinstance(cmd, (list, tuple)) else shlex.split(cmd),
163163
)
164164

165+
explicit_inputs = attr.ib(
166+
default=[],
167+
converter=lambda paths: [Path(path).resolve() for path in paths]
168+
)
169+
explicit_outputs = attr.ib(
170+
default=[],
171+
converter=lambda paths: [Path(path).resolve() for path in paths]
172+
)
173+
165174
directory = attr.ib(
166175
default='.',
167176
converter=lambda path: Path(path).resolve(),
@@ -183,7 +192,7 @@ class CommandLineToolFactory(object):
183192
successCodes = attr.ib(default=attr.Factory(list)) # list(int)
184193

185194
def __attrs_post_init__(self):
186-
"""Derive basic informations."""
195+
"""Derive basic information."""
187196
self.baseCommand, detect = self.split_command_and_args()
188197
self.arguments = []
189198
self.inputs = []
@@ -218,6 +227,10 @@ def __attrs_post_init__(self):
218227
else:
219228
self.inputs.append(input_)
220229

230+
if self.explicit_inputs:
231+
for input in self.find_explicit_inputs():
232+
self.inputs.append(input)
233+
221234
def generate_tool(self):
222235
"""Return an instance of command line tool."""
223236
return CommandLineTool(
@@ -232,24 +245,11 @@ def generate_tool(self):
232245
)
233246

234247
@contextmanager
235-
def watch(self, client, no_output=False, outputs=None):
248+
def watch(self, client, no_output=False):
236249
"""Watch a Renku repository for changes to detect outputs."""
237250
tool = self.generate_tool()
238251
repo = client.repo
239252

240-
if outputs:
241-
directories = [
242-
output for output in outputs if Path(output).is_dir()
243-
]
244-
245-
client.repo.git.rm(
246-
*outputs, r=True, force=True, ignore_unmatch=True
247-
)
248-
client.repo.index.commit('renku: automatic removal of outputs')
249-
250-
for directory in directories:
251-
Path(directory).mkdir(parents=True, exist_ok=True)
252-
253253
# NOTE consider to use git index instead
254254
existing_directories = {
255255
str(p.relative_to(client.path))
@@ -261,6 +261,10 @@ def watch(self, client, no_output=False, outputs=None):
261261
if repo:
262262
# List of all output paths.
263263
paths = []
264+
265+
inputs = {input.id: input for input in self.inputs}
266+
outputs = list(tool.outputs)
267+
264268
# Keep track of unmodified output files.
265269
unmodified = set()
266270

@@ -277,9 +281,6 @@ def watch(self, client, no_output=False, outputs=None):
277281
from renku.cli._graph import _safe_path
278282
candidates = {path for path in candidates if _safe_path(path)}
279283

280-
inputs = {input.id: input for input in self.inputs}
281-
outputs = list(tool.outputs)
282-
283284
for output, input, path in self.guess_outputs(candidates):
284285
outputs.append(output)
285286
paths.append(path)
@@ -292,27 +293,48 @@ def watch(self, client, no_output=False, outputs=None):
292293

293294
for stream_name in ('stdout', 'stderr'):
294295
stream = getattr(self, stream_name)
295-
if stream and stream not in candidates:
296+
if (
297+
stream and stream not in candidates and
298+
Path(stream).resolve() not in self.explicit_outputs
299+
):
296300
unmodified.add(stream)
297301
elif stream:
298302
paths.append(stream)
299303

304+
if self.explicit_outputs:
305+
last_output_id = len(outputs)
306+
307+
for output, input, path in self.find_explicit_outputs(
308+
last_output_id
309+
):
310+
outputs.append(output)
311+
paths.append(path)
312+
313+
if input is not None:
314+
if input.id not in inputs: # pragma: no cover
315+
raise RuntimeError('Inconsistent input name.')
316+
317+
inputs[input.id] = input
318+
300319
if unmodified:
301320
raise errors.UnmodifiedOutputs(repo, unmodified)
302321

303322
if not no_output and not paths:
304323
raise errors.OutputsNotFound(repo, inputs.values())
305324

325+
if client.has_external_storage:
326+
client.track_paths_in_storage(*paths)
327+
306328
tool.inputs = list(inputs.values())
307329
tool.outputs = outputs
308330

309-
client.track_paths_in_storage(*paths)
310-
311331
# Requirement detection can be done anytime.
312332
from .process_requirements import InitialWorkDirRequirement, \
313333
InlineJavascriptRequirement
314334
initial_work_dir_requirement = InitialWorkDirRequirement.from_tool(
315-
tool, existing_directories=existing_directories
335+
tool,
336+
existing_directories=existing_directories,
337+
working_dir=self.working_dir
316338
)
317339
if initial_work_dir_requirement:
318340
tool.requirements.extend([
@@ -521,25 +543,28 @@ def guess_outputs(self, paths):
521543
str(input_path / path)
522544
for path in tree.get(input_path, default=[])
523545
}
524-
content = {
525-
str(path)
526-
for path in input_path.rglob('*')
527-
if not path.is_dir() and path.name != '.gitkeep'
528-
}
529-
extra_paths = content - subpaths
530-
if extra_paths:
531-
raise errors.InvalidOutputPath(
532-
'The output directory "{0}" is not empty. \n\n'
533-
'Delete existing files before running the command:'
534-
'\n (use "git rm <file>..." to remove them first)'
535-
'\n\n'.format(input_path) + '\n'.join(
536-
'\t' + click.style(path, fg='yellow')
537-
for path in extra_paths
538-
) + '\n\n'
539-
'Once you have removed files that should be used '
540-
'as outputs,\n'
541-
'you can safely rerun the previous command.'
542-
)
546+
if input_path.resolve() not in self.explicit_outputs:
547+
content = {
548+
str(path)
549+
for path in input_path.rglob('*')
550+
if not path.is_dir() and path.name != '.gitkeep'
551+
}
552+
extra_paths = content - subpaths
553+
if extra_paths:
554+
raise errors.InvalidOutputPath(
555+
'The output directory "{0}" is not empty. \n\n'
556+
'Delete existing files before running the '
557+
'command:'
558+
'\n (use "git rm <file>..." to remove them '
559+
'first)'
560+
'\n\n'.format(input_path) + '\n'.join(
561+
'\t' + click.style(path, fg='yellow')
562+
for path in extra_paths
563+
) + '\n\n'
564+
'Once you have removed files that should be used '
565+
'as outputs,\n'
566+
'you can safely rerun the previous command.'
567+
)
543568

544569
# Remove files from the input directory
545570
paths = [path for path in paths if path not in subpaths]
@@ -611,3 +636,83 @@ def guess_outputs(self, paths):
611636
outputBinding=dict(glob=glob, ),
612637
), None, glob
613638
)
639+
640+
def find_explicit_inputs(self):
641+
"""Yield explicit inputs and command line input bindings if any."""
642+
input_paths = [
643+
input.default.path
644+
for input in self.inputs if input.type in PATH_OBJECTS
645+
]
646+
input_id = len(self.inputs) + len(self.arguments)
647+
648+
for explicit_input in self.explicit_inputs:
649+
if explicit_input in input_paths:
650+
continue
651+
652+
try:
653+
explicit_input.relative_to(self.working_dir)
654+
except ValueError:
655+
raise errors.InvalidInputPath(
656+
'The input file or directory is not in the repository.'
657+
'\n\n\t' + click.style(str(explicit_input), fg='yellow') +
658+
'\n\n'
659+
)
660+
if self.file_candidate(explicit_input) is None:
661+
raise errors.InvalidInputPath(
662+
'The input file or directory does not exist.'
663+
'\n\n\t' + click.style(str(explicit_input), fg='yellow') +
664+
'\n\n'
665+
)
666+
input_id += 1
667+
default, type, _ = self.guess_type(explicit_input)
668+
# Explicit inputs are either File or Directory
669+
assert type in PATH_OBJECTS
670+
# The inputBinging is None because these inputs won't
671+
# appear on command-line
672+
yield CommandInputParameter(
673+
id='input_{0}'.format(input_id),
674+
type=type,
675+
default=default,
676+
inputBinding=None
677+
)
678+
679+
def find_explicit_outputs(self, starting_output_id):
680+
"""Yield explicit output and changed command input parameter."""
681+
inputs = {
682+
str(i.default.path.relative_to(self.working_dir)): i
683+
for i in self.inputs if i.type in PATH_OBJECTS
684+
}
685+
output_id = starting_output_id
686+
687+
for path in self.explicit_outputs:
688+
if self.file_candidate(path) is None:
689+
raise errors.InvalidOutputPath(
690+
'The output file or directory does not exist.'
691+
'\n\n\t' + click.style(str(path), fg='yellow') + '\n\n'
692+
)
693+
694+
output_path = str(path.relative_to(self.working_dir))
695+
type = 'Directory' if path.is_dir() else 'File'
696+
if output_path in inputs:
697+
# change input type to note that it is also an output
698+
input = inputs[output_path]
699+
input = attr.evolve(input, type='string', default=output_path)
700+
yield (
701+
CommandOutputParameter(
702+
id='output_{0}'.format(output_id),
703+
type=type,
704+
outputBinding=dict(
705+
glob='$(inputs.{0})'.format(input.id)
706+
)
707+
), input, output_path
708+
)
709+
else:
710+
yield (
711+
CommandOutputParameter(
712+
id='output_{0}'.format(output_id),
713+
type=type,
714+
outputBinding=dict(glob=str(output_path))
715+
), None, output_path
716+
)
717+
718+
output_id += 1

0 commit comments

Comments
 (0)