Permalink
Browse files

Initial implementation of cwl_run command.

Usage:

```
    planemo cwl_run [--conformance-test] tool.cwl job.json
```

This will load a Galaxy instance configured to run CWL tools with the specified tool and then submit the job described by job.json.

Add cwl-runner implementation wrapper.

To mimic the interface established by the reference implementation cwltool.

See https://github.com/common-workflow-language/cwltool/tree/master/cwl-runner for more information.

Known issues

 - The test case doesn't work even though the command works fine from the command line.
 - --conformance-test prints extra junk to the screen it shouldn't.
 - Files aren't staged back out of Galaxy as would be expected by reference implementation.
  • Loading branch information...
jmchilton committed Oct 21, 2015
1 parent be54e4e commit 49c5c1e68901decc30ef5237ae31825999cef681
@@ -19,6 +19,9 @@ develop-eggs
lib
lib64

cwl-runner/dist
cwl-runner/build

# Installer logs
pip-log.txt

@@ -0,0 +1,9 @@
This an optional companion package to planemo which provides provides
an additional entry point under the alias "cwl-runner", which is the
implementation-agnostic name for the default CWL interpreter installed
on a host.

This package is based on the CWL reference implementation package of
the same name, see
https://github.com/common-workflow-language/cwltool/tree/master/cwl-runner
for more information.
@@ -0,0 +1,3 @@
#!/bin/sh

planemo cwl_run "$@"
@@ -0,0 +1,29 @@
#!/usr/bin/env python

import os
import sys
import setuptools.command.egg_info as egg_info_cmd
import shutil

from setuptools import setup, find_packages

SETUP_DIR = os.path.dirname(__file__)
README = os.path.join(SETUP_DIR, 'README')

scripts = ["cwl-runner"]

setup(name='cwl_runner',
version='1.0',
description='Common workflow language interpreter implementation (for Galaxy + Planemo)',
long_description=open(README).read(),
author='John Chilton',
author_email='jmchilton@gmail.com',
url="https://github.com/galaxyproject/planemo",
download_url="https://github.com/galaxyproject/planemo",
license="AFL",
install_requires=[
'planemo'
],
scripts=scripts,
zip_safe=True
)
@@ -0,0 +1,38 @@
import click
from planemo.cli import pass_context
from planemo.io import conditionally_captured_io
from planemo import options
from planemo import galaxy_serve
from planemo import cwl
from planemo import io


@click.command('cwl_run')
@options.required_tool_arg()
@options.required_job_arg()
@options.galaxy_serve_options()
@options.galaxy_cwl_root_option()
@options.cwl_conformance_test()
@pass_context
def cli(ctx, path, job_path, **kwds):
"""Planemo command for running CWL tools and jobs.
::
% planemo cwl_run cat1-tool.cwl cat-job.json
"""
# TODO: serve options aren't exactly right - don't care about
# port for instance.
kwds["cwl"] = True
conformance_test = kwds.get("conformance_test", False)
with conditionally_captured_io(conformance_test):
with galaxy_serve.serve_daemon(ctx, [path], **kwds) as config:
try:
cwl_run = cwl.run_cwl_tool(path, job_path, config, **kwds)
except Exception:
io.warn("Problem running cwl tool...")
print(config.log_contents)
raise

print(cwl_run.cwl_command_state)
return 0
@@ -0,0 +1,5 @@
from .client import run_cwl_tool

__all__ = [
'run_cwl_tool',
]
@@ -0,0 +1,119 @@
""" High-level client sitting on top of bioblend for running CWL
stuff in Galaxy.
"""
from __future__ import print_function

import json
import os

from planemo.io import wait_on


DEFAULT_HISTORY_NAME = "CWL Target History"


def run_cwl_tool(tool_path, job_path, config, **kwds):
user_gi = config.user_gi
admin_gi = config.gi

tool_id = _tool_id(tool_path)
history_id = _history_id(user_gi, **kwds)
job_dict = _galactic_job_json(job_path, user_gi, history_id)
final_state = _wait_for_history(user_gi, history_id)
if final_state != "ok":
msg = "Failed to run CWL job final job state is [%s]." % final_state
with open("errored_galaxy.log", "w") as f:
f.write(config.log_contents)
raise Exception(msg)
run_tool_payload = dict(
history_id=history_id,
tool_id=tool_id,
tool_inputs=job_dict,
inputs_representation="cwl",
)
run_response = user_gi.tools._tool_post(run_tool_payload)
job = run_response["jobs"][0]
job_id = job["id"]
final_state = _wait_for_job(user_gi, job_id)
if final_state != "ok":
msg = "Failed to run CWL job final job state is [%s]." % final_state
with open("errored_galaxy.log", "w") as f:
f.write(config.log_contents)
raise Exception(msg)

job_info = admin_gi.jobs.show_job(job_id)
cwl_command_state = job_info["cwl_command_state"]

return CwlRunResponse(cwl_command_state, run_response)


class CwlRunResponse(object):

def __init__(self, cwl_command_state, api_run_response):
self.cwl_command_state = cwl_command_state
self.api_run_response = api_run_response


def _history_id(gi, **kwds):
history_id = kwds.get("history_id", None)
if history_id is None:
history_name = kwds.get("history_name", DEFAULT_HISTORY_NAME)
history_id = gi.histories.create_history(history_name)["id"]
return history_id


def _tool_id(tool_path):
tool_id, _ = os.path.splitext(os.path.basename(tool_path))
return tool_id


