Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger Rule ONE_FAILED does not work in task group with mapped tasks #34023

Open
1 of 2 tasks
benbuckman opened this issue Sep 1, 2023 · 15 comments · Fixed by #34337 · May be fixed by #36462 or #40460
Open
1 of 2 tasks

Trigger Rule ONE_FAILED does not work in task group with mapped tasks #34023

benbuckman opened this issue Sep 1, 2023 · 15 comments · Fixed by #34337 · May be fixed by #36462 or #40460
Assignees
Labels
affected_version:2.7 Issues Reported for 2.7 area:core area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug

Comments

@benbuckman
Copy link

benbuckman commented Sep 1, 2023

Apache Airflow version

2.7.0

What happened

I have the following DAG:

from __future__ import annotations
from datetime import datetime

from airflow.decorators import dag, task, task_group
from airflow.utils.trigger_rule import TriggerRule

@task
def get_records() -> list[str]:
    return ["a", "b", "c"]


@task
def submit_job(record: str) -> None:
    pass

@task
def fake_sensor(record: str) -> bool:
    raise RuntimeError("boo")


@task
def deliver_record(record: str) -> None:
    pass


@task(trigger_rule=TriggerRule.ONE_FAILED)
def handle_failed_delivery(record: str) -> None:
    pass


@task_group(group_id="deliver_records")
def deliver_record_task_group(record: str):
    (
        submit_job(record=record)
        >> fake_sensor(record=record)
        >> deliver_record(record=record)
        >> handle_failed_delivery(record=record)
    )

@dag(
    dag_id="demo_trigger_one_failed",
    schedule=None,
    start_date=datetime(2023, 1, 1),
)
def demo_trigger_one_failed() -> None:
    records = get_records()
    deliver_record_task_group.expand(record=records)


