Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed mapped tasks block all_success trigger rule #33164

Closed
1 of 2 tasks
tanelk opened this issue Aug 7, 2023 · 5 comments
Closed
1 of 2 tasks

Removed mapped tasks block all_success trigger rule #33164

tanelk opened this issue Aug 7, 2023 · 5 comments
Labels
area:core area:dynamic-task-mapping AIP-42 Can't Reproduce The problem cannot be reproduced kind:bug This is a clearly a bug priority:medium Bug that should be fixed before next release but would not block a release

Comments

@tanelk
Copy link
Contributor

tanelk commented Aug 7, 2023

Apache Airflow version

2.6.3

What happened

When rerunning a DAG run with dynamically mapped tasks and the number of mapped task instances degreases, then downstream tasks with all_success trigger rule (likely some others as well) will not get scheduled, because removed status is not considered to be successful. When a regular task gets removed, then this does not happen, because it will get removed from the DAG structure.

Task dependency message
Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 2 non-success(es). upstream_states=_UpstreamTIStates(success=22, skipped=0, failed=0, upstream_failed=0, removed=2, done=24), upstream_task_ids={'upstream_task'}

Previously you could forcefully run it with the "run" button in UI, but it has been removed in one of the resent releases.

Luckily I could delete the removed TIs and then the downstream tasks got rescheduled, but this is not "scalable".

What you think should happen instead

Removed TIs should not block any trigger rules from getting executed.

How to reproduce

Operating System

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@tanelk tanelk added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Aug 7, 2023
@dzhigimont
Copy link
Contributor

@tanelk Could you please provide an example of DAG?

@RNHTTR
Copy link
Contributor

RNHTTR commented Aug 8, 2023

Can you please provide reproduction steps?

Also, what do you mean by the following:

the number of mapped task instances decreases

How are the number of mapped task instances decreasing during a DAG run?

@tanelk
Copy link
Contributor Author

tanelk commented Aug 9, 2023

Example DAG

from airflow import DAG
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator

with DAG(
    'removed_mapped_tasks',
    schedule='@daily',
    start_date=pendulum.DateTime(2023, 8, 7),
) as dag:

    @task
    def gen_elements():
        return [1, 2, 3]

    @task
    def mapped_task(element):
        return element * 2

    mapped_task.expand(element=gen_elements()) >> EmptyOperator(task_id='end')

Let it complete and then return one less element from the gen_elements task. Then clear the last DAG run.

The end task will not get scheduled because Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=2, skipped=0, failed=0, upstream_failed=0, removed=1, done=3), upstream_task_ids={'mapped_task'}

In this very simple DAG, the run will be failed with scheduler | [2023-08-09T13:49:56.448+0300] {dagrun.py:651} ERROR - Task deadlock (no runnable tasks); marking run <DagRun removed_mapped_tasks @ 2023-08-06 21:00:00+00:00: scheduled__2023-08-06T21:00:00+00:00, state:running, queued_at: 2023-08-09 10:49:44.110567+00:00. externally triggered: False> failed

On more complex structures the deadlock detection might not kick in.

@shahar1
Copy link
Contributor

shahar1 commented Jul 28, 2024

Example DAG

from airflow import DAG
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator

with DAG(
    'removed_mapped_tasks',
    schedule='@daily',
    start_date=pendulum.DateTime(2023, 8, 7),
) as dag:

    @task
    def gen_elements():
        return [1, 2, 3]

    @task
    def mapped_task(element):
        return element * 2

    mapped_task.expand(element=gen_elements()) >> EmptyOperator(task_id='end')

Let it complete and then return one less element from the gen_elements task. Then clear the last DAG run.

The end task will not get scheduled because Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=2, skipped=0, failed=0, upstream_failed=0, removed=1, done=3), upstream_task_ids={'mapped_task'}

In this very simple DAG, the run will be failed with scheduler | [2023-08-09T13:49:56.448+0300] {dagrun.py:651} ERROR - Task deadlock (no runnable tasks); marking run <DagRun removed_mapped_tasks @ 2023-08-06 21:00:00+00:00: scheduled__2023-08-06T21:00:00+00:00, state:running, queued_at: 2023-08-09 10:49:44.110567+00:00. externally triggered: False> failed

On more complex structures the deadlock detection might not kick in.

Example DAG

from airflow import DAG
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator

with DAG(
    'removed_mapped_tasks',
    schedule='@daily',
    start_date=pendulum.DateTime(2023, 8, 7),
) as dag:

    @task
    def gen_elements():
        return [1, 2, 3]

    @task
    def mapped_task(element):
        return element * 2

    mapped_task.expand(element=gen_elements()) >> EmptyOperator(task_id='end')

Let it complete and then return one less element from the gen_elements task. Then clear the last DAG run.

The end task will not get scheduled because Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_states=_UpstreamTIStates(success=2, skipped=0, failed=0, upstream_failed=0, removed=1, done=3), upstream_task_ids={'mapped_task'}

In this very simple DAG, the run will be failed with scheduler | [2023-08-09T13:49:56.448+0300] {dagrun.py:651} ERROR - Task deadlock (no runnable tasks); marking run <DagRun removed_mapped_tasks @ 2023-08-06 21:00:00+00:00: scheduled__2023-08-06T21:00:00+00:00, state:running, queued_at: 2023-08-09 10:49:44.110567+00:00. externally triggered: False> failed

On more complex structures the deadlock detection might not kick in.

I tried to run this example with the v2.9.3 (first run with original DAG, remove one element, and clear one DAG run) and I didn't manage to reproduce it - all tasks succeeded.
I'm closing this issue, please create a new one if you encounter this again in versions 2.9.3+ (while providing reproducible examples).

@shahar1 shahar1 closed this as not planned Won't fix, can't repro, duplicate, stale Jul 28, 2024
@shahar1 shahar1 added the Can't Reproduce The problem cannot be reproduced label Jul 28, 2024
@nick-brady
Copy link

nick-brady commented Aug 27, 2024

I don't have time to make a reproducible example, but this happened to me on Airflow 2.6.2. I manually delete the removed task for the time being until I can try this on a newer version

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core area:dynamic-task-mapping AIP-42 Can't Reproduce The problem cannot be reproduced kind:bug This is a clearly a bug priority:medium Bug that should be fixed before next release but would not block a release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants