Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Low priority tasks are scheduled before high priority tasks #26933

Closed
1 of 2 tasks
christianbrugger opened this issue Oct 7, 2022 · 7 comments
Closed
1 of 2 tasks

Low priority tasks are scheduled before high priority tasks #26933

christianbrugger opened this issue Oct 7, 2022 · 7 comments
Labels
affected_version:2.4 Issues Reported for 2.4 area:core kind:bug This is a clearly a bug

Comments

@christianbrugger
Copy link

christianbrugger commented Oct 7, 2022

Apache Airflow version

2.4.1

What happened

It sometimes happens that low priority tasks are scheduled before high priority tasks. It happens when a tasks finishes with a follow-up tasks that has a high priority. The scheduler is then sometimes scheduling another waiting low priority task instead of the newly available high priority task.

What you think should happen instead

The scheduler should not only consider ready tasks when a task finishes, but also consider the tasks that become ready. So that chains of high priority tasks can stay active without being interrupted by low priority tasks.

How to reproduce

Create a DAG factory that creates 2 DAGs with 3 Python tasks that run for 7 seconds doing a sleep. The weight rule should be set to upstream. Also they should be made part of a pool with capacity 1.

Then start 1 run for the first and then second DAG. In about 5-10% of the cases, you will see the following run pattern D{dag}_T{task}:

D1_T1, ** D2_T1, D1_T2, D1_T3, D2_T1, D2_T2

The correct tasks schedule however should always be:

D1_T1, D1_T2, D1_T3, ** D2_T1, D2_T1, D2_T2

Operating System

Ubuntu 18.04.6, x86_64
mysql-server-5.7.39

Versions of Apache Airflow Providers

apache-airflow-providers-mysql==2.0.0

Deployment

Virtualenv installation

Deployment details

Python 3.8

Notable airflow.cfg changes:

[core]
executor = LocalExecutor
parallelism = 16
[database]
sql_alchemy_conn = mysql://airflow:airflow@localhost...
[webserver]
workers = 1
worker_class = gevent
[scheduler]
parsing_processes = 1

The above behavior shows with both

schedule_after_task_execution = True or False

Anything else

The problem does not happen every time. On my configuration it happens in about 5-10% of the cases.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@christianbrugger christianbrugger added area:core kind:bug This is a clearly a bug labels Oct 7, 2022
@potiuk
Copy link
Member

potiuk commented Oct 10, 2022

Explained likely reason in #26821 - @ashb, @ephraimbuddy likely you can confirm/verify my guesses.

@eladkal eladkal added the affected_version:2.4 Issues Reported for 2.4 label Jan 6, 2023
@eladkal
Copy link
Contributor

eladkal commented Jan 6, 2023

@ephraimbuddy can you clarify if this is a bug or intentional?

@uranusjr
Copy link
Member

uranusjr commented Jan 9, 2023

I think this is kind of intended, from my reading of the implementation. Currently the scheduler checks how many free slots in the pool, and schedule at most that number of task instances in one iteration. So if a task instance finishes very quickly, its dependencies still need to wait until those task instances that the scheduler has already decided to schedule, even if those possibly lower-priority instances have not started.

The only reasonable fix, from I can tell, is for the scheduler to schedule only one task instance per iteration, but personally I am not sure this is a common enough problem to worth slowing down the entire loop.

@ashb
Copy link
Member

ashb commented Jan 9, 2023

This is an "unavoidable" side-effect of wanting high throughput from the scheduler. If you want the scheduler to schedule tasks as fast as possible, then task priority will only come in to effect when there are more scheduled tasks waiting than the queue slots.

If you've ever tried to implement QoS of a broadband connection to have SSH/interactive sessions not get throttled by large downloads then the same thing applies there: if you allow 100% use of the bandwidth then there is no possibility for higher priority traffic to be placed ahead of bulk traffic. But if you only allow bulk traffic to take 90% of the pipe then you have some slack to put higher priorty things ahead.

There isn't quite a direct analogy in terms of the scheduler, but something might be achievable right now with using Queues. And it's possible the scheduler could be extended to do some kind of "look ahead" to see what high priority tasks might be coming up soon, but that is a decidedly non-trivial amount of work.

So I think I'm tempted to close this as "Sorry, can't fix"/docs need expanding to clarify this behaviour.

@christianbrugger
Copy link
Author

christianbrugger commented Jan 9, 2023

I am not talking about a resource conflict. That indeed would be tricky to implement.

The problem I encountered is much more basic. It happens within a high priority DAG with multiple tasks. After the first task is finished instead of scheduling the next one in this DAG, which has high priority and is ready to start, a low priority task from a different DAG is scheduled. I presume just because of the fact that it is already waiting and the dependent tasks has not been marked for scheduling.

We need to somehow make sure hat the dependent tasks of a finishing tasks are put into the waiting queue, before scheduling any new tasks for the now free slot. So that 4 always happens before 3 or even 2. The goal being that Task 1.2 is scheduled in the freed slot, instead of Task 2.2.

image

@Taragolis
Copy link
Contributor

@christianbrugger Do you have an idea or prototype how it would work with different Airflow Deployment: Executors, HA Schedulers and DB backends?

@potiuk
Copy link
Member

potiuk commented Jan 9, 2023

We need to somehow make sure hat the dependent tasks of a finishing tasks are put into the waiting queue, before scheduling any new tasks for the now free slot

In case you missed it @christianbrugger - this is precisely what @ashb described as:

some kind of "look ahead"

What you described is PRECISELY "look ahead". I think there are many complex problems involved if you try do do such look ahead. Basically whatever approach you come up with you end up with some capacity loss (also as @ashb explained). Another similar case to look at is what procesors do where they execute mutiple branches at the same time hoping that one of them will succeed. Here is the same - we do not KNOW if the task that is currrently running will succeed or not and processing and scheduling further DAG runs will depend on that. And it can get super complex when there are complex dags with multiple dependencies.

One of the problems to solve (and this is one of many) - you have to find a way to create those Task Instances in multiple variants (because previous tasks might succeed/fail/skip and each different state will likely trigger different set of DagRuns to create) - this is your queue. And to discard those DAGRuns that are not needed - when the conditions that you assumed will be fulfilled, won't be actually fullfilled. And it can propagate further and further. if your DAG has multiple "layers" this might quickly become really complex. The number of potential DAGRun combination to consider grows fast - even exponentially fast - with each layer - and you need to do proper house-keeping on those already created Task instances.
Even if you just focus on "all green" scenario you need to at the very least housekeep those task instances waiting in the queue.

This is ONE of the problems. There are many more where you have to decide about priorities of processing those tasks - some ways of handling the queues might lead to starvation of other dags and eventually you might slow-down entire system significantly if you want to handle it at scale.

There are a number of papers written on similar algorithms, and If anything, I believe a good search on possible science and math there should be taken into account, because there are many problems that might occur during such "look ahead".

So yeah. I agree with @ashb (and I think this is also what @Taragolis hinted us). At this stage, it is a discussion at most, unless we have a very concrete proposal in the form of Airflow Improvement Proposal that would explain all the details of such a propsal. In which case we can scrutinize it and comment (as all other AIPs), discuss, vote and then posisbly someone can implement it.

This is neither "bug" nor "feature" - it's a property of the current scheduler implementation and we might improve it in the future, but not without a detailed and well thought proposal that will discuss edge cases and consider scale.

@apache apache locked and limited conversation to collaborators Jan 9, 2023
@potiuk potiuk converted this issue into discussion #28809 Jan 9, 2023

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
affected_version:2.4 Issues Reported for 2.4 area:core kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

6 participants