Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally allow a task pool to count tasks in the 'deferred' state as occupying slots in that pool #29416

Closed
2 tasks done
james-seymour-cubiko opened this issue Feb 8, 2023 · 6 comments
Labels

Comments

@james-seymour-cubiko
Copy link

james-seymour-cubiko commented Feb 8, 2023

Description

Optionally allow a task pool to count tasks in the 'deferred' state as occupying slots in that pool - not sure what the best way of implementing this is, but currently my very hacky solution is to patch the airflow.models.pool.Pool.slots_stats method to include deferred tasks as running in each pool.

Use case/motivation

The prototypical usecase here is using Airflow to limit the number of concurrent queries executing against a database while keeping the benefit of waiting for those queries to complete on a triggerer (where a proxy is used to execute queries instead of a direct connection to the db)

In our case, we use Airflow to orchestrate an Azure Data Factory that executes queries against a database and moves the resulting data.

We have an airflow task trigger a single pipeline run in that data factory, which then defers and waits for that pipeline run to complete in the triggerer (for efficiency) before continuing the dag run.

However, we have ~100 tasks that all execute a pipeline run on the same factory - ideally we would execute all of these pipelines concurrently, but the database is quickly overwhelmed by that many queries at the same time, resulting in timeouts. Therefore, the next best option is to limit the concurrency of those queries with a task pool in Airflow.

This can currently be achieved with Airflow's task pools, but only if we keep each of those tasks in the running state while waiting for each query to complete (as deferred tasks do not occupy slots in the task pool). Otherwise, if we defer the tasks while waiting, then we lose the concurrency limits of the pool, as all ~100 tasks are free to defer at the same time, so its currently an either / or solution.

I am aware that in this specific case that ADF does support a maximum pipeline run concurrency setting, which is a much quicker way to solve this problem, but we have other extraction tools that we can't rely on to limit concurrency in this way, and I thought I would just throw this idea out here anyway in case others might find it helpful :)

Related issues

Somewhat related - #15082

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@james-seymour-cubiko james-seymour-cubiko added the kind:feature Feature Requests label Feb 8, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Feb 8, 2023

Thanks for opening your first issue here! Be sure to follow the issue template!

@potiuk
Copy link
Member

potiuk commented Feb 8, 2023

Yes. The small problem is that this must be a global feature due to the way how pools work for now and it would be a potentially breaking behaviour and one that has potential to dead-lock in some circumstances.

But I tihnk we could likely do it in the way that Pool could have a "include_deferred" flag that could be false by default and I think the query in scheduler could be efficiently using it to include/exclude deferred tasks. That would be a nice "feature" - but it has a few consequences across the board (modifying CLI, UI, etc.).

Overall it's a nice feature for someone who would like to get more insights on some Airflow internals.

@tanelk
Copy link
Contributor

tanelk commented Feb 9, 2023

There is one more thing to keep in mind.

The lifecycle of a deferrable task is something like this:
none -> scheduled -> queued -> running -> deferred -> scheduled -> queued -> running -> success
Currently it only consumes pool slots in queued and running states and you'd like to add deferred to that list. In that chain there still is a scheduled (the second one), that splits the execution into two parts. Handling that could be little more work than just updating the Pool.slots_stats. TI.next_method could be used to distinguish the two scheduled states but that does not jibe with scheduler.

@pvsmpraveen
Copy link

Similar usecase, but with Sensor mode='reschedule'

@Usiel
Copy link
Contributor

Usiel commented Nov 10, 2023

I think this issue is solved by #32709 already.

@potiuk
Copy link
Member

potiuk commented Nov 10, 2023

Ah yes, indeed.

@potiuk potiuk closed this as completed Nov 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants