-
Notifications
You must be signed in to change notification settings - Fork 16.5k
Description
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.7.3
What happened?
Dag Information
-
generate_records_taskwill generate a list of integers based on attempt- 1 less record per attempt (4 -> 3 -> 2)
-
The list of records returned is dynamic and will serve as input to the mapped task operator to generate the intended number of mapped tasks to run on each record
-
depends_on_pastis true for this DAG -
We are not using task flow decorators
-
Business uses for this pattern : Identify cities/regions for which we need to run pipeline dynamically. The source of truth for this can change in real time.
Previous Dag Run
- When we perform actions like clear dag or clear task and downstream,
generate_records_tasktask will generate different number of records for each attempt - When a mapped task like
mapped_task_instancewas executed the second time for the same dag run, it would mark one of the mapped tasks asremovedstate. Whole mapped task was marked as success
Subsequent Dag Run
- We tried 2 scenarios for subsequent Dag Run
- Clear Dag Run
- Delete Dag Run (similar to having a new Dag Run)
- In both senarios, the
mapped_task_instancetask did not schedule and dag run was stuck inrunningstate- For Clear Dag Run: We can see that there are 3 mapped with no_status and 1 with removed.
- For Delete Dag Run: No mapped tasks were generated
- For Delete Dag Run: No mapped tasks were generated
Temporary Resolution
When we delete the removed mapped task for mapped_task_instance in the previous dag run, scheduling resumed for mapped_task_instance for current dag run
Note: We have also observed this log, due to running multiple schedulers
{scheduler_job_runner.py:1433} INFO - DAG mapped_task_issue scheduling was skipped, probably because the DAG record was locked
What you think should happen instead?
When depends_on_past is set to True, mapped task in subsequent Dag Run should only depend on the overall status of the mapped task in the previous Dag Run.
Mapped task in removed state may be an intended outcome from operational changes leading to rerun of the previous Dag Run.
How to reproduce
DAG Code for reproducing error in this word document
from airflow.models import DAG
from datetime import date, datetime, timedelta
from airflow.operators.python_operator import PythonOperator
default_args = {"owner": "joseph.ang", "depends_on_past": True, "retries": 1, "retry_delay": timedelta(minutes=1), "max_active_runs":1}
dag = DAG(
"mapped_task_issue",
description="Testing mapped task issue in Airflow v2.7.3",
schedule_interval="0 0 * * *",
catchup=True,
start_date=datetime(2024, 2, 1),
default_args=default_args,
)
def generate_records(**context):
try_number = context['task_instance'].try_number
try_number = 5 - try_number
generated_list = [ {'i': i} for i in range(try_number)]
print(f'Generated list: {generated_list}')
return generated_list
def mapped_task_method(i: int):
print(i)
generate_records_task = PythonOperator(
task_id='generate_records_task',
python_callable=generate_records,
provide_context=True,
dag=dag,
)
mapped_task = PythonOperator.partial(
task_id='mapped_task_instance',
python_callable=mapped_task_method,
dag=dag,
).expand(op_kwargs=generate_records_task.output)Operating System
Debian GNU/Linux 11 (bullseye)
Versions of Apache Airflow Providers
No response
Deployment
Other Docker-based deployment
Deployment details
Running 2 Schedulers
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct




