Permalink
Browse files

Unify all the database initialization code

Bodhi was littered with database initialization code and engines were
being created for every single request. This unifies all the
initialization code and uses a plain old scoped session rather than
zope.sqlalchemy. This is better because Bodhi uses the database
extensively outside the request/response cycle and zope.sqlalchemy gets
in the way more than it helps there.

All the code should be using the scoped session now rather than the
transaction provided by zope and zope.sqlalchemy is no longer a required
dependency.

Previously, fedmsgs used zope's transaction management to send messages
after a commit occurred. This simplifies the code by using an
after_commit listener. Messages are stashed on the session's ``info`` dict
attribute by the existing ``publish`` function.

Signed-off-by: Jeremy Cline <jeremy@jcline.org>
  • Loading branch information...
jeremycline committed Apr 4, 2017
1 parent 4c08c2f commit e9a26042b694e6202dccee8db494b43ce3832244
View
@@ -25,9 +25,8 @@
from pyramid.renderers import JSONP
from pyramid.security import unauthenticated_userid
from pyramid.settings import asbool
from sqlalchemy import engine_from_config
from sqlalchemy import engine_from_config, event
from sqlalchemy.orm import scoped_session, sessionmaker
from zope.sqlalchemy import ZopeTransactionExtension
from bodhi.server import bugs, buildsys, ffmarkdown
@@ -45,19 +44,26 @@
def get_db_session_for_request(request=None):
"""
This function returns a database session that is meant to be used for the given request. It sets
up the Zope transaction manager and configures the request to close the session when it is
completed. If you need a database session that is not tied to a request, you can use
bodhi.server.models.get_db_factory() to return a session generator.
This function returns a database session that is meant to be used for the given request.
It handles rolling back or committing the session based on whether an exception occurred or
not. To get a database session that's not tied to the request/response cycle, just use the
:data:`Session` scoped session in this module.
Args:
request (pyramid.request): The request object to create a session for.
Returns:
sqlalchemy.orm.session.Session: A database session.
"""
engine = engine_from_config(request.registry.settings, 'sqlalchemy.')
Sess = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
Sess.configure(bind=engine)
session = Sess()
session = request.registry.sessionmaker()
def cleanup(request):
# No need to do rollback/commit ourselves. the zope transaction manager takes care of that
# for us. However, we want to explicitly close the session we opened
"""A post-request hook that commits the database changes if no exceptions occurred."""
if request.exception is not None:
session.rollback()
else:
session.commit()
session.close()
request.add_finished_callback(cleanup)
@@ -121,6 +127,38 @@ def exception_filter(response, request):
# Bodhi initialization
#
#: An SQLAlchemy scoped session with an engine configured using the settings in Bodhi's server
#: configuration file. Note that you *must* call :func:`initialize_db` before you can use this.
Session = scoped_session(sessionmaker())
def initialize_db(config):
"""
Initialize the database using the given configuration.
This *must* be called before you can use the :data:`Session` object.
Args:
config (dict): The Bodhi server configuration dictionary.
Returns:
sqlalchemy.engine: The database engine created from the configuration.
"""
#: The SQLAlchemy database engine. This is constructed using the value of
#: ``DB_URL`` in :data:`config``.
engine = engine_from_config(config, 'sqlalchemy.')
# When using SQLite we need to make sure foreign keys are enabled:
# http://docs.sqlalchemy.org/en/latest/dialects/sqlite.html#foreign-key-support
if config['sqlalchemy.url'].startswith('sqlite:'):
event.listen(
engine,
'connect',
lambda db_con, con_record: db_con.execute('PRAGMA foreign_keys=ON')
)
Session.configure(bind=engine)
return engine
def main(global_config, testing=None, session=None, **settings):
""" This function returns a WSGI application """
# Setup our bugtracker and buildsystem
@@ -147,6 +185,10 @@ def main(global_config, testing=None, session=None, **settings):
config.include('pyramid_mako')
config.include('cornice')
# Initialize the database scoped session
initialize_db(settings)
config.registry.sessionmaker = Session.session_factory
# Lazy-loaded memoized request properties
if session:
config.add_request_method(lambda _: session, 'db', reify=True)
@@ -36,11 +36,10 @@
import fedmsg.consumers
from bodhi.server import bugs as bug_module
from bodhi.server import initialize_db, util, bugs as bug_module
from bodhi.server.config import config
from bodhi.server.exceptions import BodhiException
from bodhi.server.models import Bug, Update, UpdateType
from bodhi.server import models
log = logging.getLogger('bodhi')
@@ -55,7 +54,8 @@ class UpdatesHandler(fedmsg.consumers.FedmsgConsumer):
config_key = 'updates_handler'
def __init__(self, hub, *args, **kwargs):
self.db_factory = models.get_db_factory()
engine = initialize_db(config)
self.db_factory = util.transactional_session_maker(engine)
prefix = hub.config.get('topic_prefix')
env = hub.config.get('environment')
@@ -12,18 +12,23 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import copy
import collections
import logging
import socket
from sqlalchemy import event
import fedmsg
import fedmsg.config
import fedmsg.encoding
import transaction
from bodhi.server import Session
import bodhi.server
import bodhi.server.config
_log = logging.getLogger(__name__)
def init(active=None, cert_prefix=None):
if not bodhi.server.config.config.get('fedmsg_enabled'):
bodhi.server.log.warn("fedmsg disabled. not initializing.")
@@ -46,6 +51,34 @@ def init(active=None, cert_prefix=None):
bodhi.server.log.info("fedmsg initialized")
@event.listens_for(Session, 'after_commit')
def send_fedmsgs_after_commit(session):
"""
An SQLAlchemy event listener to send fedmsgs after a database commit.
This relies on the session ``info`` dictionary being populated. At the moment,
this is done by calling the :func:`publish` function. In the future it should
be done automatically using SQLAlchemy listeners.
Args:
session (sqlalchemy.orm.session.Session): The session that was committed.
"""
if 'fedmsg' in session.info:
# Initialize right before we try to publish, but only if we haven't
# initialized for this thread already.
if not fedmsg_is_initialized():
init()
for topic, messages in session.info['fedmsg'].items():
_log.info('emitting {n} fedmsgs to the "{topic}" topic.'.format(
n=len(messages), topic=topic))
for msg in messages:
fedmsg.publish(topic=topic, msg=msg)
# Tidy up after ourselves so a second call to commit on this session won't
# send the same messages again.
del session.info['fedmsg'][topic]
def publish(topic, msg, force=False):
""" Publish a message to fedmsg.
@@ -70,9 +103,13 @@ def publish(topic, msg, force=False):
bodhi.server.log.debug("fedmsg skipping transaction and sending %r" % topic)
fedmsg.publish(topic=topic, msg=msg)
else:
# This gives us the thread-local session which we'll use to stash the fedmsg.
# When commit is called on it, the :func:`send_fedmsgs_after_commit` is triggered.
session = Session()
if 'fedmsg' not in session.info:
session.info['fedmsg'] = collections.defaultdict(list)
session.info['fedmsg'][topic].append(msg)
bodhi.server.log.debug("fedmsg enqueueing %r" % topic)
manager = _managers_map.get_current_data_manager()
manager.enqueue(topic, msg)
def fedmsg_is_initialized():
@@ -83,122 +120,3 @@ def fedmsg_is_initialized():
# Ensure that fedmsg has an endpoint to publish to.
context = getattr(local, '__context')
return hasattr(context, 'publisher')
class ManagerMapping(object):
""" Maintain a two-way one-to-one mapping between transaction managers and
data managers (for different wsgi threads in the same process). """
def __init__(self):
self._left = {}
self._right = {}
def get_current_data_manager(self):
current_transaction_manager = transaction.get()
if current_transaction_manager not in self:
current_data_manager = FedmsgDataManager()
self.add(current_transaction_manager, current_data_manager)
current_transaction_manager.join(current_data_manager)
else:
current_data_manager = self.get(current_transaction_manager)
return current_data_manager
def add(self, transaction_manager, data_manager):
self._left[transaction_manager] = data_manager
self._right[data_manager] = transaction_manager
def __contains__(self, item):
return item in self._left or item in self._right
def get(self, transaction_manager):
return self._left[transaction_manager]
def remove(self, data_manager):
transaction_manager = self._right[data_manager]
del self._left[transaction_manager]
del self._right[data_manager]
def __repr__(self):
return "<ManagerMapping: left(%i) right(%i)>" % (
len(self._left),
len(self._right),
)
# This is a global object we'll maintain to keep track of the relationship
# between transaction managers and our data managers. It ensures that we don't
# create multiple data managers per transaction and that we don't join the same
# data manager to a transaction multiple times. Our data manager should clean
# up after itself and remove old tm/dm pairs from this mapping in the event of
# abort or commit.
_managers_map = ManagerMapping()
class FedmsgDataManager(object):
transaction_manager = transaction.manager
def __init__(self):
self.uncommitted = []
self.committed = []
def enqueue(self, topic, msg):
self.uncommitted.append((topic, msg,))
def __repr__(self):
return self.uncommitted.__repr__()
def abort(self, transaction):
self.uncommitted = copy.copy(self.committed)
if self in _managers_map:
_managers_map.remove(self)
def tpc_begin(self, transaction):
pass
def commit(self, transaction):
pass
def tpc_vote(self, transaction):
# This ensures two things:
# 1) that all the objects we're about to publish are JSONifiable.
# 2) that we convert them from sqlalchemy objects to dicts *before* the
# transaction enters its final phase, at which point our objects
# will be detached from their session.
self.uncommitted = [
(topic, fedmsg.encoding.loads(fedmsg.encoding.dumps(msg)))
for topic, msg in self.uncommitted
]
# Ensure that fedmsg has already been initialized.
assert fedmsg_is_initialized(), "fedmsg is not initialized"
def tpc_abort(self, transaction):
self.abort(transaction)
self._finish('aborted')
def tpc_finish(self, transaction):
for topic, msg in self.uncommitted:
bodhi.server.log.debug("fedmsg sending %r" % topic)
fedmsg.publish(topic=topic, msg=msg)
self.committed = copy.copy(self.uncommitted)
_managers_map.remove(self)
self._finish('committed')
def _finish(self, state):
self.state = state
def sortKey(self):
""" Use a 'z' to make fedmsg come last, after the db is done. """
return 'z_fedmsgdm' + str(id(self))
def savepoint(self):
return FedmsgSavepoint(self)
class FedmsgSavepoint(object):
def __init__(self, dm):
self.dm = dm
self.saved_committed = copy.copy(self.dm.uncommitted)
def rollback(self):
self.dm.uncommitted = copy.copy(self.saved_committed)
@@ -22,14 +22,10 @@
import sys
from pyramid.paster import get_appsettings
from sqlalchemy import engine_from_config
from sqlalchemy.orm import scoped_session, sessionmaker
from zope.sqlalchemy import ZopeTransactionExtension
import transaction
from ..models import Update, UpdateStatus
from ..config import config
from bodhi.server import Session, initialize_db
def usage(argv):
@@ -62,9 +58,11 @@ def main(argv=sys.argv):
if len(argv) != 2:
usage(argv)
db = _get_db_session(argv[1])
settings = get_appsettings(argv[1])
initialize_db(settings)
db = Session()
with transaction.manager:
try:
testing = db.query(Update).filter_by(status=UpdateStatus.testing,
request=None)
for update in testing:
@@ -96,19 +94,9 @@ def main(argv=sys.argv):
config.get('testing_approval_msg') % update.release.mandatory_days_in_testing)
update.comment(db, text, author=u'bodhi')
def _get_db_session(config_uri):
"""
Construct and return a database session using settings from the given config_uri.
:param config_uri: A path to a config file to use to get the db settings.
:type config_uri: basestring
:return: A database session
"""
# There are many blocks of code like this in the codebase. We should consolidate them into a
# single utility function as described in https://github.com/fedora-infra/bodhi/issues/1028
settings = get_appsettings(config_uri)
engine = engine_from_config(settings, 'sqlalchemy.')
Session = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
Session.configure(bind=engine)
return Session()
db.commit()
except Exception as e:
print(str(e))
db.rollback()
Session.remove()
sys.exit(1)
Oops, something went wrong.

0 comments on commit e9a2604

Please sign in to comment.