Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and enhance job resume functionality #5247

Merged
merged 5 commits into from Dec 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/galaxy/jobs/__init__.py
Expand Up @@ -1030,6 +1030,7 @@ def pause(self, job=None, message=None):
dataset_assoc.dataset.dataset.state = dataset_assoc.dataset.dataset.states.PAUSED
dataset_assoc.dataset.info = message
self.sa_session.add(dataset_assoc.dataset)
log.debug("Pausing Job '%d', %s", job.id, message)
job.set_state(job.states.PAUSED)
self.sa_session.add(job)

Expand Down
7 changes: 7 additions & 0 deletions lib/galaxy/model/__init__.py
Expand Up @@ -3139,6 +3139,13 @@ def copy(self, destination=None, element_destination=None):
object_session(self).flush()
return new_collection

def replace_failed_elements(self, replacements):
for element in self.elements:
if element.element_object in replacements:
if element.element_type == 'hda':
element.hda = replacements[element.element_object]
# TODO: handle the case where elements are collections

def set_from_dict(self, new_data):
# Nothing currently editable in this class.
return {}
Expand Down
100 changes: 64 additions & 36 deletions lib/galaxy/tools/actions/__init__.py
Expand Up @@ -8,7 +8,7 @@
from galaxy import model
from galaxy.exceptions import ObjectInvalid
from galaxy.model import LibraryDatasetDatasetAssociation
from galaxy.tools.parameters import update_param
from galaxy.tools.parameters import update_dataset_ids
from galaxy.tools.parameters.basic import DataCollectionToolParameter, DataToolParameter, RuntimeValue
from galaxy.tools.parameters.wrapped import WrappedParameters
from galaxy.util import ExecutionTimer
Expand Down Expand Up @@ -451,41 +451,11 @@ def handle_output(name, output, hidden=None):
# Now that we have a job id, we can remap any outputs if this is a rerun and the user chose to continue dependent jobs
# This functionality requires tracking jobs in the database.
if app.config.track_jobs_in_database and rerun_remap_job_id is not None:
try:
old_job = trans.sa_session.query(app.model.Job).get(rerun_remap_job_id)
assert old_job is not None, '(%s/%s): Old job id is invalid' % (rerun_remap_job_id, job.id)
assert old_job.tool_id == job.tool_id, '(%s/%s): Old tool id (%s) does not match rerun tool id (%s)' % (old_job.id, job.id, old_job.tool_id, job.tool_id)
if trans.user is not None:
assert old_job.user_id == trans.user.id, '(%s/%s): Old user id (%s) does not match rerun user id (%s)' % (old_job.id, job.id, old_job.user_id, trans.user.id)
elif trans.user is None and type(galaxy_session) == trans.model.GalaxySession:
assert old_job.session_id == galaxy_session.id, '(%s/%s): Old session id (%s) does not match rerun session id (%s)' % (old_job.id, job.id, old_job.session_id, galaxy_session.id)
else:
raise Exception('(%s/%s): Remapping via the API is not (yet) supported' % (old_job.id, job.id))
# Duplicate PJAs before remap.
for pjaa in old_job.post_job_actions:
job.add_post_job_action(pjaa.post_job_action)
for jtod in old_job.output_datasets:
for (job_to_remap, jtid) in [(jtid.job, jtid) for jtid in jtod.dataset.dependent_jobs]:
if (trans.user is not None and job_to_remap.user_id == trans.user.id) or (trans.user is None and job_to_remap.session_id == galaxy_session.id):
if job_to_remap.state == job_to_remap.states.PAUSED:
job_to_remap.state = job_to_remap.states.NEW
for hda in [dep_jtod.dataset for dep_jtod in job_to_remap.output_datasets]:
if hda.state == hda.states.PAUSED:
hda.state = hda.states.NEW
hda.info = None
input_values = dict([(p.name, json.loads(p.value)) for p in job_to_remap.parameters])
update_param(jtid.name, input_values, str(out_data[jtod.name].id))
for p in job_to_remap.parameters:
p.value = json.dumps(input_values[p.name])
jtid.dataset = out_data[jtod.name]
jtid.dataset.hid = jtod.dataset.hid
log.info('Job %s input HDA %s remapped to new HDA %s' % (job_to_remap.id, jtod.dataset.id, jtid.dataset.id))
trans.sa_session.add(job_to_remap)
trans.sa_session.add(jtid)
jtod.dataset.visible = False
trans.sa_session.add(jtod)
except Exception:
log.exception('Cannot remap rerun dependencies.')
self._remap_job_on_rerun(trans=trans,
galaxy_session=galaxy_session,
rerun_remap_job_id=rerun_remap_job_id,
current_job=job,
out_data=out_data)

log.info("Setup for job %s complete, ready to flush %s" % (job.log_str(), job_setup_timer))

Expand Down Expand Up @@ -518,6 +488,64 @@ def handle_output(name, output, hidden=None):
trans.log_event("Added job to the job queue, id: %s" % str(job.id), tool_id=job.tool_id)
return job, out_data

def _remap_job_on_rerun(self, trans, galaxy_session, rerun_remap_job_id, current_job, out_data):
"""
Re-connect dependent datasets for a job that is being rerun (because it failed initially).

If a job fails, the user has the option to try the job again with changed parameters.
To be able to resume jobs that depend on this jobs output datasets we change the dependent's job
input datasets to be those of the job that is being rerun.
"""
try:
old_job = trans.sa_session.query(trans.app.model.Job).get(rerun_remap_job_id)
assert old_job is not None, '(%s/%s): Old job id is invalid' % (rerun_remap_job_id, current_job.id)
assert old_job.tool_id == current_job.tool_id, '(%s/%s): Old tool id (%s) does not match rerun tool id (%s)' % (old_job.id, current_job.id, old_job.tool_id, current_job.tool_id)
if trans.user is not None:
assert old_job.user_id == trans.user.id, '(%s/%s): Old user id (%s) does not match rerun user id (%s)' % (old_job.id, current_job.id, old_job.user_id, trans.user.id)
elif trans.user is None and type(galaxy_session) == trans.model.GalaxySession:
assert old_job.session_id == galaxy_session.id, '(%s/%s): Old session id (%s) does not match rerun session id (%s)' % (old_job.id, current_job.id, old_job.session_id, galaxy_session.id)
else:
raise Exception('(%s/%s): Remapping via the API is not (yet) supported' % (old_job.id, current_job.id))
# Duplicate PJAs before remap.
for pjaa in old_job.post_job_actions:
current_job.add_post_job_action(pjaa.post_job_action)
remapped_hdas = {}
input_hdcas = set()
for jtod in old_job.output_datasets:
for (job_to_remap, jtid) in [(jtid.job, jtid) for jtid in jtod.dataset.dependent_jobs]:
if (trans.user is not None and job_to_remap.user_id == trans.user.id) or (
trans.user is None and job_to_remap.session_id == galaxy_session.id):
if job_to_remap.state == job_to_remap.states.PAUSED:
job_to_remap.state = job_to_remap.states.NEW
for hda in [dep_jtod.dataset for dep_jtod in job_to_remap.output_datasets]:
if hda.state == hda.states.PAUSED:
hda.state = hda.states.NEW
hda.info = None
input_values = dict([(p.name, json.loads(p.value)) for p in job_to_remap.parameters])
remapped_hdas[jtod.dataset] = out_data[jtod.name]
for jtidca in job_to_remap.input_dataset_collections:
input_hdcas.add(jtidca.dataset_collection)
old_dataset_id = jtod.dataset_id
new_dataset_id = out_data[jtod.name].id
input_values = update_dataset_ids(input_values, {old_dataset_id: new_dataset_id}, src='hda')
for p in job_to_remap.parameters:
p.value = json.dumps(input_values[p.name])
jtid.dataset = out_data[jtod.name]
jtid.dataset.hid = jtod.dataset.hid
log.info('Job %s input HDA %s remapped to new HDA %s' % (job_to_remap.id, jtod.dataset.id, jtid.dataset.id))
trans.sa_session.add(job_to_remap)
trans.sa_session.add(jtid)
for hdca in input_hdcas:
hdca.collection.replace_failed_elements(remapped_hdas)
if hdca.implicit_collection_jobs:
for job in hdca.implicit_collection_jobs.jobs:
if job.job_id == old_job.id:
job.job_id = current_job.id
jtod.dataset.visible = False
trans.sa_session.add(jtod)
except Exception:
log.exception('Cannot remap rerun dependencies.')

def _wrapped_params(self, trans, tool, incoming, input_datasets=None):
wrapped_params = WrappedParameters(trans, tool, incoming, input_datasets=input_datasets)
return wrapped_params
Expand Down
34 changes: 16 additions & 18 deletions lib/galaxy/tools/parameters/__init__.py
Expand Up @@ -3,9 +3,10 @@
"""
from __future__ import print_function

import re
from json import dumps

from boltons.iterutils import remap

from galaxy.util.expressions import ExpressionContext
from galaxy.util.json import json_fix
from galaxy.util.json import safe_loads
Expand Down Expand Up @@ -252,23 +253,20 @@ def params_to_incoming(incoming, inputs, input_values, app, name_prefix=""):
incoming[name_prefix + input.name] = value


def update_param(prefixed_name, input_values, new_value):
"""
Given a prefixed parameter name, e.g. 'parameter_0|parameter_1', update
the corresponding input value in a nested input values dictionary.
"""
for key in input_values:
match = re.match('^' + key + '_(\d+)\|(.+)', prefixed_name)
if match and not key.endswith("|__identifier__"):
index = int(match.group(1))
if isinstance(input_values[key], list) and len(input_values[key]) > index:
update_param(match.group(2), input_values[key][index], new_value)
else:
match = re.match('^' + key + '\|(.+)', prefixed_name)
if isinstance(input_values[key], dict) and match:
update_param(match.group(1), input_values[key], new_value)
elif prefixed_name == key:
input_values[key] = new_value
def update_dataset_ids(input_values, translate_values, src):

def replace_dataset_ids(path, key, value):
"""Exchanges dataset_ids (HDA, LDA, HDCA, not Dataset) in input_values with dataset ids used in job."""
current_case = input_values
if key == 'id':
for i, p in enumerate(path):
if isinstance(current_case, (list, dict)):
current_case = current_case[p]
if src == current_case.get('src'):
return key, translate_values.get(current_case['id'], value)
return key, value

return remap(input_values, visit=replace_dataset_ids)


def populate_state(request_context, inputs, incoming, state, errors={}, prefix='', context=None, check=True):
Expand Down
117 changes: 117 additions & 0 deletions test/api/test_workflows.py
Expand Up @@ -654,6 +654,123 @@ def test_workflow_run_output_collections(self):
self.dataset_populator.wait_for_history(history_id, assert_ok=True)
self.assertEqual("a\nc\nb\nd\n", self.dataset_populator.get_history_dataset_content(history_id, hid=0))

@skip_without_tool("job_properties")
@skip_without_tool("identifier_multiple_in_conditional")
def test_workflow_resume_from_failed_step(self):
workflow_id = self._upload_yaml_workflow("""
class: GalaxyWorkflow
steps:
- tool_id: job_properties
state:
thebool: true
failbool: true
- tool_id: identifier_multiple_in_conditional
state:
outer_cond:
cond_param_outer: true
inner_cond:
cond_param_inner: true
input1:
$link: 0#out_file1
""")
history_id = self.dataset_populator.new_history()
invocation_id = self.__invoke_workflow(history_id, workflow_id)
self.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id, assert_ok=False)
failed_dataset_one = self.dataset_populator.get_history_dataset_details(history_id, hid=1, wait=True, assert_ok=False)
assert failed_dataset_one['state'] == 'error', failed_dataset_one
paused_dataset = self.dataset_populator.get_history_dataset_details(history_id, hid=5, wait=True, assert_ok=False)
assert paused_dataset['state'] == 'paused', paused_dataset
inputs = {"thebool": "false",
"failbool": "false",
"rerun_remap_job_id": failed_dataset_one['creating_job']}
self.dataset_populator.run_tool(tool_id='job_properties',
inputs=inputs,
history_id=history_id,
assert_ok=True)
unpaused_dataset = self.dataset_populator.get_history_dataset_details(history_id, hid=5, wait=True, assert_ok=False)
assert unpaused_dataset['state'] == 'ok'

@skip_without_tool("job_properties")
@skip_without_tool("identifier_multiple_in_conditional")
def test_workflow_resume_from_failed_step_with_hdca_input(self):
workflow_id = self._upload_yaml_workflow("""
class: GalaxyWorkflow
steps:
- tool_id: job_properties
state:
thebool: true
failbool: true
- tool_id: identifier_collection
state:
input1:
$link: 0#list_output
""")
with self.dataset_populator.test_history() as history_id:
invocation_id = self.__invoke_workflow(history_id, workflow_id)
self.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id, assert_ok=False)
failed_dataset_one = self.dataset_populator.get_history_dataset_details(history_id, hid=1, wait=True, assert_ok=False)
assert failed_dataset_one['state'] == 'error', failed_dataset_one
paused_dataset = self.dataset_populator.get_history_dataset_details(history_id, hid=5, wait=True, assert_ok=False)
assert paused_dataset['state'] == 'paused', paused_dataset
inputs = {"thebool": "false",
"failbool": "false",
"rerun_remap_job_id": failed_dataset_one['creating_job']}
self.dataset_populator.run_tool(tool_id='job_properties',
inputs=inputs,
history_id=history_id,
assert_ok=True)
unpaused_dataset = self.dataset_populator.get_history_dataset_details(history_id, hid=5, wait=True,
assert_ok=False)
assert unpaused_dataset['state'] == 'ok'

@skip_without_tool("fail_identifier")
@skip_without_tool("identifier_multiple_in_conditional")
def test_workflow_resume_with_mapped_over_input(self):
with self.dataset_populator.test_history() as history_id:
job_summary = self._run_jobs("""
class: GalaxyWorkflow
steps:
- label: input_datasets
type: input_collection
- label: fail_identifier_1
tool_id: fail_identifier
state:
input1:
$link: input_datasets
failbool: true
- tool_id: identifier_collection
state:
input1:
$link: fail_identifier_1#out_file1
test_data:
input_datasets:
type: list
elements:
- identifier: fail
value: 1.fastq
type: File
- identifier: success
value: 1.fastq
type: File
""", history_id=history_id, assert_ok=False, wait=False)
self.wait_for_invocation_and_jobs(history_id, job_summary.workflow_id, job_summary.invocation_id, assert_ok=False)
history_contents = self.dataset_populator._get_contents_request(history_id=history_id).json()
paused_dataset = history_contents[-1]
failed_dataset = self.dataset_populator.get_history_dataset_details(history_id, hid=5, assert_ok=False)
assert paused_dataset['state'] == 'paused', paused_dataset
assert failed_dataset['state'] == 'error', failed_dataset
inputs = {"input1": {'values': [{'src': 'hda',
'id': history_contents[0]['id']}]
},
"failbool": "false",
"rerun_remap_job_id": failed_dataset['creating_job']}
self.dataset_populator.run_tool(tool_id='fail_identifier',
inputs=inputs,
history_id=history_id,
assert_ok=True)
unpaused_dataset = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=False)
assert unpaused_dataset['state'] == 'ok'

@skip_without_tool("collection_creates_pair")
def test_workflow_run_output_collection_mapping(self):
workflow_id = self._upload_yaml_workflow("""
Expand Down
10 changes: 5 additions & 5 deletions test/base/populators.py
Expand Up @@ -275,19 +275,19 @@ def get_history_dataset_content(self, history_id, wait=True, filename=None, **kw
data = {}
if filename:
data["filename"] = filename
display_response = self.__get_contents_request(history_id, "/%s/display" % dataset_id, data=data)
display_response = self._get_contents_request(history_id, "/%s/display" % dataset_id, data=data)
assert display_response.status_code == 200, display_response.content
return display_response.content

def get_history_dataset_details(self, history_id, **kwds):
dataset_id = self.__history_content_id(history_id, **kwds)
details_response = self.__get_contents_request(history_id, "/datasets/%s" % dataset_id)
details_response = self._get_contents_request(history_id, "/datasets/%s" % dataset_id)
assert details_response.status_code == 200
return details_response.json()

def get_history_collection_details(self, history_id, **kwds):
hdca_id = self.__history_content_id(history_id, **kwds)
details_response = self.__get_contents_request(history_id, "/dataset_collections/%s" % hdca_id)
details_response = self._get_contents_request(history_id, "/dataset_collections/%s" % hdca_id)
assert details_response.status_code == 200, details_response.content
return details_response.json()

Expand Down Expand Up @@ -320,7 +320,7 @@ def __history_content_id(self, history_id, wait=True, **kwds):
history_content_id = kwds["dataset"]["id"]
else:
hid = kwds.get("hid", None) # If not hid, just grab last dataset
history_contents = self.__get_contents_request(history_id).json()
history_contents = self._get_contents_request(history_id).json()
if hid:
history_content_id = None
for history_item in history_contents:
Expand All @@ -333,7 +333,7 @@ def __history_content_id(self, history_id, wait=True, **kwds):
history_content_id = history_contents[-1]["id"]
return history_content_id

def __get_contents_request(self, history_id, suffix="", data={}):
def _get_contents_request(self, history_id, suffix="", data={}):
url = "histories/%s/contents" % history_id
if suffix:
url = "%s%s" % (url, suffix)
Expand Down