Permalink
Browse files

make distinct the max number of connections handled from the pool size.

Before that changes the maximum number of connections created was equal
to the max of connections you could have in a pool. So if a connection
was out more longer than expected in the pool, it wasn't possible to
emit more connections. This change make sure that we can do that.

Defaults are 10 connections max kept in the pool and 150 maximum
connections that can be emitted. To get the old behaviour you can use
max_conn = max_size.
Also while i'm here make sure the con
  • Loading branch information...
benoitc committed Jun 5, 2012
1 parent 2cbec12 commit 21c7a10a3e71e24f4dd89fb1b242c849d71fe59a
Showing with 71 additions and 22 deletions.
  1. +20 −6 examples/test_gevent.py
  2. +13 −2 examples/test_threaded.py
  3. +38 −14 socketpool/pool.py
View
@@ -26,23 +26,37 @@ def echo(sock, address):
import time
options = {'host': 'localhost', 'port': 6000}
- pool = ConnectionPool(factory=TcpConnector, backend="gevent")
+ pool = ConnectionPool(factory=TcpConnector, max_conn=20,
+ backend="gevent")
server = StreamServer(('localhost', 6000), echo)
gevent.spawn(server.serve_forever)
def runpool(data):
- print 'ok'
with pool.connection(**options) as conn:
- print 'sending'
+ print ("conn: alive connections: %s" % pool.alive())
+ print ("conn: pool size: %s" % pool.size())
+
sent = conn.send(data)
- print 'send %d bytes' % sent
echo_data = conn.recv(1024)
- print "got %s" % data
assert data == echo_data
start = time.time()
- jobs = [gevent.spawn(runpool, "blahblah") for _ in xrange(20)]
+ jobs = [gevent.spawn(runpool, "blahblah") for _ in xrange(50)]
gevent.joinall(jobs)
delay = time.time() - start
+
+ print ("final alive connections: %s" % pool.alive())
+ print ("final pool size: %s" % pool.size())
+
+ with pool.connection(**options) as conn:
+ print ("conn: alive connections: %s" % pool.alive())
+ print ("conn: pool size: %s" % pool.size())
+
+ sent = conn.send("hello")
+ echo_data = conn.recv(1024)
+ assert "hello" == echo_data
+
+ print ("final alive connections: %s" % pool.alive())
+ print ("final pool size: %s" % pool.size())
View
@@ -53,14 +53,22 @@ def runpool():
print 'ok'
try:
with pool.connection() as conn:
- print 'sending'
+ print ("conn: alive connections: %s" % pool.alive)
+ print ("conn: pool size: %s" % pool.size())
sent = conn.send(data)
- print 'send %d bytes' % sent
echo = conn.recv(1024)
print "got %s" % data
assert data == echo
+
+
+
+
+
finally:
q.task_done()
+ print ("alive connections: %s" % pool.alive)
+ print ("pool size: %s" % pool.size())
+
for i in xrange(20):
q.put("Hello World %s" % i)
@@ -73,3 +81,6 @@ def runpool():
q.join()
server.shutdown()
+ print ("final alive connections: %s" % pool.alive)
+ print ("final pool size: %s" % pool.size())
+
View
@@ -12,20 +12,25 @@
class MaxTriesError(Exception):
pass
+class MaxConnectionsError(Exception):
+ pass
+
class ConnectionPool(object):
def __init__(self, factory,
retry_max=3, retry_delay=.1,
timeout=-1, max_lifetime=600.,
- max_size=10, options=None,
- reap_connections=True,
+ max_size=10, max_conn=150,
+ options=None, reap_connections=True,
backend="thread"):
self.backend_mod = load_backend(backend)
self.backend = backend
self.max_size = max_size
+ self.max_conn = max_conn
self.pool = self.backend_mod.PriorityQueue()
- self.size = 0
+ self._alive = 0
+ self._free_conns = 0
self.factory = factory
self.retry_max = retry_max
self.retry_delay = retry_delay
@@ -39,7 +44,7 @@ def __init__(self, factory,
self.options["backend_mod"] = self.backend_mod
self.options["pool"] = self
- # bounded semaphore to make self.size 'safe'
+ # bounded semaphore to make self._alive 'safe'
self._sem = self.backend_mod.Semaphore(1)
self._reaper = None
@@ -69,8 +74,15 @@ def start_reaper(self):
def _reap_connection(self, conn):
if conn.is_connected():
conn.invalidate()
- with self._sem:
- self.size -= 1
+
+ def num_connections(self):
+ return (self.alive() + self.size())
+
+ def alive(self):
+ return self._alive
+
+ def size(self):
+ return self.pool.qsize()
def release_all(self):
if self.pool.qsize():
@@ -81,11 +93,18 @@ def release_connection(self, conn):
if self._reaper is not None:
self._reaper.ensure_started()
- connected = conn.is_connected()
- if connected and not self.too_old(conn):
- self.pool.put((conn.get_lifetime(), conn))
- else:
- self._reap_connection(conn)
+ with self._sem:
+ if self.pool.qsize() < self.max_size:
+ connected = conn.is_connected()
+ if connected and not self.too_old(conn):
+ self.pool.put((conn.get_lifetime(), conn))
+ else:
+ self._reap_connection(conn)
+ else:
+ self._reap_connection(conn)

This comment has been minimized.

Show comment Hide comment
@cactus

cactus Jun 5, 2012

Contributor

This block puts the self release/reap operation entirely in the semaphore lock. Previously just the counter handling was semaphore protected. Not sure that a pool connection releasing itself needs to block/lock for safety. Just something that modifies the pool counters.

Was there a particular reason or bug that prompted a larger scope of semaphore lock?

@cactus

cactus Jun 5, 2012

Contributor

This block puts the self release/reap operation entirely in the semaphore lock. Previously just the counter handling was semaphore protected. Not sure that a pool connection releasing itself needs to block/lock for safety. Just something that modifies the pool counters.

Was there a particular reason or bug that prompted a larger scope of semaphore lock?

This comment has been minimized.

Show comment Hide comment
@benoitc

benoitc Jun 5, 2012

Owner

the pool size was always 0. Imo the len(self.queue) isn't threadsafe.

@benoitc

benoitc Jun 5, 2012

Owner

the pool size was always 0. Imo the len(self.queue) isn't threadsafe.

This comment has been minimized.

Show comment Hide comment
@cactus

cactus Jun 6, 2012

Contributor

yeah. you could pull out the size retrieval (wrap in sem) and the just use it in the test outside of it.
It just seems expensive to block for the reap operation if you dont need to (though probably only relevant for theading since only one greenlet can
run at a time).

@cactus

cactus Jun 6, 2012

Contributor

yeah. you could pull out the size retrieval (wrap in sem) and the just use it in the test outside of it.
It just seems expensive to block for the reap operation if you dont need to (though probably only relevant for theading since only one greenlet can
run at a time).

This comment has been minimized.

Show comment Hide comment
@benoitc

benoitc Jun 6, 2012

Owner

Did you try it? I would be interested by your results. Here I tried to wrap the size method with the semaphore, but it wasn't enough. Sound logic since the put or delete need to be synchronized I guess. But let me know.

@benoitc

benoitc Jun 6, 2012

Owner

Did you try it? I would be interested by your results. Here I tried to wrap the size method with the semaphore, but it wasn't enough. Sound logic since the put or delete need to be synchronized I guess. But let me know.

This comment has been minimized.

Show comment Hide comment
@cactus

cactus Jun 6, 2012

Contributor

Interesting! I haven't tried it yet, but I will make an attempt at it in the next couple of days and see what falls out. Thanks!

@cactus

cactus Jun 6, 2012

Contributor

Interesting! I haven't tried it yet, but I will make an attempt at it in the next couple of days and see what falls out. Thanks!

+
+ with self._sem:
+ self._alive -= 1
def get(self, **options):
options.update(self.options)
@@ -97,6 +116,7 @@ def get(self, **options):
while tries < self.retry_max:
# first let's try to find a matching one from pool
+
if self.pool.qsize():
for priority, candidate in self.pool:
i -= 1
@@ -123,11 +143,13 @@ def get(self, **options):
# we got one.. we use it
if found is not None:
+ with self._sem:
+ self._alive += 1
return found
# didn't get one.
# see if we have room to make a new one
- if self.size < self.max_size:
+ if self._alive < self.max_conn or not self.max_conn:
try:
new_item = self.factory(**options)
except Exception, e:
@@ -136,8 +158,10 @@ def get(self, **options):
# we should be connected now
if new_item.is_connected():
with self._sem:
- self.size += 1
- return new_item
+ self._alive += 1
+ return new_item
+ else:
+ last_error = MaxConnectionsError()
tries += 1
self.backend_mod.sleep(self.retry_delay)

0 comments on commit 21c7a10

Please sign in to comment.