From 64652c28cf7ea88195b1c9008c46137c3bb3644a Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 5 Sep 2018 12:10:30 +0200 Subject: [PATCH 1/3] Drop wait from cached job tests --- test/api/test_workflows.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/api/test_workflows.py b/test/api/test_workflows.py index 817161eb7a22..f74f258986ad 100644 --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -1893,7 +1893,6 @@ def test_workflow_rerun_with_use_cached_job(self): new_workflow_request['history'] = "hist_id=%s" % history_id_two new_workflow_request['use_cached_job'] = True # We run the workflow again, it should not produce any new outputs - self.dataset_populator.wait_for_history(history_id_one, assert_ok=True) new_workflow_response = self._post("workflows", data=new_workflow_request).json() first_wf_output = self._get("datasets/%s" % run_workflow_response['outputs'][0]).json() second_wf_output = self._get("datasets/%s" % new_workflow_response['outputs'][0]).json() @@ -1911,7 +1910,6 @@ def test_nested_workflow_rerun_with_use_cached_job(self): type: File """ % WORKFLOW_NESTED_SIMPLE run_jobs_summary = self._run_jobs(workflow_run_description, history_id=history_id_one) - self.dataset_populator.wait_for_history(history_id_one, assert_ok=True) workflow_request = run_jobs_summary.workflow_request # We copy the inputs to a new history and re-run the workflow inputs = json.loads(workflow_request['inputs']) From 31d850b64484dd4f418a27c9313a954f89e37d7d Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 29 Aug 2018 15:11:40 +0200 Subject: [PATCH 2/3] Make discovery of cachable jobs more robust We update the actually used version once the job inputs are ready. This allows finding jobs when a dataset is being new / being updated while the job is new. This is also more accurate since we record the dataset versino that is actually being used when the tool is being evaluated. This should allow chaching when simultaneously running workflows that share steps. --- lib/galaxy/jobs/handler.py | 4 ++++ lib/galaxy/managers/jobs.py | 37 +++++++++++++++++++----------------- lib/galaxy/model/__init__.py | 8 ++++++-- 3 files changed, 30 insertions(+), 19 deletions(-) diff --git a/lib/galaxy/jobs/handler.py b/lib/galaxy/jobs/handler.py index 114af0d5aab5..ffc1796ee3b4 100644 --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -457,6 +457,10 @@ def __check_job_state(self, job): if state == JOB_READY: # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration self.increase_running_job_count(job.user_id, job_destination.id) + for job_to_input_dataset_association in job.input_datasets: + # We record the input dataset version, now that we know the inputs are ready + if job_to_input_dataset_association.dataset: + job_to_input_dataset_association.dataset_version = job_to_input_dataset_association.dataset.version return state def __verify_job_ready(self, job, job_wrapper): diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 8e2fcb4baa9e..b76709022204 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -147,25 +147,28 @@ def replace_dataset_ids(path, key, value): conditions.append(and_( model.Job.id == a.job_id, a.name == k, - a.dataset_id == b.id, + a.dataset_id == b.id, # b is the HDA use for the job c.dataset_id == b.dataset_id, c.id == v, # c is the requested job input HDA - # We can compare input dataset names and metadata for a job - # if we know that the input dataset hasn't changed since the job was run, - # or if the job recorded a dataset_version and name and metadata of the current - # job request matches those that were recorded for the job in question (introduced in release 18.01) - or_(and_( - b.update_time < model.Job.create_time, - b.name == c.name, - b.extension == c.extension, - b.metadata == c.metadata, - ), and_( - b.id == e.history_dataset_association_id, - a.dataset_version == e.version, - e.name == c.name, - e.extension == c.extension, - e._metadata == c._metadata, - )), + # We need to make sure that the job we are looking for has been run with identical inputs. + # Here we deal with 3 requirements: + # - the jobs' input dataset (=b) version is 0, meaning the job's input dataset is not yet ready + # - b's update_time is older than the job create time, meaning no changes occured + # - the job has a dataset_version recorded, and that versions' metadata matches c's metadata. + or_( + and_(or_(a.dataset_version.in_([0, b.version]), + b.update_time < model.Job.create_time), + b.name == c.name, + b.extension == c.extension, + b.metadata == c.metadata, + ), + and_(b.id == e.history_dataset_association_id, + a.dataset_version == e.version, + e.name == c.name, + e.extension == c.extension, + e._metadata == c._metadata, + ), + ), or_(b.deleted == false(), c.deleted == false()) )) if identifier: diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 8a4d3b73576e..91c5739b1a35 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -34,7 +34,11 @@ type_coerce, types) from sqlalchemy.ext import hybrid -from sqlalchemy.orm import aliased, joinedload, object_session +from sqlalchemy.orm import ( + aliased, + joinedload, + object_session, +) from sqlalchemy.schema import UniqueConstraint import galaxy.model.metadata @@ -1110,7 +1114,7 @@ class JobToInputDatasetAssociation(object): def __init__(self, name, dataset): self.name = name self.dataset = dataset - self.dataset_version = dataset.version if dataset else None + self.dataset_version = 0 # We start with version 0 and update once the job is ready class JobToOutputDatasetAssociation(object): From a0982c1b900111d0b2a4add372f0caef0686a000 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 6 Sep 2018 18:31:04 +0200 Subject: [PATCH 3/3] Only create HDA history if HDA columns have changed This creates fewer versions because we ignore relationship changes. --- lib/galaxy/model/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 91c5739b1a35..8a97c4587944 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -2570,7 +2570,8 @@ def __create_version__(self, session): state = inspect(self) changes = {} - for attr in state.attrs: + for attr in state.mapper.columns: + # We only create a new version if columns of the HDA table have changed, and ignore relationships. hist = state.get_history(attr.key, True) if not hist.has_changes():