Handle pool.size and pool.max_size values safely #11

Merged
merged 9 commits into from May 21, 2012
@@ -13,6 +13,7 @@
sleep = eventlet.sleep
Socket = socket.socket
Select = select.select
+Semaphore = eventlet.semaphore.BoundedSemaphore
class PriorityQueue(queue.PriorityQueue):
@@ -7,10 +7,12 @@
from gevent import select
from gevent import socket
from gevent import queue
+from gevent import coros
from socketpool.pool import ConnectionPool
sleep = gevent.sleep
+Semaphore = gevent.coros.BoundedSemaphore
Socket = socket.socket
Select = select.select
@@ -16,6 +16,7 @@
Select = select.select
Socket = socket.socket
sleep = time.sleep
+Semaphore = threading.BoundedSemaphore
class PriorityQueue(queue.PriorityQueue):
View
@@ -6,6 +6,7 @@
import select
import socket
import time
+import random
class Connector(object):
def matches(self, **match_options):
@@ -33,7 +34,10 @@ def __init__(self, host, port, backend_mod, pool=None):
self.port = port
self.backend_mod = backend_mod
self._connected = True
- self._life = time.time()
+ # use a 'jiggle' value to make sure there is some
+ # randomization to expiry, to avoid many conns expiring very
+ # closely together.
+ self._life = time.time() - random.randint(0, 10)
self._pool = pool
def __del__(self):
View
@@ -39,6 +39,9 @@ def __init__(self, factory,
self.options["backend_mod"] = self.backend_mod
self.options["pool"] = self
+ # bounded semaphore to make self.size 'safe'
+ self._sem = self.backend_mod.Semaphore(1)
+
self._reaper = None
if reap_connections:
self.start_reaper()
@@ -47,21 +50,32 @@ def too_old(self, conn):
return time.time() - conn.get_lifetime() > self.max_lifetime
def murder_connections(self):
- pool = self.pool
- if pool.qsize():
- for priority, candidate in pool:
+ current_pool_size = self.pool.qsize()
+ if current_pool_size > 0:
+ for priority, candidate in self.pool:
+ current_pool_size -= 1
if not self.too_old(candidate):
- pool.put((priority, candidate))
+ self.pool.put((priority, candidate))
+ else:
+ self._reap_connection(candidate)
+ if current_pool_size <= 0:
+ break
def start_reaper(self):
self._reaper = self.backend_mod.ConnectionReaper(self,
delay=self.max_lifetime)
self._reaper.ensure_started()
+ def _reap_connection(self, conn):
+ if conn.is_connected():
+ conn.invalidate()
+ with self._sem:
+ self.size -= 1
+
def release_all(self):
if self.pool.qsize():
for priority, conn in self.pool:
- conn.invalidate()
+ self._reap_connection(conn)
def release_connection(self, conn):
if self._reaper is not None:
@@ -71,53 +85,59 @@ def release_connection(self, conn):
if connected and not self.too_old(conn):
self.pool.put((conn.get_lifetime(), conn))
else:
- conn.invalidate()
+ self._reap_connection(conn)
def get(self, **options):
options.update(self.options)
- # first let's try to find a matching one
found = None
i = self.pool.qsize()
- if self.size >= self.max_size or self.pool.qsize():
- for priority, candidate in self.pool:
- i -= 1
- if self.too_old(candidate):
- # let's drop it
- continue
-
- matches = candidate.matches(**options)
- if not matches:
- # let's put it back
- self.pool.put((priority, candidate))
- else:
- if candidate.is_connected():
- found = candidate
- break
-
- if i <= 0:
- break
-
- # we got one.. we use it
- if found is not None:
- return found
-
-
- # we build a new one and send it back
tries = 0
last_error = None
while tries < self.retry_max:
- self.size += 1
- try:
- new_item = self.factory(**options)
- except Exception, e:
- self.size -= 1
- last_error = e
- else:
- # we should be connected now
- if new_item.is_connected():
- return new_item
+ # first let's try to find a matching one from pool
+ if self.pool.qsize():
+ for priority, candidate in self.pool:
+ i -= 1
+ if self.too_old(candidate):
+ # let's drop it
+ self._reap_connection(candidate)
+ continue
+
+ matches = candidate.matches(**options)
+ if not matches:
+ # let's put it back
+ self.pool.put((priority, candidate))
+ else:
+ if candidate.is_connected():
+ found = candidate
+ break
+ else:
+ # conn is dead for some reason.
+ # reap it.
+ self._reap_connection(candidate)
+
+ if i <= 0:
+ break
+
+ # we got one.. we use it
+ if found is not None:
+ return found
+
+ # didn't get one.
+ # see if we have room to make a new one
+ if self.size < self.max_size:
+ try:
+ new_item = self.factory(**options)
+ except Exception, e:
+ last_error = e
+ else:
+ # we should be connected now
+ if new_item.is_connected():
+ with self._sem:
+ self.size += 1
+ return new_item
tries += 1
self.backend_mod.sleep(self.retry_delay)