diff --git a/.travis.yml b/.travis.yml index fc52f6e..049f16a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,7 +15,7 @@ env: - MOMOKO_PSYCOPG2_IMPL=psycopg2 - MOMOKO_PSYCOPG2_IMPL=psycopg2cffi -install: "pip install --use-mirrors tornado ${MOMOKO_PSYCOPG2_IMPL}" +install: "pip install --use-mirrors tornado ${MOMOKO_PSYCOPG2_IMPL} unittest2" script: python setup.py test matrix: diff --git a/examples/gen_example.py b/examples/gen_example.py index 0ff7dfb..8f9da69 100644 --- a/examples/gen_example.py +++ b/examples/gen_example.py @@ -6,6 +6,8 @@ .. _gen: http://www.tornadoweb.org/documentation/gen.html """ +from __future__ import print_function + import os import tornado.web @@ -23,6 +25,7 @@ db_host = os.environ.get('MOMOKO_TEST_HOST', '') db_port = os.environ.get('MOMOKO_TEST_PORT', 5432) enable_hstore = True if os.environ.get('MOMOKO_TEST_HSTORE', False) == '1' else False +enable_json = True if os.environ.get('MOMOKO_TEST_JSON', False) == '1' else False dsn = 'dbname=%s user=%s password=%s host=%s port=%s' % ( db_database, db_user, db_password, db_host, db_port) @@ -40,16 +43,17 @@ def db(self): class OverviewHandler(BaseHandler): def get(self): - self.write(''' + self.write(""" - ''') + """) self.finish() @@ -57,8 +61,8 @@ class MogrifyHandler(BaseHandler): @gen.coroutine def get(self): try: - sql = yield momoko.Op(self.db.mogrify, 'SELECT %s;', (1,)) - self.write('SQL: %s
' % sql) + sql = yield self.db.mogrify("SELECT %s;", (1,)) + self.write("SQL: %s
" % sql) except Exception as error: self.write(str(error)) @@ -69,8 +73,8 @@ class SingleQueryHandler(BaseHandler): @gen.coroutine def get(self): try: - cursor = yield momoko.Op(self.db.execute, 'SELECT pg_sleep(%s);', (1,)) - self.write('Query results: %s
\n' % cursor.fetchall()) + cursor = yield self.db.execute("SELECT pg_sleep(%s);", (1,)) + self.write("Query results: %s
\n" % cursor.fetchall()) except Exception as error: self.write(str(error)) @@ -82,15 +86,29 @@ class HstoreQueryHandler(BaseHandler): def get(self): if enable_hstore: try: - cursor = yield momoko.Op(self.db.execute, "SELECT 'a=>b, c=>d'::hstore;") - self.write('Query results: %s
' % cursor.fetchall()) - cursor = yield momoko.Op(self.db.execute, "SELECT %s;", - ({'e': 'f', 'g': 'h'},)) - self.write('Query results: %s
' % cursor.fetchall()) + cursor = yield self.db.execute("SELECT 'a=>b, c=>d'::hstore;") + self.write("Query results: %s
" % cursor.fetchall()) + cursor = yield self.db.execute("SELECT %s;", ({"e": "f", "g": "h"},)) + self.write("Query results: %s
" % cursor.fetchall()) + except Exception as error: + self.write(str(error)) + else: + self.write("hstore is not enabled") + + self.finish() + + +class JsonQueryHandler(BaseHandler): + @gen.coroutine + def get(self): + if enable_json: + try: + cursor = yield self.db.execute('SELECT \'{"a": "b", "c": "d"}\'::json;') + self.write("Query results: %s
" % cursor.fetchall()) except Exception as error: self.write(str(error)) else: - self.write('hstore is not enabled') + self.write("json is not enabled") self.finish() @@ -99,14 +117,14 @@ class MultiQueryHandler(BaseHandler): @gen.coroutine def get(self): cursor1, cursor2, cursor3 = yield [ - momoko.Op(self.db.execute, 'SELECT 1;'), - momoko.Op(self.db.mogrify, 'SELECT 2;'), - momoko.Op(self.db.execute, 'SELECT %s;', (3*1,)) + self.db.execute("SELECT 1;"), + self.db.mogrify("SELECT 2;"), + self.db.execute("SELECT %s;", (3*1,)) ] - self.write('Query 1 results: %s
' % cursor1.fetchall()) - self.write('Query 2 results: %s
' % cursor2) - self.write('Query 3 results: %s' % cursor3.fetchall()) + self.write("Query 1 results: %s
" % cursor1.fetchall()) + self.write("Query 2 results: %s
" % cursor2) + self.write("Query 3 results: %s" % cursor3.fetchall()) self.finish() @@ -115,49 +133,23 @@ class TransactionHandler(BaseHandler): @gen.coroutine def get(self): try: - cursors = yield momoko.Op(self.db.transaction, ( - 'SELECT 1, 12, 22, 11;', - 'SELECT 55, 22, 78, 13;', - 'SELECT 34, 13, 12, 34;', - 'SELECT 23, 12, 22, 23;', - 'SELECT 42, 23, 22, 11;', - ('SELECT 49, %s, 23, 11;', ('STR',)), + cursors = yield self.db.transaction(( + "SELECT 1, 12, 22, 11;", + "SELECT 55, 22, 78, 13;", + "SELECT 34, 13, 12, 34;", + "SELECT 23, 12, 22, 23;", + "SELECT 42, 23, 22, 11;", + ("SELECT 49, %s, 23, 11;", ("STR",)), )) for i, cursor in enumerate(cursors): - self.write('Query %s results: %s
' % (i, cursor.fetchall())) + self.write("Query %s results: %s
" % (i, cursor.fetchall())) except Exception as error: self.write(str(error)) self.finish() -class CallbackWaitHandler(BaseHandler): - @gen.coroutine - def get(self): - - self.db.execute('SELECT 42, 12, %s, 11;', (25,), - callback=(yield gen.Callback('q1'))) - self.db.execute('SELECT 42, 12, %s, %s;', (23, 56), - callback=(yield gen.Callback('q2'))) - self.db.execute('SELECT 465767, 4567, 3454;', - callback=(yield gen.Callback('q3'))) - - # Separately... - # cursor1 = yield momoko.WaitOp('q1') - # cursor2 = yield momoko.WaitOp('q2') - # cursor3 = yield momoko.WaitOp('q3') - - # Or all at once - cursor1, cursor2, cursor3 = yield momoko.WaitAllOps(('q1', 'q2', 'q3')) - - self.write('Query 1 results: %s
' % cursor1.fetchall()) - self.write('Query 2 results: %s
' % cursor2.fetchall()) - self.write('Query 3 results: %s' % cursor3.fetchall()) - - self.finish() - - class ConnectionQueryHandler(BaseHandler): def __init__(self, *args, **kwargs): self.http_connection_closed = False @@ -166,12 +158,12 @@ def __init__(self, *args, **kwargs): @gen.coroutine def get(self): try: - connection = yield momoko.Op(self.db.getconn) + connection = yield self.db.getconn() with self.db.manage(connection): for i in range(5): if self.http_connection_closed: break - cursor = yield momoko.Op(connection.execute, 'SELECT pg_sleep(1);') + cursor = yield connection.execute("SELECT pg_sleep(1);") self.write('Query %d results: %s
\n' % (i+1, cursor.fetchall())) self.flush() except Exception as error: @@ -191,26 +183,41 @@ def main(): (r'/mogrify', MogrifyHandler), (r'/query', SingleQueryHandler), (r'/hstore', HstoreQueryHandler), + (r'/json', JsonQueryHandler), (r'/transaction', TransactionHandler), (r'/multi_query', MultiQueryHandler), - (r'/callback_and_wait', CallbackWaitHandler), (r'/connection', ConnectionQueryHandler), ], debug=True) + ioloop = tornado.ioloop.IOLoop.instance() + application.db = momoko.Pool( dsn=dsn, size=1, max_size=3, + ioloop=ioloop, setsession=("SET TIME ZONE UTC",), raise_connect_errors=False, ) + # this is a one way to run ioloop in sync + future = application.db.connect() + ioloop.add_future(future, lambda f: ioloop.stop()) + ioloop.start() + if enable_hstore: - application.db.register_hstore() + future = application.db.register_hstore() + # This is the other way to run ioloop in sync + ioloop.run_sync(lambda: future) + + if enable_json: + future = application.db.register_json() + # This is the other way to run ioloop in sync + ioloop.run_sync(lambda: future) http_server = tornado.httpserver.HTTPServer(application) http_server.listen(8888, 'localhost') - tornado.ioloop.IOLoop.instance().start() + ioloop.start() except KeyboardInterrupt: print('Exit') diff --git a/momoko/__init__.py b/momoko/__init__.py index 196d596..bf79231 100644 --- a/momoko/__init__.py +++ b/momoko/__init__.py @@ -10,11 +10,9 @@ """ import psycopg2 -from psycopg2 import ProgrammingError -from .connection import Pool, Connection -from .exceptions import PoolError -from .utils import Op, WaitOp, WaitAllOps +from .connection import Pool, Connection, connect +from .exceptions import PoolError, PartiallyConnectedError try: diff --git a/momoko/connection.py b/momoko/connection.py index 83c8145..fc7fc1b 100644 --- a/momoko/connection.py +++ b/momoko/connection.py @@ -9,10 +9,13 @@ MIT, see LICENSE for more details. """ +from __future__ import print_function + import sys if sys.version_info[0] >= 3: basestring = str +import logging from functools import partial from collections import deque import datetime @@ -22,161 +25,107 @@ import psycopg2 from psycopg2.extras import register_hstore as _psy_register_hstore from psycopg2.extras import register_json as _psy_register_json -from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE, POLL_ERROR, TRANSACTION_STATUS_IDLE +from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE, POLL_ERROR from tornado import gen from tornado.ioloop import IOLoop -from tornado.stack_context import wrap -from tornado.concurrent import Future - -from .exceptions import PoolError - -from .utils import log +from tornado.concurrent import chain_future, Future +from .exceptions import PoolError, PartiallyConnectedError -# The dummy callback is used to keep the asynchronous cursor alive in case no -# callback has been specified. This will prevent the cursor from being garbage -# collected once, for example, ``Pool.execute`` has finished. -# Symptom: you'll get -# InterfaceError: the asynchronous cursor has disappeared -# exceptions -def _dummy_callback(cursor, error): - pass +log = logging.getLogger('momoko') -class Pool(object): +class ConnectionContainer(object): """ - Asynchronous connection pool. - - The pool manages database connections and passes operations to connections. - - See :py:class:`momoko.Connection` for documentation about the ``dsn``, - ``connection_factory`` and ``cursor_factory`` parameters. - These are used by the connection pool when a new connection is created. - - :param integer size: Amount of connections created upon initialization. Defaults to ``1``. - :param integer max_size: Allow number of connection to grow under load up to given size. Defaults to ``size``. - :param callable callback: - A callable that's called after all the connections are created. Defaults to ``None``. - :param ioloop: An instance of Tornado's IOLoop. Defaults to ``None``, ``IOLoop.instance()`` will be used. - :param bool raise_connect_errors: - Whether to raise exception if database connection fails. Set to ``False`` to enable - automatic reconnection attempts. Defaults to ``True``. - :param integer reconnect_interval: - When using automatic reconnects, set minimum reconnect interval, in milliseconds, - before retrying connection attempt. Don't set this value too low to prevent "banging" - the database server with connection attempts. Defaults to ``500``. - :param list setsession: - List of intial sql commands to be executed once connection is established. - If any of the commands failes, the connection will be closed. - **NOTE:** The commands will be executed as one transaction block. + Helper class that stores connecttions according to their state """ + def __init__(self): + self.empty() + + def empty(self): + self.free = set() + self.busy = set() + self.dead = set() + self.pending = set() + self.waiting_queue = deque() + + def add_free(self, conn): + self.pending.discard(conn) + log.debug("Handling free connection %s", conn.fileno) + + if not self.waiting_queue: + log.debug("No outstanding requests - adding to free pool") + self.free.add(conn) + return - class Connections(object): - def __init__(self, reconnect_interval, ioloop): - self.ioloop = ioloop - - self.reconnect_interval = reconnect_interval - self.last_connect_attempt_ts = ioloop.time() - self.last_connect_attempt_success = False - self.reconnect_in_progress = False - - self.empty() - - def empty(self): - self.free = set() - self.busy = set() - self.dead = set() - self.pending = set() - self.waiting_queue = deque() - - def remove_pending(func): - @wraps(func) - def wrapper(self, conn): - self.pending.discard(conn) - func(self, conn) - return wrapper - - def get_free(self): - if not self.free: - return + log.debug("There are outstanding requests - resumed future from waiting queue") + self.busy.add(conn) + future = self.waiting_queue.pop() + future.set_result(conn) + + def add_dead(self, conn): + log.debug("Adding dead connection") + self.pending.discard(conn) + self.dead.add(conn) + + def acquire(self): + """Occupy free connection""" + future = Future() + if self.free: conn = self.free.pop() self.busy.add(conn) - return conn + future.set_result(conn) + log.debug("Acquired free connection %s", conn.fileno) + return future + elif self.busy: + log.debug("No free connections, and some are busy - put in waiting queue") + self.waiting_queue.appendleft(future) + return future + else: + log.debug("All connections are dead") + return None + + def release(self, conn): + log.debug("About to release connection %s", conn.fileno) + assert conn in self.busy, "Tried to release non-busy connection" + self.busy.remove(conn) + if conn.closed: + self.dead.add(conn) + else: + self.add_free(conn) - def return_busy(self, conn): - if self.waiting_queue: - self.waiting_queue.pop().set_result(conn) - else: - self.busy.remove(conn) - self.free.add(conn) - - @remove_pending - def add_free(self, conn): - if self.waiting_queue: - self.busy.add(conn) - self.waiting_queue.pop().set_result(conn) - else: - self.free.add(conn) + def abort_waiting_queue(self, error): + while self.waiting_queue: + future = self.waiting_queue.pop() + future.set_exception(error) - @remove_pending - def add_dead(self, conn): - self.dead.add(conn) - self.busy.discard(conn) - # free connections are most probably dead by now - while self.free: - self.dead.add(self.free.pop()) - - def add_pending(self, conn): - self.last_connect_attempt_ts = self.ioloop.time() - self.reconnect_in_progress = True - self.pending.add(conn) - - def get_alive(self): - return self.free.union(self.busy) - - def close_alive(self): - for conn in self.get_alive(): - if not conn.closed: - conn.close() - - @property - def total(self): - return len(self.free) + len(self.busy) + len(self.dead) + len(self.pending) - - def is_time_to_reconnect(self): - now = self.ioloop.time() - if not (self.last_connect_attempt_success or - now - self.last_connect_attempt_ts > self.reconnect_interval): - return False - return True - - def on_reconnect_complete(self, connection): - if not connection.closed: - self.add_free(connection) - else: - self.add_dead(connection) - self.last_connect_attempt_success = not connection.closed - self.reconnect_in_progress = False - if not self.last_connect_attempt_success: - self.abort_waiting_queue() + def close_alive(self): + for conn in self.free.union(self.busy): + if not conn.closed: + conn.close() - def abort_waiting_queue(self): - while self.waiting_queue: - future = self.waiting_queue.pop() - future.set_result(None) # Send None to signify that all who waits should abort + @property + def all_dead(self): + return not (self.free or self.busy) + + @property + def total(self): + return len(self.free) + len(self.busy) + len(self.dead) + len(self.pending) + +class Pool(object): def __init__(self, dsn, connection_factory=None, cursor_factory=None, size=1, max_size=None, - callback=None, ioloop=None, raise_connect_errors=True, reconnect_interval=500, - setsession=[]): + setsession=()): + assert size > 0, "The connection pool size must be a number above 0." self.size = size @@ -184,235 +133,166 @@ def __init__(self, assert self.size <= self.max_size, "The connection pool max size must be of at least 'size'." self.dsn = dsn - self.closed = False self.connection_factory = connection_factory self.cursor_factory = cursor_factory - self.raise_connect_errors = raise_connect_errors - - self._ioloop = ioloop or IOLoop.instance() - - reconnect_interval = float(reconnect_interval)/1000 # the parameter is in milliseconds - self._conns = self.Connections(reconnect_interval, self._ioloop) - + self.reconnect_interval = float(reconnect_interval)/1000 # the parameter is in milliseconds self.setsession = setsession - self.connected = False + self.connected = False + self.closed = False self.server_version = None - # Create connections - def after_pool_creation(connection): - if not self._conns.pending: # all connections "connected" on way or the other - if callback: - callback() - - for i in range(self.size): - self._new(after_pool_creation) - - def _new(self, callback=None): - conn = Connection() - self._conns.add_pending(conn) - conn.connect(self.dsn, - connection_factory=self.connection_factory, - cursor_factory=self.cursor_factory, - callback=partial(self._post_connect_callback, callback), - ioloop=self._ioloop, - setsession=self.setsession) - - def _post_connect_callback(self, callback, connection, error): - if error: - if not connection.closed: - connection.close() - if self.raise_connect_errors: - raise error - else: - logger = log.error if self.log_connect_errors else log.info - logger("Failed opening connection to database: %s", error) - else: - self.server_version = connection.connection.server_version + self.ioloop = ioloop or IOLoop.instance() - self._conns.on_reconnect_complete(connection) - log.debug("Connection attempt complete. Success: %s", self._conns.last_connect_attempt_success) + self.conns = ConnectionContainer() - if self._conns.last_connect_attempt_success: - # Connection to db is OK. If we have waiting requests - # and some dead connections, we can serve requests faster - # if we reanimate dead connections - num_conns_to_reconnect = min(len(self._conns.dead), len(self._conns.waiting_queue)) - for i in range(num_conns_to_reconnect): - self._conns.dead.pop() - self._new() + self._last_connect_time = 0 + self._no_conn_availble_error = psycopg2.DatabaseError("No database connection available") - self._stretch_if_needed() + def connect(self): + """ + Returns future that resolves to this Pool object. - if callback: - callback(connection) + If some connection failed to connected, raises :py:meth:`momoko.PartiallyConnectedError` + if self.raise_connect_errors is true. + """ + future = Future() + pending = [self.size-1] - def _get_connection(self): + def on_connect(fut): + if pending[0]: + pending[0] -= 1 + return + # all connection attemts are complete + if self.conns.dead and self.raise_connect_errors: + ecp = PartiallyConnectedError("%s connection(s) failed to connect" % len(self.conns.dead)) + future.set_exception(ecp) + else: + future.set_result(self) + log.debug("All initial connection requests complete") - log.debug("Getting connection") - self.connected = True + for i in range(self.size): + self.ioloop.add_future(self._new_connection(), on_connect) - # if there are free connections - just return one - connection = self._conns.get_free() - if connection: - return connection + return future - # if there are dead connections - try to reanimate them - if self._conns.dead: - if self._conns.is_time_to_reconnect(): - log.debug("Trying to reconnect dead connection") - self._conns.dead.pop() - self._new() - return + def getconn(self, ping=True): + """ + Acquire connection from the pool. - if self._conns.busy: - # We may be maxed out here. Try to stretch if approprate - self._stretch_if_needed(new_request=True) - # At least some connections are alive, so wait for them - log.debug("There are busy connections") - return + You can then use this connection for subsequest queries. + Just use ``connection.execute`` instead of ``Pool.execute``. - if self._conns.reconnect_in_progress: - # We are connecting - wait more - log.debug("Reconnect in progress") - return + Make sure to return connection to the pool by calling :py:meth:`momoko.Pool.putconn`, + otherwise the connection will remain forever-busy and you'll starvate your pool quickly. - log.debug("no connections are available or expected in near future") - self.connected = False + Returns future that resolves to the acquired connection object. - def _stretch_if_needed(self, new_request=False): - if self._conns.total == self.max_size: - return # max size reached - if self._conns.dead or self._conns.free: - return # no point to stretch if we heave free conns to use / dead conns to reanimate - if not (new_request or (self._conns.waiting_queue and self._conns.busy)): - return - if self._conns.pending: - if len(self._conns.pending) >= len(self._conns.waiting_queue) + int(new_request): - return - log.debug("Stretching pool") - self._new() + :param boolean ping: + Whether to ping the connection before returning it by executing :py:meth:`momoko.Connection.ping`. + """ + rv = self.conns.acquire() + if isinstance(rv, Future): + self._reanimate_and_stretch_if_needed() + future = rv + else: + # Else, all connections are dead + assert len(self.conns.pending) == 0, "BUG! should be no pending connection" - def _retry_action(self, method, callback, *args, **kwargs): - action = partial(self._operate, method, callback, *args, **kwargs) - future = Future() - self._conns.waiting_queue.appendleft(future) - - def on_connection_available(future): - connection = future.result() - if not connection: - log.debug("Aborting - as instructed") - raise psycopg2.DatabaseError("No database connection available") - action(connection=connection) - return self._ioloop.add_future(future, on_connection_available) - - def _reconnect_and_retry(self, connection, method, callback, *args, **kwargs): - self._conns.add_dead(connection) - log.debug("Tried over dead connection. Retrying once") - self._retry_action(method, callback, *args, **kwargs) - self._conns.dead.pop() - self._new() - - def _operate(self, method, callback, *args, **kwargs): - - connection = kwargs.pop("connection", None) or self._get_connection() - if not connection: - if self.connected: - log.debug("No connection available right now - will try again later") - return self._retry_action(method, callback, *args, **kwargs) - else: - log.debug("Aborting - not connected") - raise psycopg2.DatabaseError("No database connection available") + future = Future() - log.debug("Connection obtained, proceeding") + def on_reanimate_done(fut): + if self.conns.all_dead: + future.set_exception(self._no_conn_availble_error) + return + f = self.conns.acquire() + assert isinstance(f, Future) + chain_future(f, future) - if kwargs.pop("get_connection_only", False): - return callback(connection, None) + self.ioloop.add_future(self._reanimate(), on_reanimate_done) - the_callback = partial(self._operate_callback, connection, method, callback, args, kwargs) - method(connection, *args, callback=the_callback, **kwargs) + if not ping: + return future + else: + return self._ping_future_connection(future) - def _operate_callback(self, connection, method, orig_callback, args, kwargs, *_args, **_kwargs): - """ - Wrap real callback coming from invoker with our own one - that will check connection status after the end of the call - and recycle connection / retry operation + def putconn(self, connection): """ - if connection.closed: - self._reconnect_and_retry(connection, method, orig_callback, *args, **kwargs) - return - - if not getattr(method, "_keep_connection", False): - self._conns.return_busy(connection) + Retrun busy connection back to the pool. - if orig_callback: - return orig_callback(*_args, **_kwargs) + **NOTE:** This is a synchronous method. - def ping(self, connection, callback=None): + :param Connection connection: + Connection object previously returned by :py:meth:`momoko.Pool.getconn`. """ - Ping given connection object to make sure its alive (involves roundtrip to the database server). - See :py:meth:`momoko.Connection.ping` for documentation about the details. - """ - self._operate(Connection.ping, callback, - connection=connection) + assert connection in self.conns.busy + self.conns.release(connection) - def transaction(self, - statements, - cursor_factory=None, - callback=None): + if self.conns.all_dead: + self.conns.abort_waiting_queue(self._no_conn_availble_error) + + @contextmanager + def manage(self, connection): """ - Run a sequence of SQL queries in a database transaction. + Context manager that automatically returns connection to the pool. + You can use it instead of :py:meth:`momoko.Pool.putconn`:: - See :py:meth:`momoko.Connection.transaction` for documentation about the - parameters. + connection = yield self.db.getconn() + with self.db.manage(connection): + cursor = yield connection.execute("BEGIN") + ... """ - self._operate(Connection.transaction, callback, - statements, cursor_factory=cursor_factory) + assert connection in self.conns.busy, "Can not manage non-busy connection. Where did you get it from?" + try: + yield connection + finally: + self.putconn(connection) - def execute(self, - operation, - parameters=(), - cursor_factory=None, - callback=None): + def execute(self, *args, **kwargs): """ Prepare and execute a database operation (query or command). See :py:meth:`momoko.Connection.execute` for documentation about the parameters. """ - self._operate(Connection.execute, callback, - operation, parameters, cursor_factory=cursor_factory) + return self._operate(Connection.execute, args, kwargs) - def callproc(self, - procname, - parameters=(), - cursor_factory=None, - callback=None): + def callproc(self, *args, **kwargs): """ Call a stored database procedure with the given name. See :py:meth:`momoko.Connection.callproc` for documentation about the parameters. """ - self._operate(Connection.callproc, callback, - procname, parameters=parameters, cursor_factory=cursor_factory) + return self._operate(Connection.callproc, args, kwargs) - def mogrify(self, - operation, - parameters=(), - callback=None): + def transaction(self, *args, **kwargs): + """ + Run a sequence of SQL queries in a database transaction. + + See :py:meth:`momoko.Connection.transaction` for documentation about the + parameters. + """ + return self._operate(Connection.transaction, args, kwargs) + + def mogrify(self, *args, **kwargs): """ Return a query string after arguments binding. + **NOTE:** This is NOT a synchronous method (contary to `momoko.Connection.mogrify`) + - it asynchronously waits for available connection. For performance + reasons, its better to create dedicated :py:meth:`momoko.Connection` + object and use it directly for mogrification, this operation does not + imply any real operation on the database server. + See :py:meth:`momoko.Connection.mogrify` for documentation about the parameters. """ - self._operate(Connection.mogrify, callback, - operation, parameters=parameters) + return self._operate(Connection.mogrify, args, kwargs, async=False) - def register_hstore(self, unicode=False, callback=None): + def register_hstore(self, *args, **kwargs): """ Register adapter and typecaster for ``dict-hstore`` conversions. @@ -420,87 +300,177 @@ def register_hstore(self, unicode=False, callback=None): the parameters. This method has no ``globally`` parameter, because it already registers hstore to all the connections in the pool. """ - self._operate(Connection.register_hstore, callback, - globally=True, unicode=unicode) + kwargs["globally"] = True + return self._operate(Connection.register_hstore, args, kwargs) - def register_json(self, loads=None, callback=None): + def register_json(self, *args, **kwargs): """ - Register adapter and typecaster for ``dict-json`` conversions. + Create and register typecasters converting :sql:`json` type to Python objects. See :py:meth:`momoko.Connection.register_json` for documentation about the parameters. This method has no ``globally`` parameter, because it already registers json to all the connections in the pool. """ - self._operate(Connection.register_json, callback, - globally=True, loads=loads) + kwargs["globally"] = True + return self._operate(Connection.register_json, args, kwargs) - def getconn(self, ping=True, callback=None): + def close(self): """ - Acquire connection from the pool. + Close the connection pool. - You can then use this connection for subsequest queries. - Just supply, for example, ``connection.execute`` instead of ``Pool.execute`` - to ``momoko.Op``. + **NOTE:** This is a synchronous method. + """ + if self.closed: + raise PoolError('connection pool is already closed') - Make sure to return connection to the pool by calling :py:meth:`momoko.Pool.putconn`, - otherwise the connection will remain forever-busy and you'll starvate your pool quickly. + self.conns.close_alive() + self.conns.empty() + self.closed = True - :param boolean ping: - Whether to ping connection before returning it by executing :py:meth:`momoko.Pool.ping`. - """ - def ping_callback(connection, error): - self.ping(connection, callback) - the_callback = ping_callback if ping else callback - self._operate("getconn", the_callback, get_connection_only=True) + def _operate(self, method, args=(), kwargs=None, async=True, keep=False, connection=None): + kwargs = kwargs or {} + future = Future() - def putconn(self, connection): - """ - Retrun busy connection back to the pool. + retry = [] - :param Connection connection: - Connection object previously returned by :py:meth:`momoko.Pool.getconn`. - **NOTE:** This is a synchronous function - """ + def when_avaialble(fut): + try: + conn = fut.result() + except psycopg2.Error as error: + future.set_exc_info(sys.exc_info()) + if retry: + self.putconn(retry[0]) + return - if connection.closed: - self._conns.add_dead(connection) + log.debug("Obtained connection: %s", conn.fileno) + try: + future_or_result = method(conn, *args, **kwargs) + except psycopg2.Error as error: + if conn.closed: + if not retry: + retry.append(conn) + self.ioloop.add_future(conn.connect(), when_avaialble) + return + else: + future.set_exception(self._no_conn_availble_error) + else: + future.set_exc_info(sys.exc_info()) + log.debug(2) + self.putconn(conn) + return + + if not async: + future.set_result(future_or_result) + log.debug(3) + self.putconn(conn) + return + + chain_future(future_or_result, future) + if not keep: + future.add_done_callback(lambda f: self.putconn(conn)) + + if not connection: + self.ioloop.add_future(self.getconn(ping=False), when_avaialble) else: - self._conns.return_busy(connection) + f = Future() + f.set_result(connection) + when_avaialble(f) + return future - @contextmanager - def manage(self, connection): - """ - Context manager that automatically returns connection to the pool. - You can use it instead of :py:meth:`momoko.Pool.putconn`:: + def _reanimate(self): + assert self.conns.dead, "BUG: dont' call reanimate when there is no one to reanimate" - connection = yield momoko.Op(self.db.getconn) - with self.db.manage(connection): - cursor = yield momoko.Op(connection.execute, "BEGIN") - ... - """ - assert connection in self._conns.busy, "Can not manage non-busy connection. Where did you get it from?" - try: - yield connection - finally: - self.putconn(connection) + future = Future() - def close(self): - """ - Close the connection pool. - """ - if self.closed: - raise PoolError('connection pool is already closed') + if self.ioloop.time() - self._last_connect_time < self.reconnect_interval: + log.debug("Not reconnecting - too soon") + future.set_result(None) + return future - self._conns.close_alive() - self._conns.empty() - self.closed = True + pending = [len(self.conns.dead)-1] - log_connect_errors = True # Unittest monkey patches it for silent output + def on_connect(fut): + if pending[0]: + pending[0] -= 1 + return + future.set_result(None) + + while self.conns.dead: + conn = self.conns.dead.pop() + self.ioloop.add_future(self._connect_one(conn), on_connect) + + return future + + def _reanimate_and_stretch_if_needed(self): + if self.conns.dead: + self._reanimate() + return + + if self.conns.total == self.max_size: + return # max size reached + if self.conns.free: + return # no point to stretch if there are free connections + if self.conns.pending: + if len(self.conns.pending) >= len(self.conns.waiting_queue): + return # there are enough outstanding connection requests + + log.debug("Stretching pool") + self._new_connection() + + def _new_connection(self): + log.debug("Spawning new connection") + conn = Connection(self.dsn, + connection_factory=self.connection_factory, + cursor_factory=self.cursor_factory, + ioloop=self.ioloop, + setsession=self.setsession) + return self._connect_one(conn) + + def _connect_one(self, conn): + future = Future() + self.conns.pending.add(conn) + + def on_connect(fut): + try: + fut.result() + except psycopg2.Error as error: + self.conns.add_dead(conn) + else: + self.conns.add_free(conn) + self.server_version = conn.server_version + self._last_connect_time = self.ioloop.time() + future.set_result(conn) + + self.ioloop.add_future(conn.connect(), on_connect) + return future + + def _ping_future_connection(self, conn_future): + ping_future = Future() + + def on_connection_available(fut): + conn = fut.result() + + def on_ping_done(ping_fut): + try: + ping_fut.result() + except psycopg2.Error as error: + ping_future.set_exc_info(error) + self.putconn(conn) + else: + ping_future.set_result(conn) + + f = self._operate(Connection.ping, keep=True, connection=conn) + self.ioloop.add_future(f, on_ping_done) + + self.ioloop.add_future(conn_future, on_connection_available) + + return ping_future class Connection(object): """ - Initiate an asynchronous connect. + Asynchronous connection object. All its methods are + asynchronous unless stated otherwide in method description. :param string dsn: A `Data Source Name`_ string containing one of the following values: @@ -524,11 +494,6 @@ class Connection(object): The class returned should be a subclass of `psycopg2.extensions.cursor`_. See `Connection and cursor factories`_ for details. Defaults to ``None``. - :param callable callback: - A callable that's called after the connection is created. It accepts one - paramater: an instance of :py:class:`momoko.Connection`. Defaults to ``None``. - :param ioloop: An instance of Tornado's IOLoop. Defaults to ``None``. - :param list setsession: List of intial sql commands to be executed once connection is established. If any of the commands failes, the connection will be closed. @@ -539,127 +504,92 @@ class Connection(object): .. _psycopg2.extensions.connection: http://initd.org/psycopg/docs/connection.html#connection .. _Connection and cursor factories: http://initd.org/psycopg/docs/advanced.html#subclassing-cursor """ - def connect(self, - dsn, - connection_factory=None, - cursor_factory=None, - callback=None, - ioloop=None, - setsession=[]): - log.info("Opening new database connection") + def __init__(self, + dsn, + connection_factory=None, + cursor_factory=None, + ioloop=None, + setsession=()): + self.dsn = dsn + self.connection_factory = connection_factory + self.cursor_factory = cursor_factory + self.ioloop = ioloop or IOLoop.instance() + self.setsession = setsession + + def connect(self): + """ + Initiate asynchronous connect. + Returns future that resolves to this connection object. + """ kwargs = {"async": True} - if connection_factory: - kwargs["connection_factory"] = connection_factory - if cursor_factory: - kwargs["cursor_factory"] = cursor_factory + if self.connection_factory: + kwargs["connection_factory"] = self.connection_factory + if self.cursor_factory: + kwargs["cursor_factory"] = self.cursor_factory + + future = Future() self.connection = None try: - self.connection = psycopg2.connect(dsn, **kwargs) + self.connection = psycopg2.connect(self.dsn, **kwargs) except psycopg2.Error as error: - if callback: - callback(self, error) - return - else: - raise + future.set_exc_info(sys.exc_info()) + return future + self.fileno = self.connection.fileno() - self._transaction_status = self.connection.get_transaction_status - self.ioloop = ioloop or IOLoop.instance() - self._on_connect_callback = partial(callback, self) if callback else None + if self.setsession: + on_connect_future = Future() + + def on_connect(on_connect_future): + self.ioloop.add_future(self.transaction(self.setsession), lambda x: future.set_result(self)) - if setsession: - self.callback = self._setsession_callback - self.setsession = setsession + self.ioloop.add_future(on_connect_future, on_connect) + callback = partial(self._io_callback, on_connect_future, self) else: - self.callback = self._on_connect_callback + callback = partial(self._io_callback, future, self) - self.ioloop.add_handler(self.fileno, self.io_callback, IOLoop.WRITE) + self.ioloop.add_handler(self.fileno, callback, IOLoop.WRITE) + self.ioloop.add_future(future, self._set_server_version) - def _setsession_callback(self, error): - """Custom post-connect callback to trigger setsession commands execution in transaction""" - if error: - return self._on_connect_callback(error) - log.debug("Running setsession commands") - return self.transaction(self.setsession, callback=self._setsession_transaction_callback) + return future - def _setsession_transaction_callback(self, cursor, error): - """ - Call back that check results of setsession transaction commands and - call the real post_connect callback. Closes connection if transaction failed. - """ - if error: - log.debug("Closing connection since set session commands failed") - self.close() - self._on_connect_callback(error) + def _set_server_version(self, future): + if future.exception(): + return + self.server_version = self.connection.server_version - def io_callback(self, fd=None, events=None): + def _io_callback(self, future, result, fd=None, events=None): try: state = self.connection.poll() except (psycopg2.Warning, psycopg2.Error) as error: self.ioloop.remove_handler(self.fileno) - if self.callback: - self.callback(error) + future.set_exc_info(sys.exc_info()) else: if state == POLL_OK: self.ioloop.remove_handler(self.fileno) - if self.callback: - self.callback(None) + future.set_result(result) elif state == POLL_READ: self.ioloop.update_handler(self.fileno, IOLoop.READ) elif state == POLL_WRITE: self.ioloop.update_handler(self.fileno, IOLoop.WRITE) else: - raise psycopg2.OperationalError('poll() returned {0}'.format(state)) - - def _catch_early_errors(func): - @wraps(func) - def wrapper(self, *args, **kwargs): - callback = kwargs.get("callback", _dummy_callback) - try: - return func(self, *args, **kwargs) - except Exception as error: - callback(None, error) - return wrapper + future.set_exception(psycopg2.OperationalError('poll() returned {0}'.format(state))) - def _keep_connection(func): - """ - Use this decorator on Connection methods to hint the Pool to not - release connection when operation is complete - """ - func._keep_connection = True - return func - - @_catch_early_errors - @_keep_connection - def ping(self, callback=None): + def ping(self): """ Make sure this connection is alive by executing SELECT 1 statement - i.e. roundtrip to the database. - **NOTE:** On the contrary to other methods, callback function signature is - ``callback(self, error)`` and not ``callback(cursor, error)``. - - **NOTE:** `callback` should always passed as keyword argument - + Returns future. If it resolves sucessfully - the connection is alive (or dead otherwise). """ - cursor = self.connection.cursor() - cursor.execute("SELECT 1 AS ping") - self.callback = partial(self._ping_callback, callback or _dummy_callback, cursor) - self.ioloop.add_handler(self.fileno, self.io_callback, IOLoop.WRITE) - - def _ping_callback(self, callback, cursor, error): - if not error: - cursor.fetchall() - return callback(self, error) + return self.execute("SELECT 1 AS ping") - @_catch_early_errors def execute(self, operation, parameters=(), - cursor_factory=None, - callback=None): + cursor_factory=None): """ Prepare and execute a database operation (query or command). @@ -671,12 +601,8 @@ def execute(self, The ``cursor_factory`` argument can be used to create non-standard cursors. The class returned must be a subclass of `psycopg2.extensions.cursor`_. See `Connection and cursor factories`_ for details. Defaults to ``None``. - :param callable callback: - A callable that is executed when the query has finished. It must accept - two positional parameters. The first one being the cursor and the second - one ``None`` or an instance of an exception if an error has occurred, - in that case the first parameter will be ``None``. Defaults to ``None``. - **NOTE:** `callback` should always passed as keyword argument + + Returns future that resolves to cursor object containing result. .. _Passing parameters to SQL queries: http://initd.org/psycopg/docs/usage.html#query-parameters .. _psycopg2.extensions.cursor: http://initd.org/psycopg/docs/extensions.html#psycopg2.extensions.cursor @@ -685,15 +611,16 @@ def execute(self, kwargs = {"cursor_factory": cursor_factory} if cursor_factory else {} cursor = self.connection.cursor(**kwargs) cursor.execute(operation, parameters) - self.callback = partial(callback or _dummy_callback, cursor) - self.ioloop.add_handler(self.fileno, self.io_callback, IOLoop.WRITE) - @_catch_early_errors + future = Future() + callback = partial(self._io_callback, future, cursor) + self.ioloop.add_handler(self.fileno, callback, IOLoop.WRITE) + return future + def callproc(self, procname, parameters=(), - cursor_factory=None, - callback=None): + cursor_factory=None): """ Call a stored database procedure with the given name. @@ -713,12 +640,8 @@ def callproc(self, The ``cursor_factory`` argument can be used to create non-standard cursors. The class returned must be a subclass of `psycopg2.extensions.cursor`_. See `Connection and cursor factories`_ for details. Defaults to ``None``. - :param callable callback: - A callable that is executed when the query has finished. It must accept - two positional parameters. The first one being the cursor and the second - one ``None`` or an instance of an exception if an error has occurred, - in that case the first parameter will be ``None``. Defaults to ``None``. - **NOTE:** `callback` should always passed as keyword argument + + Returns future that resolves to cursor object containing result. .. _fetch*(): http://initd.org/psycopg/docs/cursor.html#fetch .. _Passing parameters to SQL queries: http://initd.org/psycopg/docs/usage.html#query-parameters @@ -728,42 +651,36 @@ def callproc(self, kwargs = {"cursor_factory": cursor_factory} if cursor_factory else {} cursor = self.connection.cursor(**kwargs) cursor.callproc(procname, parameters) - self.callback = partial(callback or _dummy_callback, cursor) - self.ioloop.add_handler(self.fileno, self.io_callback, IOLoop.WRITE) - @_catch_early_errors - def mogrify(self, operation, parameters=(), callback=None): + future = Future() + callback = partial(self._io_callback, future, cursor) + self.ioloop.add_handler(self.fileno, callback, IOLoop.WRITE) + return future + + def mogrify(self, operation, parameters=()): """ Return a query string after arguments binding. The string returned is exactly the one that would be sent to the database running the execute() method or similar. + **NOTE:** This is a synchronous method. + :param string operation: An SQL query. :param tuple/list parameters: A list or tuple with query parameters. See `Passing parameters to SQL queries`_ for more information. Defaults to an empty tuple. - :param callable callback: - A callable that is executed when the query has finished. It must accept - two positional parameters. The first one being the resulting query as - a byte string and the second one ``None`` or an instance of an exception - if an error has occurred. Defaults to ``None``. - **NOTE:** `callback` should always passed as keyword argument .. _Passing parameters to SQL queries: http://initd.org/psycopg/docs/usage.html#query-parameters .. _Connection and cursor factories: http://initd.org/psycopg/docs/advanced.html#subclassing-cursor """ cursor = self.connection.cursor() - try: - result = cursor.mogrify(operation, parameters) - self.ioloop.add_callback(partial(callback or _dummy_callback, result, None)) - except (psycopg2.Warning, psycopg2.Error) as error: - self.ioloop.add_callback(partial(callback or _dummy_callback, b'', error)) + return cursor.mogrify(operation, parameters) def transaction(self, statements, cursor_factory=None, - callback=None): + auto_rollback=True): """ Run a sequence of SQL queries in a database transaction. @@ -777,85 +694,75 @@ def transaction(self, The ``cursor_factory`` argument can be used to create non-standard cursors. The class returned must be a subclass of `psycopg2.extensions.cursor`_. See `Connection and cursor factories`_ for details. Defaults to ``None``. - :param callable callback: - A callable that is executed when the transaction has finished. It must accept - two positional parameters. The first one being a list of cursors in the same - order as the given statements and the second one ``None`` or an instance of - an exception if an error has occurred, in that case the first parameter is - an empty list. Defaults to ``None``. - **NOTE:** `callback` should always passed as keyword argument + :param bool auto_rollback: + If one of the transaction statements failes, try to automatically + execute ROLLBACK to abort the transaction. If ROLLBACK fails, it would + not be raised, but only logged. + + Returns future that resolves to ``list`` of cursors. Each cursor contains the result + of the corresponding transaction statement. .. _Passing parameters to SQL queries: http://initd.org/psycopg/docs/usage.html#query-parameters .. _psycopg2.extensions.cursor: http://initd.org/psycopg/docs/extensions.html#psycopg2.extensions.cursor .. _Connection and cursor factories: http://initd.org/psycopg/docs/advanced.html#subclassing-cursor """ cursors = [] - queue = deque() - callback = callback or _dummy_callback - - for statement in statements: - if isinstance(statement, basestring): - queue.append((statement, ())) - else: - queue.append(statement[:2]) - - queue.appendleft(('BEGIN;', ())) - queue.append(('COMMIT;', ())) + transaction_future = Future() - def error_callback(statement_error, cursor, rollback_error): - callback(None, rollback_error or statement_error) + queue = self._statement_generator(statements) - def exec_statement(cursor=None, error=None): - if error: - try: - self.execute('ROLLBACK;', callback=partial(error_callback, error)) - except psycopg2.Error as rollback_error: - error_callback(error, cursor, rollback_error) - return - if cursor: + def exec_statements(future): + try: + cursor = future.result() cursors.append(cursor) - if not queue: - callback(cursors[1:-1], None) + except Exception as error: + if not auto_rollback: + transaction_future.set_exc_info(sys.exc_info()) + else: + self._rollback(transaction_future, error) return - operation, parameters = queue.popleft() - self.execute(operation, parameters, cursor_factory, callback=exec_statement) - - self.ioloop.add_callback(exec_statement) - - @_catch_early_errors - def register_json(self, globally=False, loads=None, callback=None): - """ - Register adapter and typecaster for ``dict-json`` conversions. - - More information on the json datatype can be found on the - Psycopg2 |regjsondoc|_. + try: + operation, parameters = next(queue) + except StopIteration: + transaction_future.set_result(cursors[1:-1]) + return - :param boolean globally: - Register the adapter globally, not only on this connection. - :param function loads: - The function used to parse the data into a Python object. If ``None`` - use ``json.loads()``, where ``json`` is the module chosen according to - the Python version. See psycopg2.extra docs. + f = self.execute(operation, parameters, cursor_factory) + self.ioloop.add_future(f, exec_statements) - **NOTE:** `callback` should always passed as keyword argument + self.ioloop.add_future(self.execute("BEGIN;"), exec_statements) + return transaction_future - .. |regjsondoc| replace:: documentation + def _statement_generator(self, statements): + for statement in statements: + if isinstance(statement, basestring): + yield (statement, ()) + else: + yield statement[:2] + yield ('COMMIT;', ()) - .. _regjsondoc: http://initd.org/psycopg/docs/extras.html#json-adaptation - """ - def _json_callback(cursor, error): - oid, array_oid = cursor.fetchone() - _psy_register_json(None, globally, loads, oid, array_oid) + def _rollback(self, transaction_future, error): + def rollback_callback(rb_future): + try: + rb_future.result() + except Exception as rb_error: + log.warn("Failed to ROLLBACK transaction %s", rb_error) + transaction_future.set_exception(error) + self.ioloop.add_future(self.execute("ROLLBACK;"), rollback_callback) - if callback: - callback(None, error) + def _register(self, future, registrator, fut): + try: + cursor = fut.result() + except Exception as error: + future.set_exc_info(sys.exc_info()) + return - self.execute( - "SELECT 'json'::regtype::oid, 'json[]'::regtype::oid", - callback=_json_callback) + oid, array_oid = cursor.fetchone() + registrator(oid, array_oid) + future.set_result(None) - def register_hstore(self, globally=False, unicode=False, callback=None): + def register_hstore(self, globally=False, unicode=False): """ Register adapter and typecaster for ``dict-hstore`` conversions. @@ -868,29 +775,48 @@ def register_hstore(self, globally=False, unicode=False, callback=None): If ``True``, keys and values returned from the database will be ``unicode`` instead of ``str``. The option is not available on Python 3. - **NOTE:** `callback` should always passed as keyword argument + Returns future that resolves to ``None``. .. |hstoredoc| replace:: documentation .. _hstoredoc: http://initd.org/psycopg/docs/extras.html#hstore-data-type """ - def _hstore_callback(cursor, error): - oid, array_oid = cursor.fetchone() - _psy_register_hstore(None, globally, unicode, oid, array_oid) - - if callback: - callback(None, error) - - self.execute( + future = Future() + registrator = partial(_psy_register_hstore, None, globally, unicode) + callback = partial(self._register, future, registrator) + self.ioloop.add_future(self.execute( "SELECT 'hstore'::regtype::oid, 'hstore[]'::regtype::oid", - callback=_hstore_callback) + ), callback) - def busy(self): + return future + + def register_json(self, globally=False, loads=None): """ - **(Deprecated)** Check if the connection is busy or not. + Create and register typecasters converting :sql:`json` type to Python objects. + + More information on the json datatype can be found on the Psycopg2 |regjsondoc|_. + + :param boolean globally: + Register the adapter globally, not only on this connection. + :param function loads: + The function used to parse the data into a Python object. If ``None`` + use ``json.loads()``, where ``json`` is the module chosen according to + the Python version. See psycopg2.extra docs. + + Returns future that resolves to ``None``. + + .. |regjsondoc| replace:: documentation + + .. _regjsondoc: http://initd.org/psycopg/docs/extras.html#json-adaptation """ - return self.connection.isexecuting() or (self.connection.closed == 0 and - self._transaction_status() != TRANSACTION_STATUS_IDLE) + future = Future() + registrator = partial(_psy_register_json, None, globally, loads) + callback = partial(self._register, future, registrator) + self.ioloop.add_future(self.execute( + "SELECT 'json'::regtype::oid, 'json[]'::regtype::oid" + ), callback) + + return future @property def closed(self): @@ -902,7 +828,18 @@ def closed(self): def close(self): """ - Remove the connection from the IO loop and close it. + Closes the connection. + + **NOTE:** This is a synchronous method. """ if self.connection: self.connection.close() + + +def connect(*args, **kwargs): + """ + Connection factory. + See :py:meth:`momoko.Connection` for documentation about the + Returns future that resolves to :py:meth:`momoko.Connection` object or raises exception + """ + return Connection(*args, **kwargs).connect() diff --git a/momoko/exceptions.py b/momoko/exceptions.py index 23310ed..588c224 100644 --- a/momoko/exceptions.py +++ b/momoko/exceptions.py @@ -12,7 +12,12 @@ class PoolError(Exception): """ - The ``PoolError`` exception is raised when something goes wrong in the connection - pool. When the maximum amount is exceeded for example. + Raised when something goes wrong in the connection pool. """ pass + + +class PartiallyConnectedError(PoolError): + """ + Raised when :py:meth:`momoko.Pool` can initialize all requested connections. + """ \ No newline at end of file diff --git a/momoko/utils.py b/momoko/utils.py deleted file mode 100644 index f8361f0..0000000 --- a/momoko/utils.py +++ /dev/null @@ -1,119 +0,0 @@ -# -*- coding: utf-8 -*- -""" -momoko.utils -============ - -Utilities that make life easier. - -Copyright 2011-2014, Frank Smit & Zaar Hai. -MIT, see LICENSE for more details. -""" - -import sys -import logging -from tornado import gen -from functools import partial -from collections import deque - - -if sys.version_info[0] < 3: - is_python_3k = False -else: - is_python_3k = True - - -log = logging.getLogger('momoko') - - -class Task(gen.YieldPoint): - """Runs a single asynchronous operation. - - Takes a function (and optional additional arguments) and runs it with - those arguments plus a ``callback`` keyword argument. The argument passed - to the callback is returned as the result of the yield expression. - - A `Task` is equivalent to a `Callback`/`Wait` pair (with a unique - key generated automatically):: - - result = yield gen.Task(func, args) - - func(args, callback=(yield gen.Callback(key))) - result = yield gen.Wait(key) - """ - def __init__(self, func, *args, **kwargs): - assert "callback" not in kwargs - self.args = args - self.kwargs = kwargs - self.func = func - - def start(self, runner): - self.runner = runner - self.key = object() - runner.register_callback(self.key) - self.kwargs["callback"] = runner.result_callback(self.key) - self.func(*self.args, **self.kwargs) - - def is_ready(self): - return self.runner.is_ready(self.key) - - def get_result(self): - return self.runner.pop_result(self.key) - - -class Op(Task): - """ - Run a single asynchronous operation. - - Behaves like `tornado.gen.Task`_, but raises an exception (one of Psycop2's - exceptions_) when an error occurs related to Psycopg2 or PostgreSQL. - - .. _exceptions: http://initd.org/psycopg/docs/module.html#exceptions - .. _tornado.gen.Task: http://www.tornadoweb.org/documentation/gen.html#tornado.gen.Task - """ - def get_result(self): - (result, error), _ = super(Op, self).get_result() - if error: - raise error - return result - - -class WaitOp(gen.Wait): - """ - Return the argument passed to the result of a previous `tornado.gen.Callback`_. - - Behaves like `tornado.gen.Wait`_, but raises an exception (one of Psycop2's - exceptions_) when an error occurs related to Psycopg2 or PostgreSQL. - - .. _exceptions: http://initd.org/psycopg/docs/module.html#exceptions - .. _tornado.gen.Callback: http://www.tornadoweb.org/documentation/gen.html#tornado.gen.Callback - .. _tornado.gen.Wait: http://www.tornadoweb.org/documentation/gen.html#tornado.gen.Wait - """ - def get_result(self): - (result, error), _ = super(WaitOp, self).get_result() - if error: - raise error - return result - - -class WaitAllOps(gen.WaitAll): - """ - Return the results of multiple previous `tornado.gen.Callback`_. - - Behaves like `tornado.gen.WaitAll`_, but raises an exception (one of Psycop2's - exceptions_) when an error occurs related to Psycopg2 or PostgreSQL. - - .. _exceptions: http://initd.org/psycopg/docs/module.html#exceptions - .. _tornado.gen.Callback: http://www.tornadoweb.org/documentation/gen.html#tornado.gen.Callback - .. _tornado.gen.WaitAll: http://www.tornadoweb.org/documentation/gen.html#tornado.gen.WaitAll - """ - def get_result(self): - super_results = super(WaitAllOps, self).get_result() - - results = [] - for (result, error), _ in super_results: - if error: - raise error - else: - results.append(result) - - return results diff --git a/perf_test.py b/perf_test.py new file mode 100644 index 0000000..c6badf0 --- /dev/null +++ b/perf_test.py @@ -0,0 +1,104 @@ +from __future__ import print_function +from __future__ import absolute_import + +import threading + +from tests import * + +""" +Quick and dirty performance test - async vs threads. +By default Postgresql is configured to support up 100 connections. +Change max_connections to 1100 in /etc/postgresql/9.4/main/postgresql.conf +to run this example. + +NOTE: This benchmark is purely synthetic. In real life you'll be bound by database +query throughput! + +So far, on psycopg2, momoko pool "context switching" is about 4 times slower compared to +python thread contenxt switching when issueing queries that do nothing (SELECT 1). +On my laptop momoko.Pool can do only about 8000 qps, but simple consequtive threading +can do over 30,000. + +On psycopg2cffi, things are bit better - momoko.Pool only about 50% slower. +But that's only because psycopg2cffi is less performant. + +After chaning queries to actually do something (SELECT pg_sleep(0.002)) I was able to +actually measure thread (or Pool) overhead more accurately. + +This is the typical run: + +Threads(1): 27.04 seconds +Pool(1): 28.72 seconds +Threads(10): 2.34 seconds +Pool(10): 2.67 seconds +Threads(100): 0.62 seconds +Pool(100): 1.38 seconds +Threads(1000): 1.16 seconds +Pool(1000): 1.50 seconds + +Looks like threads are at their best when their number is about 100. +At concurrency of 1000, threads get 87% penalty while pool takes only 8%. +""" + + +class MomokoPoolPerformanceTest(PoolBaseTest): + pool_size = 1 + amount = 10000 + query = "SELECT pg_sleep(0.002)" + + def run_thread_queries(self, amount=amount, thread_num=1): + conns = [] + for i in range(thread_num): + conn = psycopg2.connect(good_dsn) + conns.append(conn) + + def runner(conn): + for i in range(int(amount/thread_num)): + with conn.cursor() as cur: + cur.execute(self.query) + cur.fetchall() + + start = time.time() + threads = [] + for conn in conns: + thread = threading.Thread(target=runner, args=(conn,)) + thread.start() + threads.append(thread) + for thread in threads: + thread.join() + delta = time.time() - start + for conn in conns: + conn.close() + return delta + + def run_pool_queries(self, amount=amount, thread_num=1): + + db = self.build_pool_sync(dsn=good_dsn, size=thread_num) + + start = time.time() + + def runner(x): + futures = [] + for j in range(amount): + futures.append(db.execute(self.query)) + yield futures + + gen_test(timeout=300)(runner)(self) + delta = time.time() - start + db.close() + return delta + + def test_perf(self): + print("\n") + for threads in (1, 10, 100, 1000): + print("Threads(%s): %.2f seconds" % (threads, self.run_thread_queries(thread_num=threads))) + print("Pool(%s): %.2f seconds" % (threads, self.run_pool_queries(thread_num=threads))) + + +if __name__ == '__main__': + if debug: + FORMAT = '%(asctime)-15s %(levelname)s:%(name)s %(funcName)-15s: %(message)s' + logging.basicConfig(format=FORMAT) + logging.getLogger("momoko").setLevel(logging.DEBUG) + logging.getLogger("unittest").setLevel(logging.DEBUG) + unittest.main() diff --git a/setup.py b/setup.py index 1cbb315..b1182bd 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ from distutils.core import setup, Extension, Command -dependencies = ['tornado'] +dependencies = ['tornado >= 4.0'] psycopg2_impl = os.environ.get('MOMOKO_PSYCOPG2_IMPL', 'psycopg2') if psycopg2_impl == 'psycopg2cffi': @@ -44,6 +44,7 @@ license='MIT', test_suite='tests', install_requires=dependencies, + test_require=dependencies + ["unittest2"], classifiers = [ 'Development Status :: 5 - Production/Stable', 'Intended Audience :: Developers', diff --git a/tests.py b/tests.py index 9efc786..4c9a4cf 100644 --- a/tests.py +++ b/tests.py @@ -1,17 +1,24 @@ +from __future__ import print_function + import os import string import random import time -import unittest from collections import deque +from itertools import chain +import inspect +import logging from tornado import gen -from tornado.testing import AsyncTestCase +from tornado.testing import unittest, AsyncTestCase, gen_test import sys if sys.version_info[0] >= 3: unicode = str +log = logging.getLogger("unittest") + +debug = os.environ.get('MOMOKO_TEST_DEBUG', None) db_database = os.environ.get('MOMOKO_TEST_DB', 'momoko_test') db_user = os.environ.get('MOMOKO_TEST_USER', 'postgres') db_password = os.environ.get('MOMOKO_TEST_PASSWORD', '') @@ -31,7 +38,6 @@ 'variables: MOMOKO_TEST_DB, MOMOKO_TEST_USER, MOMOKO_TEST_PASSWORD, ' 'MOMOKO_TEST_HOST, MOMOKO_TEST_PORT') - psycopg2_impl = os.environ.get('MOMOKO_PSYCOPG2_IMPL', 'psycopg2') if psycopg2_impl == 'psycopg2cffi': @@ -43,6 +49,7 @@ import momoko +import momoko.exceptions import psycopg2 from psycopg2.extras import RealDictConnection, RealDictCursor, NamedTupleCursor @@ -51,467 +58,551 @@ class BaseTest(AsyncTestCase): - pool_size = 3 - max_size = None - raise_connect_errors = True dsn = good_dsn - def __init__(self, *args, **kwargs): - self.assert_equal = self.assertEqual - self.assert_raises = self.assertRaises - self.assert_is_instance = lambda object, classinfo: self.assertTrue(isinstance(object, classinfo)) - super(BaseTest, self).__init__(*args, **kwargs) - - if not hasattr(AsyncTestCase, "assertLess"): - def assertLess(self, a, b, msg): - return self.assertTrue(a < b, msg=msg) + # This is a hack to overcome lack of "yield from" in Python < 3.3. + # The goal is to support several set_up methods in inheriatnace chain + # So we just name them set_up_X and run them sequentially. + # Our heirs needs to define them carefully to not to step on over other, + # but its good enough for unit tests. + # TIP: Use set_up_X where X is between 10 and 99. X80 Basic is back :) + def get_methods(self, starting_with, reverse=False): + methods = [] + members = inspect.getmembers(self, predicate=inspect.ismethod) + members = sorted(members, key=lambda m: m[0], reverse=reverse) + for m in members: + name, method = m + if name.startswith(starting_with): + methods.append(method) + return methods def setUp(self): super(BaseTest, self).setUp() - self.set_up() + for method in self.get_methods("set_up"): + method() def tearDown(self): - self.tear_down() + for method in self.get_methods("tear_down", reverse=True): + method() super(BaseTest, self).tearDown() - def set_up(self): - pass + def build_transaction_query(self, ucode=False): + return ( + unicode('SELECT 1, 2, 3, 4;') if ucode else 'SELECT 1, 2, 3, 4;', + unicode('SELECT 5, 6, 7, 8;') if ucode else 'SELECT 5, 6, 7, 8;', + 'SELECT 9, 10, 11, 12;', + ('SELECT %s+10, %s+10, %s+10, %s+10;', (3, 4, 5, 6)), + 'SELECT 17, 18, 19, 20;', + ('SELECT %s+20, %s+20, %s+20, %s+20;', (1, 2, 3, 4)), + ) + + def compare_transaction_cursors(self, cursors): + self.assertEqual(len(cursors), 6) + self.assertEqual(cursors[0].fetchone(), (1, 2, 3, 4)) + self.assertEqual(cursors[1].fetchone(), (5, 6, 7, 8)) + self.assertEqual(cursors[2].fetchone(), (9, 10, 11, 12)) + self.assertEqual(cursors[3].fetchone(), (13, 14, 15, 16)) + self.assertEqual(cursors[4].fetchone(), (17, 18, 19, 20)) + self.assertEqual(cursors[5].fetchone(), (21, 22, 23, 24)) + + +class BaseDataTest(BaseTest): + def clean_db(self): + yield self.conn.execute("DROP TABLE IF EXISTS unit_test_large_query;") + yield self.conn.execute("DROP TABLE IF EXISTS unit_test_transaction;") + yield self.conn.execute("DROP TABLE IF EXISTS unit_test_int_table;") + yield self.conn.execute("DROP FUNCTION IF EXISTS unit_test_callproc(integer);") - def tear_down(self): - pass + def prepare_db(self): + yield self.conn.execute( + "CREATE TABLE unit_test_large_query (" + "id serial NOT NULL, name character varying, data text);" + ) + yield self.conn.execute( + "CREATE TABLE unit_test_transaction (" + "id serial NOT NULL, name character varying, data text);", + ) + yield self.conn.execute("CREATE TABLE unit_test_int_table (id integer);") + yield self.conn.execute( + "CREATE OR REPLACE FUNCTION unit_test_callproc(n integer)\n" + "RETURNS integer AS $BODY$BEGIN\n" + "RETURN n*n;\n" + "END;$BODY$ LANGUAGE plpgsql VOLATILE;" + ) + + def fill_int_data(self, amount=1000): + return self.conn.transaction([ + "INSERT INTO unit_test_int_table VALUES %s" % ",".join("(%s)" % i for i in range(amount)), + ]) + + @gen_test + def set_up_10(self): + self.conn = yield momoko.connect(self.dsn, ioloop=self.io_loop) + for g in chain(self.clean_db(), self.prepare_db()): + yield g + + @gen_test + def tear_down_10(self): + for g in self.clean_db(): + yield g + + +class MomokoConnectionTest(BaseTest): + @gen_test + def test_connect(self): + """Test that Connection can connect to the database""" + conn = yield momoko.connect(good_dsn, ioloop=self.io_loop) + self.assertIsInstance(conn, momoko.Connection) + + @gen_test + def test_bad_connect(self): + """Test that Connection raises connection errors""" + try: + conn = yield momoko.connect(bad_dsn, ioloop=self.io_loop) + except Exception as error: + self.assertIsInstance(error, psycopg2.OperationalError) + + @gen_test + def test_bad_connect_local(self): + """Test that Connection raises connection errors when using local socket""" + try: + conn = yield momoko.connect(local_bad_dsn, ioloop=self.io_loop) + except Exception as error: + self.assertIsInstance(error, psycopg2.OperationalError) + + +class MomokoConnectionDataTest(BaseDataTest): + @gen_test + def test_execute(self): + """Testing simple SELECT""" + cursor = yield self.conn.execute("SELECT 1, 2, 3") + self.assertEqual(cursor.fetchall(), [(1, 2, 3)]) + + @gen_test + def test_large_query(self): + """Testing support for large queries""" + query_size = 100000 + chars = string.ascii_letters + string.digits + string.punctuation + + for n in range(5): + random_data = ''.join([random.choice(chars) for i in range(query_size)]) + cursor = yield self.conn.execute("INSERT INTO unit_test_large_query (data) VALUES (%s) " + "RETURNING data;", (random_data,)) + self.assertEqual(cursor.fetchone(), (random_data,)) + + cursor = yield self.conn.execute("SELECT COUNT(*) FROM unit_test_large_query;") + self.assertEqual(cursor.fetchone(), (5,)) + + @gen_test + def test_transaction(self): + """Testing transaction on standalone connection""" + cursors = yield self.conn.transaction(self.build_transaction_query()) + self.compare_transaction_cursors(cursors) + + @gen_test + def test_unicode_transaction(self): + """Testing transaction on standalone connection, as unicode string""" + cursors = yield self.conn.transaction(self.build_transaction_query(True)) + self.compare_transaction_cursors(cursors) + + @gen_test + def test_transaction_rollback(self): + """Testing transaction auto-rollback functionality""" + chars = string.ascii_letters + string.digits + string.punctuation + data = ''.join([random.choice(chars) for i in range(100)]) + + try: + yield self.conn.transaction(( + ("INSERT INTO unit_test_transaction (data) VALUES (%s);", (data,)), + "SELECT DOES NOT WORK!;" + ), auto_rollback=True) + except psycopg2.ProgrammingError: + pass + + cursor = yield self.conn.execute("SELECT COUNT(*) FROM unit_test_transaction;") + self.assertEqual(cursor.fetchone(), (0,)) + + @gen_test + def test_hstore(self): + """Testing hstore""" + if not test_hstore: + self.skipTest("skiping test as requested") + + yield self.conn.register_hstore() + + cursor = yield self.conn.execute("SELECT 'a=>b, c=>d'::hstore;") + self.assertEqual(cursor.fetchall(), [({"a": "b", "c": "d"},)]) + + cursor = yield self.conn.execute("SELECT %s;", ({'e': 'f', 'g': 'h'},)) + self.assertEqual(cursor.fetchall(), [({"e": "f", "g": "h"},)]) + + @gen_test + def test_json(self): + """Testing json""" + if not test_json: + self.skipTest("skiping test as requested") + if self.conn.server_version < 90400: + self.skipTest("skiping test - server too old. At least 9.4 is required") + + yield self.conn.register_json() + + cursor = yield self.conn.execute('SELECT \'{"a": "b", "c": "d"}\'::json;') + self.assertEqual(cursor.fetchall(), [({"a": "b", "c": "d"},)]) + + @gen_test + def test_callproc(self): + """Testing callproc""" + cursor = yield self.conn.callproc("unit_test_callproc", (64,)) + self.assertEqual(cursor.fetchone(), (4096,)) + + @gen_test + def test_query_error(self): + """Testing that execute method propages exception properly""" + try: + yield self.conn.execute('SELECT DOES NOT WORK!;') + except psycopg2.ProgrammingError: + pass + + @gen_test + def test_mogrify(self): + """Testing mogrify""" + sql = self.conn.mogrify("SELECT %s, %s;", ('\'"test"\'', "SELECT 1;")) + if self.conn.server_version < 90100: + self.assertEqual(sql, b"SELECT E'''\"test\"''', E'SELECT 1;';") + else: + self.assertEqual(sql, b"SELECT '''\"test\"''', 'SELECT 1;';") + + yield self.conn.execute(sql) + + def test_mogrify_error(self): + """Testing that mogrify propagates exception properly""" + try: + self.conn.mogrify("SELECT %(foos;", {"foo": "bar"}) + except psycopg2.ProgrammingError: + pass + + +class MomokoConnectionServerSideCursorTest(BaseDataTest): + @gen_test + def test_server_side_cursor(self): + """Testing server side cursors support""" + int_count = 1000 + offset = 0 + chunk = 10 + yield self.fill_int_data(int_count) + + yield self.conn.execute("BEGIN") + yield self.conn.execute("DECLARE all_ints CURSOR FOR SELECT * FROM unit_test_int_table") + while offset < int_count: + cursor = yield self.conn.execute("FETCH %s FROM all_ints", (chunk,)) + self.assertEqual(cursor.fetchall(), [(i, ) for i in range(offset, offset+chunk)]) + offset += chunk + yield self.conn.execute("CLOSE all_ints") + yield self.conn.execute("COMMIT") + + +class MomokoConnectionSetsessionTest(BaseTest): + @gen_test + def test_setsession(self): + """Testing that setssion parameter is honoured""" + setsession = deque([None, "SELECT 1", "SELECT 2"]) + time_zones = ["UTC", "Israel", "Australia/Melbourne"] + + for i in range(len(time_zones)): + setsession[i] = "SET TIME ZONE '%s'" % time_zones[i] + conn = yield momoko.connect(self.dsn, ioloop=self.io_loop, setsession=setsession) + cursor = yield conn.execute("SELECT current_setting('TIMEZONE');") + self.assertEqual(cursor.fetchall(), [(time_zones[i],)]) + conn.close() + setsession.rotate(1) + + +class MomokoConnectionFactoriesTest(BaseTest): + @gen_test + def test_cursor_factory(self): + """Testing that cursor_factory parameter is properly propagated""" + conn = yield momoko.connect(self.dsn, ioloop=self.io_loop, cursor_factory=RealDictCursor) + cursor = yield conn.execute("SELECT 1 AS a") + self.assertEqual(cursor.fetchone(), {"a": 1}) + + @gen_test + def test_connection_factory(self): + """Testing that connection_factory parameter is properly propagated""" + conn = yield momoko.connect(self.dsn, ioloop=self.io_loop, connection_factory=RealDictConnection) + cursor = yield conn.execute("SELECT 1 AS a") + self.assertEqual(cursor.fetchone(), {"a": 1}) - def stop_callback(self, result, error): - self.stop((result, error)) + @gen_test + def test_ping_with_named_cursor(self): + """Test whether Connection.ping works fine with named cursors. Issue #74""" + conn = yield momoko.connect(self.dsn, ioloop=self.io_loop, cursor_factory=RealDictCursor) + yield conn.ping() +# +# Pool tests +# - def run_gen(self, func): - func() - self.wait() - def wait_for_result(self): - cursor, error = self.wait() - if error: - raise error - return cursor +class PoolBaseTest(BaseTest): + pool_size = 3 + max_size = None + raise_connect_errors = False - def build_pool(self, dsn=None, setsession=[], con_factory=None, cur_factory=None): + def build_pool(self, dsn=None, setsession=(), con_factory=None, cur_factory=None, size=None): db = momoko.Pool( dsn=(dsn or self.dsn), - size=self.pool_size, + size=(size or self.pool_size), max_size=self.max_size, - callback=self.stop, ioloop=self.io_loop, setsession=setsession, raise_connect_errors=self.raise_connect_errors, connection_factory=con_factory, cursor_factory=cur_factory, ) - self.wait() - return db + return db.connect() + + def build_pool_sync(self, *args, **kwargs): + f = self.build_pool(*args, **kwargs) + + # could use gen_test(lambda x: (yield f))(self) + # but it does not work in Python 2.6 for some reason + def runner(x): + yield f + gen_test(timeout=30)(runner)(self) + return f.result() def kill_connections(self, db, amount=None): - amount = amount or len(db._conns.free) - for conn in db._conns.free: + amount = amount or (len(db.conns.free) + len(db.conns.busy)) + for conn in db.conns.free.union(db.conns.busy): if not amount: break if not conn.closed: conn.close() amount -= 1 + @gen_test def run_and_check_query(self, db): - db.execute('SELECT 6, 19, 24;', callback=self.stop_callback) - cursor = self.wait_for_result() - self.assert_equal(cursor.fetchall(), [(6, 19, 24)]) - - -class MomokoBaseTest(BaseTest): + cursor = yield db.execute("SELECT 6, 19, 24;") + self.assertEqual(cursor.fetchall(), [(6, 19, 24)]) - def set_up(self): - self.db = self.build_pool() + @gen_test + def set_up_20(self): + self.db = yield self.build_pool() - def tear_down(self): + @gen_test + def tear_down_00(self): # closing pool is the last thing that should run self.db.close() -class MomokoBaseDataTest(MomokoBaseTest): - def clean_db(self): - self.db.execute('DROP TABLE IF EXISTS unit_test_large_query;', - callback=self.stop_callback) - self.wait_for_result() - self.db.execute('DROP TABLE IF EXISTS unit_test_transaction;', - callback=self.stop_callback) - self.wait_for_result() - self.db.execute('DROP TABLE IF EXISTS unit_test_int_table;', - callback=self.stop_callback) - self.wait_for_result() - self.db.execute('DROP FUNCTION IF EXISTS unit_test_callproc(integer);', - callback=self.stop_callback) - self.wait_for_result() +class PoolBaseDataTest(PoolBaseTest, BaseDataTest): + pass - def prepare_db(self): - self.clean_db() - - self.db.execute( - 'CREATE TABLE unit_test_large_query (' - 'id serial NOT NULL, name character varying, data text);', - callback=self.stop_callback) - self.wait_for_result() - - self.db.execute( - 'CREATE TABLE unit_test_transaction (' - 'id serial NOT NULL, name character varying, data text);', - callback=self.stop_callback) - self.wait_for_result() - - self.db.execute( - 'CREATE TABLE unit_test_int_table (id integer);', - callback=self.stop_callback) - self.wait_for_result() - - self.db.execute( - 'CREATE OR REPLACE FUNCTION unit_test_callproc(n integer)\n' - 'RETURNS integer AS $BODY$BEGIN\n' - 'RETURN n*n;\n' - 'END;$BODY$ LANGUAGE plpgsql VOLATILE;', - callback=self.stop_callback) - self.wait_for_result() - def fill_int_data(self, amount=1000): - self.db.transaction([ - "INSERT INTO unit_test_int_table VALUES %s" % ",".join("(%s)" % i for i in range(amount)), - ], callback=self.stop_callback) - self.wait_for_result() +class MomokoPoolTest(PoolBaseTest): + @gen_test + def test_connect(self): + db = yield self.build_pool() + self.assertIsInstance(db, momoko.Pool) - def set_up(self): - super(MomokoBaseDataTest, self).set_up() - self.prepare_db() - def tear_down(self): - self.clean_db() - super(MomokoBaseDataTest, self).tear_down() +class MomokoPoolSetsessionTest(PoolBaseTest): + pool_size = 1 + @gen_test + def test_setsession(self): + """Testing that setssion parameter is honoured""" + setsession = deque([None, "SELECT 1", "SELECT 2"]) + time_zones = ["UTC", "Israel", "Australia/Melbourne"] -class MomokoTest(MomokoBaseDataTest): + for i in range(len(time_zones)): + setsession[i] = "SET TIME ZONE '%s'" % time_zones[i] + db = yield self.build_pool(setsession=setsession) + cursor = yield db.execute("SELECT current_setting('TIMEZONE');") + self.assertEqual(cursor.fetchall(), [(time_zones[i],)]) + db.close() + setsession.rotate(1) - def test_single_query(self): - """Testing single query""" - self.run_and_check_query(self.db) - def test_large_query(self): - """Testing support for large queries""" - query_size = 100000 - chars = string.ascii_letters + string.digits + string.punctuation +class MomokoPoolDataTest(PoolBaseDataTest, MomokoConnectionDataTest): + @gen_test + def set_up_30(self): + self.conn = self.db - for n in range(5): - random_data = ''.join([random.choice(chars) for i in range(query_size)]) - self.db.execute('INSERT INTO unit_test_large_query (data) VALUES (%s) ' - 'RETURNING data;', (random_data,), callback=self.stop_callback) - cursor = self.wait_for_result() - self.assert_equal(cursor.fetchone(), (random_data,)) - - self.db.execute('SELECT COUNT(*) FROM unit_test_large_query;', - callback=self.stop_callback) - cursor = self.wait_for_result() - self.assert_equal(cursor.fetchone(), (5,)) - - if test_hstore: - def test_hstore(self): - """Testing hstore""" - self.db.register_hstore(callback=self.stop_callback) - self.wait() - - self.db.execute('SELECT \'a=>b, c=>d\'::hstore;', callback=self.stop_callback) - cursor = self.wait_for_result() - self.assert_equal(cursor.fetchall(), [({'a': 'b', 'c': 'd'},)]) - - self.db.execute('SELECT %s;', ({'e': 'f', 'g': 'h'},), callback=self.stop_callback) - cursor = self.wait_for_result() - self.assert_equal(cursor.fetchall(), [({'e': 'f', 'g': 'h'},)]) - - if test_json: - def test_json(self): - """Testing json""" - self.db.register_json(callback=self.stop_callback) - self.wait() - - self.db.execute('SELECT \'{"a": "b", "c": "d"}\'::json;', callback=self.stop_callback) - cursor = self.wait_for_result() - self.assert_equal(cursor.fetchall(), [({'a': 'b', 'c': 'd'},)]) - - self.db.execute('SELECT %s;', ({'e': 'f', 'g': 'h'},), callback=self.stop_callback) - cursor = self.wait_for_result() - self.assert_equal(cursor.fetchall(), [({'e': 'f', 'g': 'h'},)]) - - def test_callproc(self): - """Testing callproc""" - self.db.callproc('unit_test_callproc', (64,), callback=self.stop_callback) - cursor = self.wait_for_result() - self.assert_equal(cursor.fetchone(), (4096,)) - - def test_query_error(self): - """Testing that execute method propages exception properly""" - self.db.execute('SELECT DOES NOT WORK!;', callback=self.stop_callback) - _, error = self.wait() - self.assert_is_instance(error, psycopg2.ProgrammingError) + def tear_down_30(self): + self.assertEqual(len(self.db.conns.busy), 0, msg="Some connections were not recycled") + # Pool's mogirify is async -> copy/paste + @gen_test def test_mogrify(self): """Testing mogrify""" - self.db.mogrify('SELECT %s, %s;', ('\'"test"\'', 'SELECT 1;'), - callback=self.stop_callback) - sql = self.wait_for_result() - if self.db.server_version < 90100: - self.assert_equal(sql, b'SELECT E\'\'\'"test"\'\'\', E\'SELECT 1;\';') + sql = yield self.conn.mogrify("SELECT %s, %s;", ('\'"test"\'', "SELECT 1;")) + if self.conn.server_version < 90100: + self.assertEqual(sql, b"SELECT E'''\"test\"''', E'SELECT 1;';") else: - self.assert_equal(sql, b'SELECT \'\'\'"test"\'\'\', \'SELECT 1;\';') + self.assertEqual(sql, b"SELECT '''\"test\"''', 'SELECT 1;';") - self.db.execute(sql, callback=self.stop_callback) - _, error = self.wait() - self.assert_equal(error, None) + yield self.conn.execute(sql) + # Pool's mogirify is async -> copy/paste + @gen_test def test_mogrify_error(self): - """Testing that mogri propagates exception properly""" - self.db.mogrify('SELECT %(foos;', {'foo': 'bar'}, - callback=self.stop_callback) - _, error = self.wait() - self.assert_is_instance(error, psycopg2.ProgrammingError) - - def build_transaction_query(self, ucode=False): - return ( - unicode('SELECT 1, 2, 3, 4;') if ucode else 'SELECT 1, 2, 3, 4;', - unicode('SELECT 5, 6, 7, 8;') if ucode else 'SELECT 5, 6, 7, 8;', - 'SELECT 9, 10, 11, 12;', - ('SELECT %s+10, %s+10, %s+10, %s+10;', (3, 4, 5, 6)), - 'SELECT 17, 18, 19, 20;', - ('SELECT %s+20, %s+20, %s+20, %s+20;', (1, 2, 3, 4)), - ) - - def compare_transaction_cursors(self, cursors): - self.assert_equal(len(cursors), 6) - self.assert_equal(cursors[0].fetchone(), (1, 2, 3, 4)) - self.assert_equal(cursors[1].fetchone(), (5, 6, 7, 8)) - self.assert_equal(cursors[2].fetchone(), (9, 10, 11, 12)) - self.assert_equal(cursors[3].fetchone(), (13, 14, 15, 16)) - self.assert_equal(cursors[4].fetchone(), (17, 18, 19, 20)) - self.assert_equal(cursors[5].fetchone(), (21, 22, 23, 24)) - - def test_transaction(self): - """Testing transaction functionality""" - self.db.transaction(self.build_transaction_query(), callback=self.stop_callback) - cursors = self.wait_for_result() - self.compare_transaction_cursors(cursors) - - def test_unicode_transaction(self): - """Testing transaction functionality""" - self.db.transaction(self.build_transaction_query(True), callback=self.stop_callback) - cursors = self.wait_for_result() - self.compare_transaction_cursors(cursors) - - def test_transaction_rollback(self): - """Testing transaction auto-rollback functionality""" - chars = string.ascii_letters + string.digits + string.punctuation - data = ''.join([random.choice(chars) for i in range(100)]) - - self.db.transaction(( - ('INSERT INTO unit_test_transaction (data) VALUES (%s);', (data,)), - 'SELECT DOES NOT WORK!;' - ), callback=self.stop_callback) - _, error = self.wait() - self.assert_is_instance(error, psycopg2.ProgrammingError) - - self.db.execute('SELECT COUNT(*) FROM unit_test_transaction;', - callback=self.stop_callback) - cursor = self.wait_for_result() - self.assert_equal(cursor.fetchone(), (0,)) - - def test_op(self): - """Testing Op""" - @gen.engine - def func(): - cursor = yield momoko.Op(self.db.execute, 'SELECT 1;') - self.assert_equal(cursor.fetchone(), (1,)) - self.stop() - - self.run_gen(func) - - def test_op_exception(self): - """Testing that Op propagates exception properly""" - @gen.engine - def func(): - cursor = yield momoko.Op(self.db.execute, 'SELECT DOES NOT WORK!;') - self.stop() - - self.assert_raises(psycopg2.ProgrammingError, self.run_gen, func) - - def test_op_early_exception(self): - """Testing that Op propagates early exceptions properly""" - @gen.engine - def func(): - cursor = yield momoko.Op(self.db.execute, 'SELECT %s FROM %s', ()) - self.stop() - - self.assert_raises(IndexError, self.run_gen, func) - self.assertFalse(self.db._conns.busy, msg="Busy connction was not returned to pool after exception") - - def test_wait_op(self): - """Testing WaitOp""" - @gen.engine - def func(): - self.db.execute('SELECT 1;', callback=(yield gen.Callback('q1'))) - cursor = yield momoko.WaitOp('q1') - self.assert_equal(cursor.fetchone(), (1,)) - self.stop() - - self.run_gen(func) - - def test_wait_op_exception(self): - """Testing that WaitOp propagates exception properly""" - @gen.engine - def func(): - self.db.execute('SELECT DOES NOT WORK!;', callback=(yield gen.Callback('q1'))) - cursor = yield momoko.WaitOp('q1') - self.stop() - - self.assert_raises(psycopg2.ProgrammingError, self.run_gen, func) - - def test_wait_all_ops(self): - """Testing WaitAllOps""" - @gen.engine - def func(): - self.db.execute('SELECT 1;', callback=(yield gen.Callback('q1'))) - self.db.execute('SELECT 2;', callback=(yield gen.Callback('q2'))) - self.db.execute('SELECT 3;', callback=(yield gen.Callback('q3'))) - - cursor1, cursor2, cursor3 = yield momoko.WaitAllOps(('q1', 'q2', 'q3')) - - self.assert_equal(cursor1.fetchone(), (1,)) - self.assert_equal(cursor2.fetchone(), (2,)) - self.assert_equal(cursor3.fetchone(), (3,)) - self.stop() - - self.run_gen(func) - - def test_wait_all_ops_exception(self): - """Testing that WaitAllOps propagates exception properly""" - @gen.engine - def func(): - self.db.execute('SELECT asfdsfe;', callback=(yield gen.Callback('q1'))) - self.db.execute('SELECT DOES NOT WORK!;', callback=(yield gen.Callback('q2'))) - self.db.execute('SELECT 1;', callback=(yield gen.Callback('q3'))) - - cursor1, cursor2, cursor3 = yield momoko.WaitAllOps(('q1', 'q2', 'q3')) - - self.stop() - - self.assert_raises(psycopg2.ProgrammingError, self.run_gen, func) + """Testing that mogrify propagates exception properly""" + try: + yield self.conn.mogrify("SELECT %(foos;", {"foo": "bar"}) + except psycopg2.ProgrammingError: + pass + @gen_test def test_transaction_with_reconnect(self): """Test whether transaction works after reconnect""" # Added result counting, since there was a bug in retry mechanism that caused # double-execution of query after reconnect self.kill_connections(self.db) - self.db.transaction(("INSERT INTO unit_test_int_table VALUES (1)",), - callback=self.stop_callback) - self.wait_for_result() - self.db.execute("SELECT COUNT(1) FROM unit_test_int_table", callback=self.stop_callback) - cursor = self.wait_for_result() - self.assert_equal(cursor.fetchall(), [(1,)]) + yield self.db.transaction(("INSERT INTO unit_test_int_table VALUES (1)",)) + cursor = yield self.db.execute("SELECT COUNT(1) FROM unit_test_int_table") + self.assertEqual(cursor.fetchall(), [(1,)]) + @gen_test def test_getconn_putconn(self): """Testing getconn/putconn functionality""" - for i in range(self.pool_size * 2): + for i in range(self.pool_size * 5): + # Run many times to check that connections get recycled properly + conn = yield self.db.getconn() + for j in range(10): + cursor = yield conn.execute("SELECT %s", (j,)) + self.assertEqual(cursor.fetchall(), [(j, )]) + self.db.putconn(conn) + + @gen_test + def test_getconn_putconn_with_reconnect(self): + """Testing getconn/putconn functionality with reconnect""" + for i in range(self.pool_size * 5): # Run many times to check that connections get recycled properly - self.db.getconn(callback=self.stop_callback) - connection = self.wait_for_result() + self.kill_connections(self.db) + conn = yield self.db.getconn() for j in range(10): - connection.execute("SELECT %s", (j,), callback=self.stop_callback) - cursor = self.wait_for_result() - self.assert_equal(cursor.fetchall(), [(j, )]) - self.db.putconn(connection) + cursor = yield conn.execute("SELECT %s", (j,)) + self.assertEqual(cursor.fetchall(), [(j, )]) + self.db.putconn(conn) + @gen_test def test_getconn_manage(self): - """Testing getconn + context manager functionality""" - for i in range(self.pool_size * 2): + """Testing getcontest_getconn_putconn_with_reconnectn + context manager functionality""" + for i in range(self.pool_size * 5): # Run many times to check that connections get recycled properly - self.db.getconn(callback=self.stop_callback) - connection = self.wait_for_result() - with self.db.manage(connection): + conn = yield self.db.getconn() + with self.db.manage(conn): for j in range(10): - connection.execute("SELECT %s", (j,), callback=self.stop_callback) - cursor = self.wait_for_result() - self.assert_equal(cursor.fetchall(), [(j, )]) + cursor = yield conn.execute("SELECT %s", (j,)) + self.assertEqual(cursor.fetchall(), [(j, )]) + @gen_test + def test_getconn_manage_with_exception(self): + """Testing getconn + context manager functionality + deliberate exception""" + self.kill_connections(self.db) + conn = yield self.db.getconn(ping=False) + with self.db.manage(conn): + try: + cursor = yield conn.execute("SELECT 1") + except psycopg2.Error as error: + pass + self.assertEqual(len(self.db.conns.busy), 0, msg="Some connections were not recycled") -class MomokoServerSideCursorTest(MomokoBaseDataTest): - def execute(self, connection, query, params=()): - connection.execute(query, params, callback=self.stop_callback) - return self.wait_for_result() +class MomokoPoolServerSideCursorTest(PoolBaseDataTest): + @gen_test def test_server_side_cursor(self): """Testing server side cursors support""" int_count = 1000 offset = 0 chunk = 10 - self.fill_int_data(int_count) + yield self.fill_int_data(int_count) - self.db.getconn(callback=self.stop_callback) - connection = self.wait_for_result() - with self.db.manage(connection): - self.execute(connection, "BEGIN") - self.execute(connection, "DECLARE all_ints CURSOR FOR SELECT * FROM unit_test_int_table") + conn = yield self.db.getconn() + with self.db.manage(conn): + yield conn.execute("BEGIN") + yield conn.execute("DECLARE all_ints CURSOR FOR SELECT * FROM unit_test_int_table") while offset < int_count: - cursor = self.execute(connection, "FETCH %s FROM all_ints", (chunk,)) - self.assert_equal(cursor.fetchall(), [(i, ) for i in range(offset, offset+chunk)]) + cursor = yield conn.execute("FETCH %s FROM all_ints", (chunk,)) + self.assertEqual(cursor.fetchall(), [(i, ) for i in range(offset, offset+chunk)]) offset += chunk - self.execute(connection, "CLOSE all_ints") - self.execute(connection, "COMMIT") + yield conn.execute("CLOSE all_ints") + yield conn.execute("COMMIT") -class MomokoParallelTest(MomokoBaseTest): - def test_parallel_queries(self, jobs=None): - """Testing that pool queries database in parallel""" - sleep_time = 2 +class MomokoPoolFactoriesTest(PoolBaseTest): + @gen_test + def test_cursor_factory(self): + """Testing that cursor_factory parameter is properly propagated""" + db = yield self.build_pool(cur_factory=RealDictCursor) + cursor = yield db.execute("SELECT 1 AS a") + self.assertEqual(cursor.fetchone(), {"a": 1}) + + @gen_test + def test_connection_factory(self): + """Testing that connection_factory parameter is properly propagated""" + db = yield self.build_pool(con_factory=RealDictConnection) + cursor = yield db.execute("SELECT 1 AS a") + self.assertEqual(cursor.fetchone(), {"a": 1}) + - @gen.engine - def func(): - qnum = jobs or max(self.pool_size, self.max_size if self.max_size else 0) - for i in range(qnum): - self.db.execute('SELECT pg_sleep(%s);' % sleep_time, - callback=(yield gen.Callback('q%s' % i))) +class MomokoPoolParallelTest(PoolBaseTest): + pool_size = 1 - yield momoko.WaitAllOps(["q%s" % i for i in range(qnum)]) - self.stop() + def run_parallel_queries(self, jobs=None): + """Testing that pool queries database in parallel""" + jobs = jobs or max(self.pool_size, self.max_size if self.max_size else 0) + query_sleep = 2 + sleep_time = query_sleep * float(jobs/self.pool_size) + + def func(self): + to_yield = [] + for i in range(jobs): + to_yield.append(self.db.execute('SELECT pg_sleep(%s);' % query_sleep)) + yield to_yield start_time = time.time() - self.run_gen(func) + gen_test(func)(self) execution_time = time.time() - start_time self.assertLess(execution_time, sleep_time*1.10, msg="Query execution was too long") + def test_parallel_queries(self): + """Testing that pool queries database in parallel""" + self.run_parallel_queries() + # and once again to test that connections can be reused properly + self.run_parallel_queries() + + def test_request_queueing(self): + """Test that pool queues outstaning requests when all connections are busy""" + self.run_parallel_queries(self.pool_size*2) + def test_parallel_queries_after_reconnect_all(self): """Testing that pool still queries database in parallel after ALL connections were killed""" self.kill_connections(self.db) - self.test_parallel_queries() + self.run_parallel_queries() def test_parallel_queries_after_reconnect_some(self): """Testing that pool still queries database in parallel after SOME connections were killed""" - self.kill_connections(self.db) self.kill_connections(self.db, amount=self.pool_size/2) - self.test_parallel_queries() + self.run_parallel_queries() -class MomokoStretchTest(MomokoParallelTest): +class MomokoPoolStretchTest(MomokoPoolParallelTest): pool_size = 1 max_size = 5 def test_parallel_queries(self): """Run parallel queies and check that pool size matches number of jobs""" jobs = self.max_size - 1 - super(MomokoStretchTest, self).test_parallel_queries(jobs) - self.assert_equal(self.db._conns.total, jobs) + super(MomokoPoolStretchTest, self).run_parallel_queries(jobs) + self.assertEqual(self.db.conns.total, jobs) def test_dont_stretch(self): """Testing that we do not stretch unless needed""" self.run_and_check_query(self.db) - self.assert_equal(self.db._conns.total, self.pool_size) + self.run_and_check_query(self.db) + self.run_and_check_query(self.db) + self.assertEqual(self.db.conns.total, self.pool_size+1) def test_dont_stretch_after_reconnect(self): """Testing that reconnecting dead connection does not trigger pool stretch""" @@ -523,116 +614,116 @@ def test_stretch_after_disonnect(self): self.kill_connections(self.db) self.test_parallel_queries() + @gen_test def test_stretch_genconn(self): """Testing that stretch works together with get/putconn""" - @gen.engine - def func(): - self.db.getconn(callback=(yield gen.Callback('q1'))) - self.db.getconn(callback=(yield gen.Callback('q2'))) - self.db.getconn(callback=(yield gen.Callback('q3'))) - - conn1, conn2, conn3 = yield momoko.WaitAllOps(('q1', 'q2', 'q3')) + f1 = self.db.getconn() + f2 = self.db.getconn() + f3 = self.db.getconn() + yield [f1, f2, f3] - conn1.execute('SELECT 1;', callback=(yield gen.Callback('q1'))) - conn2.execute('SELECT 2;', callback=(yield gen.Callback('q2'))) - conn3.execute('SELECT 3;', callback=(yield gen.Callback('q3'))) + conn1 = f1.result() + conn2 = f2.result() + conn3 = f3.result() - cursor1, cursor2, cursor3 = yield momoko.WaitAllOps(('q1', 'q2', 'q3')) + f1 = conn1.execute("SELECT 1;") + f2 = conn2.execute("SELECT 2;") + f3 = conn3.execute("SELECT 3;") - self.assert_equal(cursor1.fetchone(), (1,)) - self.assert_equal(cursor2.fetchone(), (2,)) - self.assert_equal(cursor3.fetchone(), (3,)) + yield [f1, f2, f3] - for conn in conn1, conn2, conn3: - self.db.putconn(conn) + cursor1 = f1.result() + cursor2 = f2.result() + cursor3 = f3.result() - self.stop() + self.assertEqual(cursor1.fetchone(), (1,)) + self.assertEqual(cursor2.fetchone(), (2,)) + self.assertEqual(cursor3.fetchone(), (3,)) - self.run_gen(func) - self.assert_equal(self.db._conns.total, 3) + for conn in conn1, conn2, conn3: + self.db.putconn(conn) - -class MomokoSetsessionTest(BaseTest): - pool_size = 1 - - def test_setsession(self): - """Testing that setssion parameter is honoured""" - setsession = deque([None, "SELECT 1", "SELECT 2"]) - time_zones = ["UTC", "Israel", "Europe/London"] - - for i in range(len(time_zones)): - setsession[i] = "SET TIME ZONE '%s'" % time_zones[i] - db = self.build_pool(setsession=setsession) - db.execute("SELECT current_setting('TIMEZONE');", callback=self.stop_callback) - cursor = self.wait_for_result() - self.assert_equal(cursor.fetchall(), [(time_zones[i],)]) - db.close() - setsession.rotate(1) + self.assertEqual(self.db.conns.total, 3) -class MomokoVolatileDbTest(BaseTest): - raise_connect_errors = False +class MomokoPoolVolatileDbTest(PoolBaseTest): pool_size = 3 + @gen_test def test_startup(self): """Testing that all connections are dead after pool init with bad dsn""" - db = self.build_pool(dsn=bad_dsn) - self.assert_equal(self.pool_size, len(db._conns.dead)) + db = yield self.build_pool(dsn=bad_dsn) + self.assertEqual(self.pool_size, len(db.conns.dead)) + @gen_test def test_startup_local(self): """Testing that we catch early exeception with local connections""" - db = self.build_pool(dsn=local_bad_dsn) - self.assert_equal(self.pool_size, len(db._conns.dead)) + db = yield self.build_pool(dsn=local_bad_dsn) + self.assertEqual(self.pool_size, len(db.conns.dead)) def test_reconnect(self): - """Testing if we can reconnect if connections die""" - db = self.build_pool(dsn=good_dsn) + """Testing if we can reconnect if connections dies""" + db = self.build_pool_sync(dsn=good_dsn) self.kill_connections(db) self.run_and_check_query(db) def test_reconnect_interval_good_path(self): """Testing that we can recover if database was down during startup""" - db = self.build_pool(dsn=bad_dsn) - db.dsn = good_dsn - time.sleep(db._conns.reconnect_interval) + db = self.build_pool_sync(dsn=bad_dsn) + self.assertEqual(self.pool_size, len(db.conns.dead)) + for conn in db.conns.dead: + conn.dsn = good_dsn + time.sleep(db.reconnect_interval) self.run_and_check_query(db) def test_reconnect_interval_bad_path(self): """Testing that pool does not try to reconnect right after last connection attempt failed""" - db = self.build_pool(dsn=bad_dsn) + db = self.build_pool_sync(dsn=bad_dsn) + self.assertEqual(self.pool_size, len(db.conns.dead)) + for conn in db.conns.dead: + conn.dsn = good_dsn try: self.run_and_check_query(db) except psycopg2.DatabaseError: pass + @gen_test + def test_abort_waiting_queue(self): + """Testing that waiting queue is aborted properly when all connections are dead""" + db = yield self.build_pool(dsn=good_dsn, size=1) + f1 = db.execute("SELECT 1") + f2 = db.execute("SELECT 1") -class MomokoFactoriesTest(BaseTest): - def run_and_check_dict(self, db): - db.execute("SELECT 1 AS a", callback=self.stop_callback) - cursor = self.wait_for_result() - self.assert_equal(cursor.fetchone(), {"a": 1}) + self.assertEqual(len(db.conns.waiting_queue), 1) - def test_cursor_factory(self): - """Testing that cursor_factory parameter is properly propagated""" - if psycopg2_impl == "psycopg2ct": - # Known bug: https://github.com/mvantellingen/psycopg2-ctypes/issues/31 - return - db = self.build_pool(cur_factory=RealDictCursor) - self.run_and_check_dict(db) + def total_kill(f): + self.kill_connections(db) + for conn in db.conns.dead: + conn.dsn = bad_dsn - def test_connection_factory(self): - """Testing that connection_factory parameter is properly propagated""" - db = self.build_pool(con_factory=RealDictConnection) - self.run_and_check_dict(db) + f1.add_done_callback(total_kill) + + try: + yield [f1, f2] + except psycopg2.DatabaseError: + pass + self.assertEqual(len(db.conns.waiting_queue), 0) + + +class MomokoPoolPartiallyConnectedTest(PoolBaseTest): + raise_connect_errors = True + pool_size = 3 - def test_connection_manager_with_named_cursor(self): - """Test whether connection pinger works fine with named cursors. Issue #74""" - #db = self.build_pool() - db = self.build_pool(cur_factory=NamedTupleCursor) - db.getconn(callback=self.stop_callback) - connection = self.wait_for_result() - db.putconn(connection) + def test_partially_connected(self): + """Test that PartiallyConnected is raised properly""" + exp = momoko.exceptions.PartiallyConnectedError + self.assertRaises(exp, self.build_pool_sync, dsn=bad_dsn) if __name__ == '__main__': + if debug: + FORMAT = '%(asctime)-15s %(levelname)s:%(name)s %(funcName)-15s: %(message)s' + logging.basicConfig(format=FORMAT) + logging.getLogger("momoko").setLevel(logging.DEBUG) + logging.getLogger("unittest").setLevel(logging.DEBUG) unittest.main()