Skip to content

Commit

Permalink
Merge pull request #27 from kopatsy/release-reaper
Browse files Browse the repository at this point in the history
Release reaper thread
  • Loading branch information
benoitc committed Apr 24, 2014
2 parents 932d68f + 490132f commit 8f342ca
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 40 deletions.
2 changes: 0 additions & 2 deletions socketpool/backend_gevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
from gevent import socket
from gevent import queue

from socketpool.pool import ConnectionPool

try:
from gevent import lock
except ImportError:
Expand Down
14 changes: 10 additions & 4 deletions socketpool/backend_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import socket
import threading
import time
import weakref

try:
import Queue as queue
Expand Down Expand Up @@ -37,9 +38,10 @@ class ConnectionReaper(threading.Thread):
connections after a delay """

running = False
forceStop = False

def __init__(self, pool, delay=600):
self.pool = pool
self.pool = weakref.ref(pool)
self.delay = delay
threading.Thread.__init__(self)
self.setDaemon(True)
Expand All @@ -48,10 +50,14 @@ def run(self):
self.running = True
while True:
time.sleep(self.delay)
self.pool.murder_connections()
pool = self.pool()
if pool is not None:
pool.murder_connections()

if self.forceStop:
self.running = False
break

def ensure_started(self):
if not self.running and not self.isAlive():
self.start()


14 changes: 10 additions & 4 deletions socketpool/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
# See the NOTICE for more information.

import contextlib
import sys
import time

from socketpool.util import load_backend
Expand Down Expand Up @@ -64,12 +63,10 @@ def __init__(self, factory,
self.timeout = timeout
self.max_lifetime = max_lifetime
if options is None:
self.options = {"backend_mod": self.backend_mod,
"pool": self}
self.options = {"backend_mod": self.backend_mod}
else:
self.options = options
self.options["backend_mod"] = self.backend_mod
self.options["pool"] = self

# bounded semaphore to make self._alive 'safe'
self._sem = self.backend_mod.Semaphore(1)
Expand All @@ -79,6 +76,9 @@ def __init__(self, factory,
self.reap_delay = reap_delay
self.start_reaper()

def __del__(self):
self.stop_reaper()

def too_old(self, conn):
return time.time() - conn.get_lifetime() > self.max_lifetime

Expand All @@ -99,6 +99,9 @@ def start_reaper(self):
delay=self.reap_delay)
self._reaper.ensure_started()

def stop_reaper(self):
self._reaper.forceStop = True

def _reap_connection(self, conn):
if conn.is_connected():
conn.invalidate()
Expand Down Expand Up @@ -128,6 +131,9 @@ def release_connection(self, conn):

def get(self, **options):
options.update(self.options)
# Do not set this in self.options so we don't keep a persistent
# reference on the pool which would prevent garbage collection.
options["pool"] = self

found = None
i = self.pool.qsize()
Expand Down
40 changes: 16 additions & 24 deletions tests/test_backend_finding.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,23 @@
import pytest
from socketpool.pool import ConnectionPool
from socketpool.conn import TcpConnector

def make_and_check_backend(expected_name, **kw):
pool = ConnectionPool(TcpConnector, **kw)
assert pool.backend == expected_name
return pool
def pytest_generate_tests(metafunc):
if 'backend' in metafunc.fixturenames:
metafunc.parametrize('backend', ['thread', 'gevent', 'eventlet'])

class Test_Backend(object):
def make_and_check_backend(self, expected_name, **kw):
pool = ConnectionPool(TcpConnector, **kw)
assert pool.backend == expected_name
return pool

def test_default_backend():
make_and_check_backend('thread')
def test_default_backend(self):
self.make_and_check_backend('thread')

def test_backend(self, backend):
self.make_and_check_backend(backend, backend=backend)

def test_thread_backend():
make_and_check_backend('thread', backend='thread')


def test_gevent_backend():
pytest.importorskip('gevent')
make_and_check_backend('gevent', backend='gevent')


def test_eventlet_backend():
pytest.importorskip('eventlet')
make_and_check_backend('eventlet', backend='eventlet')


def test_thread_backend_as_module():
from socketpool import backend_thread
make_and_check_backend('socketpool.backend_thread', backend=backend_thread)
def test_thread_backend_as_module(self):
from socketpool import backend_thread
self.make_and_check_backend('socketpool.backend_thread',
backend=backend_thread)
45 changes: 39 additions & 6 deletions tests/test_pool_01.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
# This file is part of socketpool.
# See the NOTICE for more information.

import unittest
import time

import pytest

from socketpool import ConnectionPool, Connector
from socketpool.pool import MaxTriesError
Expand All @@ -19,10 +21,41 @@ def is_connected(self):
def invalidate(self):
pass


class PoolTestCase(unittest.TestCase):

class Test_Pool(object):
def test_size_on_isconnected_failure(self):
pool = ConnectionPool(MessyConnector)
self.assert_(pool.size == 0)
self.assertRaises(MaxTriesError, pool.get)
assert pool.size == 0
pytest.raises(MaxTriesError, pool.get)

def test_stop_reaper_thread(self):
"""Verify that calling stop_reaper will terminate the reaper thread.
"""
pool = ConnectionPool(MessyConnector, backend='thread')
assert pool._reaper.running
pool.stop_reaper()

for i in xrange(1000):
if not pool._reaper.running:
return
time.sleep(0.01)

assert False, 'Reaper thread not terminated in time.'

def test_del(self):
"""Verify that garbage collection of the pool will release the reaper
thread.
"""
pool = ConnectionPool(MessyConnector, backend='thread')
reaper = pool._reaper

assert reaper.running

# Remove reference.
pool = None

for i in xrange(1000):
if not reaper.running:
return
time.sleep(0.01)

assert False, 'Reaper thread not terminated in time.'

0 comments on commit 8f342ca

Please sign in to comment.