Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Maxconnection now just limits the concurrent connections to mongodb inst... #45

Open
wants to merge 1 commit into from

1 participant

@ceymard

...ead of raising an exception when the maxconnections are reached.

As the title of the commit says, I modified ConnectionPool and Cursor so that when reaching the maxconnections, instead of raising an Exception, we just queue the requests in the connectionpool.

Whenever a connection goes back into the cache, the backlog of requests is checked, and if not empty, a (now) free connection is given to the leftmost callback in the backlog.

I did this patch because I ran into a file descriptor limit problem while using asyncmongo.

I have a system wired to RabbitMQ, that sometimes receives ~10000 messages in a few seconds, each of which triggering a request to the MongoDB server. Instead of throttling my own system, I thought it would be interesting to give asyncmongo the possibility of throttling it itself, since I doubt I will be the only one in such a case.

While I removed the raise TooManyConnections altogether, maybe if you prefer, I could add a variable to switch behaviours at will ?

Christophe Eymard Maxconnection now just limits the concurrent connections to mongodb i…
…nstead of raising an exception when the maxconnections are reached.
7a8e6f6
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Apr 2, 2012
  1. Maxconnection now just limits the concurrent connections to mongodb i…

    Christophe Eymard authored
    …nstead of raising an exception when the maxconnections are reached.
This page is out of date. Refresh to see the latest.
Showing with 159 additions and 134 deletions.
  1. +116 −105 asyncmongo/cursor.py
  2. +43 −29 asyncmongo/pool.py
View
221 asyncmongo/cursor.py
@@ -1,5 +1,5 @@
#!/bin/env python
-#
+#
# Copyright 2010 bit.ly
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -36,16 +36,16 @@ def __init__(self, dbname, collection, pool):
assert isinstance(dbname, (str, unicode))
assert isinstance(collection, (str, unicode))
assert isinstance(pool, object)
-
+
self.__dbname = dbname
self.__collection = collection
self.__pool = pool
self.__slave_okay = False
-
+
@property
def full_collection_name(self):
return u'%s.%s' % (self.__dbname, self.__collection)
-
+
def drop(self, *args, **kwargs):
raise NotImplemented("patches accepted")
@@ -56,7 +56,7 @@ def save(self, doc, **kwargs):
def insert(self, doc_or_docs,
manipulate=True, safe=True, check_keys=True, callback=None, **kwargs):
"""Insert a document(s) into this collection.
-
+
If `manipulate` is set, the document(s) are manipulated using
any :class:`~pymongo.son_manipulator.SONManipulator` instances
that have been added to this
@@ -64,17 +64,17 @@ def insert(self, doc_or_docs,
the inserted document or a list of ``"_id"`` values of the
inserted documents. If the document(s) does not already
contain an ``"_id"`` one will be added.
-
+
If `safe` is ``True`` then the insert will be checked for
errors, raising :class:`~pymongo.errors.OperationFailure` if
one occurred. Safe inserts wait for a response from the
database, while normal inserts do not.
-
+
Any additional keyword arguments imply ``safe=True``, and
will be used as options for the resultant `getLastError`
command. For example, to wait for replication to 3 nodes, pass
``w=3``.
-
+
:Parameters:
- `doc_or_docs`: a document or list of documents to be
inserted
@@ -87,77 +87,79 @@ def insert(self, doc_or_docs,
- `**kwargs` (optional): any additional arguments imply
``safe=True``, and will be used as options for the
`getLastError` command
-
+
.. mongodoc:: insert
"""
if not isinstance(safe, bool):
raise TypeError("safe must be an instance of bool")
-
+
docs = doc_or_docs
# return_one = False
if isinstance(docs, dict):
# return_one = True
docs = [docs]
-
+
# if manipulate:
# docs = [self.__database._fix_incoming(doc, self) for doc in docs]
-
+
self.__limit = None
if kwargs:
safe = True
-
+
if safe and not callable(callback):
raise TypeError("callback must be callable")
if not safe and callback is not None:
raise TypeError("callback can not be used with safe=False")
-
+
if callback:
callback = functools.partial(self._handle_response, orig_callback=callback)
- connection = self.__pool.connection()
- try:
- connection.send_message(
- message.insert(self.full_collection_name, docs,
- check_keys, safe, kwargs), callback=callback)
- except:
- connection.close()
- raise
-
+ def _handle_connection(connection):
+ try:
+ connection.send_message(
+ message.insert(self.full_collection_name, docs,
+ check_keys, safe, kwargs), callback=callback)
+ except:
+ connection.close()
+ raise
+ self.__pool.connection(_handle_connection)
+
def remove(self, spec_or_id=None, safe=True, callback=None, **kwargs):
if not isinstance(safe, bool):
raise TypeError("safe must be an instance of bool")
-
+
if spec_or_id is None:
spec_or_id = {}
if not isinstance(spec_or_id, dict):
spec_or_id = {"_id": spec_or_id}
-
+
self.__limit = None
if kwargs:
safe = True
-
+
if safe and not callable(callback):
raise TypeError("callback must be callable")
if not safe and callback is not None:
raise TypeError("callback can not be used with safe=False")
-
+
if callback:
callback = functools.partial(self._handle_response, orig_callback=callback)
- connection = self.__pool.connection()
- try:
- connection.send_message(
- message.delete(self.full_collection_name, spec_or_id, safe, kwargs),
- callback=callback)
- except:
- connection.close()
- raise
+ def _handle_connection(connection):
+ try:
+ connection.send_message(
+ message.delete(self.full_collection_name, spec_or_id, safe, kwargs),
+ callback=callback)
+ except:
+ connection.close()
+ raise
+ self.__pool.connection(_handle_connection)
+
-
def update(self, spec, document, upsert=False, manipulate=False,
safe=True, multi=False, callback=None, **kwargs):
"""Update a document(s) in this collection.
-
+
Raises :class:`TypeError` if either `spec` or `document` is
not an instance of ``dict`` or `upsert` is not an instance of
``bool``. If `safe` is ``True`` then the update will be
@@ -166,14 +168,14 @@ def update(self, spec, document, upsert=False, manipulate=False,
occurred. Safe updates require a response from the database,
while normal updates do not - thus, setting `safe` to ``True``
will negatively impact performance.
-
+
There are many useful `update modifiers`_ which can be used
when performing updates. For example, here we use the
``"$set"`` modifier to modify some fields in a matching
document:
-
+
.. doctest::
-
+
>>> db.test.insert({"x": "y", "a": "b"})
ObjectId('...')
>>> list(db.test.find())
@@ -181,15 +183,15 @@ def update(self, spec, document, upsert=False, manipulate=False,
>>> db.test.update({"x": "y"}, {"$set": {"a": "c"}})
>>> list(db.test.find())
[{u'a': u'c', u'x': u'y', u'_id': ObjectId('...')}]
-
+
If `safe` is ``True`` returns the response to the *lastError*
command. Otherwise, returns ``None``.
-
+
# Any additional keyword arguments imply ``safe=True``, and will
# be used as options for the resultant `getLastError`
# command. For example, to wait for replication to 3 nodes, pass
# ``w=3``.
-
+
:Parameters:
- `spec`: a ``dict`` or :class:`~bson.son.SON` instance
specifying elements which must be present for a document
@@ -214,9 +216,9 @@ def update(self, spec, document, upsert=False, manipulate=False,
- `**kwargs` (optional): any additional arguments imply
``safe=True``, and will be used as options for the
`getLastError` command
-
+
.. _update modifiers: http://www.mongodb.org/display/DOCS/Updating
-
+
.. mongodoc:: update
"""
if not isinstance(spec, dict):
@@ -230,32 +232,33 @@ def update(self, spec, document, upsert=False, manipulate=False,
# TODO: apply SON manipulators
# if upsert and manipulate:
# document = self.__database._fix_incoming(document, self)
-
+
if kwargs:
safe = True
-
+
if safe and not callable(callback):
raise TypeError("callback must be callable")
if not safe and callback is not None:
raise TypeError("callback can not be used with safe=False")
-
+
if callback:
callback = functools.partial(self._handle_response, orig_callback=callback)
self.__limit = None
- connection = self.__pool.connection()
- try:
- connection.send_message(
- message.update(self.full_collection_name, upsert, multi,
- spec, document, safe, kwargs), callback=callback)
- except:
- connection.close()
- raise
-
-
+ def _handle_connection(connection):
+ try:
+ connection.send_message(
+ message.update(self.full_collection_name, upsert, multi,
+ spec, document, safe, kwargs), callback=callback)
+ except:
+ connection.close()
+ raise
+ self.__pool.connection(_handle_connection)
+
+
def find_one(self, spec_or_id, **kwargs):
"""Get a single document from the database.
-
+
All arguments to :meth:`find` are also valid arguments for
:meth:`find_one`, although any `limit` argument will be
ignored. Returns a single document, or ``None`` if no matching
@@ -265,29 +268,29 @@ def find_one(self, spec_or_id, **kwargs):
spec_or_id = {"_id": spec_or_id}
kwargs['limit'] = -1
self.find(spec_or_id, **kwargs)
-
+
def find(self, spec=None, fields=None, skip=0, limit=0,
timeout=True, snapshot=False, tailable=False, sort=None,
max_scan=None, slave_okay=False,
_must_use_master=False, _is_command=False, hint=None, debug=False,
callback=None):
"""Query the database.
-
+
The `spec` argument is a prototype document that all results
must match. For example:
-
+
>>> db.test.find({"hello": "world"}, callback=...)
-
+
only matches documents that have a key "hello" with value
"world". Matches can have other keys *in addition* to
"hello". The `fields` argument is used to specify a subset of
fields that should be included in the result documents. By
limiting results to a certain subset of fields you can cut
down on network traffic and decoding time.
-
+
Raises :class:`TypeError` if any of the arguments are of
improper type.
-
+
:Parameters:
- `spec` (optional): a SON object specifying elements which
must be present for a document to be included in the
@@ -325,13 +328,13 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
examined when performing the query
- `slave_okay` (optional): is it okay to connect directly
to and perform queries on a slave instance
-
+
.. mongodoc:: find
"""
-
+
if spec is None:
spec = {}
-
+
if not isinstance(spec, dict):
raise TypeError("spec must be an instance of dict")
if not isinstance(skip, int):
@@ -346,19 +349,19 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
raise TypeError("tailable must be an instance of bool")
if not callable(callback):
raise TypeError("callback must be callable")
-
+
if fields is not None:
if not fields:
fields = {"_id": 1}
if not isinstance(fields, dict):
fields = helpers._fields_list_to_dict(fields)
-
+
self.__spec = spec
self.__fields = fields
self.__skip = skip
self.__limit = limit
self.__batch_size = 0
-
+
self.__timeout = timeout
self.__tailable = tailable
self.__snapshot = snapshot
@@ -372,48 +375,56 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
self.__tz_aware = False #collection.database.connection.tz_aware
self.__must_use_master = _must_use_master
self.__is_command = _is_command
-
- connection = self.__pool.connection()
- try:
- if self.__debug:
- logging.debug('QUERY_SPEC: %r' % self.__query_spec())
-
- connection.send_message(
- message.query(self.__query_options(),
- self.full_collection_name,
- self.__skip,
- self.__limit,
- self.__query_spec(),
- self.__fields),
- callback=functools.partial(self._handle_response, orig_callback=callback))
- except Exception, e:
- logging.error('Error sending query %s' % e)
- connection.close()
- raise
-
+
+ def _handle_connection(connection):
+ try:
+ if self.__debug:
+ logging.debug('QUERY_SPEC: %r' % self.__query_spec())
+
+ connection.send_message(
+ message.query(self.__query_options(),
+ self.full_collection_name,
+ self.__skip,
+ self.__limit,
+ self.__query_spec(),
+ self.__fields),
+ callback=functools.partial(self._handle_response, orig_callback=callback))
+ except Exception, e:
+ logging.error('Error sending query %s' % e)
+ connection.close()
+ raise
+ self.__pool.connection(_handle_connection)
+
def _handle_response(self, result, error=None, orig_callback=None):
- if result and result.get('cursor_id'):
- connection = self.__pool.connection()
+ def _handle_finish(_res=None, _err=None):
+ if error:
+ logging.error('%s %s' % (self.full_collection_name , error))
+ orig_callback(None, error=error)
+ else:
+ if self.__limit == -1 and len(result['data']) == 1:
+ # handle the find_one() call
+ orig_callback(result['data'][0], error=None)
+ else:
+ orig_callback(result['data'], error=None)
+
+ def _close_cursor(connection):
try:
connection.send_message(
message.kill_cursors([result['cursor_id']]),
callback=None)
+ _handle_finish()
except Exception, e:
logging.error('Error killing cursor %s: %s' % (result['cursor_id'], e))
connection.close()
raise
-
- if error:
- logging.error('%s %s' % (self.full_collection_name , error))
- orig_callback(None, error=error)
+
+ if result and result.get('cursor_id'):
+ self.__pool.connection(_close_cursor)
else:
- if self.__limit == -1 and len(result['data']) == 1:
- # handle the find_one() call
- orig_callback(result['data'][0], error=None)
- else:
- orig_callback(result['data'], error=None)
+ _handle_finish()
+
+
-
def __query_options(self):
"""Get the query options string to use for this query."""
options = 0
@@ -424,7 +435,7 @@ def __query_options(self):
if not self.__timeout:
options |= _QUERY_OPTIONS["no_timeout"]
return options
-
+
def __query_spec(self):
"""Get the spec to use for a query."""
spec = self.__spec
@@ -441,5 +452,5 @@ def __query_spec(self):
if self.__max_scan:
spec["$maxScan"] = self.__max_scan
return spec
-
-
+
+
View
72 asyncmongo/pool.py
@@ -1,5 +1,5 @@
#!/bin/env python
-#
+#
# Copyright 2010 bit.ly
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -15,6 +15,7 @@
# under the License.
from threading import Condition
+from collections import deque
import logging
from errors import TooManyConnections, ProgrammingError
from connection import Connection
@@ -35,7 +36,7 @@ def get_connection_pool(self, pool_id, *args, **kwargs):
if pool_id not in self._pools:
self._pools[pool_id] = ConnectionPool(*args, **kwargs)
return self._pools[pool_id]
-
+
@classmethod
def close_idle_connections(self, pool_id=None):
"""close idle connections to mongo"""
@@ -54,7 +55,7 @@ def close_idle_connections(self, pool_id=None):
class ConnectionPool(object):
"""Connection Pool to a single mongo instance.
-
+
:Parameters:
- `mincached` (optional): minimum connections to open on instantiation. 0 to open connections on first use
- `maxcached` (optional): maximum inactive cached connections for this pool. 0 for unlimited
@@ -63,15 +64,15 @@ class ConnectionPool(object):
- `dbname`: mongo database name
- `slave_okay` (optional): is it okay to connect directly to and perform queries on a slave instance
- `**kwargs`: passed to `connection.Connection`
-
+
"""
- def __init__(self,
- mincached=0,
- maxcached=0,
- maxconnections=0,
- maxusage=0,
- dbname=None,
- slave_okay=False,
+ def __init__(self,
+ mincached=0,
+ maxcached=0,
+ maxconnections=0,
+ maxusage=0,
+ dbname=None,
+ slave_okay=False,
*args, **kwargs):
assert isinstance(mincached, int)
assert isinstance(maxcached, int)
@@ -95,33 +96,41 @@ def __init__(self,
self._slave_okay = slave_okay
self._connections = 0
-
+ self._backlog_queue = deque() # The queue of all the clients waiting for
+ # a connection.
+
# Establish an initial number of idle database connections:
- idle = [self.connection() for i in range(mincached)]
- while idle:
- self.cache(idle.pop())
-
+ idle = [self.new_connection() for i in range(mincached)]
+ while idle: self.cache(idle.pop())
+
def new_connection(self):
kwargs = self._kwargs
kwargs['pool'] = self
return Connection(*self._args, **kwargs)
-
- def connection(self):
+
+ def connection(self, callback, from_backlog=False):
""" get a cached connection from the pool """
-
+
+ con = None
self._condition.acquire()
try:
if (self._maxconnections and self._connections >= self._maxconnections):
- raise TooManyConnections("%d connections are already equal to the max: %d" % (self._connections, self._maxconnections))
- # connection limit not reached, get a dedicated connection
- try: # first try to get it from the idle cache
- con = self._idle_cache.pop(0)
- except IndexError: # else get a fresh connection
- con = self.new_connection()
- self._connections += 1
+ if from_backlog: # We requeue the request on top of the backlog if it came from there.
+ self._backlog_queue.appendleft(callback)
+ else: # Otherwise, we just queue it.
+ self._backlog_queue.append(callback)
+ else:
+ # connection limit not reached, get a dedicated connection
+ try: # first try to get it from the idle cache
+ con = self._idle_cache.pop(0)
+ except IndexError: # else get a fresh connection
+ con = self.new_connection()
+ self._connections += 1
finally:
self._condition.release()
- return con
+
+ # If we acquired a connection object, we can call the callback that was supplied.
+ if con: callback(con)
def cache(self, con):
"""Put a dedicated connection back into the idle cache."""
@@ -146,7 +155,12 @@ def cache(self, con):
finally:
self._connections -= 1
self._condition.release()
-
+
+ if self._backlog_queue:
+ # Try to see if we can use a connection if the backlog is not empty.
+ self.connection(self._backlog_queue.popleft(), from_backlog=True)
+
+
def close(self):
"""Close all connections in the pool."""
self._condition.acquire()
@@ -161,5 +175,5 @@ def close(self):
self._condition.notifyAll()
finally:
self._condition.release()
-
+
Something went wrong with that request. Please try again.