Skip to content

Commit

Permalink
Unify and simplify the implementation of Queue.get and Queue.put. Fixes
Browse files Browse the repository at this point in the history
#647 (add a test case for that scenario).
  • Loading branch information
jamadden committed Sep 4, 2015
1 parent 2c858dc commit ea62d6c
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 87 deletions.
3 changes: 2 additions & 1 deletion changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
1.1b5 (unreleased)
==================

- Nothing yet
- Fix a possible ``ValueError`` from ``gevent.queue.Queue:peek``.
Reported in :issue:`647` by Kevin Chen.

1.1b4 (Sep 4, 2015)
===================
Expand Down
124 changes: 59 additions & 65 deletions gevent/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@

__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue', 'Channel']

def _safe_remove(deq, item):
# For when the item may have been removed by
# Queue._unlock
try:
deq.remove(item)
except ValueError:
pass

class Queue(object):
"""
Expand Down Expand Up @@ -208,10 +215,7 @@ def put(self, item, block=True, timeout=None):
finally:
if timeout is not None:
timeout.cancel()
try:
self.putters.remove(waiter)
except ValueError:
pass # removed by unlock
_safe_remove(self.putters, waiter)
else:
raise Full

Expand All @@ -223,6 +227,43 @@ def put_nowait(self, item):
"""
self.put(item, False)

def __get_or_peek(self, method, block, timeout):
# Internal helper method. The `method` should be either
# self._get when called from self.get() or self._peek when
# called from self.peek(). Call this after the initial check
# to see if there are items in the queue.

if self.hub is getcurrent():
# special case to make get_nowait() or peek_nowait() runnable in the mainloop greenlet
# there are no items in the queue; try to fix the situation by unlocking putters
while self.putters:
# Note: get() used popleft(), peek used pop(); popleft
# is almost certainly correct.
self.putters.popleft().put_and_switch()
if self.qsize():
return method()
raise Empty()

if not block:
# We can't block, we're not the hub, and we have nothing
# to return. No choice...
raise Empty()

waiter = Waiter()
timeout = Timeout.start_new(timeout, Empty) if timeout is not None else None
try:
self.getters.append(waiter)
if self.putters:
self._schedule_unlock()
result = waiter.get()
if result is not waiter:
raise InvalidSwitchError('Invalid switch into Queue.get: %r' % (result, ))
return method()
finally:
if timeout is not None:
timeout.cancel()
_safe_remove(self.getters, waiter)

