Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to have a custom request #3977

Merged
merged 5 commits into from Aug 18, 2017
Merged

Conversation

mvaled
Copy link
Contributor

@mvaled mvaled commented Apr 13, 2017

Allowing a custom Request eases the task of handling timeouts (even hard timeouts).

Rationale

Some (poorly written) bits of code catch exceptions quite broadly:

  try:
      ...
  except:
      ...

This hurts tasks when a SoftTimeLimitError is raised inside such blocks of
code. Rewriting those smelly bits of code can take a lot of effort, and
sometimes, the code belongs to a third-party library which makes the task even
harder.

Using custom a request allows to catch hard time limits.

Your app can be customized like:

   from celery import Task as BaseTask
   from celery.worker.request import Request as BaseRequest

   class Request(BaseRequest):
       def on_timeout(self, soft, timeout):
          super(Request, self).on_timeout(soft, timeout)
          if not soft:
	     print('Something hard hit me!')

    class MyTask(BaseTask):
        Request = Request

    @app.task(base=MyTask, bind=True)
    def sometask(self):
        import time
        while True:
            try:
                time.sleep(100)
            except:
                pass

@mvaled mvaled force-pushed the custom-request branch 4 times, most recently from 18e621e to 26a7d86 Compare April 13, 2017 22:24
@mvaled
Copy link
Contributor Author

mvaled commented Apr 13, 2017

This PR is related to a question in the mailing list: https://groups.google.com/forum/#!topic/celery-users/0QhWtY7na4M

Without this PR the current solution I have found is to monkey-patch the create_request_cls function. But monkey patching is always harder than expected.

This is a fragment of the monkey patching I'm doing right now:

from celery.worker.request import Request as BaseRequest

class Request(BaseRequest):
    def on_timeout(self, soft, timeout):
        """Handler called if the task times out."""
        super(Request, self).on_timeout(soft, timeout)
        try:
            if not soft:
                _report_failure.delay(self.id)  # Yes! This is another task
        except:
            pass


if not getattr(BaseTask, 'Request', None):
    # So this is a celery that has not accepted our patch
    # (https://github.com/celery/celery/pull/3977).  Let's proceed to
    # monkey-patch the Request.
    from celery.worker import request
    _super_create_request_cls = request.create_request_cls

    def create_request_cls(base, task, pool, hostname, eventer,
                           ref=request.ref,
                           revoked_tasks=request.revoked_tasks,
                           task_ready=request.task_ready,
                           trace=request.trace_task_ret):

        if base is BaseRequest:
            Base = Request
        else:
            class Base(base, Request):
                pass

        class PatchedRequest(Base):
            pass

        return _super_create_request_cls(
            PatchedRequest,
            task,
            pool,
            hostname,
            eventer,
            ref=ref,
            revoked_tasks=revoked_tasks,
            task_ready=task_ready,
            trace=trace
        )

    request.create_request_cls = create_request_cls

@thedrow thedrow self-requested a review April 15, 2017 14:09
Copy link
Member

@thedrow thedrow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's something we might consider and can be useful for instrumentation or other hooks.
This PR is missing the necessary documentation and a test that verifies that we can override the request (just to ensure that there are no shenanigans in the future)

@mvaled
Copy link
Contributor Author

mvaled commented Apr 17, 2017

Hi @thedrow,

I'll be happy to provide those. In fact, after this PR I extended my own usage to also override the on_failure method. That would help me catch other problems (the OS may kill one PoolWorker).

Which document do you think I should touch? The Request is not documented in any of the narrative documents. In fact, at first, I was confused about if app.Task.request was that request. I think that it should be in http://docs.celeryproject.org/en/latest/userguide/extending.html?highlight=worker#worker-api. What do you think?

As for the tests, I will include them in the next push.

@thedrow
Copy link
Member

thedrow commented Apr 17, 2017

Actually it seems that some of the hooks are exposed through the Task class. I think that this is where we should be documenting this http://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-task-classes

The Task class already exposes an on_failure hook. I'm not sure we want to expose them through the Request API.

@mvaled
Copy link
Contributor Author

mvaled commented Apr 17, 2017

It seems that Task.on_failure runs in the PoolWorker process space. But the Request.on_failure runs on the MainProcess.

When I run something like:

$ ps a | grep PoolWorke[r] | awk '{print $1}' | xargs kill -9 

in the middle of a task the Task.on_failure doesn't get a chance. If the OS kills the PoolWorker for some reason (maybe an OOM), the MainProcess would still be able to react and detect the failure.

That's why I overloaded the on_failure of the Request. The entire code I'm using can be seen in: https://github.com/merchise-autrement/odoo/blob/merchise-develop-8.0/openerp/jobs.py#L715

I could kill the PoolWorker several times; with this in place the system can, at least, inform about the failure. In this case, since I use late acks the task should run again (I'm not seeing that happening, though.)

@mvaled
Copy link
Contributor Author

mvaled commented Apr 17, 2017

Hum. I can't manage to get the tests running locally:

________________________________________________________________ ERROR at setup of test_chain.test_simple_chain ________________________________________________________________

request = <SubRequest 'celery_session_worker' for <Function 'test_simple_chain'>>, celery_session_app = <Celery celery.tests:0x7f33d1e5cdd0>
celery_includes = set(['t.integration.tasks']), celery_worker_pool = 'prefork', celery_worker_parameters = {}

    @pytest.fixture(scope='session')
    def celery_session_worker(request,
                              celery_session_app,
                              celery_includes,
                              celery_worker_pool,
                              celery_worker_parameters):
        # type: (Any, Celery, Sequence[str], str) -> WorkController
        """Session Fixture: Start worker that lives throughout test suite."""
        if not NO_WORKER:
            for module in celery_includes:
                celery_session_app.loader.import_task_module(module)
            with worker.start_worker(celery_session_app,
                                     pool=celery_worker_pool,
>                                    **celery_worker_parameters) as w:

.tox/2.7-integration-redis/local/lib/python2.7/site-packages/celery/contrib/pytest.py:89: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/lib/python2.7/contextlib.py:17: in __enter__
    return self.gen.next()
.tox/2.7-integration-redis/local/lib/python2.7/site-packages/celery/contrib/testing/worker.py:80: in start_worker
    assert ping.delay().get(timeout=ping_task_timeout) == 'pong'
.tox/2.7-integration-redis/local/lib/python2.7/site-packages/celery/result.py:194: in get
    on_message=on_message,
.tox/2.7-integration-redis/local/lib/python2.7/site-packages/celery/backends/async.py:189: in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <celery.backends.redis.ResultConsumer object at 0x7f33d5aeca50>, result = <AsyncResult: 30af2afe-5a66-41eb-a909-2f344da5ea15>, timeout = 10.0
on_interval = <promise@0x7f33d1221348>, on_message = None, kwargs = {}, prev_on_m = None, _ = None

    def _wait_for_pending(self, result,
                          timeout=None, on_interval=None, on_message=None,
                          **kwargs):
        self.on_wait_for_pending(result, timeout=timeout, **kwargs)
        prev_on_m, self.on_message = self.on_message, on_message
        try:
            for _ in self.drain_events_until(
                    result.on_ready, timeout=timeout,
                    on_interval=on_interval):
                yield
                sleep(0)
        except socket.timeout:
>           raise TimeoutError('The operation timed out.')
E           TimeoutError: The operation timed out.

.tox/2.7-integration-redis/local/lib/python2.7/site-packages/celery/backends/async.py:260: TimeoutError
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Interrupted: stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!!

[2017-04-17 14:48:12,998: INFO/PoolWorker-1] Starting new HTTP connection (69): localhost
[2017-04-17 14:48:25,810: INFO/PoolWorker-1] Starting new HTTP connection (70): localhost
[2017-04-17 14:48:25,820: INFO/PoolWorker-1] Starting new HTTP connection (71): localhost
[2017-04-17 14:48:25,873: INFO/PoolWorker-1] Starting new HTTP connection (72): localhost
[2017-04-17 14:48:25,977: INFO/PoolWorker-1] Starting new HTTP connection (73): localhost
[2017-04-17 14:48:26,186: INFO/PoolWorker-1] Starting new HTTP connection (74): localhost
[2017-04-17 14:48:26,594: INFO/PoolWorker-1] Starting new HTTP connection (75): localhost
[2017-04-17 14:48:27,400: INFO/PoolWorker-1] Starting new HTTP connection (76): localhost
[2017-04-17 14:48:29,008: INFO/PoolWorker-1] Starting new HTTP connection (77): localhost
[2017-04-17 14:48:32,217: INFO/PoolWorker-1] Starting new HTTP connection (78): localhost
[2017-04-17 14:48:38,630: INFO/PoolWorker-1] Starting new HTTP connection (79): localhost


It remains there doing nothing.

This how I'm running tests:

TEST_BROKER=redis:// tox -re2.7-unit,2.7-integration-redis

@thedrow thedrow mentioned this pull request Apr 18, 2017
3 tasks
@georgepsarakis
Copy link
Contributor

@mvaled do you have the same issue with the test failures with the master branch also?

@mvaled
Copy link
Contributor Author

mvaled commented Apr 18, 2017

Hi @georgepsarakis,

Yes, I do get the same errors. I think this is to be expected since the change I'm introducing should not mess with the current set of tests. However, Travis does pass all tests, so I think it should be something local in my machine.

So far I can run and pass all sets of unit tests:

$ tox -e{2.7,pypy,3.4,3.5,3.6}-unit

It's integration tests which are failing.

@georgepsarakis
Copy link
Contributor

I am seeing a similar error (not the same though) in another Pull Request, so if you happen to find out what may be wrong, please share.

@mvaled
Copy link
Contributor Author

mvaled commented Apr 18, 2017

The issue is quite slippery. Just now I was able to run all integration tests for Python 2.7, PyPy, 3.4 and 3.5. Same code, no changes.

$ tox -re{2.7,pypy,3.4,3.5}-integration-{rabbitmq,redis,dynamodb}

Before, I changed the default ping_task_timeout from 10 to 100 in https://github.com/celery/celery/blob/master/celery/contrib/testing/worker.py#L60. All tests were passing; I was happy, until they reached the timeout when running the set '3.6-integration-rabbitmq'.

@mvaled
Copy link
Contributor Author

mvaled commented Apr 19, 2017

I was thinking that maybe SoftTimeLimitExceeded (in billiard) should inherit from BaseException instead.

@thedrow
Copy link
Member

thedrow commented Aug 13, 2017

@mvaled Are you still interested in working on this?

@mvaled
Copy link
Contributor Author

mvaled commented Aug 14, 2017

@thedrow I was a little off of it, because I was using my work-around happily. But I can give it another try to make it part of the celery stable API. That would be better.

Basically, what you requested was to include the tests and documentation. I think I can get this done by the end of the week-end. Are you planning to do a freeze of new features?

@mvaled
Copy link
Contributor Author

mvaled commented Aug 14, 2017

I could run the test in master:

TEST_BROKER=redis:// tox -re2.7-unit,2.7-integration-redis

Allowing a custom Request eases the task of handling timeouts (even hard
timeouts).

Rationale

Some (poorly written) bits of code catch exceptions quite broadly:

  try:
      ...
  except:
      ...

This hurts tasks when a SoftTimeLimitError is raised inside such blocks of
code.  Rewriting those smelly bits of code can take a lot of effort, and
sometimes, the code belongs to a third-party library which makes the task even
harder.

Using a custom request allows to catch hard time limits.

Your app can be customized like:

   from celery import Task as BaseTask
   from celery.worker.request import Request as BaseRequest

   class Request(BaseRequest):
       def on_timeout(self, soft, timeout):
          super(Request, self).on_timeout(soft, timeout)
          if not soft:
	     print('Something hard hit me!')

    class MyTask(BaseTask):
        Request = Request

    @app.task(base=MyTask, bind=True)
    def sometask(self):
        pass
@mvaled
Copy link
Contributor Author

mvaled commented Aug 14, 2017

One question about where to put the documentation: As I argued before, Task.on_failure does not get a chance on some hard failures, but the Request may; so, where to put the docs?

@thedrow
Copy link
Member

thedrow commented Aug 15, 2017

Somewhere in the task customization section of Tasks should be good.

@mvaled
Copy link
Contributor Author

mvaled commented Aug 16, 2017

I created a minimal documentation. It's a bit weird where it is, I think.

Since what really changed is the default strategy, the provided test only checks that the default strategy creates the custom request. Everything else (as on_failure, and the like) should be covered by other tests.

Copy link
Member

@thedrow thedrow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test looks good.
The only thing that is missing is a code example in the documentation.

worker process. An application may leverage such facility to detect failures
which are not detected using `celery.app.task.Task.on_failure`:meth:.


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide code examples for how to override the request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@thedrow thedrow merged commit 215376d into celery:master Aug 18, 2017
@thedrow
Copy link
Member

thedrow commented Aug 18, 2017

Thanks! Awesome work.
Feel free to add yourself to the CONTRIBUTORS file.

@thedrow thedrow modified the milestone: v4.2 Aug 18, 2017
Kxrr pushed a commit to Kxrr/celery that referenced this pull request Aug 18, 2017
* Allow custom Request, aka custom `on_timeout`.

Allowing a custom Request eases the task of handling timeouts (even hard
timeouts).

Rationale

Some (poorly written) bits of code catch exceptions quite broadly:

  try:
      ...
  except:
      ...

This hurts tasks when a SoftTimeLimitError is raised inside such blocks of
code.  Rewriting those smelly bits of code can take a lot of effort, and
sometimes, the code belongs to a third-party library which makes the task even
harder.

Using a custom request allows to catch hard time limits.

Your app can be customized like:

   from celery import Task as BaseTask
   from celery.worker.request import Request as BaseRequest

   class Request(BaseRequest):
       def on_timeout(self, soft, timeout):
          super(Request, self).on_timeout(soft, timeout)
          if not soft:
	     print('Something hard hit me!')

    class MyTask(BaseTask):
        Request = Request

    @app.task(base=MyTask, bind=True)
    def sometask(self):
        pass

* Check signatures' types have a default Request.

* Test Request is customizable per Task class.

* Document custom requests.

* Exemplify the usage of the custom requests.
@mvaled
Copy link
Contributor Author

mvaled commented Aug 18, 2017

Thanks! Awesome work.
Feel free to add yourself to the CONTRIBUTORS file.

Thanks! Maybe with my next contribution ;)

@mvaled mvaled deleted the custom-request branch September 5, 2017 14:26
@gwaramadze
Copy link

We have just hit this problem as well. Great to see that it is merged! Any ETA on a new release?

@thedrow
Copy link
Member

thedrow commented Dec 1, 2017

Soon. We have a few more things to merge first.

@cjh1
Copy link
Contributor

cjh1 commented Feb 5, 2018

Its worth noting that a custom Request class can't provide a on_success(...) method, the following prevents this. It would be nice if it was possible to provide a custom implementation and this is probably the expected behavior. If this is by design it may be worth updating the docs.

@thedrow
Copy link
Member

thedrow commented Feb 6, 2018

@cjh1 good catch. Please open an issue about it so we won't forget.
A proper fix would be to call super first if you are up for the task.

@cjh1
Copy link
Contributor

cjh1 commented Feb 6, 2018

@thedrow I have open #4526. I am happy to put a fix together.

@cjh1
Copy link
Contributor

cjh1 commented Feb 6, 2018

@thedrow I have raised #4527

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants