-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Add depends_on_previous_task_ids parameter to allow tasks to depend on specific tasks from previous DAG run #60385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add depends_on_previous_task_ids parameter to allow tasks to depend on specific tasks from previous DAG run #60385
Conversation
This metric was accidentally disabled in Airflow 3.0 with a TODO comment referencing AIP-66. The dag_processing.last_run.seconds_ago metric continued to work, but dag_processing.last_duration was completely missing. This fix: - Uncomments the Stats.timing() calls for the last_duration metric - Updates the code to use the correct relative_fileloc parameter - Re-enables the previously skipped test Closes apache#60325
…pend on specific tasks from previous DAG run This commit adds the depends_on_previous_task_ids parameter to BaseOperator, allowing a task to depend on the successful completion of multiple specific tasks from the previous DAG run (within the same DAG). Previously, depends_on_past=True only allowed a task to depend on the same task in the previous dag_run. This enhancement enables more flexible cross-task dependencies on previous runs. Changes: - Added depends_on_previous_task_ids parameter to BaseOperator - Validation ensures depends_on_past=True when using depends_on_previous_task_ids - Extended PrevDagrunDep to check dependencies on specified tasks - Added comprehensive unit tests Closes apache#60328
|
Hey, A review is Highly appreciated. |
|
Honeslty I'm not sure we want to support this -- it seems like quite an uncommon use case, and I worry about the maintainability of yet another feature, in terms of code, API surface area and proliferation of options for users. I don't see from your usage example why this is actually needed, and why the depends_on_past won't fit? |
Indeed @bujji8411 - as the author of the issue - can you please add more explanation about the concrete use case you had ? That would be easier to understand why it's needed and why currrent depends_on_past is needed. On slack you mentioned that you have very concrete examples of such functionality needed. |
|
This pull request has been automatically marked as stale because the author has not responded to a request for more information. It will be closed in 7 days if no further activity occurs. Thank you for your contributions. |
|
Concrete use case and why Thanks for the feedback, let me clarify the concrete use case and why Pipeline scenario:This is a stateful file-based ingestion workflow:
Key characteristics:
Failure scenariodag_run_1 Key points:
If we set: stp.depends_on_past = True Then in dag_run_2:
What we actually need is:
This is a group-level previous-run dependency, not a single-task dependency. Summary
|
|
I am closing all your PRs @Arunodoy18 - ydespite earlier warnings, you are not looking at your PRs, submit multiple unrelated, AI generated changes without even looking at them. If this continues to happen, we will ask ASF infra to block your user for any ASF contributions. |
This PR implements the
depends_on_previous_task_idsparameter for tasks, allowing a task to depend on the successful completion of multiple specific tasks from the previous DAG run (within the same DAG).Problem Statement
Previously,
depends_on_past=Trueonly allowed a task to depend on the same task in the previous dag_run. There was no native way for a task to depend on multiple specific tasks from the previous dag_run.Solution
Added a new parameter
depends_on_previous_task_idsto theBaseOperatorthat accepts a list of task IDs. When set (along withdepends_on_past=True), the task will only run if all specified tasks in the previous DAG run have succeeded.Changes Made
1. task-sdk/src/airflow/sdk/bases/operator.py
depends_on_previous_task_idsparameter toBaseOperator.__init__()depends_on_past=Truewhen usingdepends_on_previous_task_idstemplate_fieldsfor templating support2. airflow-core/src/airflow/ti_deps/deps/prev_dagrun_dep.py
PrevDagrunDep._get_dep_statuses()method to check dependencies on specified tasks from previous DAG rundepends_on_previous_task_idsexist and have succeeded in the previous run3. airflow-core/tests/ti_deps/deps/test_prev_dagrun_dep_specific_tasks.py
Usage Example
``python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
with DAG('example_dag', start_date=datetime(2022, 1, 1), schedule_interval='@daily') as dag:
task_a = PythonOperator(task_id='task_a', python_callable=lambda: print('A'))
task_b = PythonOperator(task_id='task_b', python_callable=lambda: print('B'))
``
Behavior
AirflowExceptionifdepends_on_previous_task_idsis set withoutdepends_on_past=TrueTesting
test_prev_dagrun_dep_specific_tasks.pydepends_on_past=True)Closes
Closes #60328