Description
Apache Airflow version
main (development)
If "Other Airflow 2 version" selected, which one?
No response
What happened?
When MappedOperator is inside a task group, the task_id on the task object gets duplicated. It looks fine on TaskInstance object. This is only the case for Airflow 3, in Airflow 2 it works fine.
Example logs from MappedOperator task (notice the bolded difference between TI object's task_id and task object's task_id):
DEBUG - Calling 'on_task_instance_success' with {'previous_state': <TaskInstanceState.RUNNING: 'running'>, 'task_instance': RuntimeTaskInstance(id=UUID('0197b142-c5df-7df3-acea-1bca16fd516c'), task_id='tg1.mapped_task', dag_id='dag_task_group_with_mapped', run_id='manual__2025-06-27T12:00:32.717672+00:00', try_number=1, map_index=0, hostname='51815b4293dc', context_carrier=None, task=<Task(CustomMappedOperator): tg1.tg1.mapped_task>, ...
DEBUG - Running finalizers: ti="RuntimeTaskInstance(id=UUID('0197b142-c5e1-7e6a-9f4c-5a92086314d7'), task_id='tg1.tg2.mapped_task2', dag_id='dag_task_group_with_mapped', run_id='manual__2025-06-27T12:00:32.717672+00:00', try_number=1, map_index=0, hostname='51815b4293dc', context_carrier=None, task=<Task(CustomMappedOperator): tg1.tg2.tg1.tg2.mapped_task2>,
What you think should happen instead?
task_id should be the same on task and task_instance object, without duplicated task group names.
How to reproduce
import datetime as dt
from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
class CustomMappedOperator(BashOperator):
def execute(self, context):
# Task object has wrong task_id. TaskInstance object has correct task_id.
print(context["task"].task_id) # Wrong - tg1.tg1.mapped_task or tg1.tg2.tg1.tg2.mapped_task
print(context["task_instance"].task_id) # Correct - tg1.mapped_task
print(context["task_instance"].task.task_id) # Wrong - tg1.tg1.mapped_task or tg1.tg2.tg1.tg2.mapped_task
return super().execute(context)
with DAG(
dag_id="dag_task_group_with_mapped",
start_date=dt.datetime(2025, 6, 25),
schedule=None,
) as dag:
with TaskGroup("tg1") as tg:
mapped_task = CustomMappedOperator.partial(task_id="mapped_task").expand(bash_command=["exit 0;"])
task = BashOperator(task_id="task", bash_command="exit 0;")
with TaskGroup("tg2", parent_group=tg):
mapped_task2 = CustomMappedOperator.partial(task_id="mapped_task2").expand(bash_command=["exit 0;"])
task2 = BashOperator(task_id="task2", bash_command="exit 0;")
Operating System
MacOS with breeze
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
Breeze
Anything else?
Did not have time to dig into it and find a cause, but will be happy to fix this whenever I have a free moment.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct