Skip to content
Permalink
Browse files

Merge pull request #247 from Pylons/idle-warning-redux-2

warn if there are more pending jobs than idle threads
  • Loading branch information...
bertjwregeer committed Apr 5, 2019
2 parents b1b1d3e + c1b681d commit 5bd69e9545718913bd2be0b4ab04f7da3d9f74c8
Showing with 67 additions and 81 deletions.
  1. +1 −0 CHANGES.txt
  2. +52 −61 waitress/task.py
  3. +14 −20 waitress/tests/test_task.py
@@ -36,6 +36,7 @@ Bugfixes

- Fix the queue depth warnings to only show when all threads are busy.
See https://github.com/Pylons/waitress/pull/243
and https://github.com/Pylons/waitress/pull/247

- Trigger the ``app_iter`` to close as part of shutdown. This will only be
noticeable for users of the internal server api. In more typical operations
@@ -51,56 +51,48 @@ class ThreadedTaskDispatcher(object):
"""A Task Dispatcher that creates a thread for each task.
"""
stop_count = 0 # Number of threads that will stop soon.
active = 0 # Number of currently active threads
active_count = 0 # Number of currently active threads
logger = logger
queue_logger = queue_logger

def __init__(self):
self.threads = set()
self.queue = deque()
self.queue_lock = threading.Condition(threading.Lock())
self.lock = threading.Lock()
self.queue_cv = threading.Condition(self.lock)
self.thread_exit_cv = threading.Condition(self.lock)

def start_new_thread(self, target, args):
t = threading.Thread(target=target, name='waitress', args=args)
t.daemon = True
t.start()

def handler_thread(self, thread_no):
try:
# Upon starting this thread, mark ourselves as active
with self.queue_lock:
self.active += 1

while True:
with self.queue_lock:
while not self.queue and thread_no in self.threads:
# Mark ourselves as not active before waiting to be
# woken up, then we will once again be active
self.active -= 1
self.queue_lock.wait()
self.active += 1

if thread_no not in self.threads:
break

task = self.queue.popleft()

if task is None:
# Special value: kill this thread.
break
try:
task.service()
except Exception:
self.logger.exception(
'Exception when servicing %r', task)
finally:
with self.queue_lock:
self.active -= 1
self.stop_count -= 1
self.threads.discard(thread_no)
while True:
with self.lock:
while not self.queue and self.stop_count == 0:
# Mark ourselves as idle before waiting to be
# woken up, then we will once again be active
self.active_count -= 1
self.queue_cv.wait()
self.active_count += 1

if self.stop_count > 0:
self.active_count -= 1
self.stop_count -= 1
self.threads.discard(thread_no)
self.thread_exit_cv.notify()
break

task = self.queue.popleft()
try:
task.service()
except BaseException:
self.logger.exception(
'Exception when servicing %r', task)

def set_thread_count(self, count):
with self.queue_lock:
with self.lock:
threads = self.threads
thread_no = 0
running = len(threads) - self.stop_count
@@ -111,48 +103,47 @@ def set_thread_count(self, count):
threads.add(thread_no)
running += 1
self.start_new_thread(self.handler_thread, (thread_no,))
self.active_count += 1
thread_no = thread_no + 1
if running > count:
# Stop threads.
to_stop = running - count
self.stop_count += to_stop
for n in range(to_stop):
self.queue.append(None)
running -= 1
self.queue_lock.notify(to_stop)
self.stop_count += running - count
self.queue_cv.notify_all()

def add_task(self, task):
with self.queue_lock:
with self.lock:
self.queue.append(task)
self.queue_lock.notify()
if self.active >= len(self.threads):
self.queue_cv.notify()
queue_size = len(self.queue)
idle_threads = (
len(self.threads) - self.stop_count - self.active_count)
if queue_size > idle_threads:
self.queue_logger.warning(
"Task queue depth is %d",
len(self.queue))
"Task queue depth is %d", queue_size - idle_threads)

def shutdown(self, cancel_pending=True, timeout=5):
self.set_thread_count(0)
# Ensure the threads shut down.
threads = self.threads
expiration = time.time() + timeout
while threads:
if time.time() >= expiration:
self.logger.warning(
"%d thread(s) still running" %
len(threads))
break
time.sleep(0.1)
if cancel_pending:
# Cancel remaining tasks.
with self.queue_lock:
with self.lock:
while threads:
if time.time() >= expiration:
self.logger.warning(
"%d thread(s) still running", len(threads))
break
self.thread_exit_cv.wait(0.1)
if cancel_pending:
# Cancel remaining tasks.
queue = self.queue
if len(queue) > 0:
self.logger.warning(
"Canceling %d pending task(s)", len(queue))
while queue:
task = queue.popleft()
if task is not None:
task.cancel()
threads.clear()
self.queue_lock.notify_all()
return True
task.cancel()
self.queue_cv.notify_all()
return True
return False

class Task(object):
@@ -7,36 +7,25 @@ def _makeOne(self):
from waitress.task import ThreadedTaskDispatcher
return ThreadedTaskDispatcher()

def test_handler_thread_task_is_None(self):
inst = self._makeOne()
inst.threads.add(0)
inst.queue.append(None)
inst.handler_thread(0)
self.assertEqual(inst.stop_count, -1)
self.assertEqual(inst.threads, set())

def test_handler_thread_task_raises(self):
inst = self._makeOne()
inst.threads.add(0)
inst.logger = DummyLogger()
class BadDummyTask(DummyTask):
def service(self):
super(BadDummyTask, self).service()
inst.threads.clear()
inst.stop_count += 1
raise Exception
task = BadDummyTask()
inst.logger = DummyLogger()
inst.queue.append(task)
inst.active_count += 1
inst.handler_thread(0)
self.assertEqual(inst.stop_count, -1)
self.assertEqual(inst.stop_count, 0)
self.assertEqual(inst.active_count, 0)
self.assertEqual(inst.threads, set())
self.assertEqual(len(inst.logger.logged), 1)

def test_handler_thread_exits_if_threadno_cleared(self):
inst = self._makeOne()
inst.handler_thread(0)
self.assertEqual(inst.stop_count, -1)

def test_set_thread_count_increase(self):
inst = self._makeOne()
L = []
@@ -56,8 +45,7 @@ def test_set_thread_count_decrease(self):
inst = self._makeOne()
inst.threads = {0, 1}
inst.set_thread_count(1)
self.assertEqual(len(inst.queue), 1)
self.assertEqual(inst.queue.popleft(), None)
self.assertEqual(inst.stop_count, 1)

def test_set_thread_count_same(self):
inst = self._makeOne()
@@ -67,13 +55,16 @@ def test_set_thread_count_same(self):
inst.set_thread_count(1)
self.assertEqual(L, [])

def test_add_task(self):
def test_add_task_with_idle_threads(self):
task = DummyTask()
inst = self._makeOne()
inst.threads.add(0)
inst.queue_logger = DummyLogger()
inst.add_task(task)
self.assertEqual(len(inst.queue), 1)
self.assertEqual(len(inst.queue_logger.logged), 0)

def test_log_queue_depth(self):
def test_add_task_with_all_busy_threads(self):
task = DummyTask()
inst = self._makeOne()
inst.queue_logger = DummyLogger()
@@ -89,7 +80,10 @@ def test_shutdown_one_thread(self):
task = DummyTask()
inst.queue.append(task)
self.assertEqual(inst.shutdown(timeout=.01), True)
self.assertEqual(inst.logger.logged, ['1 thread(s) still running'])
self.assertEqual(inst.logger.logged, [
'1 thread(s) still running',
'Canceling 1 pending task(s)',
])
self.assertEqual(task.cancelled, True)

def test_shutdown_no_threads(self):

0 comments on commit 5bd69e9

Please sign in to comment.
You can’t perform that action at this time.