Skip to content

Commit

Permalink
Fix queue blocking (#464)
Browse files Browse the repository at this point in the history
* Unblock QueueMessageHandler.handle_message

The thread was holding the lock while pushing to a potentially blocking
function.  Rewrite the logic and use a deque while we're at it.

* Add test for subscription queue behavior

Guarantee that the queue drops messages when blocked.
  • Loading branch information
mvollrath committed Apr 7, 2020
1 parent 3a154de commit 22e3652
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from collections import deque
from threading import Thread, Condition
from time import time
import traceback
Expand Down Expand Up @@ -107,7 +108,7 @@ def __init__(self, previous_handler):
Thread.__init__(self)
MessageHandler.__init__(self, previous_handler)
self.daemon = True
self.queue = []
self.queue = deque(maxlen=self.queue_length)
self.c = Condition()
self.alive = True
self.start()
Expand All @@ -116,8 +117,6 @@ def handle_message(self, msg):
with self.c:
should_notify = len(self.queue) == 0
self.queue.append(msg)
if len(self.queue) > self.queue_length:
del self.queue[0:len(self.queue) - self.queue_length]
if should_notify:
self.c.notify()

Expand All @@ -130,8 +129,10 @@ def transition(self):
return ThrottleMessageHandler(self)
else:
with self.c:
if len(self.queue) > self.queue_length:
del self.queue[0:len(self.queue) - self.queue_length]
old_queue = self.queue
self.queue = deque(maxlen=self.queue_length)
while len(old_queue) > 0:
self.queue.append(old_queue.popleft())
self.c.notify()
return self

Expand All @@ -146,21 +147,21 @@ def finish(self):

def run(self):
while self.alive:
msg = None
with self.c:
while self.alive and (self.time_remaining() > 0 or len(self.queue) == 0):
if len(self.queue) == 0:
self.c.wait()
else:
self.c.wait(self.time_remaining())
if len(self.queue) == 0:
self.c.wait()
else:
self.c.wait(self.time_remaining())
if self.alive and self.time_remaining() == 0 and len(self.queue) > 0:
try:
MessageHandler.handle_message(self, self.queue[0])
except:
traceback.print_exc(file=sys.stderr)
del self.queue[0]
msg = self.queue.popleft()
if msg is not None:
try:
MessageHandler.handle_message(self, msg)
except:
traceback.print_exc(file=sys.stderr)
while self.time_remaining() == 0 and len(self.queue) > 0:
try:
MessageHandler.handle_message(self, self.queue[0])
MessageHandler.handle_message(self, self.queue.popleft())
except:
traceback.print_exc(file=sys.stderr)
del self.queue[0]
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,41 @@ def cb(msg):

handler.finish()

def test_queue_message_handler_dropping(self):
received = {"msgs": []}

def cb(msg):
received["msgs"].append(msg)
time.sleep(1)

queue_length = 5
msgs = range(queue_length * 5)

handler = subscribe.MessageHandler(None, cb)

handler = handler.set_queue_length(queue_length)
self.assertIsInstance(handler, subscribe.QueueMessageHandler)

# send all messages at once.
# only the first and the last queue_length should get through,
# because the callbacks are blocked.
for x in msgs:
handler.handle_message(x)
# yield the thread so the first callback can append,
# otherwise the first handled value is non-deterministic.
time.sleep(0)

# wait long enough for all the callbacks, and then some.
time.sleep(queue_length + 3)

try:
self.assertEqual([msgs[0]] + msgs[-queue_length:], received["msgs"])
except:
handler.finish()
raise

handler.finish()

def test_queue_message_handler_rate(self):
handler = subscribe.MessageHandler(None, self.dummy_cb)
self.help_test_queue_rate(handler, 50, 10)
Expand Down

0 comments on commit 22e3652

Please sign in to comment.