Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tasks with 'none_failed_min_one_success' trigger_rule skipping before Dynamic Task Group is fully expanded #39801

Open
1 of 2 tasks
gavinhonl opened this issue May 24, 2024 · 14 comments · May be fixed by #40428
Open
1 of 2 tasks

Comments

@gavinhonl
Copy link

gavinhonl commented May 24, 2024

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.3 and 2.9.1

What happened?

If a DAG is run for the first time, a task with a 'none_failed_min_one_success' trigger rule is almost immediately skipped before the dependant upstream tasks are complete resulting in:
image

If I clear the same DAG run, with the process_items task group fully expanded, the DAG completes as expected:
image

What you think should happen instead?

If the a_end task has a 'none_failed_min_one_success' trigger rule it should only be run when its upstream dependant tasks are complete and not in a failed or upstream_failed state and at least one upstream task has succeeded.

How to reproduce

DAG Code:

from datetime import datetime, timedelta

from airflow.decorators import task, task_group
from airflow import DAG, AirflowException
from airflow.operators.empty import EmptyOperator

default_args = {
    'owner': 'insight_techops',
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 1),
    'provide_context': True,
    'retries': 0,
    'retry_delay': timedelta(seconds=30)
}

@task
def parse_csv_schedule():
    items_dict = {'A': '1', 'B': '2', 'C': '3', 'D': '4'}
    return items_dict

@task_group(group_id="process_items")
def process_items(items_dict: dict):
    @task
    def retrieve_item_metadata(items: dict):
        media_asset = items[0]
        print(media_asset)
        return media_asset

    @task_group(group_id="a_process")
    def a_process():
        @task.branch(retries=0)
        def a_start():
            a_handling = 'none'
            if a_handling == 'bypass':
                return "process_items.a_process.a_bypass"
            else:
                return "process_items.a_process.a_end"
            
        a_start = a_start()
        a_bypass = EmptyOperator(task_id='a_bypass')
        a_end = EmptyOperator(task_id='a_end', trigger_rule='none_failed_min_one_success')
        a_start >> a_bypass >> a_end
        a_start >> a_end

    @task
    def mark_item_as_done():
        try:
            print(f"Marking item as Done")
        except Exception as error:
            raise AirflowException(error)
    
    item_dict = retrieve_item_metadata(items=items_dict)
    mark_item_as_done = mark_item_as_done()
    a_process = a_process()
    item_dict >> a_process >> mark_item_as_done

with DAG(dag_id='af055_Debugger', default_args=default_args, max_active_runs=1, schedule_interval=None, tags=['sales']):
    end = EmptyOperator(task_id='end')
    items_dict = parse_csv_schedule()
    items_dict >> process_items.expand(items_dict=items_dict) >> end

I'm pretty sure it is related to the 'none_failed_min_one_success' as if I cause an upstream task within the TaskGroup to fail, the task with the trigger rule in question (and subsequent downstream tasks) are skipped:
image

Compared to if the trigger rule for the a_end is default:
image

Operating System

Mac OS 13.6 and Ubuntu 22.0.4

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

Dev:
Docker Desktop, Helm, Kubernetes

Prod:
EKS, Helm

Anything else?

100% reproducible

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@gavinhonl gavinhonl added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels May 24, 2024
Copy link

boring-cyborg bot commented May 24, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@RNHTTR
Copy link
Contributor

RNHTTR commented May 25, 2024

Here's a slightly simpler reproduction without nested task groups or branching:

from airflow.decorators import dag, task, task_group
from pendulum import datetime


@dag(
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
    doc_md=__doc__,
    tags=["example"],
)
def repro():

    @task
    def init():
        return ["seize", "the", "day"]

    @task_group(group_id="tg")
    def tg(message):
        @task
        def imsleepy(message):
            return message

        @task(trigger_rule="none_failed_min_one_success")
        def imawake(message):
            print(message)

        imawake(imsleepy(message))

    tg.expand(message=init())


repro()

@RNHTTR RNHTTR added area:dynamic-task-mapping AIP-42 and removed area:core needs-triage label for new issues that we didn't triage yet labels May 25, 2024
@mateuslatrova
Copy link
Contributor

Hi, could I take this issue?

@gavinhonl
Copy link
Author

Please do. It would be great if you can confirm that you are able to reproduce the issue.

@mateuslatrova
Copy link
Contributor

