Skip to content

Commit

Permalink
Use cached database identity to avoid certain DB requests when creati…
Browse files Browse the repository at this point in the history
…ng jobs.

When creating thousands of jobs for large database transactions - previously Galaxy would query the database thousands of times for the same history id, user id, and session id.
  • Loading branch information
jmchilton committed Jul 23, 2018
1 parent 3b629f0 commit dd6b643
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 16 deletions.
28 changes: 27 additions & 1 deletion lib/galaxy/model/__init__.py
Expand Up @@ -160,6 +160,32 @@ def seconds_since_created(self):
return (galaxy.model.orm.now.now() - create_time).total_seconds()


def cached_id(galaxy_model_object):
"""Get model object id attribute without a firing a database query.
Useful to fetching the id of a typical Galaxy model object after a flush,
where SA is going to mark the id attribute as unloaded but we know the
id is immutable and so we can use the database identity to fetch.
With Galaxy's default SA initialization - any flush marks all attributes
as unloaded - even objects completely unrelated to the flushed changes
and even attributes we know to be immutable like id. See test_galaxy_mapping.py
for verification of this behavior. This method is a workaround that
uses the fact we know all Galaxy objects use the id attribute as identity
and SA internals to infer the previously loaded ID value. I tried
digging into the SA internals extensively and couldn't find a way to get
the previously loaded values after a flush to allow a generalization of this
for other attributes but I couldn't find anything.
"""
if hasattr(galaxy_model_object, "_sa_instance_state"):
identity = galaxy_model_object._sa_instance_state.identity
if identity:
assert len(identity) == 1
return identity[0]

return galaxy_model_object.id


class JobLike(object):

MAX_NUMERIC = 10**(JOB_METRIC_PRECISION - JOB_METRIC_SCALE) - 1
Expand Down Expand Up @@ -1434,7 +1460,7 @@ def __add_datasets_optimized(self, datasets, genome_build=None):
if set_genome:
self.genome_build = genome_build
for dataset in datasets:
dataset.history_id = self.id
dataset.history_id = cached_id(self)
return datasets

def add_dataset_collection(self, history_dataset_collection, set_hid=True):
Expand Down
15 changes: 6 additions & 9 deletions lib/galaxy/tools/actions/__init__.py
Expand Up @@ -642,10 +642,10 @@ def _new_job_for_session(self, trans, tool, history):
galaxy_session = trans.get_galaxy_session()
# If we're submitting from the API, there won't be a session.
if type(galaxy_session) == trans.model.GalaxySession:
job.session_id = galaxy_session.id
job.session_id = model.cached_id(galaxy_session)
if trans.user is not None:
job.user_id = trans.user.id
job.history_id = history.id
job.user_id = model.cached_id(trans.user)
job.history_id = model.cached_id(history)
job.tool_id = tool.id
try:
# For backward compatibility, some tools may not have versions yet.
Expand Down Expand Up @@ -685,9 +685,9 @@ def restore_reduction_visitor(input, value, prefix, parent=None, prefixed_name=N
target_dict[input.name] = []
for reduced_collection in reductions[prefixed_name]:
if hasattr(reduced_collection, "child_collection"):
target_dict[input.name].append({'id': reduced_collection.id, 'src': 'dce'})
target_dict[input.name].append({'id': model.cached_id(reduced_collection), 'src': 'dce'})
else:
target_dict[input.name].append({'id': reduced_collection.id, 'src': 'hdca'})
target_dict[input.name].append({'id': model.cached_id(reduced_collection), 'src': 'hdca'})

if reductions:
tool.visit_inputs(incoming, restore_reduction_visitor)
Expand All @@ -713,10 +713,7 @@ def _check_input_data_access(self, trans, job, inp_data, current_user_roles):
if dataset:
if not trans.app.security_agent.can_access_dataset(current_user_roles, dataset.dataset):
raise Exception("User does not have permission to use a dataset (%s) provided for input." % dataset.id)
if dataset in trans.sa_session:
job.add_input_dataset(name, dataset=dataset)
else:
job.add_input_dataset(name, dataset_id=dataset.id)
job.add_input_dataset(name, dataset=dataset)
else:
job.add_input_dataset(name, None)
job_str = job.log_str()
Expand Down
9 changes: 7 additions & 2 deletions lib/galaxy/tools/parameters/basic.py
Expand Up @@ -1521,10 +1521,15 @@ def single_to_json(value):
src = 'dce'
elif isinstance(value, app.model.HistoryDatasetCollectionAssociation):
src = 'hdca'
elif hasattr(value, 'id'):
elif isinstance(value, app.model.HistoryDatasetAssociation) or hasattr(value, 'id'):
# hasattr 'id' fires a query on persistent objects after a flush so better
# to do the isinstance check. Not sure we need the hasattr check anymore - it'd be
# nice to drop it.
src = 'hda'
if src is not None:
return {'id' : app.security.encode_id(value.id) if use_security else value.id, 'src' : src}
object_id = galaxy.model.cached_id(value)
return {'id' : app.security.encode_id(object_id) if use_security else object_id, 'src' : src}

if value not in [None, '', 'None']:
if isinstance(value, list) and len(value) > 0:
values = [single_to_json(v) for v in value]
Expand Down
9 changes: 5 additions & 4 deletions lib/galaxy/web/framework/webapp.py
Expand Up @@ -316,10 +316,11 @@ def is_allowed_origin(origin):

def get_user(self):
"""Return the current user if logged in or None."""
if self.galaxy_session:
return self.galaxy_session.user
else:
return self.__user
user = self.__user
if not user and self.galaxy_session:
user = self.galaxy_session.user
self.__user = user
return user

def set_user(self, user):
"""Set the current user."""
Expand Down
82 changes: 82 additions & 0 deletions test/unit/test_galaxy_mapping.py
Expand Up @@ -4,6 +4,7 @@
import uuid

from six import text_type
from sqlalchemy import inspect

import galaxy.datatypes.registry
import galaxy.model
Expand Down Expand Up @@ -377,6 +378,87 @@ def contents_iter_names(**kwds):

assert contents_iter_names(ids=[d1.id, d3.id]) == ["1", "3"]

def _non_empty_flush(self):
model = self.model
lf = model.LibraryFolder(name="RootFolder")
session = self.session()
session.add(lf)
session.flush()

def test_flush_refreshes(self):
# Normally I don't believe in unit testing library code, but the behaviors around attribute
# states and flushing in SQL Alchemy is very subtle and it is good to have a executable
# reference for how it behaves in the context of Galaxy objects.
model = self.model
user = model.User(
email="testworkflows@bx.psu.edu",
password="password"
)
galaxy_session = model.GalaxySession()
galaxy_session_other = model.GalaxySession()
galaxy_session.user = user
galaxy_session_other.user = user
self.persist(user, galaxy_session_other, galaxy_session)
galaxy_session_id = galaxy_session.id

self.expunge()
session = self.session()
galaxy_model_object = self.query(model.GalaxySession).get(galaxy_session_id)
expected_id = galaxy_model_object.id

# id loaded as part of the object query, could be any non-deferred attribute.
assert 'id' not in inspect(galaxy_model_object).unloaded

# Perform an empty flush, verify empty flush doesn't reload all attributes.
session.flush()
assert 'id' not in inspect(galaxy_model_object).unloaded

# However, flushing anything non-empty - even unrelated object will invalidate
# the session ID.
self._non_empty_flush()
assert 'id' in inspect(galaxy_model_object).unloaded

# Fetch the ID loads the value from the database
assert expected_id == galaxy_model_object.id
assert 'id' not in inspect(galaxy_model_object).unloaded

# Using cached_id instead does not exhibit this behavior.
self._non_empty_flush()
assert expected_id == galaxy.model.cached_id(galaxy_model_object)
assert 'id' in inspect(galaxy_model_object).unloaded

# Keeping the following failed experiments here for future reference,
# I probed the internals of the attribute tracking and couldn't find an
# alternative, generalized way to get the previously loaded value for unloaded
# attributes.
# print(galaxy_model_object._sa_instance_state.attrs.id)
# print(dir(galaxy_model_object._sa_instance_state.attrs.id))
# print(galaxy_model_object._sa_instance_state.attrs.id.loaded_value)
# print(galaxy_model_object._sa_instance_state.attrs.id.state)
# print(galaxy_model_object._sa_instance_state.attrs.id.load_history())
# print(dir(galaxy_model_object._sa_instance_state.attrs.id.load_history()))
# print(galaxy_model_object._sa_instance_state.identity)
# print(dir(galaxy_model_object._sa_instance_state))
# print(galaxy_model_object._sa_instance_state.expired_attributes)
# print(galaxy_model_object._sa_instance_state.expired)
# print(galaxy_model_object._sa_instance_state._instance_dict().keys())
# print(dir(galaxy_model_object._sa_instance_state._instance_dict))
# assert False

# Verify cached_id works even immeidately after an initial flush, prevents a second SELECT
# query that would be executed if object.id was used.
galaxy_model_object_new = model.GalaxySession()
session.add(galaxy_model_object_new)
session.flush()
assert galaxy.model.cached_id(galaxy_model_object_new)
assert 'id' in inspect(galaxy_model_object_new).unloaded

# Verify a targetted flush prevent expiring unrelated objects.
galaxy_model_object_new.id
assert 'id' not in inspect(galaxy_model_object_new).unloaded
session.flush(model.GalaxySession())
assert 'id' not in inspect(galaxy_model_object_new).unloaded

def test_workflows(self):
model = self.model
user = model.User(
Expand Down

0 comments on commit dd6b643

Please sign in to comment.