demo_trigger_one_failed()
  • fake_sensor is simulating a task that raises an exception. (It could be a @task.sensor raising a AirflowSensorTimeout; it doesn't matter, the behavior is the same.)
  • handle_failed_delivery's TriggerRule.ONE_FAILED means it is supposed to run whenever any task upstream fails. So when fake_sensor fails, handle_failed_delivery should run.

But this does not work. handle_failed_delivery is skipped, and (based on the UI) it's skipped very early, before it can know if the upstream tasks have completed successfully or errored.

Here's what I see, progressively (see How to reproduce below for how I got this):

started ... skipped too early ... fake sensor about to fail... ... done, didn't run
Screenshot 2023-09-01 at 3 26 49 PM Screenshot 2023-09-01 at 3 26 50 PM Screenshot 2023-09-01 at 3 26 53 PM Screenshot 2023-09-01 at 3 26 56 PM

If I remove the task group and instead do,

@dag(
    dag_id="demo_trigger_one_failed",
    schedule=None,
    start_date=datetime(2023, 1, 1),
)
def demo_trigger_one_failed() -> None:
    records = get_records()
    (
        submit_job(record=records)
        >> fake_sensor.expand(record=records)
        >> deliver_record.expand(record=records)
        >> handle_failed_delivery.expand(record=records)
    )

then it does the right thing:

started ... waiting ... ... done, triggered correctly
Screenshot 2023-09-01 at 3 46 48 PM Screenshot 2023-09-01 at 3 46 50 PM Screenshot 2023-09-01 at 3 46 53 PM

What you think should happen instead

The behavior with the task group should be the same as without the task group: the handle_failed_delivery task with trigger_rule=TriggerRule.ONE_FAILED should be run when the upstream fake_sensor task fails.

How to reproduce

  1. Put the above DAG at a local path, /tmp/dags/demo_trigger_one_failed.py.

  2. docker run -it --rm --mount type=bind,source="/tmp/dags",target=/opt/airflow/dags -p 8080:8080 apache/airflow:2.7.0-python3.10 bash

  3. In the container:

    airflow db init
    airflow users create --role Admin --username airflow --email airflow --firstname airflow --lastname airflow --password airflow
    airflow scheduler --daemon
    airflow webserver
    
  4. Open http://localhost:8080 on the host. Login with airflow / airflow. Run the DAG.

I tested this with:

  • apache/airflow:2.6.2-python3.10
  • apache/airflow:2.6.3-python3.10
  • apache/airflow:2.7.0-python3.10

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

n/a

Deployment

Other Docker-based deployment

Deployment details

This can be reproduced using standalone Docker images, see Repro steps above.

Anything else

I wonder if this is related to (or fixed by?) #33446 -> #33732 ? (The latter was "added to the Airflow 2.7.1 milestone 3 days ago." I can try to install that pre-release code in the container and see if it's fixed.)
edit: nope, not fixed

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@benbuckman benbuckman added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Sep 1, 2023
@benbuckman
Copy link
Author

I was curious if #33732 (fe27031) fixes the same issue I'm describing here. That fix is on main.

  1. In my git clone of this repo,
git checkout main
git pull

(I'm at 3ae6b4e86fe807c00bd736c59df58733df2b9bf9)

docker build . -f Dockerfile --pull --tag airflow-trigger-rule-test:0.0.1
docker run -it --rm --mount type=bind,source="/tmp/dags",target=/opt/airflow/dags -p 8080:8080 airflow-trigger-rule-test:0.0.1 bash
  1. "In the container:" step 3 from above, run the DAG ...

Nope – it exhibits the same incorrect behavior of skipping handle_failed_delivery before the task group has finished, and not respecting the trigger_rule.

@RNHTTR RNHTTR added area:dynamic-task-mapping AIP-42 affected_version:2.7 Issues Reported for 2.7 and removed needs-triage label for new issues that we didn't triage yet labels Sep 2, 2023
@RNHTTR
Copy link
Collaborator

RNHTTR commented Sep 2, 2023

Seems as though it only affects mapped tasks. That is, it runs as expected if you replace deliver_record_task_group.expand(record=records) with deliver_record_task_group(record=records).

@RNHTTR
Copy link
Collaborator

RNHTTR commented Sep 2, 2023

Curiously, the GANTT chart shows the task group has been queued for 24 hours

image

@RNHTTR
Copy link
Collaborator

RNHTTR commented Sep 2, 2023

Like @benbuckman reported, the task was skipped before submit_job or fake_sensor ran.

debug logs:

[2023-09-02T23:21:23.108+0000] {retries.py:92} DEBUG - Running SchedulerJobRunner._schedule_all_dag_runs with retries. Try 1 of 3
[2023-09-02T23:21:23.110+0000] {scheduler_job_runner.py:1485} DEBUG - DAG demo_trigger_one_failed not changed structure, skipping dagrun.verify_integrity
[2023-09-02T23:21:23.111+0000] {dagrun.py:740} DEBUG - number of tis tasks for <DagRun demo_trigger_one_failed @ 2023-09-02 23:21:21.860553+00:00: manual__2023-09-02T23:21:21.860553+00:00, state:running, queued_at: 2023-09-02 23:21:21.885385+00:00. externally triggered: True>: 6 task(s)
[2023-09-02T23:21:23.112+0000] {dagrun.py:761} DEBUG - number of scheduleable tasks for <DagRun demo_trigger_one_failed @ 2023-09-02 23:21:21.860553+00:00: manual__2023-09-02T23:21:21.860553+00:00, state:running, queued_at: 2023-09-02 23:21:21.885385+00:00. externally triggered: True>: 2 task(s)
[2023-09-02T23:21:23.116+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.116+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.116+0000] {taskinstance.py:1159} DEBUG - Dependencies all met for dep_context=None ti=<TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 [None]>
[2023-09-02T23:21:23.210+0000] {abstractoperator.py:573} DEBUG - Updated in place to become <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]>
[2023-09-02T23:21:23.215+0000] {abstractoperator.py:598} DEBUG - Expanding TIs upserted <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]>
[2023-09-02T23:21:23.218+0000] {abstractoperator.py:598} DEBUG - Expanding TIs upserted <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]>
[2023-09-02T23:21:23.224+0000] {taskinstance.py:956} DEBUG - Setting task state for <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [None]> to skipped
[2023-09-02T23:21:23.224+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [skipped]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'one_failed' requires one upstream task failure, but none were found. upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.fake_sensor'}
[2023-09-02T23:21:23.224+0000] {taskinstance.py:1149} DEBUG - Dependencies not met for <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [skipped]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'one_failed' requires one upstream task failure, but none were found. upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.fake_sensor'}
[2023-09-02T23:21:23.224+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [skipped]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.224+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.handle_failed_delivery manual__2023-09-02T23:21:21.860553+00:00 [skipped]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.226+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.226+0000] {taskinstance.py:1149} DEBUG - Dependencies not met for <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.226+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.226+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.229+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.229+0000] {taskinstance.py:1149} DEBUG - Dependencies not met for <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.229+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.229+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.232+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]> dependency 'Trigger Rule' PASSED: False, Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.232+0000] {taskinstance.py:1149} DEBUG - Dependencies not met for <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=1, skipped=0, failed=0, upstream_failed=0, removed=0, done=1, success_setup=0, skipped_setup=0), upstream_task_ids={'get_records', 'deliver_records.submit_job'}
[2023-09-02T23:21:23.232+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-09-02T23:21:23.232+0000] {taskinstance.py:1168} DEBUG - <TaskInstance: demo_trigger_one_failed.deliver_records.fake_sensor manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [None]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-09-02T23:21:23.236+0000] {scheduler_job_runner.py:1512} DEBUG - Skipping SLA check for <DAG: demo_trigger_one_failed> because no tasks in DAG have SLAs
[2023-09-02T23:21:23.236+0000] {scheduler_job_runner.py:1504} DEBUG - callback is empty
[2023-09-02T23:21:23.241+0000] {scheduler_job_runner.py:414} INFO - 3 tasks up for execution:
	<TaskInstance: demo_trigger_one_failed.deliver_records.submit_job manual__2023-09-02T23:21:21.860553+00:00 map_index=0 [scheduled]>
	<TaskInstance: demo_trigger_one_failed.deliver_records.submit_job manual__2023-09-02T23:21:21.860553+00:00 map_index=1 [scheduled]>
	<TaskInstance: demo_trigger_one_failed.deliver_records.submit_job manual__2023-09-02T23:21:21.860553+00:00 map_index=2 [scheduled]>

