New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blocking and timeout params to Pool.add #1032

Closed
wants to merge 9 commits into
base: master
from

Conversation

Projects
None yet
2 participants
@RonRothman
Contributor

RonRothman commented Nov 8, 2017

This PR adds a timeout (and blocking) parameter to Pool.add.

This allows clients of Pool to decide whether they want to fail if a pool is at capacity, and (for example) apply some backpressure to whomever is, in turn, calling them.

Notes:

  • I've tried running tox but I wasn't able to get it working reliably, so apologies in advance if I've missed some tests that should have been run.

RonRothman added some commits Nov 8, 2017

@jamadden

I like the idea! Thanks for your work on this. While it's not directly applicable to the servers that come with gevent (by the time they're ready to spawn, it's really too late to quit without losing a request, and there's no out-of-band defined mechanisms to communicate with clients that situation---well, there are some HTTP response codes in that special case, but usually a reverse proxy is better at managing that sort of stuff), but I can see it being useful in other areas.

I left some review comments. This will also need tests added to src/greentest/test__pool.py and an entry in CHANGES.rst. If you'd like, I can tackle these items (so long as the checkbox to allow maintainers to access the PR was checked.)

"""
Begin tracking the greenlet.
:keyword bool blocking: and :keyword bool timeout: are ignored

This comment has been minimized.

@jamadden

jamadden Nov 8, 2017

Member

This method probably needs to return True (but see below about return values; this may change), and the docstring needs a .. versionadded:: section explaining the changes.

I have to think about whether the unused keywords are desirable here or not, but I'm leaning towards not. (Pool and Group are not fully equivalent to each other. According to the substitution principle a Pool is (should be) substitutable for a Group, but the reverse doesn't have to be true.)

This comment has been minimized.

@RonRothman

RonRothman Nov 8, 2017

Contributor

You're right, I missed returning True here. In any case, I agree with your broader point, so I'm going to remove the changes to Group.add.

.. seealso:: :meth:`Group.add`
"""
self._semaphore.acquire()
try:
# NOTE: The docs for Semaphore.acquire indicate that it may raise a Timeout rather

This comment has been minimized.

@jamadden

jamadden Nov 8, 2017

Member

A Timeout error can be raised if a higher caller had set a timer and it expired. The Semaphore itself won't raise a Timeout. So that must be ignored here and just allowed to propagate (that is, the try/except wrapping can go away).

The pattern for working with timeouts is:

timeout = Timeout(...)
try:
   do_some_stuff()
except Timeout as e:
   if e is not timeout: 
     # Somebody that called us had a timer set. It expired, not ours. 
     # They need to know about it, so this must raise
      raise
  # Our timer expired. Do whatever we do in that case.

This comment has been minimized.

@RonRothman

RonRothman Nov 8, 2017

Contributor

Ah, thanks for the explanation of Semaphore's timeout logic. :)

try:
Group.add(self, greenlet)
except:
self._semaphore.release()
raise
return True

This comment has been minimized.

@jamadden

jamadden Nov 8, 2017

Member

I know Semaphore has the pattern of returning True/False to indicate that it was acquired or that the timeout elapsed. I'm not sure that's the right thing to do in general, though. Return values are easy to ignore or miss.

Semaphore gets away with it because of how low-level speed critical it is (it's compiled with Cython, and not having the possibility of raising exceptions at all makes a big difference), but I don't think our constraints are that tight here. What if this did what subprocess.Popen.communicate does and raises an exception if the semaphore can't be acquired (class PoolFull(Queue.Full))?

This comment has been minimized.

@RonRothman

RonRothman Nov 8, 2017

Contributor

Yep, I agree that it's cleaner to have Pool.add raise a Timeout rather than return True/False. Will modify.

This comment has been minimized.

@jamadden

jamadden Nov 8, 2017

Member

I don't think Timeout is correct. If blocking is False, we could return immediately. I think we need a new exception, which is why I suggested class PoolFull(Queue.Full). Timeout is a BaseException, not an Exception (the better to bubble up to the caller who is expecting it), but I think we want this exception to be caught by a generic except Exception: block.

This comment has been minimized.

@RonRothman

RonRothman Nov 8, 2017

Contributor

Yep, makes sense. Couple of questions:

  1. The name Full makes more sense to me than PoolFull, since it's already qualified by the module name (pool.Full).
  2. It feels unnatural to have pool.Full (or whatever we decide to call it) derive from Queue.Full, since classes Pool and Queue have no relationship. Does it confer some benefit to associate the two? If not, then I'd just as soon do class Full(Exception).

@jamadden

This comment has been minimized.

@jamadden

jamadden Nov 8, 2017

Member

OK, so here's what I was thinking. I'm not committed to it, but it seemed to make a certain amount of sense.

For inheritance:

A stdlib Queue (which shares the same API as a gevent.queue.Queue) and a Pool have similar APIs. They share full, join and possibly some other methods I'm not remembering. One of the supported use cases of a Queue is as a task queue, where daemon threads are blocking and waiting on tasks to finish; it provides the task_done callback method for this purpose. Conceptually, that's what a Pool is, and this PR makes them even more similar by adding blocking and timeout parameters to Pool.add, in much the same way that Queue.put has them. Queue.put raises Queue.Full when it fails given those parameters.

It wouldn't be hard to transition from a stdlib Queue + threads as a task queue to a gevent Pool.

If that's under a layer or two of abstraction, it would be nice if we had exception compatibility too so that higher level code could continue catching the old Queue.Full exception even if the implementation changes.

Is that a real use case? I don't really know. But it didn't seem to hurt much to try to support it.

On naming:

It's not uncommon to use both a Queue and a Pool at the same time. You might have 1000 websites to scrape but want to limit the concurrency to 20 at a time. So you put all the websites in a Queue and use a Pool to spin up 20 workers who each draw from that Queue. It would be nice to be able to import classes from both modules without having to manually worry about renaming conflicting classes.

Plus, on Python 2, only the local name of an exception is printed in a traceback, so if this exception ever got logged or bubbled all the way to the top, it would be good if the name was unambiguous without the module.

This comment has been minimized.

@RonRothman

RonRothman Nov 10, 2017

Contributor

Sounds good, I find that convincing. Making the changes now.

This comment has been minimized.

@RonRothman

RonRothman Nov 10, 2017

Contributor

FYI I've coded gevent.pool to import Full from gevent.queue rather than from the stdlib, since that module already handles the Python2/Python3 juggling. Let me know how the implementation looks to you. Will commit once the tests complete.

Btw, while I was in there, I noticed that gevent.queue.__all__ does not contain Full or Empty, whereas the stdlib Queue.__all__ does. I'm going to submit a separate PR where I'll propose bringing gevent's queue.__all__ to parity with the stdlib's.

:keyword float timeout: The maximum number of seconds this method will block.
:return: True if the greenlet was added; False if a Timeout occured or
if blocking is True and the pool is full.

This comment has been minimized.

@jamadden

jamadden Nov 8, 2017

Member

The docstring needs .. versionchanged:: explaining the changes.

This comment has been minimized.

@RonRothman

RonRothman Nov 8, 2017

Contributor

Wasn't sure whether this would make the cut for 1.3.0 so I omitted it all together. :) Will add.

RonRothman added some commits Nov 8, 2017

PR comments:
    * revert Group.add to original state (no new params)
    * add new exception "Full", and return it when blocking==False and
    pool is full
    * no longer return True/False from Pool.add
@RonRothman

This comment has been minimized.

Contributor

RonRothman commented Nov 8, 2017

@jamadden Updated per our conversation. Can you take another look when you have a moment?

@jamadden

Looking better! There's one test failure to address (see comments) and the decisions around the exception.

Vaguely related, it occurs to me that it would be nice for Pool.spawn to be able to pass these parameters down (Pool.spawn ->Pool.start->Pool.add) but given the existing API that's non-trivial and better left for later.

self._semaphore.acquire()
if not self._semaphore.acquire(blocking=blocking, timeout=timeout):
# We failed to acquire the semaphore. Presumably, blocking was False, because had it
# been True, we would have either acquired the semaphore or encountered a Timeout.

This comment has been minimized.

@jamadden

jamadden Nov 8, 2017

Member

The comment is incorrect. The Semaphore will never itself raise a Timeout, regardless of the blocking parameter. That's the cause of this test failure

When you acquire a semaphore, if it is available you immediately get it (it returns True). If it's not available and blocking was False, the semaphore immediately returns False. The only remaining case is that it wasn't available, and blocking was True; in that case, the semaphore will wait until it gets it and return True (or, if you specified a timeout that elapses before that happens, return False).

This comment has been minimized.

@RonRothman

RonRothman Nov 8, 2017

Contributor

Addressed.

second = gevent.spawn(gevent.sleep, 1000)
try:
p.add(first)
try:

This comment has been minimized.

@jamadden

jamadden Nov 8, 2017

Member

This (and possibly the next test) can be written more clearly using with self.assertRaises(...): I think.

This comment has been minimized.

@RonRothman

RonRothman Nov 8, 2017

Contributor

Perfect. I never use unittest (in favor of pytest), wasn't aware that it supported assertRaises. Thanks!

@RonRothman

This comment has been minimized.

Contributor

RonRothman commented Nov 8, 2017

it would be nice for Pool.spawn to be able to pass these parameters down

Yep, I had the same thought--in fact that's exactly the way I originally wanted to use this from my own code.

But yes, the API for spawn makes this non-trivial to do in a backward-compatible way. (If only spawn took fn_args and fn_kwargs params, differentiating the target's params from spawn's own params! Alas.)

@jamadden

Thanks for making those changes. There's a test failure and an exception handling issue to address. If you rebase on master, the builds on Travis should go green.

from gevent.timeout import Timeout
from gevent.event import Event
from gevent.lock import Semaphore, DummySemaphore
__all__ = ['Group', 'Pool']
__all__ = ['Group', 'Pool', 'Full']

This comment has been minimized.

@jamadden

jamadden Nov 10, 2017

Member

This causes a test failure. You'll need to update the name to match.

Raised when a Pool is full and an attempt was made to
add a new greenlet to it.
"""
pass

This comment has been minimized.

@jamadden

jamadden Nov 10, 2017

Member

Minor style: pass is not necessary here, the docstring alone is sufficient.

# We failed to acquire the semaphore.
# If blocking was True, then there was a timeout. If blocking was
# False, then there was no capacity.
raise Timeout() if blocking else PoolFull()

This comment has been minimized.

@jamadden

jamadden Nov 10, 2017

Member

This should simply be raise PoolFull() (with corresponding test and docstring changes). I don't think we should be raising Timeout here. Not only does it make the API more complicated (a caller has to catch two kinds of exceptions), it breaks timeout handling completely. Because the caller didn't create the timeout object, the usual way of telling if the timeout that elapsed was yours or someone else's is broken:

def do_something(blocking, timeout):
   greenlet = gevent.spawn(...)
   try:
     pool.add(greenlet, blocking, timeout)
   except PoolFull:
     # pool was full
     greenlet.kill()
  except Timeout:
     # UH-OH! We don't know if this is because the pool was full,
     # or because our caller had established a timeout of its own

This comment has been minimized.

@RonRothman

RonRothman Nov 10, 2017

Contributor

Hmm, I was trying to preserve the ability to distinguish timeouts from full pools, but after reading your comment and thinking it over, I agree that this is unnecessary - the caller, by virtue of choosing what to specify for blocking and timeout in the first place - already has enough information to determine whether a raise PoolFull was due to a timeout or a nonblocking full pool.

Will make the changes to code/docs/tests.

@jamadden

This comment has been minimized.

Member

jamadden commented Nov 10, 2017

Hmm, when I was reviewing exceptions, the example that I had to write made me have second thoughts about this. Here's that example:

def do_something(blocking, timeout):
   greenlet = gevent.spawn(...)
   try:
     pool.add(greenlet, blocking, timeout)
   except PoolFull:
     # pool was full
     greenlet.kill()

The problem I see is that this API invites a race condition. Unless the caller is very careful to not start the greenlet before calling add, there's a chance that while the pool was blocking the greenlet had already run to completion, thus defeating the point.

Now, that race condition already exists if you call add, so this doesn't really change that: add is just an inherently fragile method, and people really should be using spawn or, at most start. This PR just makes using add more attractive.

I think two things need to happen:

  1. Pool needs to add the same parameters to its start method. That should be the preferred way to access this functionality.
  2. The documentation needs to be more clear about this case.

Both of those things can happen after this PR, I'm only writing it here because this is when I noticed it.

@RonRothman

This comment has been minimized.

Contributor

RonRothman commented Nov 10, 2017

Pool needs to add the same parameters to its start method

Typo? You meant "spawn," not "start," yes?

@RonRothman

This comment has been minimized.

Contributor

RonRothman commented Nov 10, 2017

Agreed that the ideal interface would be to add blocking and timeout to Pool.spawn.

But because Pool.spawn simply passes all of its *args and **kwargs to the greenlet constructor, it's difficult to make this change in a non-breaking way.

Nevertheless, I'd love to find a way to solve this, not only to avoid the potential race condition you pointed out, but also because my own particular use case would be made easier (my code already uses Pool.spawn).

I'm ambivalent about this proposal, but: what if we added two "reserved" kwargs to Pool.spawn, and we relied on a documented naming convention to separate them from the greenlet's kwargs?

def spawn(gevent_pool_blocking=True, gevent_pool_timeout=None, *args, **kwargs):

The likelihood that some existing code is passing an arg named gevent_pool_blocking is near zero - but not zero. Not sure how uncomfortable that makes you. :)

@jamadden

This comment has been minimized.

Member

jamadden commented Nov 10, 2017

Typo? You meant "spawn," not "start," yes?

Nope, I meant start 😄

The spawn method creates an unstarted greenlet and passes it to start, which calls add and then starts the greenlet.

Calling add with a started greenlet isn't safe for this use case, but calling start with an unstarted greenlet is what you're supposed to do.

@jamadden

This comment has been minimized.

Member

jamadden commented Nov 10, 2017

But because Pool.spawn simply passes all of its *args and **kwargs to the greenlet constructor, it's difficult to make this change in a non-breaking way.

Right.

I'm ambivalent about this proposal, but: what if we added two "reserved" kwargs to Pool.spawn, and we relied on a documented naming convention to separate them from the greenlet's kwargs?

I gave that some consideration, but I'm currently thinking that if someone needs Pool.spawn to have those options, then they're better off either using a subclass or a trivial wrapper function (that calls start).

@RonRothman

This comment has been minimized.

Contributor

RonRothman commented Nov 10, 2017

Fair enough, a subclass will work. We can reconsider adding the args in gevent 2.0. ;)

Just pushed a fix for your Timeout/PoolFull comment above.

@jamadden

This comment has been minimized.

Member

jamadden commented Nov 10, 2017

Thanks! I've rebased and tested locally and merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment