diff --git a/openstack/common/db/sqlalchemy/session.py b/openstack/common/db/sqlalchemy/session.py index fb2f51f5c..5391a7fa7 100644 --- a/openstack/common/db/sqlalchemy/session.py +++ b/openstack/common/db/sqlalchemy/session.py @@ -601,18 +601,24 @@ def _thread_yield(dbapi_con, con_record): time.sleep(0) -def _ping_listener(dbapi_conn, connection_rec, connection_proxy): - """Ensures that MySQL connections checked out of the pool are alive. +def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy): + """Ensures that MySQL and DB2 connections are alive. Borrowed from: http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f """ + cursor = dbapi_conn.cursor() try: - dbapi_conn.cursor().execute('select 1') - except dbapi_conn.OperationalError as ex: - if ex.args[0] in (2006, 2013, 2014, 2045, 2055): - LOG.warning(_('Got mysql server has gone away: %s'), ex) - raise sqla_exc.DisconnectionError("Database server went away") + ping_sql = 'select 1' + if engine.name == 'ibm_db_sa': + # DB2 requires a table expression + ping_sql = 'select 1 from (values (1)) AS t1' + cursor.execute(ping_sql) + except Exception as ex: + if engine.dialect.is_disconnect(ex, dbapi_conn, cursor): + msg = _('Database server has gone away: %s') % ex + LOG.warning(msg) + raise sqla_exc.DisconnectionError(msg) else: raise @@ -670,8 +676,9 @@ def create_engine(sql_connection, sqlite_fk=False): sqlalchemy.event.listen(engine, 'checkin', _thread_yield) - if 'mysql' in connection_dict.drivername: - sqlalchemy.event.listen(engine, 'checkout', _ping_listener) + if engine.name in ['mysql', 'ibm_db_sa']: + callback = functools.partial(_ping_listener, engine) + sqlalchemy.event.listen(engine, 'checkout', callback) elif 'sqlite' in connection_dict.drivername: if not CONF.sqlite_synchronous: sqlalchemy.event.listen(engine, 'connect', diff --git a/tests/unit/db/sqlalchemy/test_sqlalchemy.py b/tests/unit/db/sqlalchemy/test_sqlalchemy.py index b29802fbc..4bbce0e76 100644 --- a/tests/unit/db/sqlalchemy/test_sqlalchemy.py +++ b/tests/unit/db/sqlalchemy/test_sqlalchemy.py @@ -17,6 +17,9 @@ """Unit tests for SQLAlchemy specific code.""" +import _mysql_exceptions +import mock +import sqlalchemy from sqlalchemy import Column, MetaData, Table, UniqueConstraint from sqlalchemy import DateTime, Integer, String from sqlalchemy.ext.declarative import declarative_base @@ -225,3 +228,88 @@ def test_no_slave_engine_match(self): def test_slave_backend_nomatch(self): session.CONF.database.slave_connection = "mysql:///localhost" self.assertRaises(AssertionError, session._assert_matching_drivers) + + +class FakeDBAPIConnection(): + def cursor(self): + return FakeCursor() + + +class FakeCursor(): + def execute(self, sql): + pass + + +class FakeConnectionProxy(): + pass + + +class FakeConnectionRec(): + pass + + +class OperationalError(Exception): + pass + + +class ProgrammingError(Exception): + pass + + +class FakeDB2Engine(object): + + class Dialect(): + + def is_disconnect(self, e, *args): + expected_error = ('SQL30081N: DB2 Server connection is no longer ' + 'active') + return (str(e) == expected_error) + + dialect = Dialect() + name = 'ibm_db_sa' + + +class TestDBDisconnected(test.BaseTestCase): + + def _test_ping_listener_disconnected(self, connection): + engine_args = { + 'pool_recycle': 3600, + 'echo': False, + 'convert_unicode': True} + + engine = sqlalchemy.create_engine(connection, **engine_args) + + self.assertRaises(sqlalchemy.exc.DisconnectionError, + session._ping_listener, engine, + FakeDBAPIConnection(), FakeConnectionRec(), + FakeConnectionProxy()) + + def test_mysql_ping_listener_disconnected(self): + def fake_execute(sql): + raise _mysql_exceptions.OperationalError(self.mysql_error, + ('MySQL server has ' + 'gone away')) + with mock.patch.object(FakeCursor, 'execute', + side_effect=fake_execute): + connection = 'mysql://root:password@fakehost/fakedb?charset=utf8' + for code in [2006, 2013, 2014, 2045, 2055]: + self.mysql_error = code + self._test_ping_listener_disconnected(connection) + + def test_db2_ping_listener_disconnected(self): + + def fake_execute(sql): + raise OperationalError('SQL30081N: DB2 Server ' + 'connection is no longer active') + with mock.patch.object(FakeCursor, 'execute', + side_effect=fake_execute): + # TODO(dperaza): Need a fake engine for db2 since ibm_db_sa is not + # in global requirements. Change this code to use real IBM db2 + # engine as soon as ibm_db_sa is included in global-requirements + # under openstack/requirements project. + fake_create_engine = lambda *args, **kargs: FakeDB2Engine() + with mock.patch.object(sqlalchemy, 'create_engine', + side_effect=fake_create_engine): + connection = ('ibm_db_sa://db2inst1:openstack@fakehost:50000' + '/fakedab') + self._test_ping_listener_disconnected(connection)