Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Connections queue support #46

Open
wants to merge 1 commit into from

2 participants

@emdagon

Instead of raise the TooManyConnections exception when maxconnections is reached, the outgoing requests will be queued until a slot become free.

I faced a problem regarding this issue on a high concurrency scenario, I did some tests and seems to work properly.

@emdagon

This commit avoid the TooManyConnections exception in favor to manage a waiting list of connections. Instead of raise the exception, it will take the next cached connection and run the operation (find, update, ...) on it.

@FlorianLudwig

Isn't this the same as pull request 45, #45 ?

@emdagon

@FlorianLudwig It solve the same problem, but the implementation it's a little bit different (In this one, the connection is taken on the releasing event, IMHO it's a little bit efficient)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on May 11, 2012
  1. Added connection queue support

    Emilio González authored
This page is out of date. Refresh to see the latest.
Showing with 134 additions and 112 deletions.
  1. +105 −95 asyncmongo/cursor.py
  2. +29 −17 asyncmongo/pool.py
View
200 asyncmongo/cursor.py
@@ -1,5 +1,5 @@
#!/bin/env python
-#
+#
# Copyright 2010 bit.ly
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -36,16 +36,16 @@ def __init__(self, dbname, collection, pool):
assert isinstance(dbname, (str, unicode))
assert isinstance(collection, (str, unicode))
assert isinstance(pool, object)
-
+
self.__dbname = dbname
self.__collection = collection
self.__pool = pool
self.__slave_okay = False
-
+
@property
def full_collection_name(self):
return u'%s.%s' % (self.__dbname, self.__collection)
-
+
def drop(self, *args, **kwargs):
raise NotImplemented("patches accepted")
@@ -56,7 +56,7 @@ def save(self, doc, **kwargs):
def insert(self, doc_or_docs,
manipulate=True, safe=True, check_keys=True, callback=None, **kwargs):
"""Insert a document(s) into this collection.
-
+
If `manipulate` is set, the document(s) are manipulated using
any :class:`~pymongo.son_manipulator.SONManipulator` instances
that have been added to this
@@ -64,17 +64,17 @@ def insert(self, doc_or_docs,
the inserted document or a list of ``"_id"`` values of the
inserted documents. If the document(s) does not already
contain an ``"_id"`` one will be added.
-
+
If `safe` is ``True`` then the insert will be checked for
errors, raising :class:`~pymongo.errors.OperationFailure` if
one occurred. Safe inserts wait for a response from the
database, while normal inserts do not.
-
+
Any additional keyword arguments imply ``safe=True``, and
will be used as options for the resultant `getLastError`
command. For example, to wait for replication to 3 nodes, pass
``w=3``.
-
+
:Parameters:
- `doc_or_docs`: a document or list of documents to be
inserted
@@ -87,77 +87,82 @@ def insert(self, doc_or_docs,
- `**kwargs` (optional): any additional arguments imply
``safe=True``, and will be used as options for the
`getLastError` command
-
+
.. mongodoc:: insert
"""
if not isinstance(safe, bool):
raise TypeError("safe must be an instance of bool")
-
+
docs = doc_or_docs
# return_one = False
if isinstance(docs, dict):
# return_one = True
docs = [docs]
-
+
# if manipulate:
# docs = [self.__database._fix_incoming(doc, self) for doc in docs]
-
+
self.__limit = None
if kwargs:
safe = True
-
+
if safe and not callable(callback):
raise TypeError("callback must be callable")
if not safe and callback is not None:
raise TypeError("callback can not be used with safe=False")
-
+
if callback:
callback = functools.partial(self._handle_response, orig_callback=callback)
- connection = self.__pool.connection()
- try:
- connection.send_message(
- message.insert(self.full_collection_name, docs,
- check_keys, safe, kwargs), callback=callback)
- except:
- connection.close()
- raise
-
+ #connection = self.__pool.connection()
+ def on_connect(connection):
+ try:
+ connection.send_message(
+ message.insert(self.full_collection_name, docs,
+ check_keys, safe, kwargs), callback=callback)
+ except:
+ connection.close()
+ raise
+
+ self.__pool.add_to_queue(on_connect)
+
def remove(self, spec_or_id=None, safe=True, callback=None, **kwargs):
if not isinstance(safe, bool):
raise TypeError("safe must be an instance of bool")
-
+
if spec_or_id is None:
spec_or_id = {}
if not isinstance(spec_or_id, dict):
spec_or_id = {"_id": spec_or_id}
-
+
self.__limit = None
if kwargs:
safe = True
-
+
if safe and not callable(callback):
raise TypeError("callback must be callable")
if not safe and callback is not None:
raise TypeError("callback can not be used with safe=False")
-
+
if callback:
callback = functools.partial(self._handle_response, orig_callback=callback)
- connection = self.__pool.connection()
- try:
- connection.send_message(
- message.delete(self.full_collection_name, spec_or_id, safe, kwargs),
- callback=callback)
- except:
- connection.close()
- raise
+ #connection = self.__pool.connection()
+ def on_connect(connection):
+ try:
+ connection.send_message(
+ message.delete(self.full_collection_name, spec_or_id, safe, kwargs),
+ callback=callback)
+ except:
+ connection.close()
+ raise
+
+ self.__pool.add_to_queue(on_connect)
-
def update(self, spec, document, upsert=False, manipulate=False,
safe=True, multi=False, callback=None, **kwargs):
"""Update a document(s) in this collection.
-
+
Raises :class:`TypeError` if either `spec` or `document` is
not an instance of ``dict`` or `upsert` is not an instance of
``bool``. If `safe` is ``True`` then the update will be
@@ -166,14 +171,14 @@ def update(self, spec, document, upsert=False, manipulate=False,
occurred. Safe updates require a response from the database,
while normal updates do not - thus, setting `safe` to ``True``
will negatively impact performance.
-
+
There are many useful `update modifiers`_ which can be used
when performing updates. For example, here we use the
``"$set"`` modifier to modify some fields in a matching
document:
-
+
.. doctest::
-
+
>>> db.test.insert({"x": "y", "a": "b"})
ObjectId('...')
>>> list(db.test.find())
@@ -181,15 +186,15 @@ def update(self, spec, document, upsert=False, manipulate=False,
>>> db.test.update({"x": "y"}, {"$set": {"a": "c"}})
>>> list(db.test.find())
[{u'a': u'c', u'x': u'y', u'_id': ObjectId('...')}]
-
+
If `safe` is ``True`` returns the response to the *lastError*
command. Otherwise, returns ``None``.
-
+
# Any additional keyword arguments imply ``safe=True``, and will
# be used as options for the resultant `getLastError`
# command. For example, to wait for replication to 3 nodes, pass
# ``w=3``.
-
+
:Parameters:
- `spec`: a ``dict`` or :class:`~bson.son.SON` instance
specifying elements which must be present for a document
@@ -214,9 +219,9 @@ def update(self, spec, document, upsert=False, manipulate=False,
- `**kwargs` (optional): any additional arguments imply
``safe=True``, and will be used as options for the
`getLastError` command
-
+
.. _update modifiers: http://www.mongodb.org/display/DOCS/Updating
-
+
.. mongodoc:: update
"""
if not isinstance(spec, dict):
@@ -230,32 +235,35 @@ def update(self, spec, document, upsert=False, manipulate=False,
# TODO: apply SON manipulators
# if upsert and manipulate:
# document = self.__database._fix_incoming(document, self)
-
+
if kwargs:
safe = True
-
+
if safe and not callable(callback):
raise TypeError("callback must be callable")
if not safe and callback is not None:
raise TypeError("callback can not be used with safe=False")
-
+
if callback:
callback = functools.partial(self._handle_response, orig_callback=callback)
self.__limit = None
- connection = self.__pool.connection()
- try:
- connection.send_message(
- message.update(self.full_collection_name, upsert, multi,
- spec, document, safe, kwargs), callback=callback)
- except:
- connection.close()
- raise
-
-
+ #connection = self.__pool.connection()
+ def on_connect(connection):
+ try:
+ connection.send_message(
+ message.update(self.full_collection_name, upsert, multi,
+ spec, document, safe, kwargs), callback=callback)
+ except:
+ connection.close()
+ raise
+
+ self.__pool.add_to_queue(on_connect)
+
+
def find_one(self, spec_or_id, **kwargs):
"""Get a single document from the database.
-
+
All arguments to :meth:`find` are also valid arguments for
:meth:`find_one`, although any `limit` argument will be
ignored. Returns a single document, or ``None`` if no matching
@@ -265,29 +273,29 @@ def find_one(self, spec_or_id, **kwargs):
spec_or_id = {"_id": spec_or_id}
kwargs['limit'] = -1
self.find(spec_or_id, **kwargs)
-
+
def find(self, spec=None, fields=None, skip=0, limit=0,
timeout=True, snapshot=False, tailable=False, sort=None,
max_scan=None, slave_okay=False,
_must_use_master=False, _is_command=False, hint=None, debug=False,
callback=None):
"""Query the database.
-
+
The `spec` argument is a prototype document that all results
must match. For example:
-
+
>>> db.test.find({"hello": "world"}, callback=...)
-
+
only matches documents that have a key "hello" with value
"world". Matches can have other keys *in addition* to
"hello". The `fields` argument is used to specify a subset of
fields that should be included in the result documents. By
limiting results to a certain subset of fields you can cut
down on network traffic and decoding time.
-
+
Raises :class:`TypeError` if any of the arguments are of
improper type.
-
+
:Parameters:
- `spec` (optional): a SON object specifying elements which
must be present for a document to be included in the
@@ -325,13 +333,13 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
examined when performing the query
- `slave_okay` (optional): is it okay to connect directly
to and perform queries on a slave instance
-
+
.. mongodoc:: find
"""
-
+
if spec is None:
spec = {}
-
+
if not isinstance(spec, dict):
raise TypeError("spec must be an instance of dict")
if not isinstance(skip, int):
@@ -346,19 +354,19 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
raise TypeError("tailable must be an instance of bool")
if not callable(callback):
raise TypeError("callback must be callable")
-
+
if fields is not None:
if not fields:
fields = {"_id": 1}
if not isinstance(fields, dict):
fields = helpers._fields_list_to_dict(fields)
-
+
self.__spec = spec
self.__fields = fields
self.__skip = skip
self.__limit = limit
self.__batch_size = 0
-
+
self.__timeout = timeout
self.__tailable = tailable
self.__snapshot = snapshot
@@ -372,25 +380,27 @@ def find(self, spec=None, fields=None, skip=0, limit=0,
self.__tz_aware = False #collection.database.connection.tz_aware
self.__must_use_master = _must_use_master
self.__is_command = _is_command
-
- connection = self.__pool.connection()
- try:
- if self.__debug:
- logging.debug('QUERY_SPEC: %r' % self.__query_spec())
-
- connection.send_message(
- message.query(self.__query_options(),
- self.full_collection_name,
- self.__skip,
- self.__limit,
- self.__query_spec(),
- self.__fields),
- callback=functools.partial(self._handle_response, orig_callback=callback))
- except Exception, e:
- logging.error('Error sending query %s' % e)
- connection.close()
- raise
-
+
+ #connection = self.__pool.connection()
+ def on_connect(connection):
+ try:
+ if self.__debug:
+ logging.debug('QUERY_SPEC: %r' % self.__query_spec())
+
+ connection.send_message(
+ message.query(self.__query_options(),
+ self.full_collection_name,
+ self.__skip,
+ self.__limit,
+ self.__query_spec(),
+ self.__fields),
+ callback=functools.partial(self._handle_response, orig_callback=callback))
+ except Exception, e:
+ logging.error('Error sending query %s' % e)
+ connection.close()
+ raise
+ self.__pool.add_to_queue(on_connect)
+
def _handle_response(self, result, error=None, orig_callback=None):
if result and result.get('cursor_id'):
connection = self.__pool.connection()
@@ -402,7 +412,7 @@ def _handle_response(self, result, error=None, orig_callback=None):
logging.error('Error killing cursor %s: %s' % (result['cursor_id'], e))
connection.close()
raise
-
+
if error:
logging.error('%s %s' % (self.full_collection_name , error))
orig_callback(None, error=error)
@@ -413,7 +423,7 @@ def _handle_response(self, result, error=None, orig_callback=None):
else:
orig_callback(result['data'], error=None)
-
+
def __query_options(self):
"""Get the query options string to use for this query."""
options = 0
@@ -424,7 +434,7 @@ def __query_options(self):
if not self.__timeout:
options |= _QUERY_OPTIONS["no_timeout"]
return options
-
+
def __query_spec(self):
"""Get the spec to use for a query."""
spec = self.__spec
@@ -441,5 +451,5 @@ def __query_spec(self):
if self.__max_scan:
spec["$maxScan"] = self.__max_scan
return spec
-
-
+
+
View
46 asyncmongo/pool.py
@@ -1,5 +1,5 @@
#!/bin/env python
-#
+#
# Copyright 2010 bit.ly
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -15,6 +15,7 @@
# under the License.
from threading import Condition
+from collections import deque
import logging
from errors import TooManyConnections, ProgrammingError
from connection import Connection
@@ -35,7 +36,7 @@ def get_connection_pool(self, pool_id, *args, **kwargs):
if pool_id not in self._pools:
self._pools[pool_id] = ConnectionPool(*args, **kwargs)
return self._pools[pool_id]
-
+
@classmethod
def close_idle_connections(self, pool_id=None):
"""close idle connections to mongo"""
@@ -54,7 +55,7 @@ def close_idle_connections(self, pool_id=None):
class ConnectionPool(object):
"""Connection Pool to a single mongo instance.
-
+
:Parameters:
- `mincached` (optional): minimum connections to open on instantiation. 0 to open connections on first use
- `maxcached` (optional): maximum inactive cached connections for this pool. 0 for unlimited
@@ -63,15 +64,15 @@ class ConnectionPool(object):
- `dbname`: mongo database name
- `slave_okay` (optional): is it okay to connect directly to and perform queries on a slave instance
- `**kwargs`: passed to `connection.Connection`
-
+
"""
- def __init__(self,
- mincached=0,
- maxcached=0,
- maxconnections=0,
- maxusage=0,
- dbname=None,
- slave_okay=False,
+ def __init__(self,
+ mincached=0,
+ maxcached=0,
+ maxconnections=0,
+ maxusage=0,
+ dbname=None,
+ slave_okay=False,
*args, **kwargs):
assert isinstance(mincached, int)
assert isinstance(maxcached, int)
@@ -90,25 +91,32 @@ def __init__(self,
self._maxcached = maxcached
self._maxconnections = maxconnections
self._idle_cache = [] # the actual connections that can be used
+ self._queue = deque()
self._condition = Condition()
self._dbname = dbname
self._slave_okay = slave_okay
self._connections = 0
-
+
# Establish an initial number of idle database connections:
idle = [self.connection() for i in range(mincached)]
while idle:
self.cache(idle.pop())
-
+
def new_connection(self):
kwargs = self._kwargs
kwargs['pool'] = self
return Connection(*self._args, **kwargs)
-
+
+ def add_to_queue(self, callback):
+ try:
+ callback(self.connection())
+ except TooManyConnections:
+ self._queue.append(callback)
+
def connection(self):
""" get a cached connection from the pool """
-
+
self._condition.acquire()
try:
if (self._maxconnections and self._connections >= self._maxconnections):
@@ -135,6 +143,10 @@ def cache(self, con):
# called via socket close on a connection in the idle cache
self._condition.release()
return
+ if self._queue:
+ waiting = self._queue.popleft()
+ waiting(con)
+ return
try:
if not self._maxcached or len(self._idle_cache) < self._maxcached:
# the idle cache is not full, so put it there
@@ -146,7 +158,7 @@ def cache(self, con):
finally:
self._connections -= 1
self._condition.release()
-
+
def close(self):
"""Close all connections in the pool."""
self._condition.acquire()
@@ -161,5 +173,5 @@ def close(self):
self._condition.notifyAll()
finally:
self._condition.release()
-
+
Something went wrong with that request. Please try again.