Browse files

transactions

  • Loading branch information...
1 parent f965bc2 commit 29a69791c944e07a22e502adc96f3f40514f0a0a @eggspurt eggspurt committed Aug 19, 2012
Showing with 103 additions and 57 deletions.
  1. +1 −0 README.rst
  2. +2 −2 docs/_themes/nature-mod/layout.html
  3. +2 −2 docs/installation.rst
  4. +10 −5 momoko/clients.py
  5. +44 −36 momoko/pools.py
  6. +17 −12 momoko/utils.py
  7. +27 −0 tests/async_client.py
View
1 README.rst
@@ -19,3 +19,4 @@ With pip::
Or manually::
python setup.py install
+
View
4 docs/_themes/nature-mod/layout.html
@@ -39,7 +39,7 @@
VERSION: '{{ release|e }}',
COLLAPSE_INDEX: false,
FILE_SUFFIX: '{{ '' if no_search_suffix else file_suffix }}',
- HAS_SOURCE: {{ has_source|lower }}
+ HAS_SOURCE: '{{ has_source|lower }}'
};
</script>
{%- for scriptfile in script_files %}
@@ -111,7 +111,7 @@
<li>
{%- if not loop.first %}{{ reldelim2 }}{% endif %}
<a href="{{ pathto(rellink[0]) }}" title="{{ rellink[1]|striptags|e }}"
- {{ accesskey(rellink[2]) }}>{{ rellink[3] }}</a>
+ {{ accesskey(rellink[2]) }}>{{ rellink[3] }}</a>
</li>
{%- endfor %}
</ul>
View
4 docs/installation.rst
@@ -11,12 +11,12 @@ Psycopg2 must have support for asynchronous connections.
Momoko can be installed with *easy_install* or pip_::
- pip install momoko
+ pip install momoko
The latest sources can be cloned from the `Github repository`_ and
installed with::
- python setup.py install
+ python setup.py install
.. _Tornado: http://www.tornadoweb.org/
View
15 momoko/clients.py
@@ -14,7 +14,7 @@
from contextlib import contextmanager
from .pools import AsyncPool, BlockingPool
-from .utils import BatchQuery, QueryChain
+from .utils import BatchQuery, QueryChain, TransactionChain
class BlockingClient(object):
@@ -53,8 +53,8 @@ def connection(self):
class AsyncClient(object):
"""The ``AsyncClient`` class is a wrapper for ``AsyncPool``, ``BatchQuery``
- and ``QueryChain``. It also provides the ``execute`` and ``callproc``
- functions.
+ ``TransactionChain'' and ``QueryChain``. It also provides the ``execute``
+ and ``callproc`` functions.
:param settings: A dictionary that is passed to the ``AsyncPool`` object.
"""
@@ -92,6 +92,8 @@ def batch(self, queries, callback=None, cursor_kwargs={}):
def transaction(self, statements, callback=None, cursor_kwargs={}):
"""Run a chain of statements in the given order using a single connection.
+ The statements will be wrapped between a "begin;" and a "commit;". The
+ connection will be unavailable while the chain is running.
A list/tuple with statements looks like this::
@@ -138,7 +140,7 @@ def chain(self, queries, callback=None, cursor_kwargs={}):
"""
return QueryChain(self, queries, callback, cursor_kwargs)
- def execute(self, operation, parameters=(), callback=None, cursor_kwargs={}):
+ def execute(self, operation, parameters=(), callback=None, cursor_kwargs={}, connection = None):
"""Prepare and execute a database operation (query or command).
Parameters may be provided as sequence or mapping and will be bound to
@@ -157,7 +159,10 @@ def execute(self, operation, parameters=(), callback=None, cursor_kwargs={}):
.. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
"""
- self._pool.new_cursor('execute', (operation, parameters), callback, cursor_kwargs)
+ if connection:
+ self._pool.new_cursor('execute', (operation, parameters), callback, cursor_kwargs, connection, transaction=True)
+ else:
+ self._pool.new_cursor('execute', (operation, parameters), callback, cursor_kwargs)
def callproc(self, procname, parameters=None, callback=None, cursor_kwargs={}):
"""Call a stored database procedure with the given name.
View
80 momoko/pools.py
@@ -103,7 +103,7 @@ def _clean_pool(self):
conn.close()
conns -= 1
self._pool.remove(conn)
- if conns == 0:
+ if not conns:
break
def close(self):
@@ -148,7 +148,7 @@ def __init__(self, min_conn=1, max_conn=20, cleanup_timeout=10,
self._ioloop = ioloop or IOLoop.instance()
self._args = args
self._kwargs = kwargs
- self._last_reconnect = 0 # prevents DOS'sing the DB server with connect requests
+ self._last_reconnect = 0
self._pool = []
@@ -162,35 +162,55 @@ def __init__(self, min_conn=1, max_conn=20, cleanup_timeout=10,
cleanup_timeout * 1000)
self._cleaner.start()
- def _new_conn(self, callback, callback_args=[]):
+ def _new_conn(self, callback=None, callback_args=[]):
"""Create a new connection.
- If `new_cursor_args` is provided a new cursor is created when the
- callback is executed.
-
- :param new_cursor_args: Arguments (dictionary) for a new cursor.
+ :param callback_args: Parameters for the callback - connection will be appended
+ to the parameters
"""
if len(self._pool) > self.max_conn:
self._clean_pool()
if len(self._pool) > self.max_conn:
raise PoolError('connection pool exhausted')
- timeout = self._last_reconnect + .1
+ timeout = self._last_reconnect + .25 # 1/4 second delay between reconnection
timenow = time.time()
- if timenow > timeout or len(self._pool) < self.min_conn:
+ if timenow > timeout or len(self._pool) <= self.min_conn:
self._last_reconnect = timenow
conn = AsyncConnection(self._ioloop)
callbacks = [partial(self._pool.append, conn)] # add new connection to the pool
- if callback_args:
- callback_args.append(conn)
- callbacks.append(partial(callback, *callback_args))
+ if callback:
+ callbacks.append(partial(callback, *(callback_args+[conn])))
conn.open(callbacks, *self._args, **self._kwargs)
else:
+ # recursive timeout call, retaining the parameters
self._ioloop.add_timeout(timeout,partial(self._new_conn,callback,callback_args))
+ def _get_free_conn(self):
+ """Look for a free connection and return it.
+
+ `None` is returned when no free connection can be found.
+ """
+ if self.closed:
+ raise PoolError('connection pool is closed')
+ for conn in self._pool:
+ if not conn.isexecuting():
+ return conn
+ return None
+
+ def get_connection(self, callback = None, callback_args=[]):
+ """Get a connection, trying available ones, and if not available - create a new one;
+
+ Afterwards, the callback will be called
+ """
+ connection = self._get_free_conn()
+ if connection is None:
+ self._new_conn(callback,callback_args)
+ else:
+ callback(*(callback_args+[connection]))
- def new_cursor(self, function, function_args=(), callback=None, cursor_kwargs={},
- connection=None):
+
+ def new_cursor(self, function, function_args=(), callback=None, cursor_kwargs={}, connection=None, transaction=False):
"""Create a new cursor.
If there's no connection available, a new connection will be created and
@@ -204,34 +224,20 @@ def new_cursor(self, function, function_args=(), callback=None, cursor_kwargs={}
.. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
"""
- if connection is None:
- connection = self._get_free_conn()
-
- if connection is None:
- self._new_conn(callback=self.new_cursor, callback_args=[function, function_args, callback, cursor_kwargs])
- else:
+ if connection is not None:
try:
connection.cursor(function, function_args, callback, cursor_kwargs)
+ return
except (DatabaseError, InterfaceError): # Recover from lost connection
logging.warning('Requested connection was closed')
self._pool.remove(connection)
- connection = self._get_free_conn()
- if connection is None:
- self._new_conn([function, function_args, callback, cursor_kwargs])
- else:
- self.new_cursor(function, function_args, callback, cursor_kwargs, connection)
- def _get_free_conn(self):
- """Look for a free connection and return it.
+ # if no connection, or if exception caught
+ if not transaction:
+ self.get_connection(callback=self.new_cursor, callback_args=[function, function_args, callback, cursor_kwargs])
+ else:
+ raise TransactionError
- `None` is returned when no free connection can be found.
- """
- if self.closed:
- raise PoolError('connection pool is closed')
- for conn in self._pool:
- if not conn.isexecuting():
- return conn
- return None
def _clean_pool(self):
"""Close a number of inactive connections when the number of connections
@@ -246,7 +252,7 @@ def _clean_pool(self):
conn.close()
conns -= 1
self._pool.remove(conn)
- if conns == 0:
+ if not conns:
break
def close(self):
@@ -265,6 +271,8 @@ def close(self):
class PoolError(Exception):
pass
+class TransactionError(Exception):
+ pass
class AsyncConnection(object):
"""An asynchronous connection object.
View
29 momoko/utils.py
@@ -32,36 +32,41 @@ class TransactionChain(object):
without parameters doesn't need to be in a list.
:param db: A ``momoko.Client`` or ``momoko.AdispClient`` instance.
- :param queries: A tuple or with all the queries.
+ :param statements: A tuple or with all the queries.
:param callback: The function that needs to be executed once all the
queries are finished.
:param cursor_kwargs: A dictionary with Psycopg's `connection.cursor`_ arguments.
:return: A list with the resulting cursors is passed on to the callback.
.. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
"""
- def __init__(self, db, queries, callback, cursor_kwargs={}):
+ def __init__(self, db, statements, callback, cursor_kwargs={}):
self._db = db
self._cursors = []
- self._queries = list(queries)
- self._queries.reverse()
+ self._statements = list(statements)
+ self._statements.reverse()
self._callback = callback
self._cursor_kwargs = cursor_kwargs
- self._connection = self._db._get_free_conn
+ self._db._pool.get_connection(self._set_connection)
+
+ def _set_connection(self, conn):
+ self._connection = conn
+ self._db._pool._pool.remove(conn) # don't let other connections mess up the transaction
self._collect(None)
def _collect(self, cursor):
if cursor is not None:
self._cursors.append(cursor)
- if not self._queries:
+ if not self._statements:
if self._callback:
self._callback(self._cursors)
+ self._db._pool._pool.append(self._connection)
return
- query = self._queries.pop()
- if isinstance(query, str):
- query = [query]
- self._db.execute(*query, callback=self._collect,
- cursor_kwargs=self._cursor_kwargs)
+ statement = self._statements.pop()
+ if isinstance(statement, str):
+ statement = [statement]
+ self._db.execute(*statement, callback=self._collect,
+ cursor_kwargs=self._cursor_kwargs, connection=self._connection)
class QueryChain(object):
@@ -156,7 +161,7 @@ def __init__(self, db, queries, callback, cursor_kwargs={}):
self._db.execute(*query, cursor_kwargs=self._cursor_kwargs)
def _collect(self, key, cursor):
- self._size = self._size - 1
+ self._size -= 1
self._args[key] = cursor
if not self._size and self._callback:
self._callback(self._args)
View
27 tests/async_client.py
@@ -8,6 +8,7 @@
import momoko
import settings
+import psycopg2
class AsyncClientTest(tornado.testing.AsyncTestCase):
@@ -76,6 +77,32 @@ def test_chain_query(self):
for index, cursor in enumerate(cursors):
self.assertEqual(cursor.fetchall(), expected[index])
+ def test_transaction(self):
+ """Test executing a chain query.
+ """
+ input = (
+ ['begin;'],
+ ['create local temporary table async_test on commit drop as select 42;'],
+ ['select * from async_test;'],
+ ['commit;'],
+ ["select 1 from information_schema.tables where table_name='async_test';"]
+ )
+ expected = (
+ None,
+ None,
+ [(42,)],
+ None,
+ []
+ )
+
+ self.db.transaction(input, callback=self.stop)
+ cursors = self.wait()
+
+ for index, cursor in enumerate(cursors):
+ if expected[index] is not None:
+ self.assertEqual(cursor.fetchall(), expected[index])
+ else:
+ self.assertRaises(psycopg2.ProgrammingError,cursor.fetchall)
if __name__ == '__main__':
unittest.main()

0 comments on commit 29a6979

Please sign in to comment.