-
Notifications
You must be signed in to change notification settings - Fork 16.5k
Automatically reschedule stuck queued tasks in CeleryExecutor #23432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
|
airflow/executors/celery_executor.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One issue we had before was load on the scheduler. Is the UI complaining about the scheduler? Also, how often do you get the AirflowTaskTimeout error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the UI isn't complaining at all. As for the timeout I've never seen it - I only put that code in because you had it in the original PR here 32d7060 ....but I guess I did change the query to probably be a decent amount faster than the previous implementation, so I really doubt it's necessary anymore
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few code changes, and this needs documenting -- since with this change if you have too many items in the celery queue then they will get cleared, even if they aren't lost.
For that reason I think I'd like you to look at If we can see if the taskmeta is actually lost, to separate "lost" from "just in a queue behind n long running tasks."
airflow/config_templates/config.yml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need a new setting, or is the existing "orphan" settings enough here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the orphan/adoption stuff doesn't really have an appropriate setting to use instead afaik - the "adopted task timeout" runs on every heartbeat, which is fine because it's very cheap unless a task has actually timed out....in contrast, this check will (minimally) issue a database query every time it runs, which we probably don't want to do on every heartbeat.
Alternatively, instead of issuing the query to find these tasks we could internally track "sent to celery time" and "last known state" by task....then we wouldn't need to issue the query and we could run on every heartbeat instead...I kinda like this option, but if we're talking about separating "lost" from "just in a long queue" then it starts to get sketchy (see separate comment below)
In theory this feature also largely makes _check_for_stalled_adopted_tasks() unnecessary, as "stalled adopted tasks" and "lost tasks" are effectively the same thing. I didn't remove _check_for_stalled_adopted_tasks() though as I figured it added unnecessary risk, and I was trying to stay true to the design of the original PR by ephraimbuddy. A stalled adopted task, I guess, also has a slight behavioural difference - it fails the task rather than re-scheduling it (which I'm not entirely convinced is really correct)
airflow/executors/celery_executor.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong timer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually is using this task_adoption_timeout setting for the "max queued time"....it only has its own setting for "run frequency"
1743e4a to
29f7644
Compare
@ashb Yes, this would be great, I agree. I'm far from a celery expert but as far as I can tell celery does not provide any capability to do this. Any implementation would need to be broker-specific, and directly access celery internals, reading pending work items from the queue directly. This is not possible on SQS or rabbit afaik...but I've only witnessed this problem on redis, so maybe the fix only needs to work on redis. Even on redis the solution isn't pretty though... The redis queue in celery is implemented using a redis "list" data type, which, internally is a linked-list. So, there's no great way to check if it contains a given task - we need to scan the entire list O(n) to check if a single task is in there. Redis does provide an Is rescheduling a "just-in-a-long-queue" task really that bad? Maybe I don't fully grasp the impacts of it, but revoking and re-enqueuing a task which was healthy and waiting seems like a pretty good option compared to the alternative 😄 |
|
@ashb As another option, I could stop using the task_adoption_timeout and instead add an explicit |
|
First, thank you so much for working on this. We’ve run into a very similar issue and are eagerly looking forward to a fix. We also now have an isolated environment where we can reproduce this, so I’d be happy to test some version of this patch if you’d like (may have turnarounds of a week or so, sorry in advance). I went ahead and tested your current code with our setup and it works perfectly for us - the stuck task is revoked in Celery so there is no chance of it running twice and sent back to the scheduler.
I’m also not a celery expert, but I believe tasks lost due to a worker shutdown like celery/celery#7266 have already gone from the broker queue to the celery worker consumer. This means that you would be able to find the tasks on celery rather than on the broker. You might try to look at We experience this behavior due to a different bug in Celery. You can see a longer description of our experience here.
I’m either misunderstanding this or it’s not accurate. At least, there is a As a sidenote, for tasks lost by shut down workers like celery/celery#7266, you might give this config in celery a shot. |
Oooh yes, you're absolutely correct. I had the blinders on and was only looking at the "adopted task timeout" functionality of the celery executor - I totally forgot about the actual orphaned task adoption functionality of the scheduler 😂
Thanks for the tip...I'll look into this a little further but I'm pretty sure they won't be there. My understanding is that reserved tasks will eventually get re-delivered to a different worker after 6 hours, but I've had tasks stuck for far longer than that...and I did also verify that the stuck tasks did not exist in the |
|
FYI I have re-implemented this feature as an expansion of the existing "adopted task timeout" capability in #23690. I believe the new implementation is superior as it is cleaner (IMHO), lighter-weight, and introduces minimal behavioural change unless it's explicitly enabled in the config. |
e5a587e to
fec8bb1
Compare
|
Closing in favour of #23690 |
Celery can lose tasks on worker shutdown, causing airflow to just wait on them indefinitely (may be related to celery/celery#7266). This PR detects these "hung" tasks and sets them back to
SCHEDULEDstate so the scheduler can queue them up again.Closes: #19699
This is basically a resurrection of PR19769 which was reverted because no one could reproduce/test it. We can reproduce the problem reliably in an isolated test environment, so we were able to test this fairly thoroughly.
cc: @ephraimbuddy @kristoffern @kaxil
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragement file, named
{pr_number}.significant.rst, in newsfragments.