@benbuckman benbuckman changed the title Trigger Rule ONE_FAILED does not work in task group Trigger Rule ONE_FAILED does not work in task group with mapped tasks Sep 2, 2023
@benbuckman
Copy link
Author

Seems as though it only affects mapped tasks.

Yes, thanks @RNHTTR for that clarification. I updated the issue title accordingly.

@ephraimbuddy ephraimbuddy self-assigned this Sep 3, 2023
@hussein-awala
Copy link
Member

@ephraimbuddy the problem is in the method get_relevant_upstream_map_indexes where when we try to get the relevant map index for upstream deliver_record and we call this method

ti.get_relevant_upstream_map_indexes(
    upstream=ti.task.dag.task_dict[upstream_id],
    ti_count=expanded_ti_count,
    session=session,
)

we call it with this values:

ti.get_relevant_upstream_map_indexes(
    upstream="deliver_records.deliver_record",
    ti_count=3, # we have 3 tis because it's a mapped task group
    session=session,
)

and this method doesn't take into account the mapped task group, so it return -1 instead of the same map index of the checked TI.

So we have two options:

  1. update get_relevant_upstream_map_indexes to make it handling mapped task groups.
  2. trying to detect that the two tasks are in a mapped task group without calling this method, in this case we can return the map index of the checked TI.

I'm already working on refactoring some queries in this class including the one which have this bug:

                task_id_counts = session.execute(
                    select(TaskInstance.task_id, func.count(TaskInstance.task_id))
                    .where(TaskInstance.dag_id == ti.dag_id, TaskInstance.run_id == ti.run_id)
                    .where(or_(*_iter_upstream_conditions(relevant_tasks=upstream_tasks)))
                    .group_by(TaskInstance.task_id)
                ).all()

@benbuckman
Copy link
Author

Thanks @hussein-awala for digging into the fix so quickly.

Something else that's worth looking into and fixing here, is why unit tests with DebugExecutor behave differently. Take this unit test for example – simplified again for demonstration:

import unittest
from datetime import datetime, timezone

from airflow.exceptions import BackfillUnfinished
from airflow.executors.debug_executor import DebugExecutor
from airflow.models import DagBag
from airflow.models.taskinstance import TaskInstance

from .demo_trigger_one_failed import demo_trigger_one_failed

class TestDemoDag(unittest.TestCase):
    def test_handle_failed_delivery(self):
        dagbag = DagBag(include_examples=False, safe_mode=False)
        demo_dag = dagbag.get_dag("demo_trigger_one_failed")
        now = datetime.now(timezone.utc)

        # We need to use the slow DebugExecutor (not `dag.test()`) to run this
        # b/c of https://github.com/apache/airflow/discussions/32831
        demo_dag.clear()
        with self.assertRaises(BackfillUnfinished):
            demo_dag.run(
                start_date=now,
                end_date=now,
                executor=DebugExecutor(),
                run_at_least_once=True,
                verbose=True,
                disable_retry=True,
            )

        downstream_task_ids = list(demo_dag.task_group_dict["deliver_records"].children.keys())
        print(f"downstream_task_ids: {downstream_task_ids}")

        task_instance_states: dict[str, str | None] = {}  # task_id => state

        for task_id in downstream_task_ids:
            # (demo simplified w/ hard-coded 0 for single mapped task)
            ti = TaskInstance(demo_dag.task_dict[task_id], execution_date=now, map_index=0)
            task_instance_states[task_id] = ti.current_state()

        print(f"task_instance_states: {task_instance_states}")

        self.assertEqual("success", task_instance_states["deliver_records.submit_job"])
        self.assertEqual("failed", task_instance_states["deliver_records.fake_sensor"])
        self.assertEqual("upstream_failed", task_instance_states["deliver_records.deliver_record"])

        # Test says this ran and succeeded - but in actual scheduler/UI,
        # that's not true!
        self.assertEqual("success", task_instance_states["deliver_records.handle_failed_delivery"])

