diff --git a/planemo/galaxy/activity.py b/planemo/galaxy/activity.py index 5e7578b44..2aac4f84f 100644 --- a/planemo/galaxy/activity.py +++ b/planemo/galaxy/activity.py @@ -20,6 +20,7 @@ from galaxy.tools.parser import get_tool_source +from planemo.galaxy.api import summarize_history from planemo.io import wait_on from planemo.runnable import ( ErrorRunResponse, @@ -41,25 +42,35 @@ def execute(ctx, config, runnable, job_path, **kwds): return ErrorRunResponse(str(e)) +def _verified_tool_id(runnable, user_gi): + tool_id = _tool_id(runnable.path) + try: + user_gi.tools.show_tool(tool_id) + except Exception as e: + raise Exception(ERR_NO_SUCH_TOOL % (tool_id, e)) + return tool_id + + +def _inputs_representation(runnable): + if runnable.type == RunnableType.cwl_tool: + inputs_representation = "cwl" + else: + inputs_representation = "galaxy" + return inputs_representation + + def _execute(ctx, config, runnable, job_path, **kwds): user_gi = config.user_gi admin_gi = config.gi history_id = _history_id(user_gi, **kwds) - galaxy_paths, job_dict, datasets = stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds) + galaxy_paths, job_dict, _ = stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds) if runnable.type in [RunnableType.galaxy_tool, RunnableType.cwl_tool]: response_class = GalaxyToolRunResponse - tool_id = _tool_id(runnable.path) - if runnable.type == RunnableType.cwl_tool: - inputs_representation = "cwl" - else: - inputs_representation = "galaxy" - try: - user_gi.tools.show_tool(tool_id) - except Exception as e: - raise Exception(ERR_NO_SUCH_TOOL % (tool_id, e)) + tool_id = _verified_tool_id(runnable) + inputs_representation = _inputs_representation(runnable) run_tool_payload = dict( history_id=history_id, tool_id=tool_id, @@ -71,9 +82,14 @@ def _execute(ctx, config, runnable, job_path, **kwds): job = tool_run_response["jobs"][0] job_id = job["id"] - final_state = _wait_for_job(user_gi, job_id) + try: + final_state = _wait_for_job(user_gi, job_id) + except Exception: + summarize_history(ctx, user_gi, history_id) + raise if final_state != "ok": msg = "Failed to run CWL tool job final job state is [%s]." % final_state + summarize_history(ctx, user_gi, history_id) with open("errored_galaxy.log", "w") as f: f.write(config.log_contents) raise Exception(msg) @@ -110,14 +126,16 @@ def _execute(ctx, config, runnable, job_path, **kwds): invocation_id = invocation["id"] ctx.vlog("Waiting for invocation [%s]" % invocation_id) try: - final_invocation_state = _wait_for_invocation(ctx, user_gi, workflow_id, invocation_id) + final_invocation_state = _wait_for_invocation(ctx, user_gi, history_id, workflow_id, invocation_id) except Exception: ctx.vlog("Problem waiting on invocation...") + summarize_history(ctx, user_gi, history_id) raise ctx.vlog("Final invocation state is [%s]" % final_invocation_state) - final_state = _wait_for_history(user_gi, history_id) + final_state = _wait_for_history(ctx, user_gi, history_id) if final_state != "ok": msg = "Failed to run workflow final history state is [%s]." % final_state + summarize_history(ctx, user_gi, history_id) with open("errored_galaxy.log", "w") as f: f.write(config.log_contents) raise Exception(msg) @@ -156,15 +174,17 @@ def upload_func(upload_target): ) name = os.path.basename(file_path) upload_payload["inputs"]["files_0|auto_decompress"] = False + upload_payload["inputs"]["auto_decompress"] = False upload_payload["inputs"]["files_0|url_paste"] = "file://%s" % os.path.abspath(file_path) upload_payload["inputs"]["files_0|NAME"] = name if upload_target.secondary_files: - upload_payload["files_1|url_paste"] = "file://%s" % os.path.abspath(upload_target.secondary_files) - upload_payload["files_1|type"] = "upload_dataset" - upload_payload["files_1|auto_decompress"] = True - upload_payload["file_count"] = "2" - upload_payload["force_composite"] = "True" + upload_payload["inputs"]["files_1|url_paste"] = "file://%s" % os.path.abspath(upload_target.secondary_files) + upload_payload["inputs"]["files_1|type"] = "upload_dataset" + upload_payload["inputs"]["files_1|auto_decompress"] = True + upload_payload["inputs"]["file_count"] = "2" + upload_payload["inputs"]["force_composite"] = "True" + ctx.vlog("upload_payload is %s" % upload_payload) return user_gi.tools._tool_post(upload_payload, files_attached=False) elif isinstance(upload_target, DirectoryUploadTarget): tar_path = upload_target.tar_path @@ -218,7 +238,7 @@ def create_collection_func(element_identifiers, collection_type): ) if datasets: - final_state = _wait_for_history(user_gi, history_id) + final_state = _wait_for_history(ctx, user_gi, history_id) for (dataset, path) in datasets: dataset_details = user_gi.histories.show_dataset( @@ -233,6 +253,7 @@ def create_collection_func(element_identifiers, collection_type): ctx.vlog("final state is %s" % final_state) if final_state != "ok": msg = "Failed to run CWL job final job state is [%s]." % final_state + summarize_history(ctx, user_gi, history_id) with open("errored_galaxy.log", "w") as f: f.write(config.log_contents) raise Exception(msg) @@ -516,18 +537,25 @@ def _history_id(gi, **kwds): return history_id -def _wait_for_invocation(ctx, gi, workflow_id, invocation_id): +def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id): def state_func(): - # TODO: Hack gi to work around Galaxy simply handing on this request - # sometimes. - gi.timeout = 60 - rval = None - try_count = 5 + if _retry_on_timeouts(ctx, gi, lambda gi: has_jobs_in_states(gi, history_id, ["error", "deleted", "deleted_new"])): + raise Exception("Problem running workflow, one or more jobs failed.") + + return _retry_on_timeouts(ctx, gi, lambda gi: gi.workflows.show_invocation(workflow_id, invocation_id)) + + return _wait_on_state(state_func) + + +def _retry_on_timeouts(ctx, gi, f): + gi.timeout = 60 + try_count = 5 + try: for try_num in range(try_count): start_time = time.time() try: - rval = gi.workflows.show_invocation(workflow_id, invocation_id) + return f(gi) except Exception: end_time = time.time() if end_time - start_time > 45 and (try_num + 1) < try_count: @@ -535,30 +563,32 @@ def state_func(): continue else: raise + finally: gi.timeout = None - return rval - return _wait_on_state(state_func) +def has_jobs_in_states(gi, history_id, states): + params = {"history_id": history_id} + jobs_url = gi._make_url(gi.jobs) + jobs = Client._get(gi.jobs, params=params, url=jobs_url) + + target_jobs = [j for j in jobs if j["state"] in states] -def _wait_for_history(gi, history_id): + return len(target_jobs) > 0 - def has_active_jobs(): - params = {"history_id": history_id} - jobs_url = gi._make_url(gi.jobs) - jobs = Client._get(gi.jobs, params=params, url=jobs_url) - active_jobs = [j for j in jobs if j["state"] in ["new", "upload", "waiting", "queued", "running"]] +def _wait_for_history(ctx, gi, history_id): - if len(active_jobs) == 0: + def has_active_jobs(gi): + if has_jobs_in_states(gi, history_id, ["new", "upload", "waiting", "queued", "running"]): return True else: return None - wait_on(has_active_jobs, "active jobs", timeout=60 * 60 * 24) + wait_on(lambda: _retry_on_timeouts(ctx, gi, has_active_jobs), "active jobs", timeout=60 * 60 * 24) def state_func(): - return gi.histories.show_history(history_id) + return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id)) return _wait_on_state(state_func) diff --git a/planemo/galaxy/api.py b/planemo/galaxy/api.py index e432bbfe7..97523e708 100644 --- a/planemo/galaxy/api.py +++ b/planemo/galaxy/api.py @@ -1,4 +1,6 @@ """A high-level interface to local Galaxy instances using bioblend.""" +from six import StringIO + from planemo.bioblend import ensure_module from planemo.bioblend import galaxy @@ -47,6 +49,60 @@ def user_api_key(admin_gi): return users.create_user_apikey(user_id) +def summarize_history(ctx, gi, history_id): + """Summarize a history with print() based on similar code in Galaxy for populators. + """ + if not ctx.verbose: + return + + if history_id is None: + raise ValueError("summarize_history passed empty history_id") + try: + history_contents = gi.histories.show_history(history_id, contents=True) + except Exception: + print("Failed to fetch history contents in summarize_history.") + return + + for history_content in history_contents: + history_content_id = history_content.get('id', None) + print("| %d - %s (HID - NAME) " % (int(history_content['hid']), history_content['name'])) + if history_content['history_content_type'] == 'dataset_collection': + history_contents_json = gi.histories.show_dataset_collection(history_id, history_content["id"]) + print("| Dataset Collection: %s" % history_contents_json) + continue + try: + dataset_info = gi.histories.show_dataset(history_id, history_content_id) + print("| Dataset State:") + print(_format_for_summary(dataset_info.get("state"), "Dataset state is unknown.")) + print("| Dataset Blurb:") + print(_format_for_summary(dataset_info.get("misc_blurb", ""), "Dataset blurb was empty.")) + print("| Dataset Info:") + print(_format_for_summary(dataset_info.get("misc_info", ""), "Dataset info is empty.")) + print("| Peek:") + print(_format_for_summary(dataset_info.get("peek", ""), "Peek unavilable.")) + except Exception: + print("| *PLANEMO ERROR FETCHING DATASET DETAILS*") + try: + provenance_info = _dataset_provenance(gi, history_id, history_content_id) + print("| Dataset Job Standard Output:") + print(_format_for_summary(provenance_info.get("stdout", ""), "Standard output was empty.")) + print("| Dataset Job Standard Error:") + print(_format_for_summary(provenance_info.get("stderr", ""), "Standard error was empty.")) + except Exception: + print("| *PLANEMO ERROR FETCHING JOB DETAILS*") + print("|") + + +def _format_for_summary(blob, empty_message, prefix="| "): + contents = "\n".join(["%s%s" % (prefix, line.strip()) for line in StringIO(blob).readlines() if line.rstrip("\n\r")]) + return contents or "%s*%s*" % (prefix, empty_message) + + +def _dataset_provenance(gi, history_id, id): + provenance = gi.histories.show_dataset_provenance(history_id, id) + return provenance + + __all__ = ( "DEFAULT_MASTER_API_KEY", "gi", diff --git a/planemo/galaxy/config.py b/planemo/galaxy/config.py index 736491ad7..82f90d3a8 100644 --- a/planemo/galaxy/config.py +++ b/planemo/galaxy/config.py @@ -442,6 +442,7 @@ def config_join(*args): ftp_upload_purge="False", ftp_upload_dir=test_data_dir or os.path.abspath('.'), ftp_upload_site="Test Data", + check_upload_content="False", allow_path_paste="True", tool_dependency_dir=dependency_dir, file_path=file_path, diff --git a/requirements.txt b/requirements.txt index 2485b09fb..2ae7890b6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,6 @@ virtualenv lxml gxformat2>=0.1.1 ephemeris>=0.2.0 -galaxy-lib>=17.9.6 +galaxy-lib>=17.9.7 html5lib>=0.9999999,!=0.99999999,!=0.999999999,!=1.0b10,!=1.0b09 cwltool==1.0.20170828135420