Please do. It would be great if you can confirm that you are able to reproduce the issue.

I have just reproduced the issue using @RNHTTR 's example code.

Does anyone have an idea of which parts of the codebase I should take a look first to find this bug?

In the meanwhile I will try to figure it out.

@RNHTTR
Copy link
Contributor

RNHTTR commented Jun 1, 2024

Please do. It would be great if you can confirm that you are able to reproduce the issue.

I have just reproduced the issue using @RNHTTR 's example code.

Does anyone have an idea of which parts of the codebase I should take a look first to find this bug?

In the meanwhile I will try to figure it out.

Some candidate starting points:

One good idea looking into what's going on would be to review the unit tests for none_failed_min_one_success and see the expected behavior.

@mateuslatrova
Copy link
Contributor

Please do. It would be great if you can confirm that you are able to reproduce the issue.

I have just reproduced the issue using @RNHTTR 's example code.
Does anyone have an idea of which parts of the codebase I should take a look first to find this bug?
In the meanwhile I will try to figure it out.

Some candidate starting points:

One good idea looking into what's going on would be to review the unit tests for none_failed_min_one_success and see the expected behavior.

Nice, thank you so much! I will take a look.

@le-chartreux
Copy link
Contributor

Hello, I am also facing this issue, it will be awesome if someone can find a solution (or at least a workaround).

Best regards and thank you for developing Airflow!

@mateuslatrova
Copy link
Contributor

mateuslatrova commented Jun 18, 2024

Hi @le-chartreux ! I have already found the issue, and I am working on a solution. Soon I will send an explanation here.

@le-chartreux
Copy link
Contributor

Awesome, thank you @mateuslatrova!

@gavinhonl
Copy link
Author

@mateuslatrova - Great to hear there's progress with this!

@le-chartreux
Copy link
Contributor

Hello Airflow Community,

While waiting for a resolution to the trigger rule issue, I wanted to share a workaround that I've implemented.
While it may not address every scenario, it could be beneficial for similar use cases.

Problem Context

In scenarios involving branching, where only one (or a part) of several tasks is executed and its result is needed downstream, the ideal trigger rule for the downstream task that will get the output would be NONE_FAILED_MIN_ONE_SUCCESS.

For example:

branching ─┬─> task_a ─┐
           OR          |─> task_to_get_the_result_of_the_task_that_runned
           └─> task b ─┘

Unfortunately, this trigger rule doesn't work as expected within dynamic task groups, leading to the downstream task being erroneously skipped.

Workaround Strategy

My workaround involves using the NONE_FAILED trigger rule, which does not exhibit the skipping behavior.
To emulate the desired NONE_FAILED_MIN_ONE_SUCCESS logic, I've incorporated the use of AirflowSkipException to skip the downstream task when all preceding tasks are skipped (i.e., when all outputs are None).

Below is an example of using NONE_FAILED instead of NONE_FAILED_MIN_ONE_SUCCESS:

from airflow.decorators import dag, task, task_group
from pendulum import datetime
from airflow.utils.trigger_rule import TriggerRule
from airflow.exceptions import AirflowSkipException


@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def workaround():
    directions = get_directions()
    get_message_from_direction.expand(direction=directions)


@task
def get_directions() -> list[str]:
    return ["left", "right", "bottom"]


@task_group
def get_message_from_direction(direction: str) -> str | None:
    message_from_left = left_task()
    message_from_right = right_task()
    choose_next_task_from_direction(direction) >> [message_from_left, message_from_right]
    return get_message(message_from_left, message_from_right)


@task.branch
def choose_next_task_from_direction(direction: str) -> str | None:
    this_group = "get_message_from_direction"
    if direction == "left":
        return f"{this_group}.left_task"
    if direction == "right":
        return f"{this_group}.right_task"
    # return None, nothing follows


@task
def left_task():
    return "message from left"


@task
def right_task():
    return "message from right"


