Skip to content

branch operators inside expand/expand_kwargs-mapped TaskGroups in 3.2.x — non-selected branch siblings run instead of being skipped #67265

@joshOberhaus

Description

@joshOberhaus

Under which category would you file this issue?

Airflow Core

Apache Airflow version

3.2.1

What happened and how to reproduce it?

In the following minimal example, run_optional_step runs for each sibling (item_type full, and item_type partial), however partial is intended to be skipped::


"""Minimal example: @task.branch inside a dynamically mapped task group.
This example replicates the structure of a real-world DAG, where:
- One outer task group (process_items) fetches items and expands
    an inner task group (handle_item) via expand_kwargs.
- Inside handle_item, a @task.branch decides whether to run
    run_optional_step based on item_type.

Expected behavior
------------------
item_type == "full"    → branch returns absolute task ID → run_optional_step should run
item_type == "partial" → branch returns None             → run_optional_step should be skipped

Actual behavior
-------------------------
run_optional_step runs for ALL items.
"""

from __future__ import annotations

import pendulum
from airflow.sdk import DAG, task, task_group
from airflow.providers.standard.operators.empty import EmptyOperator

with DAG(
    dag_id="branch_in_mapped_taskgroup_example",
    schedule=None,
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    catchup=False,
) as dag:

    # ------------------------------------------------------------------ #
    # Inner task group: handles one mapped item.                           #
    # The branch should skip or run an optional step per mapped instance.  #
    # ------------------------------------------------------------------ #
    @task_group(group_id="handle_item")
    def tg_handle_item(item_id: str, item_type: str):
        """Process one item.  Only 'full' items need the extra step."""

        @task(task_id="process")
        def process(item_id: str, item_type: str) -> str:
            print(f"Processing {item_id=} {item_type=}")
            return item_type

        result = process(item_id, item_type)

        @task.branch(task_id="select_optional_step")
        def select_optional_step(item_type: str):
            if item_type == "full":
                return ["process_items.handle_item.run_optional_step"]
            return None

        branch = select_optional_step(result)
        run_optional_step = EmptyOperator(task_id="run_optional_step")
        branch >> run_optional_step

    # ------------------------------------------------------------------ #
    # Outer task group: loads items and expands the inner group once per  #
    # item. There is only one outer group instance in the DAG.            #
    # ------------------------------------------------------------------ #
    @task_group(group_id="process_items")
    def tg_process_items():
        @task(task_id="get_items")
        def get_items() -> list[dict]:
            return [
                {"item_id": "item_1", "item_type": "full"},     # optional step should run
                {"item_id": "item_2", "item_type": "partial"},  # optional step should be skipped
            ]

        tg_handle_item.expand_kwargs(get_items())

    tg_process_items()

What you think should happen instead?

item_type == "full" → branch returns absolute task ID → run_optional_step should run
item_type == "partial" → branch returns None → run_optional_step should be skipped

This is what happens up through 3.1.7.

Operating System

No response

Deployment

None

Apache Airflow Provider(s)

No response

Versions of Apache Airflow Providers

No response

Official Helm Chart version

Not Applicable

Kubernetes Version

No response

Helm Chart configuration

No response

Docker Image customizations

No response

Anything else?

This was originally discussed in #65745, however, it seems like that issue is either not-reproducible or describing another issue.

Further context from @martijn-exads:
We think we've traced it to #62287, specifically this line in NotPreviouslySkippedDep:
xcom_map_index = ti.map_index if parent.is_mapped else -1

For a branch operator inside a mapped @task_group, the operator itself has no .expand() so parent.is_mapped is False, but its TIs run with map_index >= 0 and SkipMixin writes XCOM_SKIPMIXIN_KEY at the parent TI's per-map map_index. The dep then queries map_indexes=-1, finds nothing, returns "not skipped", and the non-selected sibling proceeds.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions