Skip to content

Commit

Permalink
Merge pull request #734 from jmchilton/cwl_redo_upload_and_timeout
Browse files Browse the repository at this point in the history
Fixes for CWL workflow running.
  • Loading branch information
jmchilton committed Sep 19, 2017
2 parents c58ad3c + 96ba0c7 commit 24dd7de
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 38 deletions.
104 changes: 67 additions & 37 deletions planemo/galaxy/activity.py
Expand Up @@ -20,6 +20,7 @@
from galaxy.tools.parser import get_tool_source


from planemo.galaxy.api import summarize_history
from planemo.io import wait_on
from planemo.runnable import (
ErrorRunResponse,
Expand All @@ -41,25 +42,35 @@ def execute(ctx, config, runnable, job_path, **kwds):
return ErrorRunResponse(str(e))


def _verified_tool_id(runnable, user_gi):
tool_id = _tool_id(runnable.path)
try:
user_gi.tools.show_tool(tool_id)
except Exception as e:
raise Exception(ERR_NO_SUCH_TOOL % (tool_id, e))
return tool_id


def _inputs_representation(runnable):
if runnable.type == RunnableType.cwl_tool:
inputs_representation = "cwl"
else:
inputs_representation = "galaxy"
return inputs_representation


def _execute(ctx, config, runnable, job_path, **kwds):
user_gi = config.user_gi
admin_gi = config.gi

history_id = _history_id(user_gi, **kwds)

galaxy_paths, job_dict, datasets = stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds)
galaxy_paths, job_dict, _ = stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds)

if runnable.type in [RunnableType.galaxy_tool, RunnableType.cwl_tool]:
response_class = GalaxyToolRunResponse
tool_id = _tool_id(runnable.path)
if runnable.type == RunnableType.cwl_tool:
inputs_representation = "cwl"
else:
inputs_representation = "galaxy"
try:
user_gi.tools.show_tool(tool_id)
except Exception as e:
raise Exception(ERR_NO_SUCH_TOOL % (tool_id, e))
tool_id = _verified_tool_id(runnable)
inputs_representation = _inputs_representation(runnable)
run_tool_payload = dict(
history_id=history_id,
tool_id=tool_id,
Expand All @@ -71,9 +82,14 @@ def _execute(ctx, config, runnable, job_path, **kwds):

job = tool_run_response["jobs"][0]
job_id = job["id"]
final_state = _wait_for_job(user_gi, job_id)
try:
final_state = _wait_for_job(user_gi, job_id)
except Exception:
summarize_history(ctx, user_gi, history_id)
raise
if final_state != "ok":
msg = "Failed to run CWL tool job final job state is [%s]." % final_state
summarize_history(ctx, user_gi, history_id)
with open("errored_galaxy.log", "w") as f:
f.write(config.log_contents)
raise Exception(msg)
Expand Down Expand Up @@ -110,14 +126,16 @@ def _execute(ctx, config, runnable, job_path, **kwds):
invocation_id = invocation["id"]
ctx.vlog("Waiting for invocation [%s]" % invocation_id)
try:
final_invocation_state = _wait_for_invocation(ctx, user_gi, workflow_id, invocation_id)
final_invocation_state = _wait_for_invocation(ctx, user_gi, history_id, workflow_id, invocation_id)
except Exception:
ctx.vlog("Problem waiting on invocation...")
summarize_history(ctx, user_gi, history_id)
raise
ctx.vlog("Final invocation state is [%s]" % final_invocation_state)
final_state = _wait_for_history(user_gi, history_id)
final_state = _wait_for_history(ctx, user_gi, history_id)
if final_state != "ok":
msg = "Failed to run workflow final history state is [%s]." % final_state
summarize_history(ctx, user_gi, history_id)
with open("errored_galaxy.log", "w") as f:
f.write(config.log_contents)
raise Exception(msg)
Expand Down Expand Up @@ -156,15 +174,17 @@ def upload_func(upload_target):
)
name = os.path.basename(file_path)
upload_payload["inputs"]["files_0|auto_decompress"] = False
upload_payload["inputs"]["auto_decompress"] = False
upload_payload["inputs"]["files_0|url_paste"] = "file://%s" % os.path.abspath(file_path)
upload_payload["inputs"]["files_0|NAME"] = name
if upload_target.secondary_files:
upload_payload["files_1|url_paste"] = "file://%s" % os.path.abspath(upload_target.secondary_files)
upload_payload["files_1|type"] = "upload_dataset"
upload_payload["files_1|auto_decompress"] = True
upload_payload["file_count"] = "2"
upload_payload["force_composite"] = "True"
upload_payload["inputs"]["files_1|url_paste"] = "file://%s" % os.path.abspath(upload_target.secondary_files)
upload_payload["inputs"]["files_1|type"] = "upload_dataset"
upload_payload["inputs"]["files_1|auto_decompress"] = True
upload_payload["inputs"]["file_count"] = "2"
upload_payload["inputs"]["force_composite"] = "True"

ctx.vlog("upload_payload is %s" % upload_payload)
return user_gi.tools._tool_post(upload_payload, files_attached=False)
elif isinstance(upload_target, DirectoryUploadTarget):
tar_path = upload_target.tar_path
Expand Down Expand Up @@ -218,7 +238,7 @@ def create_collection_func(element_identifiers, collection_type):
)

if datasets:
final_state = _wait_for_history(user_gi, history_id)
final_state = _wait_for_history(ctx, user_gi, history_id)

for (dataset, path) in datasets:
dataset_details = user_gi.histories.show_dataset(
Expand All @@ -233,6 +253,7 @@ def create_collection_func(element_identifiers, collection_type):
ctx.vlog("final state is %s" % final_state)
if final_state != "ok":
msg = "Failed to run CWL job final job state is [%s]." % final_state
summarize_history(ctx, user_gi, history_id)
with open("errored_galaxy.log", "w") as f:
f.write(config.log_contents)
raise Exception(msg)
Expand Down Expand Up @@ -516,49 +537,58 @@ def _history_id(gi, **kwds):
return history_id


def _wait_for_invocation(ctx, gi, workflow_id, invocation_id):
def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id):

def state_func():
# TODO: Hack gi to work around Galaxy simply handing on this request
# sometimes.
gi.timeout = 60
rval = None
try_count = 5
if _retry_on_timeouts(ctx, gi, lambda gi: has_jobs_in_states(gi, history_id, ["error", "deleted", "deleted_new"])):
raise Exception("Problem running workflow, one or more jobs failed.")

return _retry_on_timeouts(ctx, gi, lambda gi: gi.workflows.show_invocation(workflow_id, invocation_id))

return _wait_on_state(state_func)


def _retry_on_timeouts(ctx, gi, f):
gi.timeout = 60
try_count = 5
try:
for try_num in range(try_count):
start_time = time.time()
try:
rval = gi.workflows.show_invocation(workflow_id, invocation_id)
return f(gi)
except Exception:
end_time = time.time()
if end_time - start_time > 45 and (try_num + 1) < try_count:
ctx.vlog("Galaxy seems to have timedout, retrying to fetch status.")
continue
else:
raise
finally:
gi.timeout = None
return rval

return _wait_on_state(state_func)

def has_jobs_in_states(gi, history_id, states):
params = {"history_id": history_id}
jobs_url = gi._make_url(gi.jobs)
jobs = Client._get(gi.jobs, params=params, url=jobs_url)

target_jobs = [j for j in jobs if j["state"] in states]

def _wait_for_history(gi, history_id):
return len(target_jobs) > 0

def has_active_jobs():
params = {"history_id": history_id}
jobs_url = gi._make_url(gi.jobs)
jobs = Client._get(gi.jobs, params=params, url=jobs_url)

active_jobs = [j for j in jobs if j["state"] in ["new", "upload", "waiting", "queued", "running"]]
def _wait_for_history(ctx, gi, history_id):

if len(active_jobs) == 0:
def has_active_jobs(gi):
if has_jobs_in_states(gi, history_id, ["new", "upload", "waiting", "queued", "running"]):
return True
else:
return None