Put that in a file test_demo_trigger_one_failed.py next to the file with the DAG above, and run it with python -m unittest path/to/test_demo_trigger_one_failed.py.

Note,

  • As commented inline, this uses DebugExecutor not dag.test() because the latter cannot test error cases.
  • The error in fake_sensor_task is in the DAG itself (contrived for demo); in a real test this would be stubbed.

task_instance_states at the end is shown to be:

task_instance_states: 
{'deliver_records.submit_job': 'success',
 'deliver_records.fake_sensor': 'failed',
 'deliver_records.deliver_record': 'upstream_failed',
 'deliver_records.handle_failed_delivery': 'success'
}

and the test passes, asserting that handle_failed_delivery succeeded. This is a misleading test output, because as we see above, in the real scheduler, handle_failed_delivery doesn't run at all! (Its ti.current_state() should be None not success.)

We have unit tests like this in our application, and were confident that our actual task with trigger_rule=ONE_FAILED would work correctly, and were very surprised when it broke in production.

In the process of fixing this with the "real" executors (SequentialExecutor in the simple demo above; KubernetesExecutor et al in a real production application), it would be great if DebugExecutor behaved at parity with the others, so users can rely on test coverage.

Thank you!

@ephraimbuddy
Copy link
Contributor

@hussein-awala thanks for debugging, I was busy with releases the past 2 days, getting to looking at it now

@ephraimbuddy
Copy link
Contributor

Ok. Found a fix for this but still need to reason more about it:

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 82282eb39d..31fc1bfa9b 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2859,7 +2859,7 @@ class TaskInstance(Base, LoggingMixin):
         # and "ti_count == ancestor_ti_count" does not work, since the further
         # expansion may be of length 1.
         if not _is_further_mapped_inside(upstream, common_ancestor):
-            return ancestor_map_index
+            return abs(ancestor_map_index)
 
         # Otherwise we need a partial aggregation for values from selected task
         # instances in the ancestor's expansion context.

@ephraimbuddy
Copy link
Contributor

Update:
This case is similar to the one I raised the PR just now: #34138

Can reproduce this in test and working on it

@ephraimbuddy
Copy link
Contributor

Ok. Found a fix for this but still need to reason more about it:

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 82282eb39d..31fc1bfa9b 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2859,7 +2859,7 @@ class TaskInstance(Base, LoggingMixin):
         # and "ti_count == ancestor_ti_count" does not work, since the further
         # expansion may be of length 1.
         if not _is_further_mapped_inside(upstream, common_ancestor):
-            return ancestor_map_index
+            return abs(ancestor_map_index)
 
         # Otherwise we need a partial aggregation for values from selected task
         # instances in the ancestor's expansion context.

Hi @uranusjr , can you chime in on this?
I can't find a way to test this but this actually solved this case for mapped taskgroup with one failed.
when we calculate ancestor_map_index, we multiply with self.map_index which can be -1. Are there reasons why we chose to not care if the map_index is negative or not?

@uranusjr
Copy link
Member

uranusjr commented Sep 8, 2023

The variable comes from here:

ancestor_ti_count = common_ancestor.get_mapped_ti_count(self.run_id, session=session)
ancestor_map_index = self.map_index * ancestor_ti_count // ti_count

So there are only two scenarios abs would make a difference:

  1. Either ancestor_ti_count or ti_count is negative
  2. self.map_index is negative

The first point seems unlikely since those both count something and should be a non-negative integer. Of course it’s possible we implemented something wrong, but in that case that bug should be fixed so the two values are always either zero or positive.

The second is more viable, and the only possible negative map_index value is -1, i.e. if the task instance is not expanded. This in turn is not right since this function should only be called on a mapped task instance in the first place. A task instance with map index of -1 should not appear here. And if it does, making -1 become 1 does not make sense as a fix.

@benbuckman
Copy link
Author

Any updates on this?

@ephraimbuddy
Copy link
Contributor

Any updates on this?

The PR is awaiting reviews

