Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Transactions + assorted improvements #27

Merged
merged 5 commits into from

3 participants

@aleksj
  • support for transactions using the TransactionChain
  • general refactoring and code simplification around new_cursor
  • a more graceful protocol based on timeouts for attempting reconnection to DB servers (it used to flood the logs with reconnects)
  • some typo-fixing and formatting
@FSX
Owner

I'll review it once I get a chance. Once it cools down (the weather) I'll be able to do some stuff. ~_~

@FSX FSX commented on the diff
momoko/clients.py
@@ -26,6 +26,9 @@ class BlockingClient(object):
def __init__(self, settings):
self._pool = BlockingPool(**settings)
+ def __del__(self):
@FSX Owner
FSX added a note

Hasn't been added to AsyncClient.

@aleksj
aleksj added a note

I tried it, but it doesn't work. There might be a cyclic relationship somewhere, you might know this better?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@FSX FSX commented on the diff
momoko/pools.py
((14 lines not shown))
"""
if len(self._pool) > self.max_conn:
- raise PoolError('connection pool exausted')
- conn = AsyncConnection(self._ioloop)
-
- if new_cursor_args:
- new_cursor_args.append(conn)
- callbacks = [
- partial(self._pool.append, conn),
- partial(self.new_cursor, *new_cursor_args)]
+ self._clean_pool()
+ if len(self._pool) > self.max_conn:
+ raise PoolError('connection pool exhausted')
+ timeout = self._last_reconnect + .25 # 1/4 second delay between reconnection
+ timenow = time.time()
+ if timenow > timeout or len(self._pool) <= self.min_conn:
@FSX Owner
FSX added a note

I don't understand this line. len(self._pool) <= self.min_conn is ok. There's no need to limit the connection that are needed to reach the minimum, but what about timenow > timeout? Are there cases when this is true? W

hy would the creation of connections be delayed? To run a batch of queries the same amount of connections are needed and sometimes new connections need to be created for this.

@aleksj
aleksj added a note

Imagine that a connection goes down (it happened in my testing). Within seconds, there were megabytes of log entries and a barrage of reconnection attempts. I'm not saying the timeout should be very long, but it's good practice. I'm quite open to other protocols, this one is about as simple as one can go.

@FSX Owner
FSX added a note

Hmm, I didn't know that happens. Do you also know the cause of the
reconnection attempts? Maybe the connection times out.

I implemented a maximum amount of 5 attempts to get a new connection
in the rewrite (located in this repo in the rewrite branch).

@aleksj
aleksj added a note

I'd prefer progressively increasing timeouts (up to a maximum, say 30s) - rather than timing out and having to restart the server.

@FSX Owner
FSX added a note

But only for connections that are lost. Not for connections that are requested by execute/callproc/chain/batch.

I'll do some research about keeping connections alive. Sockets can time out and if that's prevented there's no need for a complicated reconnection mechanism.

@aleksj
aleksj added a note
@FSX Owner
FSX added a note

Is it possible to record the exceptions in your log? It sounds like your using Momoko in a production environment and that would give us a better view on what's happening.

    if connection is not None:
        try:
            connection.cursor(function, function_args, callback, cursor_kwargs)
            return
        except (DatabaseError, InterfaceError) as e:  # Recover from lost connection
            logging.warning('Requested connection was closed; exception: %r'  % e)
            self._pool.remove(connection)

Would be cool. :D

@FSX Owner
FSX added a note

That's true. The reconnect mechanism shouldn't be completely gone. I was just wondering about why reconnect would happen.

The the pool cleaner could also be used for checking the health of the connections.

@aleksj
aleksj added a note

You want me to push this change into my pull request? Any other changes?

@FSX Owner
FSX added a note

Hmm, no. I'll merge it and test it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@FSX FSX merged commit 3e487d0 into FSX:master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Aug 17, 2012
  1. @eggspurt
  2. @eggspurt

    pycharm support

    eggspurt authored
  3. @eggspurt
  4. @eggspurt
Commits on Aug 19, 2012
  1. @eggspurt

    transactions

    eggspurt authored
This page is out of date. Refresh to see the latest.
View
6 .gitignore
@@ -9,3 +9,9 @@ docs/_build/
*.sublime-workspace
*.bak
modules/
+.idea
+setup.sh
+
+*.pth
+
+tests/database.cfg
View
1  README.rst
@@ -19,3 +19,4 @@ With pip::
Or manually::
python setup.py install
+
View
4 docs/_themes/nature-mod/layout.html
@@ -39,7 +39,7 @@
VERSION: '{{ release|e }}',
COLLAPSE_INDEX: false,
FILE_SUFFIX: '{{ '' if no_search_suffix else file_suffix }}',
- HAS_SOURCE: {{ has_source|lower }}
+ HAS_SOURCE: '{{ has_source|lower }}'
};
</script>
{%- for scriptfile in script_files %}
@@ -111,7 +111,7 @@
<li>
{%- if not loop.first %}{{ reldelim2 }}{% endif %}
<a href="{{ pathto(rellink[0]) }}" title="{{ rellink[1]|striptags|e }}"
- {{ accesskey(rellink[2]) }}>{{ rellink[3] }}</a>
+ {{ accesskey(rellink[2]) }}>{{ rellink[3] }}</a>
</li>
{%- endfor %}
</ul>
View
4 docs/installation.rst
@@ -11,12 +11,12 @@ Psycopg2 must have support for asynchronous connections.
Momoko can be installed with *easy_install* or pip_::
- pip install momoko
+ pip install momoko
The latest sources can be cloned from the `Github repository`_ and
installed with::
- python setup.py install
+ python setup.py install
.. _Tornado: http://www.tornadoweb.org/
View
52 momoko/clients.py
@@ -14,7 +14,7 @@
from contextlib import contextmanager
from .pools import AsyncPool, BlockingPool
-from .utils import BatchQuery, QueryChain
+from .utils import BatchQuery, QueryChain, TransactionChain
class BlockingClient(object):
@@ -26,6 +26,9 @@ class BlockingClient(object):
def __init__(self, settings):
self._pool = BlockingPool(**settings)
+ def __del__(self):
@FSX Owner
FSX added a note

Hasn't been added to AsyncClient.

@aleksj
aleksj added a note

I tried it, but it doesn't work. There might be a cyclic relationship somewhere, you might know this better?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ self._pool.close()
+
@property
@contextmanager
def connection(self):
@@ -50,8 +53,8 @@ def connection(self):
class AsyncClient(object):
"""The ``AsyncClient`` class is a wrapper for ``AsyncPool``, ``BatchQuery``
- and ``QueryChain``. It also provides the ``execute`` and ``callproc``
- functions.
+ ``TransactionChain'' and ``QueryChain``. It also provides the ``execute``
+ and ``callproc`` functions.
:param settings: A dictionary that is passed to the ``AsyncPool`` object.
"""
@@ -85,7 +88,33 @@ def batch(self, queries, callback=None, cursor_kwargs={}):
.. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
"""
- return BatchQuery(self, queries, callback)
+ return BatchQuery(self, queries, callback, cursor_kwargs)
+
+ def transaction(self, statements, callback=None, cursor_kwargs={}):
+ """Run a chain of statements in the given order using a single connection.
+ The statements will be wrapped between a "begin;" and a "commit;". The
+ connection will be unavailable while the chain is running.
+
+ A list/tuple with statements looks like this::
+
+ (
+ ['SELECT 42, 12, %s, 11;', (23,)],
+ 'SELECT 1, 2, 3, 4, 5;'
+ )
+
+ A statement with parameters is contained in a list: ``['some sql
+ here %s, %s', ('and some', 'parameters here')]``. A statement
+ without parameters doesn't need to be in a list.
+
+ :param statements: A tuple or list with all the statements.
+ :param callback: The function that needs to be executed once all the
+ queries are finished. Optional.
+ :param cursor_kwargs: A dictionary with Psycopg's `connection.cursor`_ arguments.
+ :return: A list with the resulting cursors.
+
+ .. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
+ """
+ return TransactionChain(self, statements, callback, cursor_kwargs)
def chain(self, queries, callback=None, cursor_kwargs={}):
"""Run a chain of queries in the given order.
@@ -97,9 +126,9 @@ def chain(self, queries, callback=None, cursor_kwargs={}):
'SELECT 1, 2, 3, 4, 5;'
)
- A query with paramaters is contained in a list: ``['some sql
- here %s, %s', ('and some', 'paramaters here')]``. A query
- without paramaters doesn't need to be in a list.
+ A query with parameters is contained in a list: ``['some sql
+ here %s, %s', ('and some', 'parameters here')]``. A query
+ without parameters doesn't need to be in a list.
:param queries: A tuple or list with all the queries.
:param callback: The function that needs to be executed once all the
@@ -109,9 +138,9 @@ def chain(self, queries, callback=None, cursor_kwargs={}):
.. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
"""
- return QueryChain(self, queries, callback)
+ return QueryChain(self, queries, callback, cursor_kwargs)
- def execute(self, operation, parameters=(), callback=None, cursor_kwargs={}):
+ def execute(self, operation, parameters=(), callback=None, cursor_kwargs={}, connection = None):
"""Prepare and execute a database operation (query or command).
Parameters may be provided as sequence or mapping and will be bound to
@@ -130,7 +159,10 @@ def execute(self, operation, parameters=(), callback=None, cursor_kwargs={}):
.. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
"""
- self._pool.new_cursor('execute', (operation, parameters), callback, cursor_kwargs)
+ if connection:
+ self._pool.new_cursor('execute', (operation, parameters), callback, cursor_kwargs, connection, transaction=True)
+ else:
+ self._pool.new_cursor('execute', (operation, parameters), callback, cursor_kwargs)
def callproc(self, procname, parameters=None, callback=None, cursor_kwargs={}):
"""Call a stored database procedure with the given name.
View
114 momoko/pools.py
@@ -11,7 +11,7 @@
import logging
from functools import partial
-from contextlib import contextmanager
+import time
import psycopg2
from psycopg2 import DatabaseError, InterfaceError
@@ -30,8 +30,6 @@ class BlockingPool(object):
``PoolError`` exception is raised.
:param cleanup_timeout: Time in seconds between pool cleanups. Connections
will be closed until there are ``min_conn`` left.
- :param host: The database host address (defaults to UNIX socket if not provided)
- :param port: The database host port (defaults to 5432 if not provided)
:param database: The database name
:param user: User name used to authenticate
:param password: Password used to authenticate
@@ -63,7 +61,7 @@ def _new_conn(self):
"""Create a new connection.
"""
if len(self._pool) > self.max_conn:
- raise PoolError('connection pool exausted')
+ raise PoolError('connection pool exhausted')
conn = psycopg2.connect(*self._args, **self._kwargs)
self._pool.append(conn)
@@ -103,9 +101,9 @@ def _clean_pool(self):
for conn in self._pool[:]:
if conn.status == STATUS_READY:
conn.close()
- conns = conns - 1
+ conns -= 1
self._pool.remove(conn)
- if conns == 0:
+ if not conns:
break
def close(self):
@@ -150,11 +148,13 @@ def __init__(self, min_conn=1, max_conn=20, cleanup_timeout=10,
self._ioloop = ioloop or IOLoop.instance()
self._args = args
self._kwargs = kwargs
+ self._last_reconnect = 0
self._pool = []
for i in range(self.min_conn):
self._new_conn()
+ self._last_reconnect = time.time()
# Create a periodic callback that tries to close inactive connections
if cleanup_timeout > 0:
@@ -162,30 +162,55 @@ def __init__(self, min_conn=1, max_conn=20, cleanup_timeout=10,
cleanup_timeout * 1000)
self._cleaner.start()
- def _new_conn(self, new_cursor_args=[]):
+ def _new_conn(self, callback=None, callback_args=[]):
"""Create a new connection.
- If `new_cursor_args` is provided a new cursor is created when the
- callback is executed.
-
- :param new_cursor_args: Arguments (dictionary) for a new cursor.
+ :param callback_args: Parameters for the callback - connection will be appended
+ to the parameters
"""
if len(self._pool) > self.max_conn:
- raise PoolError('connection pool exausted')
- conn = AsyncConnection(self._ioloop)
-
- if new_cursor_args:
- new_cursor_args.append(conn)
- callbacks = [
- partial(self._pool.append, conn),
- partial(self.new_cursor, *new_cursor_args)]
+ self._clean_pool()
+ if len(self._pool) > self.max_conn:
+ raise PoolError('connection pool exhausted')
+ timeout = self._last_reconnect + .25 # 1/4 second delay between reconnection
+ timenow = time.time()
+ if timenow > timeout or len(self._pool) <= self.min_conn:
@FSX Owner
FSX added a note

I don't understand this line. len(self._pool) <= self.min_conn is ok. There's no need to limit the connection that are needed to reach the minimum, but what about timenow > timeout? Are there cases when this is true? W

hy would the creation of connections be delayed? To run a batch of queries the same amount of connections are needed and sometimes new connections need to be created for this.

@aleksj
aleksj added a note

Imagine that a connection goes down (it happened in my testing). Within seconds, there were megabytes of log entries and a barrage of reconnection attempts. I'm not saying the timeout should be very long, but it's good practice. I'm quite open to other protocols, this one is about as simple as one can go.

@FSX Owner
FSX added a note

Hmm, I didn't know that happens. Do you also know the cause of the
reconnection attempts? Maybe the connection times out.

I implemented a maximum amount of 5 attempts to get a new connection
in the rewrite (located in this repo in the rewrite branch).

@aleksj
aleksj added a note

I'd prefer progressively increasing timeouts (up to a maximum, say 30s) - rather than timing out and having to restart the server.

@FSX Owner
FSX added a note

But only for connections that are lost. Not for connections that are requested by execute/callproc/chain/batch.

I'll do some research about keeping connections alive. Sockets can time out and if that's prevented there's no need for a complicated reconnection mechanism.

@aleksj
aleksj added a note
@FSX Owner
FSX added a note

Is it possible to record the exceptions in your log? It sounds like your using Momoko in a production environment and that would give us a better view on what's happening.

    if connection is not None:
        try:
            connection.cursor(function, function_args, callback, cursor_kwargs)
            return
        except (DatabaseError, InterfaceError) as e:  # Recover from lost connection
            logging.warning('Requested connection was closed; exception: %r'  % e)
            self._pool.remove(connection)

Would be cool. :D

@FSX Owner
FSX added a note

That's true. The reconnect mechanism shouldn't be completely gone. I was just wondering about why reconnect would happen.

The the pool cleaner could also be used for checking the health of the connections.

@aleksj
aleksj added a note

You want me to push this change into my pull request? Any other changes?

@FSX Owner
FSX added a note

Hmm, no. I'll merge it and test it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ self._last_reconnect = timenow
+ conn = AsyncConnection(self._ioloop)
+ callbacks = [partial(self._pool.append, conn)] # add new connection to the pool
+ if callback:
+ callbacks.append(partial(callback, *(callback_args+[conn])))
+
+ conn.open(callbacks, *self._args, **self._kwargs)
+ else:
+ # recursive timeout call, retaining the parameters
+ self._ioloop.add_timeout(timeout,partial(self._new_conn,callback,callback_args))
+
+ def _get_free_conn(self):
+ """Look for a free connection and return it.
+
+ `None` is returned when no free connection can be found.
+ """
+ if self.closed:
+ raise PoolError('connection pool is closed')
+ for conn in self._pool:
+ if not conn.isexecuting():
+ return conn
+ return None
+
+ def get_connection(self, callback = None, callback_args=[]):
+ """Get a connection, trying available ones, and if not available - create a new one;
+
+ Afterwards, the callback will be called
+ """
+ connection = self._get_free_conn()
+ if connection is None:
+ self._new_conn(callback,callback_args)
else:
- callbacks = [partial(self._pool.append, conn)]
+ callback(*(callback_args+[connection]))
- conn.open(callbacks, *self._args, **self._kwargs)
- def new_cursor(self, function, function_args=(), callback=None, cursor_kwargs={},
- connection=None):
+ def new_cursor(self, function, function_args=(), callback=None, cursor_kwargs={}, connection=None, transaction=False):
"""Create a new cursor.
If there's no connection available, a new connection will be created and
@@ -199,35 +224,20 @@ def new_cursor(self, function, function_args=(), callback=None, cursor_kwargs={}
.. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
"""
- if connection is None:
- connection = self._get_free_conn()
- if connection is None:
- self._new_conn([function, function_args, callback, cursor_kwargs])
+ if connection is not None:
+ try:
+ connection.cursor(function, function_args, callback, cursor_kwargs)
return
+ except (DatabaseError, InterfaceError): # Recover from lost connection
+ logging.warning('Requested connection was closed')
+ self._pool.remove(connection)
- try:
- connection.cursor(function, function_args, callback, cursor_kwargs)
- except (DatabaseError, InterfaceError): # Recover from lost connection
- logging.warning('Requested connection was closed')
- self._pool.remove(connection)
- connection = self._get_free_conn()
- if connection is None:
- self._new_conn([function, function_args, callback, cursor_kwargs])
- else:
- self.new_cursor(function, function_args, callback,
- connection, cursor_kwargs)
-
- def _get_free_conn(self):
- """Look for a free connection and return it.
+ # if no connection, or if exception caught
+ if not transaction:
+ self.get_connection(callback=self.new_cursor, callback_args=[function, function_args, callback, cursor_kwargs])
+ else:
+ raise TransactionError
- `None` is returned when no free connection can be found.
- """
- if self.closed:
- raise PoolError('connection pool is closed')
- for conn in self._pool:
- if not conn.isexecuting():
- return conn
- return None
def _clean_pool(self):
"""Close a number of inactive connections when the number of connections
@@ -240,9 +250,9 @@ def _clean_pool(self):
for conn in self._pool[:]:
if not conn.isexecuting():
conn.close()
- conns = conns - 1
+ conns -= 1
self._pool.remove(conn)
- if conns == 0:
+ if not conns:
break
def close(self):
@@ -261,6 +271,8 @@ def close(self):
class PoolError(Exception):
pass
+class TransactionError(Exception):
+ pass
class AsyncConnection(object):
"""An asynchronous connection object.
View
66 momoko/utils.py
@@ -17,6 +17,58 @@
from tornado.ioloop import IOLoop
+class TransactionChain(object):
+ """Run queries as a transaction
+
+ A list/tuple with queries looks like this::
+
+ (
+ ['SELECT 42, 12, %s, 11;', (23,)],
+ 'SELECT 1, 2, 3, 4, 5;'
+ )
+
+ A query with parameters is contained in a list: ``['some sql
+ here %s, %s', ('and some', 'parameters here')]``. A query
+ without parameters doesn't need to be in a list.
+
+ :param db: A ``momoko.Client`` or ``momoko.AdispClient`` instance.
+ :param statements: A tuple or with all the queries.
+ :param callback: The function that needs to be executed once all the
+ queries are finished.
+ :param cursor_kwargs: A dictionary with Psycopg's `connection.cursor`_ arguments.
+ :return: A list with the resulting cursors is passed on to the callback.
+
+ .. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
+ """
+ def __init__(self, db, statements, callback, cursor_kwargs={}):
+ self._db = db
+ self._cursors = []
+ self._statements = list(statements)
+ self._statements.reverse()
+ self._callback = callback
+ self._cursor_kwargs = cursor_kwargs
+ self._db._pool.get_connection(self._set_connection)
+
+ def _set_connection(self, conn):
+ self._connection = conn
+ self._db._pool._pool.remove(conn) # don't let other connections mess up the transaction
+ self._collect(None)
+
+ def _collect(self, cursor):
+ if cursor is not None:
+ self._cursors.append(cursor)
+ if not self._statements:
+ if self._callback:
+ self._callback(self._cursors)
+ self._db._pool._pool.append(self._connection)
+ return
+ statement = self._statements.pop()
+ if isinstance(statement, str):
+ statement = [statement]
+ self._db.execute(*statement, callback=self._collect,
+ cursor_kwargs=self._cursor_kwargs, connection=self._connection)
+
+
class QueryChain(object):
"""Run a chain of queries in the given order.
@@ -27,9 +79,9 @@ class QueryChain(object):
'SELECT 1, 2, 3, 4, 5;'
)
- A query with paramaters is contained in a list: ``['some sql
- here %s, %s', ('and some', 'paramaters here')]``. A query
- without paramaters doesn't need to be in a list.
+ A query with parameters is contained in a list: ``['some sql
+ here %s, %s', ('and some', 'parameters here')]``. A query
+ without parameters doesn't need to be in a list.
:param db: A ``momoko.Client`` or ``momoko.AdispClient`` instance.
:param queries: A tuple or with all the queries.
@@ -77,9 +129,9 @@ class BatchQuery(object):
'query3': 'SELECT 465767, 4567, 3454;'
}
- A query with paramaters is contained in a list: ``['some sql
- here %s, %s', ('and some', 'paramaters here')]``. A query
- without paramaters doesn't need to be in a list.
+ A query with parameters is contained in a list: ``['some sql
+ here %s, %s', ('and some', 'parameters here')]``. A query
+ without parameters doesn't need to be in a list.
:param db: A ``momoko.Client`` or ``momoko.AdispClient`` instance.
:param queries: A dictionary with all the queries.
@@ -109,7 +161,7 @@ def __init__(self, db, queries, callback, cursor_kwargs={}):
self._db.execute(*query, cursor_kwargs=self._cursor_kwargs)
def _collect(self, key, cursor):
- self._size = self._size - 1
+ self._size -= 1
self._args[key] = cursor
if not self._size and self._callback:
self._callback(self._args)
View
28 tests/async_client.py
@@ -8,6 +8,7 @@
import momoko
import settings
+import psycopg2
class AsyncClientTest(tornado.testing.AsyncTestCase):
@@ -28,6 +29,7 @@ def setUp(self):
})
def tearDown(self):
+ self.db.close()
super(AsyncClientTest, self).tearDown()
def test_single_query(self):
@@ -75,6 +77,32 @@ def test_chain_query(self):
for index, cursor in enumerate(cursors):
self.assertEqual(cursor.fetchall(), expected[index])
+ def test_transaction(self):
+ """Test executing a chain query.
+ """
+ input = (
+ ['begin;'],
+ ['create local temporary table async_test on commit drop as select 42;'],
+ ['select * from async_test;'],
+ ['commit;'],
+ ["select 1 from information_schema.tables where table_name='async_test';"]
+ )
+ expected = (
+ None,
+ None,
+ [(42,)],
+ None,
+ []
+ )
+
+ self.db.transaction(input, callback=self.stop)
+ cursors = self.wait()
+
+ for index, cursor in enumerate(cursors):
+ if expected[index] is not None:
+ self.assertEqual(cursor.fetchall(), expected[index])
+ else:
+ self.assertRaises(psycopg2.ProgrammingError,cursor.fetchall)
if __name__ == '__main__':
unittest.main()
Something went wrong with that request. Please try again.