Permalink
Browse files

rate-limiting on new connections, typos fixed

  • Loading branch information...
1 parent 0f041d9 commit 5fa72b96eba843ebe8c6362906957788e9409940 @eggspurt eggspurt committed Aug 17, 2012
Showing with 116 additions and 43 deletions.
  1. +29 −5 momoko/clients.py
  2. +34 −32 momoko/pools.py
  3. +53 −6 momoko/utils.py
View
@@ -85,7 +85,31 @@ def batch(self, queries, callback=None, cursor_kwargs={}):
.. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
"""
- return BatchQuery(self, queries, callback)
+ return BatchQuery(self, queries, callback, cursor_kwargs)
+
+ def transaction(self, statements, callback=None, cursor_kwargs={}):
+ """Run a chain of statements in the given order using a single connection.
+
+ A list/tuple with statements looks like this::
+
+ (
+ ['SELECT 42, 12, %s, 11;', (23,)],
+ 'SELECT 1, 2, 3, 4, 5;'
+ )
+
+ A statement with parameters is contained in a list: ``['some sql
+ here %s, %s', ('and some', 'parameters here')]``. A statement
+ without parameters doesn't need to be in a list.
+
+ :param statements: A tuple or list with all the statements.
+ :param callback: The function that needs to be executed once all the
+ queries are finished. Optional.
+ :param cursor_kwargs: A dictionary with Psycopg's `connection.cursor`_ arguments.
+ :return: A list with the resulting cursors.
+
+ .. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
+ """
+ return TransactionChain(self, statements, callback, cursor_kwargs)
def chain(self, queries, callback=None, cursor_kwargs={}):
"""Run a chain of queries in the given order.
@@ -97,9 +121,9 @@ def chain(self, queries, callback=None, cursor_kwargs={}):
'SELECT 1, 2, 3, 4, 5;'
)
- A query with paramaters is contained in a list: ``['some sql
- here %s, %s', ('and some', 'paramaters here')]``. A query
- without paramaters doesn't need to be in a list.
+ A query with parameters is contained in a list: ``['some sql
+ here %s, %s', ('and some', 'parameters here')]``. A query
+ without parameters doesn't need to be in a list.
:param queries: A tuple or list with all the queries.
:param callback: The function that needs to be executed once all the
@@ -109,7 +133,7 @@ def chain(self, queries, callback=None, cursor_kwargs={}):
.. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
"""
- return QueryChain(self, queries, callback)
+ return QueryChain(self, queries, callback, cursor_kwargs)
def execute(self, operation, parameters=(), callback=None, cursor_kwargs={}):
"""Prepare and execute a database operation (query or command).
View
@@ -12,6 +12,7 @@
import logging
from functools import partial
from contextlib import contextmanager
+import time
import psycopg2
from psycopg2 import DatabaseError, InterfaceError
@@ -30,8 +31,6 @@ class BlockingPool(object):
``PoolError`` exception is raised.
:param cleanup_timeout: Time in seconds between pool cleanups. Connections
will be closed until there are ``min_conn`` left.
- :param host: The database host address (defaults to UNIX socket if not provided)
- :param port: The database host port (defaults to 5432 if not provided)
:param database: The database name
:param user: User name used to authenticate
:param password: Password used to authenticate
@@ -63,7 +62,7 @@ def _new_conn(self):
"""Create a new connection.
"""
if len(self._pool) > self.max_conn:
- raise PoolError('connection pool exausted')
+ raise PoolError('connection pool exhausted')
conn = psycopg2.connect(*self._args, **self._kwargs)
self._pool.append(conn)
@@ -103,7 +102,7 @@ def _clean_pool(self):
for conn in self._pool[:]:
if conn.status == STATUS_READY:
conn.close()
- conns = conns - 1
+ conns -= 1
self._pool.remove(conn)
if conns == 0:
break
@@ -150,19 +149,21 @@ def __init__(self, min_conn=1, max_conn=20, cleanup_timeout=10,
self._ioloop = ioloop or IOLoop.instance()
self._args = args
self._kwargs = kwargs
+ self._last_reconnect = 0 # prevents DOS'sing the DB server with connect requests
self._pool = []
for i in range(self.min_conn):
self._new_conn()
+ self._last_reconnect = time.time()
# Create a periodic callback that tries to close inactive connections
if cleanup_timeout > 0:
self._cleaner = PeriodicCallback(self._clean_pool,
cleanup_timeout * 1000)
self._cleaner.start()
- def _new_conn(self, new_cursor_args=[]):
+ def _new_conn(self, callback, callback_args=[]):
"""Create a new connection.
If `new_cursor_args` is provided a new cursor is created when the
@@ -171,18 +172,23 @@ def _new_conn(self, new_cursor_args=[]):
:param new_cursor_args: Arguments (dictionary) for a new cursor.
"""
if len(self._pool) > self.max_conn:
- raise PoolError('connection pool exausted')
- conn = AsyncConnection(self._ioloop)
-
- if new_cursor_args:
- new_cursor_args.append(conn)
- callbacks = [
- partial(self._pool.append, conn),
- partial(self.new_cursor, *new_cursor_args)]
+ self._clean_pool()
+ if len(self._pool) > self.max_conn:
+ raise PoolError('connection pool exhausted')
+ timeout = self._last_reconnect + .1
+ timenow = time.time()
+ if timenow > timeout or len(self._pool) < self.min_conn:
+ self._last_reconnect = timenow
+ conn = AsyncConnection(self._ioloop)
+ callbacks = [partial(self._pool.append, conn)] # add new connection to the pool
+ if callback_args:
+ callback_args.append(conn)
+ callbacks.append(partial(callback, *callback_args))
+
+ conn.open(callbacks, *self._args, **self._kwargs)
else:
- callbacks = [partial(self._pool.append, conn)]
+ self._ioloop.add_timeout(timeout,partial(self._new_conn,callback,callback_args))
- conn.open(callbacks, *self._args, **self._kwargs)
def new_cursor(self, function, function_args=(), callback=None, cursor_kwargs={},
connection=None):
@@ -200,22 +206,18 @@ def new_cursor(self, function, function_args=(), callback=None, cursor_kwargs={}
.. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
"""
if connection is None:
- connection = self._get_free_conn()
- if connection is None:
- self._new_conn([function, function_args, callback, cursor_kwargs])
- return
-
- try:
- connection.cursor(function, function_args, callback, cursor_kwargs)
- except (DatabaseError, InterfaceError): # Recover from lost connection
- logging.warning('Requested connection was closed')
- self._pool.remove(connection)
- connection = self._get_free_conn()
- if connection is None:
- self._new_conn([function, function_args, callback, cursor_kwargs])
- else:
- self.new_cursor(function, function_args, callback,
- connection, cursor_kwargs)
+ self._new_conn(callback=self.new_cursor, callback_args=[function, function_args, callback, cursor_kwargs])
+ else:
+ try:
+ connection.cursor(function, function_args, callback, cursor_kwargs)
+ except (DatabaseError, InterfaceError): # Recover from lost connection
+ logging.warning('Requested connection was closed')
+ self._pool.remove(connection)
+ connection = self._get_free_conn()
+ if connection is None:
+ self._new_conn([function, function_args, callback, cursor_kwargs])
+ else:
+ self.new_cursor(function, function_args, callback, cursor_kwargs, connection)
def _get_free_conn(self):
"""Look for a free connection and return it.
@@ -240,7 +242,7 @@ def _clean_pool(self):
for conn in self._pool[:]:
if not conn.isexecuting():
conn.close()
- conns = conns - 1
+ conns -= 1
self._pool.remove(conn)
if conns == 0:
break
View
@@ -17,6 +17,53 @@
from tornado.ioloop import IOLoop
+class TransactionChain(object):
+ """Run queries as a transaction
+
+ A list/tuple with queries looks like this::
+
+ (
+ ['SELECT 42, 12, %s, 11;', (23,)],
+ 'SELECT 1, 2, 3, 4, 5;'
+ )
+
+ A query with parameters is contained in a list: ``['some sql
+ here %s, %s', ('and some', 'parameters here')]``. A query
+ without parameters doesn't need to be in a list.
+
+ :param db: A ``momoko.Client`` or ``momoko.AdispClient`` instance.
+ :param queries: A tuple or with all the queries.
+ :param callback: The function that needs to be executed once all the
+ queries are finished.
+ :param cursor_kwargs: A dictionary with Psycopg's `connection.cursor`_ arguments.
+ :return: A list with the resulting cursors is passed on to the callback.
+
+ .. _connection.cursor: http://initd.org/psycopg/docs/connection.html#connection.cursor
+ """
+ def __init__(self, db, queries, callback, cursor_kwargs={}):
+ self._db = db
+ self._cursors = []
+ self._queries = list(queries)
+ self._queries.reverse()
+ self._callback = callback
+ self._cursor_kwargs = cursor_kwargs
+ self._connection = self._db._get_free_conn
+ self._collect(None)
+
+ def _collect(self, cursor):
+ if cursor is not None:
+ self._cursors.append(cursor)
+ if not self._queries:
+ if self._callback:
+ self._callback(self._cursors)
+ return
+ query = self._queries.pop()
+ if isinstance(query, str):
+ query = [query]
+ self._db.execute(*query, callback=self._collect,
+ cursor_kwargs=self._cursor_kwargs)
+
+
class QueryChain(object):
"""Run a chain of queries in the given order.
@@ -27,9 +74,9 @@ class QueryChain(object):
'SELECT 1, 2, 3, 4, 5;'
)
- A query with paramaters is contained in a list: ``['some sql
- here %s, %s', ('and some', 'paramaters here')]``. A query
- without paramaters doesn't need to be in a list.
+ A query with parameters is contained in a list: ``['some sql
+ here %s, %s', ('and some', 'parameters here')]``. A query
+ without parameters doesn't need to be in a list.
:param db: A ``momoko.Client`` or ``momoko.AdispClient`` instance.
:param queries: A tuple or with all the queries.
@@ -77,9 +124,9 @@ class BatchQuery(object):
'query3': 'SELECT 465767, 4567, 3454;'
}
- A query with paramaters is contained in a list: ``['some sql
- here %s, %s', ('and some', 'paramaters here')]``. A query
- without paramaters doesn't need to be in a list.
+ A query with parameters is contained in a list: ``['some sql
+ here %s, %s', ('and some', 'parameters here')]``. A query
+ without parameters doesn't need to be in a list.
:param db: A ``momoko.Client`` or ``momoko.AdispClient`` instance.
:param queries: A dictionary with all the queries.

0 comments on commit 5fa72b9

Please sign in to comment.