Skip to content
This repository has been archived by the owner on Jun 26, 2020. It is now read-only.

Commit

Permalink
Enables db2 server disconnects to be handled pessimistically
Browse files Browse the repository at this point in the history
A checkout listener can be added to sqlalchemy to be able to
handle database disconnect pessimistically and avoid an error in
the event of a DB restart or simply a temp connection issue due
to network glitches. At the moment only mysql is enabled with
this feature. This patch enables db2 as well.

Partial-Bug: #1231657
Change-Id: I5563360b43c4b648f5b035099b0e1d2e71d32503
  • Loading branch information
David Peraza committed Dec 5, 2013
1 parent d2b62b4 commit bb4d7a2
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 9 deletions.
25 changes: 16 additions & 9 deletions openstack/common/db/sqlalchemy/session.py
Expand Up @@ -601,18 +601,24 @@ def _thread_yield(dbapi_con, con_record):
time.sleep(0)


def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
"""Ensures that MySQL connections checked out of the pool are alive.
def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
"""Ensures that MySQL and DB2 connections are alive.
Borrowed from:
http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
"""
cursor = dbapi_conn.cursor()
try:
dbapi_conn.cursor().execute('select 1')
except dbapi_conn.OperationalError as ex:
if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
LOG.warning(_('Got mysql server has gone away: %s'), ex)
raise sqla_exc.DisconnectionError("Database server went away")
ping_sql = 'select 1'
if engine.name == 'ibm_db_sa':
# DB2 requires a table expression
ping_sql = 'select 1 from (values (1)) AS t1'
cursor.execute(ping_sql)
except Exception as ex:
if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
msg = _('Database server has gone away: %s') % ex
LOG.warning(msg)
raise sqla_exc.DisconnectionError(msg)
else:
raise

Expand Down Expand Up @@ -670,8 +676,9 @@ def create_engine(sql_connection, sqlite_fk=False):

sqlalchemy.event.listen(engine, 'checkin', _thread_yield)

if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(engine, 'checkout', _ping_listener)
if engine.name in ['mysql', 'ibm_db_sa']:
callback = functools.partial(_ping_listener, engine)
sqlalchemy.event.listen(engine, 'checkout', callback)
elif 'sqlite' in connection_dict.drivername:
if not CONF.sqlite_synchronous:
sqlalchemy.event.listen(engine, 'connect',
Expand Down
88 changes: 88 additions & 0 deletions tests/unit/db/sqlalchemy/test_sqlalchemy.py
Expand Up @@ -17,6 +17,9 @@

"""Unit tests for SQLAlchemy specific code."""

import _mysql_exceptions
import mock
import sqlalchemy
from sqlalchemy import Column, MetaData, Table, UniqueConstraint
from sqlalchemy import DateTime, Integer, String
from sqlalchemy.ext.declarative import declarative_base
Expand Down Expand Up @@ -225,3 +228,88 @@ def test_no_slave_engine_match(self):
def test_slave_backend_nomatch(self):
session.CONF.database.slave_connection = "mysql:///localhost"
self.assertRaises(AssertionError, session._assert_matching_drivers)


class FakeDBAPIConnection():
def cursor(self):
return FakeCursor()


class FakeCursor():
def execute(self, sql):
pass


class FakeConnectionProxy():
pass


class FakeConnectionRec():
pass


class OperationalError(Exception):
pass


class ProgrammingError(Exception):
pass


class FakeDB2Engine(object):

class Dialect():

def is_disconnect(self, e, *args):
expected_error = ('SQL30081N: DB2 Server connection is no longer '
'active')
return (str(e) == expected_error)

dialect = Dialect()
name = 'ibm_db_sa'


class TestDBDisconnected(test.BaseTestCase):

def _test_ping_listener_disconnected(self, connection):
engine_args = {
'pool_recycle': 3600,
'echo': False,
'convert_unicode': True}

engine = sqlalchemy.create_engine(connection, **engine_args)

self.assertRaises(sqlalchemy.exc.DisconnectionError,
session._ping_listener, engine,
FakeDBAPIConnection(), FakeConnectionRec(),
FakeConnectionProxy())

def test_mysql_ping_listener_disconnected(self):
def fake_execute(sql):
raise _mysql_exceptions.OperationalError(self.mysql_error,
('MySQL server has '
'gone away'))
with mock.patch.object(FakeCursor, 'execute',
side_effect=fake_execute):
connection = 'mysql://root:password@fakehost/fakedb?charset=utf8'
for code in [2006, 2013, 2014, 2045, 2055]:
self.mysql_error = code
self._test_ping_listener_disconnected(connection)

def test_db2_ping_listener_disconnected(self):

def fake_execute(sql):
raise OperationalError('SQL30081N: DB2 Server '
'connection is no longer active')
with mock.patch.object(FakeCursor, 'execute',
side_effect=fake_execute):
# TODO(dperaza): Need a fake engine for db2 since ibm_db_sa is not
# in global requirements. Change this code to use real IBM db2
# engine as soon as ibm_db_sa is included in global-requirements
# under openstack/requirements project.
fake_create_engine = lambda *args, **kargs: FakeDB2Engine()
with mock.patch.object(sqlalchemy, 'create_engine',
side_effect=fake_create_engine):
connection = ('ibm_db_sa://db2inst1:openstack@fakehost:50000'
'/fakedab')
self._test_ping_listener_disconnected(connection)

0 comments on commit bb4d7a2

Please sign in to comment.