Skip to content

Commit

Permalink
Merge pull request #115 from mikebonnet/nested-txns
Browse files Browse the repository at this point in the history
use nested transactions for adding Package and User objects
  • Loading branch information
cverna committed Apr 12, 2019
2 parents c153ef8 + 65fb9c7 commit 4a20e21
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .travis-runtests.sh
@@ -1,6 +1,6 @@
#!/bin/bash -e

echo "Installing all packages in development mode"
echo "Running tests for all packages"
for package in datanommer.{models,consumer,commands}; do
echo "[$package] Testing..."
pushd $package
Expand Down
42 changes: 30 additions & 12 deletions datanommer.models/datanommer/models/__init__.py
Expand Up @@ -13,7 +13,7 @@
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
from sqlalchemy import create_engine
from sqlalchemy import create_engine, event
from sqlalchemy import (
Column,
DateTime,
Expand Down Expand Up @@ -75,6 +75,20 @@ def init(uri=None, alembic_ini=None, engine=None, create=False):
if uri and not engine:
engine = create_engine(uri)

if 'sqlite' in engine.driver:
# Enable nested transaction support under SQLite
# See https://stackoverflow.com/questions/1654857/nested-transactions-with-sqlalchemy-and-sqlite
@event.listens_for(engine, "connect")
def do_connect(dbapi_connection, connection_record):
# disable pysqlite's emitting of the BEGIN statement entirely.
# also stops it from emitting COMMIT before any DDL.
dbapi_connection.isolation_level = None

@event.listens_for(engine, "begin")
def do_begin(conn):
# emit our own BEGIN
conn.execute("BEGIN")

# We need to hang our own attribute on the sqlalchemy session to stop
# ourselves from initializing twice. That is only a problem is the code
# calling us isn't consistent.
Expand Down Expand Up @@ -136,8 +150,8 @@ def add(envelope):
session.add(obj)
session.flush()
except IntegrityError:
log.warn('Skipping message from %s with duplicate id: %s',
message['topic'], msg_id)
log.warning('Skipping message from %s with duplicate id: %s',
message['topic'], msg_id)
session.rollback()
return

Expand Down Expand Up @@ -283,15 +297,19 @@ def get_or_create(cls, name):
Return the instance of the class with the specified name. If it doesn't
already exist, create it.
"""
# Use an INSERT ... SELECT to guarantee we don't get unique constraint
# violations if multiple instances of datanommer are trying to insert the same
# value at the same time.
not_exists = ~exists(select([cls.__table__.c.name]).where(cls.name == name))
insert = cls.__table__.insert(inline=True).\
from_select([cls.__table__.c.name],
select([literal(name)]).where(not_exists))
session.execute(insert)
return cls.query.filter_by(name=name).one()
obj = cls.query.filter_by(name=name).one_or_none()
if obj:
return obj
try:
with session.begin_nested():
obj = cls(name=name)
session.add(obj)
session.flush()
return obj
except IntegrityError:
log.debug('Collision when adding %s(name="%s"), returning existing object',
cls.__name__, name)
return cls.query.filter_by(name=name).one()


class User(DeclarativeBase, Singleton):
Expand Down
6 changes: 2 additions & 4 deletions datanommer.models/setup.py
Expand Up @@ -14,10 +14,7 @@
# You should have received a copy of the GNU General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
from setuptools import setup, find_packages
import sys

import multiprocessing
import logging

f = open('README.rst')
long_description = f.read().strip()
Expand All @@ -39,14 +36,15 @@
include_package_data=True,
zip_safe=False,
install_requires=[
"sqlalchemy>=0.7",
"sqlalchemy>=1.0.9",
"fedmsg",
"alembic",
"six",
],
tests_require=[
"nose",
"fedmsg_meta_fedora_infrastructure",
"mock",
],
classifiers=[
'Programming Language :: Python :: 2',
Expand Down
51 changes: 31 additions & 20 deletions datanommer.models/tests/test_model.py
Expand Up @@ -23,13 +23,14 @@

from sqlalchemy.orm import scoped_session

from mock import patch
from nose.tools import raises
from nose.tools import eq_

import datanommer.models
import six

# Set this to false to use faitout
# Set this to false to use a local postgres instance
USE_SQLITE = True # False

filename = ":memory:"
Expand Down Expand Up @@ -190,36 +191,25 @@ def setUpClass(cls):
if USE_SQLITE:
fname = "sqlite:///%s" % filename
else:
response = requests.get('http://faitout.fedorainfracloud.org/new',
headers=dict(accept='application/json'))
details = response.json()
fname = "postgres://{username}:{password}@{host}:{port}/{dbname}"\
.format(**details)
cls.dbname = details['dbname']
# Test against a local postgresql instance.
# You'll need to have the "psycopg2" module available in your environment.
fname = "postgresql://datanommer:datanommer@localhost/datanommer"

config['datanommer.sqlalchemy.url'] = cls.fname = fname
fedmsg.meta.make_processors(**config)

@classmethod
def tearDownClass(cls):
if not USE_SQLITE:
requests.get('http://faitout.fedorainfracloud.org/drop/{dbname}'\
.format(dbname=cls.dbname))

def setUp(self):
if not USE_SQLITE:
response = requests.get('http://faitout.fedorainfracloud.org/clean/{dbname}'.\
format(dbname=self.dbname))
# We only have to do this so that we can do it over
# and over again for each test.
datanommer.models.session = scoped_session(datanommer.models.maker)
datanommer.models.init(self.fname, create=True)


def tearDown(self):
if USE_SQLITE:
engine = datanommer.models.session.get_bind()
datanommer.models.DeclarativeBase.metadata.drop_all(engine)
datanommer.models.session.rollback()
engine = datanommer.models.session.get_bind()
datanommer.models.DeclarativeBase.metadata.drop_all(engine)
datanommer.models.session.close()

# These contain objects bound to the old session, so we have to flush.
Expand Down Expand Up @@ -288,12 +278,18 @@ def track(conn, cursor, statement, param, ctx, many):

# Add it to the db and check how many queries we made
datanommer.models.add(msg)
eq_(len(statements), 7)
if 'sqlite' in datanommer.models.session.get_bind().driver:
eq_(len(statements), 12)
else:
eq_(len(statements), 11)

# Add it again and check again
datanommer.models.add(msg)
pprint.pprint(statements)
eq_(len(statements), 10)
if 'sqlite' in datanommer.models.session.get_bind().driver:
eq_(len(statements), 16)
else:
eq_(len(statements), 14)

def test_add_missing_cert(self):
msg = copy.deepcopy(scm_message)
Expand Down Expand Up @@ -440,3 +436,18 @@ def test_Package_get_or_create(self):
eq_(datanommer.models.Package.query.count(), 1)
datanommer.models.Package.get_or_create(six.u('foo'))
eq_(datanommer.models.Package.query.count(), 1)

@patch('datanommer.models.log')
@patch('sqlalchemy.orm.query.Query.filter_by')
def test_singleton_nested_txns(self, filter_by, log):
# Hide existing instances from get_or_create(), which forces a duplicate insert
# and constraint violation.
filter_by.return_value.one_or_none.return_value = None
datanommer.models.Package.get_or_create(six.u('foo'))
datanommer.models.Package.get_or_create(six.u('foo'))
eq_(datanommer.models.Package.query.count(), 1)
log.debug.assert_called_once_with(
'Collision when adding %s(name="%s"), returning existing object',
'Package',
'foo'
)

0 comments on commit 4a20e21

Please sign in to comment.