Skip to content
This repository has been archived by the owner on Sep 24, 2022. It is now read-only.

Commit

Permalink
Added documentation and cleaned up.
Browse files Browse the repository at this point in the history
  • Loading branch information
FSX committed Jul 22, 2012
1 parent 86b1365 commit ea4d79a
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 28 deletions.
3 changes: 2 additions & 1 deletion ChangeLog
Original file line number Original file line Diff line number Diff line change
@@ -1,9 +1,10 @@
News/Changelog News/Changelog
============== ==============


0.4.1 (2012-01-??) 0.5.1 (2012-07-??)
------------------ ------------------


* Refactored connection pool and connection polling.
* Just pass all unspecified arguments to ``BlockingPool`` and ``AsyncPool``. So * Just pass all unspecified arguments to ``BlockingPool`` and ``AsyncPool``. So
``connection_factory`` can be used again. ``connection_factory`` can be used again.


Expand Down
70 changes: 43 additions & 27 deletions momoko/pools.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
""" """


import logging import logging
import functools from functools import partial
from contextlib import contextmanager from contextlib import contextmanager


import psycopg2 import psycopg2
Expand Down Expand Up @@ -162,7 +162,7 @@ def __init__(self, min_conn=1, max_conn=20, cleanup_timeout=10,
cleanup_timeout * 1000) cleanup_timeout * 1000)
self._cleaner.start() self._cleaner.start()


def _new_conn(self, new_cursor_args={}): def _new_conn(self, new_cursor_args=[]):
"""Create a new connection. """Create a new connection.
If `new_cursor_args` is provided a new cursor is created when the If `new_cursor_args` is provided a new cursor is created when the
Expand All @@ -173,50 +173,43 @@ def _new_conn(self, new_cursor_args={}):
if len(self._pool) > self.max_conn: if len(self._pool) > self.max_conn:
raise PoolError('connection pool exausted') raise PoolError('connection pool exausted')
conn = AsyncConnection(self._ioloop) conn = AsyncConnection(self._ioloop)
callbacks = [functools.partial(self._pool.append, conn)]


if new_cursor_args: if new_cursor_args:
new_cursor_args['connection'] = conn new_cursor_args.append(conn)
new_cursor = functools.partial(self.new_cursor, **new_cursor_args) callbacks = [
callbacks.append(new_cursor) partial(self._pool.append, conn),
partial(self.new_cursor, *new_cursor_args)]
else:
callbacks = [partial(self._pool.append, conn)]


conn.open(callbacks, *self._args, **self._kwargs) conn.open(callbacks, *self._args, **self._kwargs)


def new_cursor(self, function, func_args=(), callback=None, connection=None): def new_cursor(self, function, args=(), callback=None, connection=None):
"""Create a new cursor. """Create a new cursor.
If there's no connection available, a new connection will be created and If there's no connection available, a new connection will be created and
`new_cursor` will be called again after the connection has been made. `new_cursor` will be called again after the connection has been made.
:param function: ``execute``, ``executemany`` or ``callproc``. :param function: ``execute``, ``executemany`` or ``callproc``.
:param func_args: A tuple with the arguments for the specified function. :param args: A tuple with the arguments for the specified function.
:param callback: A callable that is executed once the operation is done. :param callback: A callable that is executed once the operation is done.
""" """
if not connection: if connection is None:
connection = self._get_free_conn() connection = self._get_free_conn()
if not connection: if connection is None:
self._new_conn({ self._new_conn([function, args, callback])
'function': function,
'func_args': func_args,
'callback': callback
})
return return


try: try:
connection.exec_cursor(function, func_args, callback) connection.cursor(function, args, callback)
except (DatabaseError, InterfaceError): except (DatabaseError, InterfaceError): # Recover from lost connection
# Recover from lost connection
logging.warning('Requested connection was closed') logging.warning('Requested connection was closed')
self._pool.remove(connection) self._pool.remove(connection)
connection = self._get_free_conn() connection = self._get_free_conn()
if not connection: if connection is None:
self._new_conn({ self._new_conn([function, args, callback])
'function': function,
'func_args': func_args,
'callback': callback
})
else: else:
self.new_cursor(function, func_args, callback, connection) self.new_cursor(function, args, callback, connection)


def _get_free_conn(self): def _get_free_conn(self):
"""Look for a free connection and return it. """Look for a free connection and return it.
Expand Down Expand Up @@ -264,24 +257,45 @@ class PoolError(Exception):




class AsyncConnection(object): class AsyncConnection(object):
"""An asynchronous connection object.
:param ioloop: An instance of Tornado's IOLoop.
"""
def __init__(self, ioloop): def __init__(self, ioloop):
self._conn = None self._conn = None
self._fileno = -1 self._fileno = -1
self._ioloop = ioloop self._ioloop = ioloop
self._callbacks = [] self._callbacks = []


def open(self, callbacks, *args, **kwargs): def open(self, callbacks, *args, **kwargs):
"""Open the connection to the database,
:param host: The database host address (defaults to UNIX socket if not provided)
:param port: The database host port (defaults to 5432 if not provided)
:param database: The database name
:param user: User name used to authenticate
:param password: Password used to authenticate
:param connection_factory: Using the connection_factory parameter a different
class or connections factory can be specified. It
should be a callable object taking a dsn argument.
"""
self._conn = psycopg2.connect(async=1, *args, **kwargs) self._conn = psycopg2.connect(async=1, *args, **kwargs)
self._fileno = self._conn.fileno() self._fileno = self._conn.fileno()
self._callbacks = callbacks self._callbacks = callbacks


# Connection state should be 2 (write) # Connection state should be 2 (write)
self._ioloop.add_handler(self._fileno, self._io_callback, IOLoop.WRITE) self._ioloop.add_handler(self._fileno, self._io_callback, IOLoop.WRITE)


def exec_cursor(self, function, args, callback): def cursor(self, function, args, callback):
"""Get a cursor and execute the requested function
:param function: ``execute``, ``executemany`` or ``callproc``.
:param args: A tuple with the arguments for the specified function.
:param callback: A callable that is executed once the operation is done.
"""
cursor = self._conn.cursor() cursor = self._conn.cursor()
getattr(cursor, function)(*args) getattr(cursor, function)(*args)
self._callbacks = [functools.partial(callback, cursor)] self._callbacks = [partial(callback, cursor)]


# Connection state should be 1 (write) # Connection state should be 1 (write)
self._ioloop.update_handler(self._fileno, IOLoop.READ) self._ioloop.update_handler(self._fileno, IOLoop.READ)
Expand All @@ -298,6 +312,8 @@ def _io_callback(self, fd, events):
self._ioloop.update_handler(self._fileno, IOLoop.WRITE) self._ioloop.update_handler(self._fileno, IOLoop.WRITE)


def close(self): def close(self):
"""Close connection.
"""
self._ioloop.remove_handler(self._fileno) self._ioloop.remove_handler(self._fileno)
return self._conn.close() return self._conn.close()


Expand Down

0 comments on commit ea4d79a

Please sign in to comment.