def _galactic_job_json(job_path, gi, history_id):
with open(job_path, "r") as f:
job_as_dict = json.load( f )

replace_keys = {}
for key, value in job_as_dict.iteritems():
if isinstance( value, dict ):
type_class = value.get("class", None)
if type_class != "File":
continue

file_path = value.get("path", None)
if file_path is None:
continue

if not os.path.isabs(file_path):
directory = os.path.dirname(job_path)
file_path = os.path.join(directory, file_path)

upload_response = gi.tools.upload_file(file_path, history_id)
dataset_id = upload_response["outputs"][0]["id"]

replace_keys[key] = {"src": "hda", "id": dataset_id}

job_as_dict.update(replace_keys)
return job_as_dict


def _wait_for_history(gi, history_id):
state_func = lambda: gi.histories.show_history(history_id)
return _wait_on_state(state_func)


def _wait_for_job(gi, job_id):
state_func = lambda: gi.jobs.show_job(job_id)
return _wait_on_state(state_func)


def _wait_on_state(state_func):

def get_state():
response = state_func()
state = response[ "state" ]
if str(state) not in [ "running", "queued", "new", "ready" ]:
return state
else:
return None

final_state = wait_on(get_state, "state", timeout=100)
return final_state
@@ -5,7 +5,6 @@
import os
import random
import shutil
import time
from six.moves.urllib.request import urlopen
from six import iteritems
from string import Template
@@ -19,6 +18,7 @@
from planemo.io import shell
from planemo.io import write_file
from planemo.io import kill_pid_file
from planemo.io import wait_on
from planemo import git
from planemo.shed import tool_shed_url
from planemo.bioblend import (
@@ -154,7 +154,7 @@ def config_join(*args):

os.makedirs(shed_tools_path)
server_name = "planemo%d" % random.randint(0, 100000)
port = kwds.get("port", 9090)
port = int(kwds.get("port", 9090))
template_args = dict(
port=port,
host=kwds.get("host", "127.0.0.1"),
@@ -276,6 +276,7 @@ def __init__(
self.port = port
self.server_name = server_name
self.master_api_key = master_api_key
self._user_api_key = None

def kill(self):
kill_pid_file(self.pid_file)
@@ -288,10 +289,33 @@ def pid_file(self):
def gi(self):
ensure_module(galaxy)
return galaxy.GalaxyInstance(
url="http://localhost:%d" % self.port,
url="http://localhost:%d" % int(self.port),
key=self.master_api_key
)

@property
def user_gi(self):
# TODO: thread-safe
if self._user_api_key is None:
users = self.gi.users
# Allow override with --user_api_key.
user_response = users.create_local_user(
"planemo",
"planemo@galaxyproject.org",
"planemo",
)
user_id = user_response["id"]

self._user_api_key = users.create_user_apikey(user_id)
return self._gi_for_key(self._user_api_key)

def _gi_for_key(self, key):
ensure_module(galaxy)
return galaxy.GalaxyInstance(
url="http://localhost:%d" % self.port,
key=key
)

def install_repo(self, *args, **kwds):
self.tool_shed_client.install_repository_revision(
*args, **kwds
@@ -305,33 +329,30 @@ def wait_for_all_installed(self):
def status_ready(repo):
status = repo["status"]
if status in ["Installing", "New"]:
return False
return None
if status == "Installed":
return True
raise Exception("Error installing repo status is %s" % status)

def not_ready():
def ready():
repos = self.tool_shed_client.get_repositories()
return not all(map(status_ready, repos))

self._wait_for(not_ready)

# Taken from Galaxy's twilltestcase.
def _wait_for(self, func, **kwd):
sleep_amount = 0.2
slept = 0
walltime_exceeded = 1086400
while slept <= walltime_exceeded:
result = func()
if result:
time.sleep(sleep_amount)
slept += sleep_amount
sleep_amount *= 1.25
if slept + sleep_amount > walltime_exceeded:
sleep_amount = walltime_exceeded - slept
else:
break
assert slept < walltime_exceeded, "Action taking too long."
ready = all(map(status_ready, repos))
return ready or None

wait_on(ready)

@property
def log_file(self):
""" Not actually used by this module, but galaxy_serve will
respect it.
"""
file_name = "%s.log" % self.server_name
return os.path.join(self.galaxy_root, file_name)

@property
def log_contents(self):
with open(self.log_file, "r") as f:
return f.read()

def cleanup(self):
shutil.rmtree(self.config_directory)
@@ -41,9 +41,8 @@ def serve(ctx, paths, **kwds):

@contextlib.contextmanager
def shed_serve(ctx, install_args_list, **kwds):
config = serve(ctx, [], daemon=True, **kwds)
install_deps = not kwds.get("skip_dependencies", False)
try:
with serve_daemon(ctx, **kwds) as config:
install_deps = not kwds.get("skip_dependencies", False)
io.info("Installing repositories - this may take some time...")
for install_args in install_args_list:
install_args["install_tool_dependencies"] = install_deps
@@ -54,6 +53,13 @@ def shed_serve(ctx, install_args_list, **kwds):
)
config.wait_for_all_installed()
yield config


@contextlib.contextmanager
def serve_daemon(ctx, paths=[], **kwds):
try:
config = serve(ctx, paths, daemon=True, **kwds)
yield config
finally:
config.kill()
if not kwds.get("no_cleanup", False):
Oops, something went wrong.

0 comments on commit 49c5c1e

Please sign in to comment.