Skip to content

Commit

Permalink
Do not explicitly begin a new transaction when getting next_hid
Browse files Browse the repository at this point in the history
There is no transactional state, so it seems unncessary. Not starting an
explicit transaction means sqlalchemy won't flush the current session,
which means we're not committing a HDA/HDCA to the database that doesn't
have a hid yet.
  • Loading branch information
mvdbeek committed Oct 18, 2021
1 parent cfdd6e9 commit f63dc3b
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 25 deletions.
2 changes: 1 addition & 1 deletion lib/galaxy/managers/collections.py
Expand Up @@ -341,7 +341,7 @@ def copy(self, trans, parent, source, encoded_source_id, copy_elements=False, da
copy_kwds["element_destination"] = parent # e.g. a history
if dataset_instance_attributes is not None:
copy_kwds["dataset_instance_attributes"] = dataset_instance_attributes
new_hdca = source_hdca.copy(**copy_kwds)
new_hdca = source_hdca.copy(flush=False, **copy_kwds)
new_hdca.copy_tags_from(target_user=trans.get_user(), source=source_hdca)
if not copy_elements:
parent.add_dataset_collection(new_hdca)
Expand Down
7 changes: 3 additions & 4 deletions lib/galaxy/model/__init__.py
Expand Up @@ -2566,10 +2566,9 @@ def copy(self, name=None, target_user=None, activatable=False, all_datasets=Fals
else:
hdcas = self.active_dataset_collections
for hdca in hdcas:
new_hdca = hdca.copy()
new_hdca = hdca.copy(flush=False)
new_history.add_dataset_collection(new_hdca, set_hid=False)
db_session.add(new_hdca)
db_session.flush()

if target_user:
new_hdca.copy_item_annotation(db_session, self.user, hdca, target_user, new_hdca)
Expand Down Expand Up @@ -5699,7 +5698,7 @@ def find_implicit_input_collection(self, name):
break
return matching_collection

def copy(self, element_destination=None, dataset_instance_attributes=None):
def copy(self, element_destination=None, dataset_instance_attributes=None, flush=True):
"""
Create a copy of this history dataset collection association. Copy
underlying collection.
Expand Down Expand Up @@ -5729,7 +5728,7 @@ def copy(self, element_destination=None, dataset_instance_attributes=None):
if element_destination:
element_destination.stage_addition(hdca)
element_destination.add_pending_items()
else:
if flush:
object_session(self).flush()
return hdca

Expand Down
20 changes: 7 additions & 13 deletions lib/galaxy/model/mapping.py
Expand Up @@ -45,19 +45,13 @@ def db_next_hid(self, n=1):
"""
session = object_session(self)
table = self.table
trans = session.begin()
try:
if "postgres" not in session.bind.dialect.name:
next_hid = select([table.c.hid_counter], table.c.id == model.cached_id(self)).with_for_update().scalar()
table.update(table.c.id == self.id).execute(hid_counter=(next_hid + n))
else:
stmt = table.update().where(table.c.id == model.cached_id(self)).values(hid_counter=(table.c.hid_counter + n)).returning(table.c.hid_counter)
next_hid = session.execute(stmt).scalar() - n
trans.commit()
return next_hid
except Exception:
trans.rollback()
raise
if "postgres" not in session.bind.dialect.name:
next_hid = select([table.c.hid_counter], table.c.id == model.cached_id(self)).with_for_update().scalar()
table.update(table.c.id == self.id).execute(hid_counter=(next_hid + n))
else:
stmt = table.update().where(table.c.id == model.cached_id(self)).values(hid_counter=(table.c.hid_counter + n)).returning(table.c.hid_counter)
next_hid = session.execute(stmt).scalar() - n
return next_hid


model.History._next_hid = db_next_hid # type: ignore
Expand Down
12 changes: 5 additions & 7 deletions lib/galaxy/tools/execute.py
Expand Up @@ -111,18 +111,16 @@ def execute_single_job(execution_slice, completed_job):
history = execution_slice.history or history
jobs_executed += 1

if execution_slice:
# a side effect of adding datasets to a history is a commit within db_next_hid (even with flush=False).
history.add_pending_items()
else:
# Make sure collections, implicit jobs etc are flushed even if there are no precreated output datasets
trans.sa_session.flush()

if job_datasets:
for job, datasets in job_datasets.items():
for dataset_instance in datasets:
dataset_instance.dataset.job = job

if execution_slice:
history.add_pending_items()
# Make sure collections, implicit jobs etc are flushed even if there are no precreated output datasets
trans.sa_session.flush()

tool_id = tool.id
for job in execution_tracker.successful_jobs:
# Put the job in the queue if tracking in memory
Expand Down
1 change: 1 addition & 0 deletions test/unit/tools/test_history_imp_exp.py
Expand Up @@ -516,6 +516,7 @@ def test_export_copied_objects_copied_outside_history():

other_h = model.History(name=h.name + "-other", user=h.user)
sa_session.add(other_h)
sa_session.flush()

hc3 = hc2.copy(element_destination=other_h)
other_h.add_pending_items()
Expand Down

0 comments on commit f63dc3b

Please sign in to comment.