ephraimbuddy added a commit to astronomer/airflow that referenced this issue Nov 1, 2023
Getting the relevant upstream indexes of a task instance in a mapped task group
should only be done when the task has expanded. If the task has not expanded yet,
we should return None so that the task can wait for the upstreams before trying
to run.
This issue is more noticeable when the trigger rule is ONE_FAILED because then,
the task instance is marked as SKIPPED.
This commit fixes this issue.
closes: apache#34023
ephraimbuddy added a commit that referenced this issue Nov 1, 2023
* Fix pre-mature evaluation of tasks in mapped task group

Getting the relevant upstream indexes of a task instance in a mapped task group
should only be done when the task has expanded. If the task has not expanded yet,
we should return None so that the task can wait for the upstreams before trying
to run.
This issue is more noticeable when the trigger rule is ONE_FAILED because then,
the task instance is marked as SKIPPED.
This commit fixes this issue.
closes: #34023

* fixup! Fix pre-mature evaluation of tasks in mapped task group

* fixup! fixup! Fix pre-mature evaluation of tasks in mapped task group

* fixup! fixup! fixup! Fix pre-mature evaluation of tasks in mapped task group

* Fix tests
ephraimbuddy added a commit that referenced this issue Nov 1, 2023
* Fix pre-mature evaluation of tasks in mapped task group

Getting the relevant upstream indexes of a task instance in a mapped task group
should only be done when the task has expanded. If the task has not expanded yet,
we should return None so that the task can wait for the upstreams before trying
to run.
This issue is more noticeable when the trigger rule is ONE_FAILED because then,
the task instance is marked as SKIPPED.
This commit fixes this issue.
closes: #34023

* fixup! Fix pre-mature evaluation of tasks in mapped task group

* fixup! fixup! Fix pre-mature evaluation of tasks in mapped task group

* fixup! fixup! fixup! Fix pre-mature evaluation of tasks in mapped task group

* Fix tests

(cherry picked from commit 69938fd)
romsharon98 pushed a commit to romsharon98/airflow that referenced this issue Nov 10, 2023
* Fix pre-mature evaluation of tasks in mapped task group

Getting the relevant upstream indexes of a task instance in a mapped task group
should only be done when the task has expanded. If the task has not expanded yet,
we should return None so that the task can wait for the upstreams before trying
to run.
This issue is more noticeable when the trigger rule is ONE_FAILED because then,
the task instance is marked as SKIPPED.
This commit fixes this issue.
closes: apache#34023

* fixup! Fix pre-mature evaluation of tasks in mapped task group

* fixup! fixup! Fix pre-mature evaluation of tasks in mapped task group

* fixup! fixup! fixup! Fix pre-mature evaluation of tasks in mapped task group

* Fix tests
ahidalgob pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue May 15, 2024
* Fix pre-mature evaluation of tasks in mapped task group

Getting the relevant upstream indexes of a task instance in a mapped task group
should only be done when the task has expanded. If the task has not expanded yet,
we should return None so that the task can wait for the upstreams before trying
to run.
This issue is more noticeable when the trigger rule is ONE_FAILED because then,
the task instance is marked as SKIPPED.
This commit fixes this issue.
closes: apache/airflow#34023

* fixup! Fix pre-mature evaluation of tasks in mapped task group

* fixup! fixup! Fix pre-mature evaluation of tasks in mapped task group

* fixup! fixup! fixup! Fix pre-mature evaluation of tasks in mapped task group

* Fix tests

(cherry picked from commit 69938fd163045d750b8c218500d79bc89858f9c1)

GitOrigin-RevId: e6662d0a59ea5277e0244bdae48cd670aa90bf95
@shahar1
Copy link
Collaborator

shahar1 commented Jun 15, 2024

I'm reopening this issue as the original PR (#34337) that should have solved it was reverted, and no other fix has been merged since then.
I'd like to take a chance to improve @ephraimbuddy 's fix so issues like #35541 won't reoccur.

@shahar1 shahar1 reopened this Jun 15, 2024
@shahar1 shahar1 self-assigned this Jun 15, 2024
shahar1 pushed a commit to shahar1/airflow that referenced this issue Jun 27, 2024
fixup! fixup! fixup! Fix pre-mature evaluation of tasks in mapped task group

fixup! fixup! Fix pre-mature evaluation of tasks in mapped task group

fixup! Fix pre-mature evaluation of tasks in mapped task group

Fix pre-mature evaluation of tasks in mapped task group

Getting the relevant upstream indexes of a task instance in a mapped task group
should only be done when the task has expanded. If the task has not expanded yet,
we should return None so that the task can wait for the upstreams before trying
to run.
This issue is more noticeable when the trigger rule is ONE_FAILED because then,
the task instance is marked as SKIPPED.
This commit fixes this issue.
closes: apache#34023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment