Skip to content
Permalink
Browse files

select: make Select.add() handle multiple buffered items.

Previously given something like:

    l = mitogen.core.Latch()
    l.put(1)
    l.put(2)

    s = mitogen.select.Select([l], oneshot=False)
    assert 1 == s.get(block=False)
    assert 2 == s.get(block=False)

The second call would throw TimeoutError, because Select.add() only
queued the receiver/latch once if it was non-empty, rather than once for
each item as should happen.
  • Loading branch information...
dw committed Aug 8, 2019
1 parent 49a6446 commit ecc570cbdacdd4a8741d4f88444327161b47f152
Showing with 33 additions and 3 deletions.
  1. +11 −3 mitogen/select.py
  2. +22 −0 tests/select_test.py
@@ -224,8 +224,15 @@ def add(self, recv):
raise Error(self.owned_msg)

recv.notify = self._put
# Avoid race by polling once after installation.
if not recv.empty():
# After installing the notify function, _put() will potentially begin
# receiving calls from other threads immediately, but not for items
# they already had buffered. For those we call _put(), possibly
# duplicating the effect of other _put() being made concurrently, such
# that the Select ends up with more items in its buffer than exist in
# the underlying receivers. We handle the possibility of receivers
# marked notified yet empty inside Select.get(), so this should be
# robust.
for _ in range(recv.size()):
self._put(recv)

not_present_msg = 'Instance is not a member of this Select'
@@ -335,5 +342,6 @@ def get_event(self, timeout=None, block=True):
# A receiver may have been queued with no result if another
# thread drained it before we woke up, or because another
# thread drained it between add() calling recv.empty() and
# self._put(). In this case just sleep again.
# self._put(), or because Select.add() caused duplicate _put()
# calls. In this case simply retry.
continue
@@ -358,6 +358,18 @@ def test_nonempty_before_add(self):
msg = select.get()
self.assertEquals('123', msg.unpickle())

def test_nonempty_multiple_items_before_add(self):
recv = mitogen.core.Receiver(self.router)
recv._on_receive(mitogen.core.Message.pickled('123'))
recv._on_receive(mitogen.core.Message.pickled('234'))
select = self.klass([recv], oneshot=False)
msg = select.get()
self.assertEquals('123', msg.unpickle())
msg = select.get()
self.assertEquals('234', msg.unpickle())
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(block=False))

def test_nonempty_after_add(self):
recv = mitogen.core.Receiver(self.router)
select = self.klass([recv])
@@ -415,6 +427,16 @@ def test_nonempty_before_add(self):
select = self.klass([latch])
self.assertEquals(123, select.get())

def test_nonempty_multiple_items_before_add(self):
latch = mitogen.core.Latch()
latch.put(123)
latch.put(234)
select = self.klass([latch], oneshot=False)
self.assertEquals(123, select.get())
self.assertEquals(234, select.get())
self.assertRaises(mitogen.core.TimeoutError,
lambda: select.get(block=False))

def test_nonempty_after_add(self):
latch = mitogen.core.Latch()
select = self.klass([latch])

0 comments on commit ecc570c

Please sign in to comment.
You can’t perform that action at this time.