# NONE_FAILED_MIN_ONE_SUCCESS nor ONE_SUCCESS are used because they aren't working
# inside dynamic task groups: see https://github.com/apache/airflow/issues/39801
# To workaround, NONE_FAILED is used, but it's not a problem:
# - If no previous task failed and at least one of the previous tasks returns a message,
#   (i.e., at least one succeed), everything is fine and the message is returned.
# - If at least one of the previous tasks failed, this task will not be triggered.
# - If all the previous tasks are skipped, no message will be provided and this task
#   will be skipped with the AirflowSkipException.
@task(trigger_rule=TriggerRule.NONE_FAILED)
def get_message(message_from_left: str | None, message_from_right: str | None) -> str:
    # Please note that the output of a successfully task should not be equivalent to False
    # for the or to work! Else, use something like 'if message_from_left is not None: ...'
    message = message_from_left or message_from_right  
    if message:
        # one succeed
        return message
    raise AirflowSkipException("Both skipped.")


workaround()

The get_message task will effectively mimic the behavior of NONE_FAILED_MIN_ONE_SUCCESS :

  • Retrieve the message from the task that was executed and succeeded.
  • Be skipped if both preceding tasks were skipped.
  • Not be triggered if any preceding task failed.

I hope this workaround proves useful to others facing the same issue.
Your feedback and suggestions for improvement are welcome.

Best regards,
Nathan Rousseau

@mateuslatrova
Copy link
Contributor

mateuslatrova commented Jun 19, 2024

The issue

I will use @RNHTTR 's example DAG to make the explanation simpler. So, we have the following tasks:

init >> tg (imsleepy >> imawake)

When the init task instance terminates successfully, both task instances, imsleepy and imawake, are in None state and they are also not expanded yet (meaning, their map_index is -1).

Then, the scheduler gets the next task instance to be run, which is imsleepy, checks that its dependencies are met, and expands it into three task instances with None state (and map_indexes 0, 1 and 2).

Next, the scheduler gets the following task instance to be run, which is imawake. At this moment, only the init task instance has finished running. So, when checking imawake task instance's dependencies inside method _evaluate_direct_relatives from TriggerRuleDep class, we have the following code to calculate how many upstream tasks it has:

if not any(t.get_needs_expansion() for t in upstream_tasks.values()):
upstream = len(upstream_tasks)
upstream_setup = sum(1 for x in upstream_tasks.values() if x.is_setup)
else:
task_id_counts = session.execute(
select(TaskInstance.task_id, func.count(TaskInstance.task_id))
.where(TaskInstance.dag_id == ti.dag_id, TaskInstance.run_id == ti.run_id)
.where(or_(*_iter_upstream_conditions(relevant_tasks=upstream_tasks)))
.group_by(TaskInstance.task_id)
).all()
upstream = sum(count for _, count in task_id_counts)
upstream_setup = sum(c for t, c in task_id_counts if upstream_tasks[t].is_setup)

When this code is running for ti being imawake's task instance, task_id_counts is set to 0 by that query, and then upstream is also set to 0, which is completely wrong. It should actually be set to 1, because it has 1 upstream task, which is imsleepy.

Since imsleepy task instance's trigger rule is NONE_FAILED_MIN_ONE_SUCCESS and upstream is set to 0, the following condition on line 403 is met (because skipped is also 0, since no upstream task was skipped), and the imawake task instance will have its state changed to SKIPPED even before being expanded (unwanted behavior).

elif trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS:
if upstream_failed or failed:
new_state = TaskInstanceState.UPSTREAM_FAILED
elif skipped == upstream:
new_state = TaskInstanceState.SKIPPED

Now, why did that query return 0? Because the call to _iter_upstream_conditions returned condition task_id == imsleepy and map_index < 0 and, at this moment, there is no TaskInstance from task imsleepy with map_index < 0 because it was expanded into three task instances with map_index 0, 1 and 2.

So the ROOT CAUSE is: the Database is updated with imsleepy expansion data (otherwise, that query above would not result in 0), but the program does not identify that! In this example, inside the _iter_upstream_conditions, TaskInstance.get_relevant_upstream_map_indexes is called and it returns -1, which makes the condition mentioned above be returned by _iter_upstream_conditions.

Possible solution

  1. Insert a specific (not sure which) condition in the TaskInstance.get_relevant_upstream_map_indexes method to cover this edge case.

I invite all of you to review and question this explanation to guarantee it is correct, and also, if anyone has suggestions for solutions, they are very welcome!

@shahar1
Copy link
Contributor

shahar1 commented Jul 8, 2024

I'd just like to point out that this issue seems related to #34023 which I'm trying to tackle (for now, without much success).
@mateuslatrova - if you're still working on this issue, I'd be happy if you could also take a look there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants