Skip to content

EmptyOperator in dynamically mapped TaskGroups does not respect upstream dependencies #32283

@seeholza

Description

@seeholza

Apache Airflow version

2.6.2

What happened

When using an EmptyOperator in dynamically mapped TaskGroups (https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html#mapping-over-a-task-group), the EmptyOperator of all branches starts as soon as the first upstream task dependency of the EmptyOperator in any branch completes. This causes downstream tasks of the EmptyOperator to start prematurely in all branches, breaking depth-first execution of the mapped TaskGroup.

I have provided a test for this behavior below, by introducing an artificial wait time in a variable_task, followed by an EmptyOperator in checkpoint and a final dependent task .
image

Running this test, during the execution I see this: The checkpoint and final tasks are already complete, while the upstream variable_task in the group is still running.
image
I have measured the difference of time when of each the branches' final tasks execute, and compared them, to cause a failure condition, which you can see failing here in the assert_branch_waited task.

By using just a regular Task, one gets the correct behavior.

What you think should happen instead

In each branch separately, the EmptyOperator should wait for its dependency to complete, before it starts. This would be the same behavior as using a regular Task for checkpoint.

How to reproduce

Here are test cases in two dags, one with an EmptyOperator, showing incorrect behavior, one with a Task in sequence instead of the EmptyOperator, that has correct behavior.

import time
from datetime import datetime

from airflow import DAG
from airflow.decorators import task, task_group
from airflow.models import TaskInstance
from airflow.operators.empty import EmptyOperator

branches = [1, 2]
seconds_difference_expected = 10

for use_empty_operator in [False, True]:
    dag_id = "test-mapped-group"
    if use_empty_operator:
        dag_id += "-with-emptyoperator"
    else:
        dag_id += "-no-emptyoperator"

    with DAG(
        dag_id=dag_id,
        schedule=None,
        catchup=False,
        start_date=datetime(2023, 1, 1),
        default_args={"retries": 0},
    ) as dag:

        @task_group(group_id="branch_run")
        def mapped_group(branch_number):
            """Branch 2 will take > `seconds_difference_expected` seconds, branch 1 will be immediately executed"""

            @task(dag=dag)
            def variable_task(branch_number):
                """Waits `seconds_difference_expected` seconds for branch 2"""
                if branch_number == 2:
                    time.sleep(seconds_difference_expected)
                return branch_number

            variable_task_result = variable_task(branch_number)

            if use_empty_operator:
                # emptyoperator as a checkpoint
                checkpoint_result = EmptyOperator(task_id="checkpoint")
            else:

                @task
                def checkpoint():
                    pass

                checkpoint_result = checkpoint()

            @task(dag=dag)
            def final(ti: TaskInstance = None):
                """Return the time at the task execution"""
                return datetime.now()

            final_result = final()
            variable_task_result >> checkpoint_result >> final_result

            return final_result

        @task(dag=dag)
        def assert_branch_waited(times):
            """Check that the difference of the start times of the final step in each branch
            are at least `seconds_difference_expected`, i.e. the branch waited for all steps
            """
            seconds_difference = abs(times[1] - times[0]).total_seconds()
            if not seconds_difference >= seconds_difference_expected:
                raise RuntimeError(
                    "Branch 2 completed too fast with respect to branch 1: "
                    + f"observed [seconds difference]: {seconds_difference}; "
                    + f"expected [seconds difference]: >= {seconds_difference_expected}"
                )

        mapping_results = mapped_group.expand(branch_number=branches)
        assert_branch_waited(mapping_results)

Operating System

Debian GNU/Linux 11 (bullseye) on docker (official image)

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions