Connections queue support #46

Open
wants to merge 1 commit into
from
View
Oops, something went wrong.
View
@@ -1,5 +1,5 @@
#!/bin/env python
-#
+#
# Copyright 2010 bit.ly
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -15,6 +15,7 @@
# under the License.
from threading import Condition
+from collections import deque
import logging
from errors import TooManyConnections, ProgrammingError
from connection import Connection
@@ -35,7 +36,7 @@ def get_connection_pool(self, pool_id, *args, **kwargs):
if pool_id not in self._pools:
self._pools[pool_id] = ConnectionPool(*args, **kwargs)
return self._pools[pool_id]
-
+
@classmethod
def close_idle_connections(self, pool_id=None):
"""close idle connections to mongo"""
@@ -54,7 +55,7 @@ def close_idle_connections(self, pool_id=None):
class ConnectionPool(object):
"""Connection Pool to a single mongo instance.
-
+
:Parameters:
- `mincached` (optional): minimum connections to open on instantiation. 0 to open connections on first use
- `maxcached` (optional): maximum inactive cached connections for this pool. 0 for unlimited
@@ -63,15 +64,15 @@ class ConnectionPool(object):
- `dbname`: mongo database name
- `slave_okay` (optional): is it okay to connect directly to and perform queries on a slave instance
- `**kwargs`: passed to `connection.Connection`
-
+
"""
- def __init__(self,
- mincached=0,
- maxcached=0,
- maxconnections=0,
- maxusage=0,
- dbname=None,
- slave_okay=False,
+ def __init__(self,
+ mincached=0,
+ maxcached=0,
+ maxconnections=0,
+ maxusage=0,
+ dbname=None,
+ slave_okay=False,
*args, **kwargs):
assert isinstance(mincached, int)
assert isinstance(maxcached, int)
@@ -90,25 +91,32 @@ def __init__(self,
self._maxcached = maxcached
self._maxconnections = maxconnections
self._idle_cache = [] # the actual connections that can be used
+ self._queue = deque()
self._condition = Condition()
self._dbname = dbname
self._slave_okay = slave_okay
self._connections = 0
-
+
# Establish an initial number of idle database connections:
idle = [self.connection() for i in range(mincached)]
while idle:
self.cache(idle.pop())
-
+
def new_connection(self):
kwargs = self._kwargs
kwargs['pool'] = self
return Connection(*self._args, **kwargs)
-
+
+ def add_to_queue(self, callback):
+ try:
+ callback(self.connection())
+ except TooManyConnections:
+ self._queue.append(callback)
+
def connection(self):
""" get a cached connection from the pool """
-
+
self._condition.acquire()
try:
if (self._maxconnections and self._connections >= self._maxconnections):
@@ -135,6 +143,10 @@ def cache(self, con):
# called via socket close on a connection in the idle cache
self._condition.release()
return
+ if self._queue:
+ waiting = self._queue.popleft()
+ waiting(con)
+ return
try:
if not self._maxcached or len(self._idle_cache) < self._maxcached:
# the idle cache is not full, so put it there
@@ -146,7 +158,7 @@ def cache(self, con):
finally:
self._connections -= 1
self._condition.release()
-
+
def close(self):
"""Close all connections in the pool."""
self._condition.acquire()
@@ -161,5 +173,5 @@ def close(self):
self._condition.notifyAll()
finally:
self._condition.release()
-
+