Skip to content

Task group names are duplicated in Task's task_id for MappedOperator #52334

Open
@kacpermuda

Description

@kacpermuda

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

When MappedOperator is inside a task group, the task_id on the task object gets duplicated. It looks fine on TaskInstance object. This is only the case for Airflow 3, in Airflow 2 it works fine.

Example logs from MappedOperator task (notice the bolded difference between TI object's task_id and task object's task_id):

DEBUG - Calling 'on_task_instance_success' with {'previous_state': <TaskInstanceState.RUNNING: 'running'>, 'task_instance': RuntimeTaskInstance(id=UUID('0197b142-c5df-7df3-acea-1bca16fd516c'), task_id='tg1.mapped_task', dag_id='dag_task_group_with_mapped', run_id='manual__2025-06-27T12:00:32.717672+00:00', try_number=1, map_index=0, hostname='51815b4293dc', context_carrier=None, task=<Task(CustomMappedOperator): tg1.tg1.mapped_task>, ...

DEBUG - Running finalizers: ti="RuntimeTaskInstance(id=UUID('0197b142-c5e1-7e6a-9f4c-5a92086314d7'), task_id='tg1.tg2.mapped_task2', dag_id='dag_task_group_with_mapped', run_id='manual__2025-06-27T12:00:32.717672+00:00', try_number=1, map_index=0, hostname='51815b4293dc', context_carrier=None, task=<Task(CustomMappedOperator): tg1.tg2.tg1.tg2.mapped_task2>,

What you think should happen instead?

task_id should be the same on task and task_instance object, without duplicated task group names.

How to reproduce

import datetime as dt

from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup

class CustomMappedOperator(BashOperator):

    def execute(self, context):
        # Task object has wrong task_id. TaskInstance object has correct task_id.
        print(context["task"].task_id) # Wrong - tg1.tg1.mapped_task or tg1.tg2.tg1.tg2.mapped_task
        print(context["task_instance"].task_id) # Correct - tg1.mapped_task
        print(context["task_instance"].task.task_id) # Wrong - tg1.tg1.mapped_task or tg1.tg2.tg1.tg2.mapped_task
        return super().execute(context)

with DAG(
    dag_id="dag_task_group_with_mapped",
    start_date=dt.datetime(2025, 6, 25),
    schedule=None,
) as dag:

    with TaskGroup("tg1") as tg:
        mapped_task = CustomMappedOperator.partial(task_id="mapped_task").expand(bash_command=["exit 0;"])
        task = BashOperator(task_id="task", bash_command="exit 0;")
        
        with TaskGroup("tg2", parent_group=tg):
            mapped_task2 = CustomMappedOperator.partial(task_id="mapped_task2").expand(bash_command=["exit 0;"])
            task2 = BashOperator(task_id="task2", bash_command="exit 0;")

Operating System

MacOS with breeze

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

Breeze

Anything else?

Did not have time to dig into it and find a cause, but will be happy to fix this whenever I have a free moment.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions