Permalink
Browse files

Merge pull request #27 from aleksj/master

Transactions + assorted improvements.
  • Loading branch information...
FSX committed Aug 20, 2012
2 parents 0f041d9 + 29a6979 commit 3e487d0be21ad3b0c739cc63926a935c97dfdf52
Showing with 203 additions and 72 deletions.
  1. +6 −0 .gitignore
  2. +1 −0 README.rst
  3. +2 −2 docs/_themes/nature-mod/layout.html
  4. +2 −2 docs/installation.rst
  5. +42 −10 momoko/clients.py
  6. +63 −51 momoko/pools.py
  7. +59 −7 momoko/utils.py
  8. +28 −0 tests/async_client.py
View
@@ -9,3 +9,9 @@ docs/_build/
*.sublime-workspace
*.bak
modules/
+.idea
+setup.sh
+
+*.pth
+
+tests/database.cfg
View
@@ -19,3 +19,4 @@ With pip::
Or manually::
python setup.py install
+
@@ -39,7 +39,7 @@
VERSION: '{{ release|e }}',
COLLAPSE_INDEX: false,
FILE_SUFFIX: '{{ '' if no_search_suffix else file_suffix }}',
- HAS_SOURCE: {{ has_source|lower }}
+ HAS_SOURCE: '{{ has_source|lower }}'
};
</script>
{%- for scriptfile in script_files %}
@@ -111,7 +111,7 @@ <h3>{{ _('Navigation') }}</h3>
<li>
{%- if not loop.first %}{{ reldelim2 }}{% endif %}
<a href="{{ pathto(rellink[0]) }}" title="{{ rellink[1]|striptags|e }}"
- {{ accesskey(rellink[2]) }}>{{ rellink[3] }}</a>
+ {{ accesskey(rellink[2]) }}>{{ rellink[3] }}</a>
</li>
{%- endfor %}
</ul>
View
@@ -11,12 +11,12 @@ Psycopg2 must have support for asynchronous connections.
Momoko can be installed with *easy_install* or pip_::
- pip install momoko
+ pip install momoko
The latest sources can be cloned from the `Github repository`_ and
installed with::
- python setup.py install
+ python setup.py install
.. _Tornado: http://www.tornadoweb.org/
View
@@ -14,7 +14,7 @@
from contextlib import contextmanager
from .pools import AsyncPool, BlockingPool
-from .utils import BatchQuery, QueryChain
+from .utils import BatchQuery, QueryChain, TransactionChain
class BlockingClient(object):
@@ -26,6 +26,9 @@ class BlockingClient(object):
def __init__(self, settings):
self._pool = BlockingPool(**settings)
+ def __del__(self):
+ self._pool.close()
+
@property
@contextmanager
def connection(self):
@@ -50,8 +53,8 @@ def connection(self):
class AsyncClient(object):
"""The ``AsyncClient`` class is a wrapper for ``AsyncPool``, ``BatchQuery``
- and ``QueryChain``. It also provides the ``execute`` and ``callproc``
- functions.
+ ``TransactionChain'' and ``QueryChain``. It also provides the ``execute``
+ and ``callproc`` functions.
:param settings: A dictionary that is passed to the ``AsyncPool`` object.
"""
@@ -85,7 +88,33 @@ def batch(self, queries, callback=None, cursor_kwargs={}):
.. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
"""
- return BatchQuery(self, queries, callback)
+ return BatchQuery(self, queries, callback, cursor_kwargs)
+
+ def transaction(self, statements, callback=None, cursor_kwargs={}):
+ """Run a chain of statements in the given order using a single connection.
+ The statements will be wrapped between a "begin;" and a "commit;". The
+ connection will be unavailable while the chain is running.
+
+ A list/tuple with statements looks like this::
+
+ (
+ ['SELECT 42, 12, %s, 11;', (23,)],
+ 'SELECT 1, 2, 3, 4, 5;'
+ )
+
+ A statement with parameters is contained in a list: ``['some sql
+ here %s, %s', ('and some', 'parameters here')]``. A statement
+ without parameters doesn't need to be in a list.
+
+ :param statements: A tuple or list with all the statements.
+ :param callback: The function that needs to be executed once all the
+ queries are finished. Optional.
+ :param cursor_kwargs: A dictionary with Psycopg's `connection.cursor`_ arguments.
+ :return: A list with the resulting cursors.
+
+ .. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
+ """
+ return TransactionChain(self, statements, callback, cursor_kwargs)
def chain(self, queries, callback=None, cursor_kwargs={}):
"""Run a chain of queries in the given order.
@@ -97,9 +126,9 @@ def chain(self, queries, callback=None, cursor_kwargs={}):
'SELECT 1, 2, 3, 4, 5;'
)
- A query with paramaters is contained in a list: ``['some sql
- here %s, %s', ('and some', 'paramaters here')]``. A query
- without paramaters doesn't need to be in a list.
+ A query with parameters is contained in a list: ``['some sql
+ here %s, %s', ('and some', 'parameters here')]``. A query
+ without parameters doesn't need to be in a list.
:param queries: A tuple or list with all the queries.
:param callback: The function that needs to be executed once all the
@@ -109,9 +138,9 @@ def chain(self, queries, callback=None, cursor_kwargs={}):
.. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
"""
- return QueryChain(self, queries, callback)
+ return QueryChain(self, queries, callback, cursor_kwargs)
- def execute(self, operation, parameters=(), callback=None, cursor_kwargs={}):
+ def execute(self, operation, parameters=(), callback=None, cursor_kwargs={}, connection = None):
"""Prepare and execute a database operation (query or command).
Parameters may be provided as sequence or mapping and will be bound to
@@ -130,7 +159,10 @@ def execute(self, operation, parameters=(), callback=None, cursor_kwargs={}):
.. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
"""
- self._pool.new_cursor('execute', (operation, parameters), callback, cursor_kwargs)
+ if connection:
+ self._pool.new_cursor('execute', (operation, parameters), callback, cursor_kwargs, connection, transaction=True)
+ else:
+ self._pool.new_cursor('execute', (operation, parameters), callback, cursor_kwargs)
def callproc(self, procname, parameters=None, callback=None, cursor_kwargs={}):
"""Call a stored database procedure with the given name.
View
@@ -11,7 +11,7 @@
import logging
from functools import partial
-from contextlib import contextmanager
+import time
import psycopg2
from psycopg2 import DatabaseError, InterfaceError
@@ -30,8 +30,6 @@ class BlockingPool(object):
``PoolError`` exception is raised.
:param cleanup_timeout: Time in seconds between pool cleanups. Connections
will be closed until there are ``min_conn`` left.
- :param host: The database host address (defaults to UNIX socket if not provided)
- :param port: The database host port (defaults to 5432 if not provided)
:param database: The database name
:param user: User name used to authenticate
:param password: Password used to authenticate
@@ -63,7 +61,7 @@ def _new_conn(self):
"""Create a new connection.
"""
if len(self._pool) > self.max_conn:
- raise PoolError('connection pool exausted')
+ raise PoolError('connection pool exhausted')
conn = psycopg2.connect(*self._args, **self._kwargs)
self._pool.append(conn)
@@ -103,9 +101,9 @@ def _clean_pool(self):
for conn in self._pool[:]:
if conn.status == STATUS_READY:
conn.close()
- conns = conns - 1
+ conns -= 1
self._pool.remove(conn)
- if conns == 0:
+ if not conns:
break
def close(self):
@@ -150,42 +148,69 @@ def __init__(self, min_conn=1, max_conn=20, cleanup_timeout=10,
self._ioloop = ioloop or IOLoop.instance()
self._args = args
self._kwargs = kwargs
+ self._last_reconnect = 0
self._pool = []
for i in range(self.min_conn):
self._new_conn()
+ self._last_reconnect = time.time()
# Create a periodic callback that tries to close inactive connections
if cleanup_timeout > 0:
self._cleaner = PeriodicCallback(self._clean_pool,
cleanup_timeout * 1000)
self._cleaner.start()
- def _new_conn(self, new_cursor_args=[]):
+ def _new_conn(self, callback=None, callback_args=[]):
"""Create a new connection.
- If `new_cursor_args` is provided a new cursor is created when the
- callback is executed.
-
- :param new_cursor_args: Arguments (dictionary) for a new cursor.
+ :param callback_args: Parameters for the callback - connection will be appended
+ to the parameters
"""
if len(self._pool) > self.max_conn:
- raise PoolError('connection pool exausted')
- conn = AsyncConnection(self._ioloop)
-
- if new_cursor_args:
- new_cursor_args.append(conn)
- callbacks = [
- partial(self._pool.append, conn),
- partial(self.new_cursor, *new_cursor_args)]
+ self._clean_pool()
+ if len(self._pool) > self.max_conn:
+ raise PoolError('connection pool exhausted')
+ timeout = self._last_reconnect + .25 # 1/4 second delay between reconnection
+ timenow = time.time()
+ if timenow > timeout or len(self._pool) <= self.min_conn:
+ self._last_reconnect = timenow
+ conn = AsyncConnection(self._ioloop)
+ callbacks = [partial(self._pool.append, conn)] # add new connection to the pool
+ if callback:
+ callbacks.append(partial(callback, *(callback_args+[conn])))
+
+ conn.open(callbacks, *self._args, **self._kwargs)
+ else:
+ # recursive timeout call, retaining the parameters
+ self._ioloop.add_timeout(timeout,partial(self._new_conn,callback,callback_args))
+
+ def _get_free_conn(self):
+ """Look for a free connection and return it.
+
+ `None` is returned when no free connection can be found.
+ """
+ if self.closed:
+ raise PoolError('connection pool is closed')
+ for conn in self._pool:
+ if not conn.isexecuting():
+ return conn
+ return None
+
+ def get_connection(self, callback = None, callback_args=[]):
+ """Get a connection, trying available ones, and if not available - create a new one;
+
+ Afterwards, the callback will be called
+ """
+ connection = self._get_free_conn()
+ if connection is None:
+ self._new_conn(callback,callback_args)
else:
- callbacks = [partial(self._pool.append, conn)]
+ callback(*(callback_args+[connection]))
- conn.open(callbacks, *self._args, **self._kwargs)
- def new_cursor(self, function, function_args=(), callback=None, cursor_kwargs={},
- connection=None):
+ def new_cursor(self, function, function_args=(), callback=None, cursor_kwargs={}, connection=None, transaction=False):
"""Create a new cursor.
If there's no connection available, a new connection will be created and
@@ -199,35 +224,20 @@ def new_cursor(self, function, function_args=(), callback=None, cursor_kwargs={}
.. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
"""
- if connection is None:
- connection = self._get_free_conn()
- if connection is None:
- self._new_conn([function, function_args, callback, cursor_kwargs])
+ if connection is not None:
+ try:
+ connection.cursor(function, function_args, callback, cursor_kwargs)
return
+ except (DatabaseError, InterfaceError): # Recover from lost connection
+ logging.warning('Requested connection was closed')
+ self._pool.remove(connection)
- try:
- connection.cursor(function, function_args, callback, cursor_kwargs)
- except (DatabaseError, InterfaceError): # Recover from lost connection
- logging.warning('Requested connection was closed')
- self._pool.remove(connection)
- connection = self._get_free_conn()
- if connection is None:
- self._new_conn([function, function_args, callback, cursor_kwargs])
- else:
- self.new_cursor(function, function_args, callback,
- connection, cursor_kwargs)
-
- def _get_free_conn(self):
- """Look for a free connection and return it.
+ # if no connection, or if exception caught
+ if not transaction:
+ self.get_connection(callback=self.new_cursor, callback_args=[function, function_args, callback, cursor_kwargs])
+ else:
+ raise TransactionError
- `None` is returned when no free connection can be found.
- """
- if self.closed:
- raise PoolError('connection pool is closed')
- for conn in self._pool:
- if not conn.isexecuting():
- return conn
- return None
def _clean_pool(self):
"""Close a number of inactive connections when the number of connections
@@ -240,9 +250,9 @@ def _clean_pool(self):
for conn in self._pool[:]:
if not conn.isexecuting():
conn.close()
- conns = conns - 1
+ conns -= 1
self._pool.remove(conn)
- if conns == 0:
+ if not conns:
break
def close(self):
@@ -261,6 +271,8 @@ def close(self):
class PoolError(Exception):
pass
+class TransactionError(Exception):
+ pass
class AsyncConnection(object):
"""An asynchronous connection object.
Oops, something went wrong.

0 comments on commit 3e487d0

Please sign in to comment.