Skip to content
Merged

Cwl #93

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 80 additions & 2 deletions renga/cli/_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from renga._compat import Path
from renga.models.cwl.command_line_tool import CommandLineTool
from renga.models.cwl.workflow import Workflow


@attr.s
Expand Down Expand Up @@ -72,7 +73,8 @@ def find_latest(self, start, path):
def iter_file_inputs(self, tool, basedir):
"""Yield path of tool file inputs."""
if tool.stdin:
raise NotImplemented(tool.stdin)
if tool.stdin[0] != '$': # pragma: no cover
raise NotImplemented(tool.stdin)
for input_ in tool.inputs:
if input_.type == 'File' and input_.default:
yield os.path.relpath(
Expand Down Expand Up @@ -105,9 +107,85 @@ def add_file(self, path, revision='HEAD'):
file_key = self.add_node(commit, path)
tool_key = self.add_tool(commit, cwl)
#: Edge from a tool to the output.
self.G.add_edge(tool_key, file_key)
tool = self.G.nodes[tool_key]['tool']
output_id = tool.get_output_id(path)
self.G.add_edge(tool_key, file_key, id=output_id)
return file_key

if file_commits:
#: Does not have a parent CWL.
return self.add_node(file_commits[0], path)

@property
def _output_keys(self):
"""Return a list of the output keys."""
return [n for n, d in self.G.out_degree() if d == 0]

def _source_name(self, key):
"""Find source name for a node."""
if self.G.in_degree(key) == 0:
return None

assert self.G.in_degree(key) == 1

tool_key, attr = list(self.G.pred[key].items())[0]
step = self.G.nodes[tool_key]['step']['id']
return '{0}/{1}'.format(step, attr['id'])

@property
def _tool_nodes(self):
"""Yield topologically sorted tools."""
for key in nx.topological_sort(self.G):
node = self.G.nodes[key]
tool = node.get('tool')
if tool is not None:
yield key, node

def ascwl(self):
"""Serialize graph to CWL workflow."""
workflow = Workflow()

input_index = 1

for tool_index, (key, node) in enumerate(self._tool_nodes, 1):
_, path = key
tool = node['tool']
step_id = 'step_{0}'.format(tool_index)
node['step'] = {'id': step_id}

ins = {
edge_id: self._source_name(target_id)
for target_id, _, edge_id in self.G.in_edges(key, data='id')
}
outs = [
edge_id for _, _, edge_id in self.G.out_edges(key, data='id')
]

for input_ in tool.inputs:
input_mapping = ins.get(input_.id)
if input_mapping is None:
input_id = 'input_{0}'.format(input_index)
workflow.inputs.append({
'id': input_id,
'type': input_.type,
# 'default': input_.default,
})
input_index += 1
ins[input_.id] = input_id

workflow.add_step(
run=Path(path),
id=step_id,
in_=ins,
out=outs,
)

for index, key in enumerate(self._output_keys):
output_id = 'output_{0}'.format(index)
workflow.outputs.append({
'id': output_id,
'type': 'File',
'outputSource': self._source_name(key),
})

return workflow
14 changes: 12 additions & 2 deletions renga/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""Track provenance of data created by executing programs."""

import os
import sys
from subprocess import call

import click
Expand All @@ -39,11 +40,20 @@ def run(repo, no_output, command_line):
"""Tracking work on a specific problem."""
candidates = [x[0] for x in repo.git.index.entries] + \
repo.git.untracked_files
mapped_std = _mapped_std_streams(candidates)
factory = CommandLineToolFactory(
command_line=command_line,
**_mapped_std_streams(candidates))
**mapped_std)

with repo.with_workflow_storage() as wf:
with factory.watch(repo.git, no_output=no_output) as tool:
call(factory.command_line, cwd=os.getcwd())
call(
factory.command_line,
cwd=os.getcwd(),
**{key: getattr(sys, key) for key in mapped_std.keys()},
)

sys.stdout.flush()
sys.stderr.flush()

wf.add_step(run=tool)
52 changes: 52 additions & 0 deletions renga/cli/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
#
# Copyright 2018 - Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Workflow operations."""

import os

import click
import yaml

from renga.models.cwl._ascwl import ascwl

from ._graph import Graph
from ._repo import pass_repo


@click.group()
def workflow():
"""Workflow operations."""


@workflow.command()
@click.option('--revision', default='HEAD')
@click.argument('path', type=click.Path(exists=True, dir_okay=False), nargs=-1)
@pass_repo
def create(repo, revision, path):
"""Create a workflow description for a file."""
graph = Graph(repo)
for p in path:
graph.add_file(p, revision=revision)

click.echo(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we save the workflow? give an option to specify the name so it's easier to re-execute?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. but later. I need to solve issue with detecting basedir.

yaml.dump(ascwl(
graph.ascwl(),
filter=lambda _, x: x is not None,
# basedir=repo.workflow_path,
basedir='.',
), default_flow_style=False))
15 changes: 8 additions & 7 deletions renga/models/cwl/_ascwl.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,18 @@ def convert_value(v):
return v

for a in attrs:
a_name = a.name.rstrip('_')
v = getattr(inst, a.name)
if filter is not None and not filter(a, v):
continue
if recurse is True:
if has(v.__class__):
rv[a.name] = ascwl(v, recurse=True, filter=filter,
rv[a_name] = ascwl(v, recurse=True, filter=filter,
dict_factory=dict_factory, basedir=basedir)

elif isinstance(v, (tuple, list, set)):
cf = v.__class__ if retain_collection_types is True else list
rv[a.name] = cf([
rv[a_name] = cf([
ascwl(i, recurse=True, filter=filter,
dict_factory=dict_factory, basedir=basedir)
if has(i.__class__) else i
Expand All @@ -99,23 +100,23 @@ def convert_value(v):
k = a.metadata['jsonldPredicate'].get('mapSubject')
if k:
vv = dict_factory()
for i in rv[a.name]:
for i in rv[a_name]:
kk = i.pop(k)
vv[kk] = i
rv[a.name] = vv
rv[a_name] = vv

elif isinstance(v, dict):
df = dict_factory
rv[a.name] = df((
rv[a_name] = df((
ascwl(kk, dict_factory=df, basedir=basedir)
if has(kk.__class__) else kk,
ascwl(vv, dict_factory=df, basedir=basedir)
if has(vv.__class__) else vv)
for kk, vv in iteritems(v))
else:
rv[a.name] = convert_value(v)
rv[a_name] = convert_value(v)
else:
rv[a.name] = convert_value(v)
rv[a_name] = convert_value(v)

if isinstance(inst, CWLClass):
rv['class'] = inst.__class__.__name__
Expand Down
37 changes: 33 additions & 4 deletions renga/models/cwl/command_line_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# limitations under the License.
"""Represent a ``CommandLineTool`` from the Common Workflow Language."""

import fnmatch
import re
import shlex
from contextlib import contextmanager
Expand Down Expand Up @@ -48,8 +49,7 @@ class CommandLineTool(Process, CWLClass):
cmd, (list, tuple)) else shlex.split(cmd),
) # list(string, Expression, CommandLineBinding)

stdin = attr.ib(default=None, converter=attr.converters.optional(Path))
# null, str, Expression
stdin = attr.ib(default=None)
stdout = attr.ib(default=None)
stderr = attr.ib(default=None)

Expand All @@ -60,6 +60,24 @@ class CommandLineTool(Process, CWLClass):
temporaryFailCodes = attr.ib(default=attr.Factory(list)) # list(int)
permanentFailCodes = attr.ib(default=attr.Factory(list)) # list(int)

def get_output_id(self, path):
"""Return an id of the matching path from default values."""
for output in self.outputs:
if output.type in {'stdout', 'stderr'}:
stream = getattr(self, output.type)
if stream == path:
return output.id
elif output.type == 'File':
glob = output.outputBinding.glob
# TODO better support for Expression
if glob.startswith('$(inputs.'):
input_id = glob[len('$(inputs.'):-1]
for input_ in self.inputs:
if input_.id == input_id and input_.default == path:
return output.id
elif fnmatch.fnmatch(path, glob):
return output.id


@attr.s
class CommandLineToolFactory(object):
Expand Down Expand Up @@ -94,6 +112,13 @@ def __attrs_post_init__(self):
self.inputs = []
self.outputs = []

if self.stdin:
input_ = next(self.guess_inputs(self.stdin))
assert input_.type == 'File'
input_.id = 'input_stdin'
self.inputs.append(input_)
self.stdin = '$(inputs.{0}.path)'.format(input_.id)

for stream_name in ('stdout', 'stderr'):
stream = getattr(self, stream_name)
if stream and self.file_candidate(stream):
Expand Down Expand Up @@ -151,7 +176,7 @@ def watch(self, git=None, no_output=False):
'Output file was not created or changed.'
)

if not tool.outputs:
if not outputs:
raise RuntimeError('No output was detected')

tool.inputs = list(inputs.values())
Expand Down Expand Up @@ -184,6 +209,10 @@ def split_command_and_args(self):
cmd = [self.command_line[0]]
args = list(self.command_line[1:])

if len(args) < 2:
# only guess subcommand for more arguments
return cmd, args

while args and re.match(self._RE_SUBCOMMAND, args[0]) \
and not self.file_candidate(args[0]):
cmd.append(args.pop(0))
Expand All @@ -210,7 +239,7 @@ def guess_type(self, value):
# TODO suggest that the file should be imported to the repo
pass

if ',' in value:
if len(value) > 1 and ',' in value:
return value.split(','), 'string[]', ','

return value, 'string', None
Expand Down
13 changes: 11 additions & 2 deletions renga/models/cwl/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,30 @@
# limitations under the License.
"""Represent workflows from the Common Workflow Language."""

import uuid

import attr

from ._ascwl import CWLClass, mapped
from .process import Process


@attr.s
class WorkflowStep(object):
"""Define an executable element of a workflow."""

run = attr.ib() # string, Process
id = attr.ib(default=attr.Factory(uuid.uuid4))

in_ = attr.ib(default=None)
out = attr.ib(default=None)


@attr.s
class Workflow(object):
class Workflow(Process, CWLClass):
"""Define a workflow representation."""

steps = attr.ib(default=attr.Factory(list))
steps = mapped(WorkflowStep)

def add_step(self, **kwargs):
"""Add a workflow step."""
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
'log=renga.cli.log:log',
'run=renga.cli.run:run',
'workon=renga.cli.workon:workon',
'workflow=renga.cli.workflow:workflow',
],
},
extras_require=extras_require,
Expand Down
Loading