Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Terminate tasks with late acknoledgement on connection loss #6654

Merged
merged 17 commits into from
Apr 28, 2021

Conversation

thedrow
Copy link
Member

@thedrow thedrow commented Mar 2, 2021

Note: Before submitting this pull request, please review our contributing
guidelines
.

Description

Tasks with late acknowledgement keep running after restart although the connection is lost and they cannot be acked anymore (to the best of my knowledge).

This results in log errors such as these:

[2021-03-02 18:29:58,355: CRITICAL/MainProcess] Couldn't ack 5, reason:RecoverableConnectionError(None, 'connection already closed', None, '')
Traceback (most recent call last):
  File "/home/thedrow/.virtualenvs/celery/lib/python3.9/site-packages/kombu/message.py", line 128, in ack_log_error
    self.ack(multiple=multiple)
  File "/home/thedrow/.virtualenvs/celery/lib/python3.9/site-packages/kombu/message.py", line 123, in ack
    self.channel.basic_ack(self.delivery_tag, multiple=multiple)
  File "/home/thedrow/.virtualenvs/celery/lib/python3.9/site-packages/amqp/channel.py", line 1391, in basic_ack
    return self.send_method(
  File "/home/thedrow/.virtualenvs/celery/lib/python3.9/site-packages/amqp/abstract_channel.py", line 54, in send_method
    raise RecoverableConnectionError('connection already closed')
amqp.exceptions.RecoverableConnectionError: connection already closed
[2021-03-02 18:29:58,356: CRITICAL/MainProcess] Couldn't ack 12, reason:RecoverableConnectionError(None, 'connection already closed', None, '')

If we cannot recover from this situation, we must terminate all tasks with late acknowledgement.
Any other suggestion, would be useful.

@codecov
Copy link

codecov bot commented Mar 2, 2021

Codecov Report

Merging #6654 (028f334) into master (850c62a) will increase coverage by 0.06%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #6654      +/-   ##
==========================================
+ Coverage   70.52%   70.58%   +0.06%     
==========================================
  Files         138      138              
  Lines       16497    16531      +34     
  Branches     2066     2074       +8     
==========================================
+ Hits        11634    11668      +34     
  Misses       4663     4663              
  Partials      200      200              
Flag Coverage Δ
unittests 70.58% <100.00%> (+0.06%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
celery/app/defaults.py 97.33% <ø> (ø)
celery/bin/amqp.py 0.00% <ø> (ø)
celery/worker/consumer/consumer.py 93.57% <100.00%> (+0.18%) ⬆️
celery/worker/request.py 96.91% <100.00%> (+0.19%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 850c62a...028f334. Read the comment docs.

@lgtm-com
Copy link

lgtm-com bot commented Mar 2, 2021

This pull request introduces 1 alert and fixes 2 when merging acc6c1e into cfa1b41 - view on LGTM.com

new alerts:

  • 1 for Unused import

fixed alerts:

  • 1 for Non-exception in 'except' clause
  • 1 for Module is imported with 'import' and 'import from'

@xirdneh xirdneh self-requested a review March 11, 2021 22:06
@xirdneh
Copy link
Member

xirdneh commented Mar 11, 2021

@thedrow Can we add a test for this?

@thedrow
Copy link
Member Author

thedrow commented Mar 16, 2021

I'm not sure how since you need to cause a disconnect from the broker.
There are other problems with this PR. I'll update you soon.

@xirdneh
Copy link
Member

xirdneh commented Mar 16, 2021

@thedrow That's what I was thinking. I was trying to figure out a way to mark a connection as disconnected so we can fake this case but I don't know if it's worth it.

matusvalo
matusvalo previously approved these changes Mar 22, 2021
Copy link
Member

@matusvalo matusvalo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@thedrow
Copy link
Member Author

thedrow commented Mar 23, 2021

@matusvalo Actually, I was waiting on you to review this PR throughoutly.
Is there a way to revive the channel?

The problem is that by the time we restart the connection, the message's channel has its connection set to None.

Note: Before submitting this pull request, please review our contributing guidelines.

Description

Tasks with late acknowledgement keep running after restart although the connection is lost and they cannot be acked anymore (to the best of my knowledge).

[2021-03-02 18:29:58,355: CRITICAL/MainProcess] Couldn't ack 5, reason:RecoverableConnectionError(None, 'connection already closed', None, '')
Traceback (most recent call last):
  File "/home/thedrow/.virtualenvs/celery/lib/python3.9/site-packages/kombu/message.py", line 128, in ack_log_error
    self.ack(multiple=multiple)
  File "/home/thedrow/.virtualenvs/celery/lib/python3.9/site-packages/kombu/message.py", line 123, in ack
    self.channel.basic_ack(self.delivery_tag, multiple=multiple)
  File "/home/thedrow/.virtualenvs/celery/lib/python3.9/site-packages/amqp/channel.py", line 1391, in basic_ack
    return self.send_method(
  File "/home/thedrow/.virtualenvs/celery/lib/python3.9/site-packages/amqp/abstract_channel.py", line 54, in send_method
    raise RecoverableConnectionError('connection already closed')
amqp.exceptions.RecoverableConnectionError: connection already closed
[2021-03-02 18:29:58,356: CRITICAL/MainProcess] Couldn't ack 12, reason:RecoverableConnectionError(None, 'connection already closed', None, '')

Is there an alternative here?
I'd rather ack the message since the task is done.

@thedrow
Copy link
Member Author

thedrow commented Mar 23, 2021

@xirdneh @matusvalo There's also a request from a client of mine to terminate these tasks without revoking them.
Maybe we should introduce a setting for it? Is it always better to revoke these tasks?

@matusvalo
Copy link
Member

@thedrow can you show also the celery part of the stack? Kombu is able to recover automatically connection/channel during failed connection - see [1] - it also supports refreshing the channel if needed.

Regarding your second question about revoking tasks, In this PR when you terminate the request, the task is revoked? (I don't know deep all details about Celery so maybe stupid question).

[1] https://docs.celeryproject.org/projects/kombu/en/master/userguide/failover.html#operation-failover

@matusvalo
Copy link
Member

matusvalo commented Mar 26, 2021

My humble opinion: In distributed systems you can guarantee only on of the following two possibilities:

  1. you can guarantee that message is processed at most once (this is early acknowledge)
  2. you can guarantee that message is processed at least once (this is late acknowledge)

So it means that when user is using late acknowledge, he must be aware (and must design solution in that way) that data can be processed multiple times (they are idempotent). So processing task multiple times is fine in this case and user must be take that in account.

@thedrow
Copy link
Member Author

thedrow commented Apr 1, 2021

@thedrow can you show also the celery part of the stack? Kombu is able to recover automatically connection/channel during failed connection - see [1] - it also supports refreshing the channel if needed.

That's the only traceback we get.
The message object has a channel with its connection set to None.

Regarding your second question about revoking tasks, In this PR when you terminate the request, the task is revoked? (I don't know deep all details about Celery so maybe stupid question).

[1] https://docs.celeryproject.org/projects/kombu/en/master/userguide/failover.html#operation-failover

Yes. A client of mine thinks it's the wrong behavior. According to them, we should terminate the task's without revoking.

@thedrow
Copy link
Member Author

thedrow commented Apr 8, 2021

Yes we shouldn't revoke as it will prevent the task from running again.
I feel like we need to introduce a new event.
I also hope that marking the task as retry is good enough and it won't have any side effects. I'm testing this now.

What I'm also working on now is to mark the tasks that were already done as successful, if that's possible.
If we can do that and check if the task was done, we can ack it when we receive it again.
@celery/core-developers Does that sound feasible?

@lgtm-com
Copy link

lgtm-com bot commented Apr 8, 2021

This pull request introduces 1 alert and fixes 2 when merging 7149523 into 4f2213a - view on LGTM.com

new alerts:

  • 1 for Unused import

fixed alerts:

  • 1 for Non-exception in 'except' clause
  • 1 for Module is imported with 'import' and 'import from'

@thedrow thedrow force-pushed the terminate-ack-late-tasks-on-connection-loss branch from 85d8e9c to edf6e77 Compare April 8, 2021 12:35
@lgtm-com
Copy link

lgtm-com bot commented Apr 8, 2021

This pull request introduces 1 alert and fixes 1 when merging cd3bf99 into 4f2213a - view on LGTM.com

new alerts:

  • 1 for Unused import

fixed alerts:

  • 1 for Non-exception in 'except' clause

ahopkins
ahopkins previously approved these changes Apr 9, 2021
@thedrow thedrow force-pushed the terminate-ack-late-tasks-on-connection-loss branch from cd3bf99 to 8342818 Compare April 18, 2021 12:23
There already is a concept of abortable tasks so the term is overloaded.
If the worker already managed to report the task is revoked, there's no need to do it again.
Without this change, the `task-revoked` event and the `task_revoked` signal are sent twice.
worker_cancel_long_running_tasks_on_connection_loss is False by default since it is possibly a breaking change.
In 6.0 it will be True by default.
@thedrow thedrow force-pushed the terminate-ack-late-tasks-on-connection-loss branch from c4579f1 to 7d5917b Compare April 26, 2021 12:44
@thedrow thedrow marked this pull request as ready for review April 26, 2021 13:02
@thedrow thedrow requested a review from a team April 26, 2021 13:02
@lgtm-com
Copy link

lgtm-com bot commented Apr 26, 2021

This pull request introduces 1 alert when merging 7d5917b into 850c62a - view on LGTM.com

new alerts:

  • 1 for Module is imported with 'import' and 'import from'

@lgtm-com
Copy link

lgtm-com bot commented Apr 26, 2021

This pull request introduces 1 alert when merging 028f334 into 850c62a - view on LGTM.com

new alerts:

  • 1 for Unused import

Copy link
Contributor

@maybe-sybr maybe-sybr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this lgtm. I'm happy to approve the structure of this an allow @thedrow to address or resolve my minor comments as he sees fit.

Edit: Oops, forgot to do this as a review rather than comments!

@thedrow thedrow merged commit 934a227 into master Apr 28, 2021
@thedrow thedrow deleted the terminate-ack-late-tasks-on-connection-loss branch April 28, 2021 09:53
jeyrce pushed a commit to jeyrce/celery that referenced this pull request Aug 25, 2021
)

* Terminate tasks with late acknoledgement on connection loss.

* Abort task instead of terminating.

Instead of terminating the task (which revokes it and prevents its execution in the future), abort the task.

* Fix serialization error.

* Remove debugging helpers.

* Avoid revoking the task if it is aborted.

* Rename `abort` to `cancel`.

There already is a concept of abortable tasks so the term is overloaded.

* The revoke flow is no longer called twice.

If the worker already managed to report the task is revoked, there's no need to do it again.
Without this change, the `task-revoked` event and the `task_revoked` signal are sent twice.

* Unify the flow of announcing a task as cancelled.

* Add feature flag.

worker_cancel_long_running_tasks_on_connection_loss is False by default since it is possibly a breaking change.
In 6.0 it will be True by default.

* Add documentation.

* Add unit test for the task cancelling behavior.

* isort.

* Add unit tests for request.cancel().

* isort & autopep8.

* Add test coverage for request.on_failure() changes.

* Add more test coverage.

* Add more test coverage.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants