Navigation Menu

Skip to content

Commit

Permalink
Merge pull request #6669 from mvdbeek/fix_cache_tests
Browse files Browse the repository at this point in the history
Increase robustness and accuracy of job search / cache use
  • Loading branch information
jmchilton committed Sep 6, 2018
2 parents 89405fa + a0982c1 commit 5c54793
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 22 deletions.
4 changes: 4 additions & 0 deletions lib/galaxy/jobs/handler.py
Expand Up @@ -457,6 +457,10 @@ def __check_job_state(self, job):
if state == JOB_READY:
# PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration
self.increase_running_job_count(job.user_id, job_destination.id)
for job_to_input_dataset_association in job.input_datasets:
# We record the input dataset version, now that we know the inputs are ready
if job_to_input_dataset_association.dataset:
job_to_input_dataset_association.dataset_version = job_to_input_dataset_association.dataset.version
return state

def __verify_job_ready(self, job, job_wrapper):
Expand Down
37 changes: 20 additions & 17 deletions lib/galaxy/managers/jobs.py
Expand Up @@ -147,25 +147,28 @@ def replace_dataset_ids(path, key, value):
conditions.append(and_(
model.Job.id == a.job_id,
a.name == k,
a.dataset_id == b.id,
a.dataset_id == b.id, # b is the HDA use for the job
c.dataset_id == b.dataset_id,
c.id == v, # c is the requested job input HDA
# We can compare input dataset names and metadata for a job
# if we know that the input dataset hasn't changed since the job was run,
# or if the job recorded a dataset_version and name and metadata of the current
# job request matches those that were recorded for the job in question (introduced in release 18.01)
or_(and_(
b.update_time < model.Job.create_time,
b.name == c.name,
b.extension == c.extension,
b.metadata == c.metadata,
), and_(
b.id == e.history_dataset_association_id,
a.dataset_version == e.version,
e.name == c.name,
e.extension == c.extension,
e._metadata == c._metadata,
)),
# We need to make sure that the job we are looking for has been run with identical inputs.
# Here we deal with 3 requirements:
# - the jobs' input dataset (=b) version is 0, meaning the job's input dataset is not yet ready
# - b's update_time is older than the job create time, meaning no changes occured
# - the job has a dataset_version recorded, and that versions' metadata matches c's metadata.
or_(
and_(or_(a.dataset_version.in_([0, b.version]),
b.update_time < model.Job.create_time),
b.name == c.name,
b.extension == c.extension,
b.metadata == c.metadata,
),
and_(b.id == e.history_dataset_association_id,
a.dataset_version == e.version,
e.name == c.name,
e.extension == c.extension,
e._metadata == c._metadata,
),
),
or_(b.deleted == false(), c.deleted == false())
))
if identifier:
Expand Down
11 changes: 8 additions & 3 deletions lib/galaxy/model/__init__.py
Expand Up @@ -34,7 +34,11 @@
type_coerce,
types)
from sqlalchemy.ext import hybrid
from sqlalchemy.orm import aliased, joinedload, object_session
from sqlalchemy.orm import (
aliased,
joinedload,
object_session,
)
from sqlalchemy.schema import UniqueConstraint

import galaxy.model.metadata
Expand Down Expand Up @@ -1110,7 +1114,7 @@ class JobToInputDatasetAssociation(object):
def __init__(self, name, dataset):
self.name = name
self.dataset = dataset
self.dataset_version = dataset.version if dataset else None
self.dataset_version = 0 # We start with version 0 and update once the job is ready


class JobToOutputDatasetAssociation(object):
Expand Down Expand Up @@ -2568,7 +2572,8 @@ def __create_version__(self, session):
state = inspect(self)
changes = {}

for attr in state.attrs:
for attr in state.mapper.columns:
# We only create a new version if columns of the HDA table have changed, and ignore relationships.
hist = state.get_history(attr.key, True)

if not hist.has_changes():
Expand Down
2 changes: 0 additions & 2 deletions test/api/test_workflows.py
Expand Up @@ -1893,7 +1893,6 @@ def test_workflow_rerun_with_use_cached_job(self):
new_workflow_request['history'] = "hist_id=%s" % history_id_two
new_workflow_request['use_cached_job'] = True
# We run the workflow again, it should not produce any new outputs
self.dataset_populator.wait_for_history(history_id_one, assert_ok=True)
new_workflow_response = self._post("workflows", data=new_workflow_request).json()
first_wf_output = self._get("datasets/%s" % run_workflow_response['outputs'][0]).json()
second_wf_output = self._get("datasets/%s" % new_workflow_response['outputs'][0]).json()
Expand All @@ -1911,7 +1910,6 @@ def test_nested_workflow_rerun_with_use_cached_job(self):
type: File
""" % WORKFLOW_NESTED_SIMPLE
run_jobs_summary = self._run_jobs(workflow_run_description, history_id=history_id_one)
self.dataset_populator.wait_for_history(history_id_one, assert_ok=True)
workflow_request = run_jobs_summary.workflow_request
# We copy the inputs to a new history and re-run the workflow
inputs = json.loads(workflow_request['inputs'])
Expand Down

0 comments on commit 5c54793

Please sign in to comment.