wait_on(has_active_jobs, "active jobs", timeout=60 * 60 * 24)
wait_on(lambda: _retry_on_timeouts(ctx, gi, has_active_jobs), "active jobs", timeout=60 * 60 * 24)

def state_func():
return gi.histories.show_history(history_id)
return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id))

return _wait_on_state(state_func)

Expand Down
56 changes: 56 additions & 0 deletions planemo/galaxy/api.py
@@ -1,4 +1,6 @@
"""A high-level interface to local Galaxy instances using bioblend."""
from six import StringIO

from planemo.bioblend import ensure_module
from planemo.bioblend import galaxy

Expand Down Expand Up @@ -47,6 +49,60 @@ def user_api_key(admin_gi):
return users.create_user_apikey(user_id)


def summarize_history(ctx, gi, history_id):
"""Summarize a history with print() based on similar code in Galaxy for populators.
"""
if not ctx.verbose:
return

if history_id is None:
raise ValueError("summarize_history passed empty history_id")
try:
history_contents = gi.histories.show_history(history_id, contents=True)
except Exception:
print("Failed to fetch history contents in summarize_history.")
return

for history_content in history_contents:
history_content_id = history_content.get('id', None)
print("| %d - %s (HID - NAME) " % (int(history_content['hid']), history_content['name']))
if history_content['history_content_type'] == 'dataset_collection':
history_contents_json = gi.histories.show_dataset_collection(history_id, history_content["id"])
print("| Dataset Collection: %s" % history_contents_json)
continue
try:
dataset_info = gi.histories.show_dataset(history_id, history_content_id)
print("| Dataset State:")
print(_format_for_summary(dataset_info.get("state"), "Dataset state is unknown."))
print("| Dataset Blurb:")
print(_format_for_summary(dataset_info.get("misc_blurb", ""), "Dataset blurb was empty."))
print("| Dataset Info:")
print(_format_for_summary(dataset_info.get("misc_info", ""), "Dataset info is empty."))
print("| Peek:")
print(_format_for_summary(dataset_info.get("peek", ""), "Peek unavilable."))
except Exception:
print("| *PLANEMO ERROR FETCHING DATASET DETAILS*")
try:
provenance_info = _dataset_provenance(gi, history_id, history_content_id)
print("| Dataset Job Standard Output:")
print(_format_for_summary(provenance_info.get("stdout", ""), "Standard output was empty."))
print("| Dataset Job Standard Error:")
print(_format_for_summary(provenance_info.get("stderr", ""), "Standard error was empty."))
except Exception:
print("| *PLANEMO ERROR FETCHING JOB DETAILS*")
print("|")


def _format_for_summary(blob, empty_message, prefix="| "):
contents = "\n".join(["%s%s" % (prefix, line.strip()) for line in StringIO(blob).readlines() if line.rstrip("\n\r")])
return contents or "%s*%s*" % (prefix, empty_message)


def _dataset_provenance(gi, history_id, id):
provenance = gi.histories.show_dataset_provenance(history_id, id)
return provenance


__all__ = (
"DEFAULT_MASTER_API_KEY",
"gi",
Expand Down
1 change: 1 addition & 0 deletions planemo/galaxy/config.py
Expand Up @@ -442,6 +442,7 @@ def config_join(*args):
ftp_upload_purge="False",
ftp_upload_dir=test_data_dir or os.path.abspath('.'),
ftp_upload_site="Test Data",
check_upload_content="False",
allow_path_paste="True",
tool_dependency_dir=dependency_dir,
file_path=file_path,
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Expand Up @@ -13,6 +13,6 @@ virtualenv
lxml
gxformat2>=0.1.1
ephemeris>=0.2.0
galaxy-lib>=17.9.6
galaxy-lib>=17.9.7
html5lib>=0.9999999,!=0.99999999,!=0.999999999,!=1.0b10,!=1.0b09
cwltool==1.0.20170828135420

0 comments on commit 24dd7de

Please sign in to comment.