Skip to content

Airflow2 : When skipping by BranchPythonOperator with taskgroup, all downstream tasks is skipping. #32793

@keomgak

Description

@keomgak

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

I'm testing taskgroup in airflow2. My dag is like this,,

workflow_start_task
child_taskgroup
\first_task
\decision(BranchPythonOperator)
\start_job_task
\grandchild1_taskgroup
 \\first_task
 \\decision(BranchPythonOperator) <--------- point1
 \\start_job_task
 \\run_job_task
 \\success_job_task
 \\skip_end                 <---------------- point2                      
\grandchild2_taskgroup
 \\first_task
 \\decision(BranchPythonOperator)
 \\start_job_task
 \\run_job_task
 \\success_job_task
 \\skip_end
\success_job_task
\skip_end
workflow_end_task

When BranchPythonOperator(point 1) -> 'skip_end'(point2) case is occured, all downstream tasks are skipped.
The result is shown in the screenshot below.

screenshot_taskgroup

I think the grandchild2_taskgroup task and the remaining tasks of the child_taskgroup should run, but I don't know why it is skipped.

What you think should happen instead

No response

How to reproduce

Here is my test code.

test_taskgroup.py

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator

from test_child_taskgroup import Child_Dag

with DAG(
    dag_id="test_taskgroup",
    schedule_interval="@daily",
    start_date=datetime(2023, 7, 1),
    end_date=datetime(2023, 7, 1),
    default_args={"retries": 0},
) as dag:
    start_task = BashOperator(
        task_id='workflow_start',
        bash_command='echo 1',
    )

    child_task = Child_Dag(dag.dag_id).get_dag()

    end_task = BashOperator(
        task_id='workflow_end',
        bash_command='echo 1',
    )

    start_task >> child_task >> end_task

test_child_taskgroup.py

from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.task_group import TaskGroup

from test_grandchild_taskgroup import Grandchild_Dag

def start_decision():
    return "child.start_job_task"

class Child_Dag:
    def __init__(self, parent_dag_id):
        self.parent_dag_id = parent_dag_id
        self.workflow_id = "child"

    def get_dag(self):
        dag = DAG(
            '%s.%s' % (self.parent_dag_id, self.workflow_id),
            )
        with TaskGroup(self.workflow_id) as child_dag:
            first_task = BashOperator(
                task_id='first_task',
                bash_command='echo 1'
            )

            start_job_task = BashOperator(
                task_id='start_job_task',
                bash_command='echo 1'
            )

            grandchild1_id = 'grandchild1_task'
            grandchild1_task = Grandchild_Dag(parent_dag_id=self.workflow_id, dataset_id=grandchild1_id).get_dag()

            grandchild2_id = 'grandchild2_task'
            grandchild2_task = Grandchild_Dag(parent_dag_id=self.workflow_id, dataset_id=grandchild2_id).get_dag()

            success_job_task = BashOperator(
                task_id='success_job_task',
                bash_command='echo 1'
            )

            skip_end = BashOperator(
                task_id='skip_end',
                bash_command='echo 1',
            )

            decision = BranchPythonOperator(
                task_id='decision',
                python_callable=start_decision,
            )

            first_task >> decision >> start_job_task >> grandchild1_task >> grandchild2_task >> success_job_task
            decision >> skip_end

        return child_dag

test_grandchild_taskgroup.py

from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.task_group import TaskGroup


def start_decision(dataset_id, **kwargs):
    return 'child.' + dataset_id + '.skip_end'

class Grandchild_Dag:
    def __init__(self, parent_dag_id, dataset_id):
        self.parent_dag_id = parent_dag_id
        self.dataset_id = dataset_id

    def get_dag(self):
        dag = DAG(
            '%s.%s' % (self.parent_dag_id, self.dataset_id),
            )
        with TaskGroup(self.dataset_id) as grandchild_dag:
            first_task = BashOperator(
                task_id='first_task',
                bash_command='echo 1'
            )

            start_job_task = BashOperator(
                task_id='start_job_task',
                bash_command='echo 1'
            )

            run_job_task = BashOperator(
                task_id='run_job_task',
                bash_command='echo 1'
            )

            success_job_task = BashOperator(
                task_id='success_job_task',
                bash_command='echo 1'
            )

            skip_end = BashOperator(
                task_id='skip_end',
                bash_command='echo 1',
            )

            decision = BranchPythonOperator(
                task_id='decision',
                python_callable=start_decision,
                op_kwargs={'dataset_id': self.dataset_id},
            )

            first_task >> decision >> start_job_task >> run_job_task >> success_job_task
            decision >> skip_end

        return grandchild_dag

Operating System

ubuntu 22.04

Versions of Apache Airflow Providers

composer 2.1.8 + airflow 2.4.3
composer 2.3.3 + airflow 2.5.1

Deployment

Google Cloud Composer

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions