Skip to content

Commit

Permalink
db_pool: proxy Connection.set_isolation_level()
Browse files Browse the repository at this point in the history
  • Loading branch information
temoto committed Feb 25, 2017
1 parent 4e775e2 commit 885482b
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 144 deletions.
139 changes: 47 additions & 92 deletions eventlet/db_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,98 +327,53 @@ def __exit__(self, exc, value, tb):
def __repr__(self):
return self._base.__repr__()

def affected_rows(self):
return self._base.affected_rows()

def autocommit(self, *args, **kwargs):
return self._base.autocommit(*args, **kwargs)

def begin(self):
return self._base.begin()

def change_user(self, *args, **kwargs):
return self._base.change_user(*args, **kwargs)

def character_set_name(self, *args, **kwargs):
return self._base.character_set_name(*args, **kwargs)

def close(self, *args, **kwargs):
return self._base.close(*args, **kwargs)

def commit(self, *args, **kwargs):
return self._base.commit(*args, **kwargs)

def cursor(self, *args, **kwargs):
return self._base.cursor(*args, **kwargs)

def dump_debug_info(self, *args, **kwargs):
return self._base.dump_debug_info(*args, **kwargs)

def errno(self, *args, **kwargs):
return self._base.errno(*args, **kwargs)

def error(self, *args, **kwargs):
return self._base.error(*args, **kwargs)

def errorhandler(self, *args, **kwargs):
return self._base.errorhandler(*args, **kwargs)

def insert_id(self, *args, **kwargs):
return self._base.insert_id(*args, **kwargs)

def literal(self, *args, **kwargs):
return self._base.literal(*args, **kwargs)

def set_character_set(self, *args, **kwargs):
return self._base.set_character_set(*args, **kwargs)

def set_sql_mode(self, *args, **kwargs):
return self._base.set_sql_mode(*args, **kwargs)

def show_warnings(self):
return self._base.show_warnings()

def warning_count(self):
return self._base.warning_count()

def ping(self, *args, **kwargs):
return self._base.ping(*args, **kwargs)

def query(self, *args, **kwargs):
return self._base.query(*args, **kwargs)

def rollback(self, *args, **kwargs):
return self._base.rollback(*args, **kwargs)

def select_db(self, *args, **kwargs):
return self._base.select_db(*args, **kwargs)

def set_server_option(self, *args, **kwargs):
return self._base.set_server_option(*args, **kwargs)

def server_capabilities(self, *args, **kwargs):
return self._base.server_capabilities(*args, **kwargs)

def shutdown(self, *args, **kwargs):
return self._base.shutdown(*args, **kwargs)

def sqlstate(self, *args, **kwargs):
return self._base.sqlstate(*args, **kwargs)

def stat(self, *args, **kwargs):
return self._base.stat(*args, **kwargs)

def store_result(self, *args, **kwargs):
return self._base.store_result(*args, **kwargs)

def string_literal(self, *args, **kwargs):
return self._base.string_literal(*args, **kwargs)

def thread_id(self, *args, **kwargs):
return self._base.thread_id(*args, **kwargs)

def use_result(self, *args, **kwargs):
return self._base.use_result(*args, **kwargs)
_proxy_funcs = (
'affected_rows',
'autocommit',
'begin',
'change_user',
'character_set_name',
'close',
'commit',
'cursor',
'dump_debug_info',
'errno',
'error',
'errorhandler',
'insert_id',
'literal',
'ping',
'query',
'rollback',
'select_db',
'server_capabilities',
'set_character_set',
'set_isolation_level',
'set_server_option',
'set_sql_mode',
'show_warnings',
'shutdown',
'sqlstate',
'stat',
'store_result',
'string_literal',
'thread_id',
'use_result',
'warning_count',
)
for _proxy_fun in GenericConnectionWrapper._proxy_funcs:
# excess wrapper for early binding (closure by value)
def _wrapper(_proxy_fun=_proxy_fun):
def _proxy_method(self, *args, **kwargs):
return getattr(self._base, _proxy_fun)(*args, **kwargs)
_proxy_method.func_name = _proxy_fun
_proxy_method.__name__ = _proxy_fun
_proxy_method.__qualname__ = 'GenericConnectionWrapper.' + _proxy_fun
return _proxy_method
setattr(GenericConnectionWrapper, _proxy_fun, _wrapper(_proxy_fun))
del GenericConnectionWrapper._proxy_funcs
del _proxy_fun
del _wrapper


class PooledConnectionWrapper(GenericConnectionWrapper):
Expand Down
105 changes: 53 additions & 52 deletions tests/db_pool_test.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
'''Test cases for db_pool
'''
from __future__ import print_function

import sys
import os
import sys
import traceback
from unittest import TestCase, main

from tests import mock, skip_unless, skip_with_pyevent, get_database_auth
from eventlet import event
from eventlet import db_pool
from eventlet.support import six
import eventlet
import eventlet.tpool
import tests
import tests.mock

psycopg2 = None
try:
import psycopg2
import psycopg2.extensions
except ImportError:
pass

MySQLdb = None
try:
import MySQLdb
except ImportError:
pass


class DBTester(object):
Expand Down Expand Up @@ -141,7 +151,7 @@ def test_returns_immediately(self):
curs = conn.cursor()
results = []
SHORT_QUERY = "select * from test_table"
evt = event.Event()
evt = eventlet.Event()

def a_query():
self.assert_cursor_works(curs)
Expand Down Expand Up @@ -282,7 +292,7 @@ def test_waiters_get_woken(self):
self.connection = self.pool.get()
self.assertEqual(self.pool.free(), 0)
self.assertEqual(self.pool.waiting(), 0)
e = event.Event()
e = eventlet.Event()

def retrieve(pool, ev):
c = pool.get()
Expand Down Expand Up @@ -337,14 +347,13 @@ def create_pool(self, min_size=0, max_size=1, max_idle=10, max_age=10,
connect_timeout=connect_timeout,
**self._auth)

@skip_with_pyevent
@tests.skip_with_pyevent
def setUp(self):
super(TpoolConnectionPool, self).setUp()

def tearDown(self):
super(TpoolConnectionPool, self).tearDown()
from eventlet import tpool
tpool.killall()
eventlet.tpool.killall()


class RawConnectionPool(DBConnectionPool):
Expand Down Expand Up @@ -373,7 +382,7 @@ def test_raw_pool_issue_125():


def test_raw_pool_custom_cleanup_ok():
cleanup_mock = mock.Mock()
cleanup_mock = tests.mock.Mock()
pool = db_pool.RawConnectionPool(DummyDBModule(), cleanup=cleanup_mock)
conn = pool.get()
pool.put(conn)
Expand All @@ -385,7 +394,7 @@ def test_raw_pool_custom_cleanup_ok():


def test_raw_pool_custom_cleanup_arg_error():
cleanup_mock = mock.Mock(side_effect=NotImplementedError)
cleanup_mock = tests.mock.Mock(side_effect=NotImplementedError)
pool = db_pool.RawConnectionPool(DummyDBModule())
conn = pool.get()
pool.put(conn, cleanup=cleanup_mock)
Expand Down Expand Up @@ -427,27 +436,23 @@ def test_raw_pool_clear_update_current_size():
assert len(pool.free_items) == 0


get_auth = get_database_auth


def mysql_requirement(_f):
verbose = os.environ.get('eventlet_test_mysql_verbose')
try:
import MySQLdb
try:
auth = get_auth()['MySQLdb'].copy()
MySQLdb.connect(**auth)
return True
except MySQLdb.OperationalError:
if verbose:
print(">> Skipping mysql tests, error when connecting:", file=sys.stderr)
traceback.print_exc()
return False
except ImportError:
if MySQLdb is None:
if verbose:
print(">> Skipping mysql tests, MySQLdb not importable", file=sys.stderr)
return False

try:
auth = tests.get_database_auth()['MySQLdb'].copy()
MySQLdb.connect(**auth)
return True
except MySQLdb.OperationalError:
if verbose:
print(">> Skipping mysql tests, error when connecting:", file=sys.stderr)
traceback.print_exc()
return False


class MysqlConnectionPool(object):
dummy_table_sql = """CREATE TEMPORARY TABLE test_table
Expand All @@ -463,11 +468,10 @@ class MysqlConnectionPool(object):
created TIMESTAMP
) ENGINE=InnoDB;"""

@skip_unless(mysql_requirement)
@tests.skip_unless(mysql_requirement)
def setUp(self):
import MySQLdb
self._dbmodule = MySQLdb
self._auth = get_auth()['MySQLdb']
self._auth = tests.get_database_auth()['MySQLdb']
super(MysqlConnectionPool, self).setUp()

def tearDown(self):
Expand All @@ -493,28 +497,27 @@ def drop_db(self):
del db


class Test01MysqlTpool(MysqlConnectionPool, TpoolConnectionPool, TestCase):
class Test01MysqlTpool(MysqlConnectionPool, TpoolConnectionPool, tests.LimitedTestCase):
__test__ = True


class Test02MysqlRaw(MysqlConnectionPool, RawConnectionPool, TestCase):
class Test02MysqlRaw(MysqlConnectionPool, RawConnectionPool, tests.LimitedTestCase):
__test__ = True


def postgres_requirement(_f):
try:
import psycopg2
try:
auth = get_auth()['psycopg2'].copy()
psycopg2.connect(**auth)
return True
except psycopg2.OperationalError:
print("Skipping postgres tests, error when connecting")
return False
except ImportError:
if psycopg2 is None:
print("Skipping postgres tests, psycopg2 not importable")
return False

try:
auth = tests.get_database_auth()['psycopg2'].copy()
psycopg2.connect(**auth)
return True
except psycopg2.OperationalError:
print("Skipping postgres tests, error when connecting")
return False


class Psycopg2ConnectionPool(object):
dummy_table_sql = """CREATE TEMPORARY TABLE test_table
Expand All @@ -529,11 +532,10 @@ class Psycopg2ConnectionPool(object):
created TIMESTAMP
);"""

@skip_unless(postgres_requirement)
@tests.skip_unless(postgres_requirement)
def setUp(self):
import psycopg2
self._dbmodule = psycopg2
self._auth = get_auth()['psycopg2']
self._auth = tests.get_database_auth()['psycopg2']
super(Psycopg2ConnectionPool, self).setUp()

def tearDown(self):
Expand Down Expand Up @@ -566,7 +568,7 @@ def drop_db(self):
conn.close()


class TestPsycopg2Base(TestCase):
class TestPsycopg2Base(tests.LimitedTestCase):
__test__ = False

def test_cursor_works_as_context_manager(self):
Expand All @@ -575,14 +577,13 @@ def test_cursor_works_as_context_manager(self):
row = c.fetchone()
assert row == (1,)

def test_set_isolation_level(self):
self.connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)


class Test01Psycopg2Tpool(Psycopg2ConnectionPool, TpoolConnectionPool, TestPsycopg2Base):
__test__ = True


class Test02Psycopg2Raw(Psycopg2ConnectionPool, RawConnectionPool, TestPsycopg2Base):
__test__ = True


if __name__ == '__main__':
main()

0 comments on commit 885482b

Please sign in to comment.