def get(self, block=True, timeout=None):
"""Remove and return an item from the queue.
Expand All @@ -237,34 +278,8 @@ def get(self, block=True, timeout=None):
if self.putters:
self._schedule_unlock()
return self._get()
elif self.hub is getcurrent():
# special case to make get_nowait() runnable in the mainloop greenlet
# there are no items in the queue; try to fix the situation by unlocking putters
while self.putters:
self.putters.popleft().put_and_switch()
if self.qsize():
return self._get()
raise Empty
elif block:
waiter = Waiter()
timeout = Timeout.start_new(timeout, Empty) if timeout is not None else None
try:
self.getters.append(waiter)
if self.putters:
self._schedule_unlock()
result = waiter.get()
if result is not waiter:
raise InvalidSwitchError('Invalid switch into Queue.get: %r' % (result, ))
return self._get()
finally:
if timeout is not None:
timeout.cancel()
try:
self.getters.remove(waiter)
except ValueError:
pass # Removed by _unlock
else:
raise Empty

return self.__get_or_peek(self._get, block, timeout)

def get_nowait(self):
"""Remove and return an item from the queue without blocking.
Expand All @@ -285,33 +300,17 @@ def peek(self, block=True, timeout=None):
(*timeout* is ignored in that case).
"""
if self.qsize():
# XXX: Why doesn't this schedule an unlock like get() does?
return self._peek()
elif self.hub is getcurrent():
# special case to make peek(False) runnable in the mainloop greenlet
# there are no items in the queue; try to fix the situation by unlocking putters
while self.putters:
self.putters.pop().put_and_switch()
if self.qsize():
return self._peek()
raise Empty
elif block:
waiter = Waiter()
timeout = Timeout.start_new(timeout, Empty)
try:
self.getters.append(waiter)
if self.putters:
self._schedule_unlock()
result = waiter.get()
if result is not waiter:
raise InvalidSwitchError('Invalid switch into Queue.peek: %r' % (result, ))
return self._peek()
finally:
self.getters.remove(waiter)
timeout.cancel()
else:
raise Empty

return self.__get_or_peek(self._peek, block, timeout)

def peek_nowait(self):
"""Return an item from the queue without blocking.
Only return an item if one is immediately available. Otherwise
raise the :class:`Empty` exception.
"""
return self.peek(False)

def _unlock(self):
Expand Down Expand Up @@ -519,24 +518,19 @@ def put(self, item, block=True, timeout=None):
waiter = Waiter()
item = (item, waiter)
self.putters.append(item)
timeout = Timeout.start_new(timeout, Full)
timeout = Timeout.start_new(timeout, Full) if timeout is not None else None
try:
if self.getters:
self._schedule_unlock()
result = waiter.get()
if result is not waiter:
raise InvalidSwitchError("Invalid switch into Channel.put: %r" % (result, ))
except:
self._discard(item)
_safe_remove(self.putters, item)
raise
finally:
timeout.cancel()

def _discard(self, item):
try:
self.putters.remove(item)
except ValueError:
pass
if timeout is not None:
timeout.cancel()

def put_nowait(self, item):
self.put(item, False)
Expand Down
52 changes: 31 additions & 21 deletions greentest/test__queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ def test_send_first(self):
self.switch_expected = False
q = queue.Queue()
q.put('hi')
self.assertEquals(q.peek(), 'hi')
self.assertEquals(q.get(), 'hi')
self.assertEqual(q.peek(), 'hi')
self.assertEqual(q.get(), 'hi')

def test_peek_empty(self):
q = queue.Queue()
Expand All @@ -27,12 +27,22 @@ def waiter(q):
gevent.sleep(0.1)
g.join()

def test_peek_multi_greenlet(self):
q = queue.Queue()
g = gevent.spawn(q.peek)
g.start()
gevent.sleep(0)
q.put(1)
g.join()
self.assertTrue(g.exception is None)
self.assertEqual(q.peek(), 1)

def test_send_last(self):
q = queue.Queue()

def waiter(q):
with gevent.Timeout(0.1):
self.assertEquals(q.get(), 'hi2')
self.assertEqual(q.get(), 'hi2')
return "OK"

p = gevent.spawn(waiter, q)
Expand All @@ -56,12 +66,12 @@ def putter(q):

p = gevent.spawn(putter, q)
gevent.sleep(0)
self.assertEquals(results, ['a', 'b'])
self.assertEquals(q.get(), 'a')
self.assertEqual(results, ['a', 'b'])
self.assertEqual(q.get(), 'a')
gevent.sleep(0)
self.assertEquals(results, ['a', 'b', 'c'])
self.assertEquals(q.get(), 'b')
self.assertEquals(q.get(), 'c')
self.assertEqual(results, ['a', 'b', 'c'])
self.assertEqual(q.get(), 'b')
self.assertEqual(q.get(), 'c')
assert p.get(timeout=0) == "OK"

def test_zero_max_size(self):
Expand All @@ -82,8 +92,8 @@ def receiver(evt, q):
gevent.sleep(0.001)
self.assertTrue(not e1.ready())
p2 = gevent.spawn(receiver, e2, q)
self.assertEquals(e2.get(), 'hi')
self.assertEquals(e1.get(), 'done')
self.assertEqual(e2.get(), 'hi')
self.assertEqual(e1.get(), 'done')
with gevent.Timeout(0):
gevent.joinall([p1, p2])

Expand Down Expand Up @@ -111,12 +121,12 @@ def collect_pending_results():
return len(results)

q.put(sendings[0])
self.assertEquals(collect_pending_results(), 1)
self.assertEqual(collect_pending_results(), 1)
q.put(sendings[1])
self.assertEquals(collect_pending_results(), 2)
self.assertEqual(collect_pending_results(), 2)
q.put(sendings[2])
q.put(sendings[3])
self.assertEquals(collect_pending_results(), 4)
self.assertEqual(collect_pending_results(), 4)

def test_waiters_that_cancel(self):
q = queue.Queue()
Expand All @@ -131,10 +141,10 @@ def do_receive(q, evt):

evt = AsyncResult()
gevent.spawn(do_receive, q, evt)
self.assertEquals(evt.get(), 'timed out')
self.assertEqual(evt.get(), 'timed out')

q.put('hi')
self.assertEquals(q.get(), 'hi')
self.assertEqual(q.get(), 'hi')

def test_senders_that_die(self):
q = queue.Queue()
Expand All @@ -143,7 +153,7 @@ def do_send(q):
q.put('sent')

gevent.spawn(do_send, q)
self.assertEquals(q.get(), 'sent')
self.assertEqual(q.get(), 'sent')

def test_two_waiters_one_dies(self):

Expand All @@ -165,8 +175,8 @@ def do_receive(q, evt):
gevent.spawn(waiter, q, waiting_evt)
gevent.sleep(0.1)
q.put('hi')
self.assertEquals(dying_evt.get(), 'timed out')
self.assertEquals(waiting_evt.get(), 'hi')
self.assertEqual(dying_evt.get(), 'timed out')
self.assertEqual(waiting_evt.get(), 'hi')

def test_two_bogus_waiters(self):
def do_receive(q, evt):
Expand All @@ -184,9 +194,9 @@ def do_receive(q, evt):
gevent.spawn(do_receive, q, e2)
gevent.sleep(0.1)
q.put('sent')
self.assertEquals(e1.get(), 'timed out')
self.assertEquals(e2.get(), 'timed out')
self.assertEquals(q.get(), 'sent')
self.assertEqual(e1.get(), 'timed out')
self.assertEqual(e2.get(), 'timed out')
self.assertEqual(q.get(), 'sent')


class TestChannel(TestCase):
Expand Down

0 comments on commit ea62d6c

Please sign in to comment.