Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic task mapping does not correctly handle depends_on_past #28296

Closed
1 of 2 tasks
internetcoffeephone opened this issue Dec 12, 2022 · 2 comments · Fixed by #28379
Closed
1 of 2 tasks

Dynamic task mapping does not correctly handle depends_on_past #28296

internetcoffeephone opened this issue Dec 12, 2022 · 2 comments · Fixed by #28379
Assignees
Labels
affected_version:2.4 Issues Reported for 2.4 area:core area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug

Comments

@internetcoffeephone
Copy link
Contributor

internetcoffeephone commented Dec 12, 2022

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

Using Airflow 2.4.2.

I've got a task that retrieves some filenames, which then creates dynamically mapped tasks to move the files, one per task.
I'm using a similar task across multiple DAGs. However, task mapping fails on some DAG runs: it inconsistently happens per DAG run, and some DAGs do not seem to be affected at all. These seem to be the DAGs where no task was ever mapped, so that the mapped task instance ended up in a Skipped state.

What happens is that multiple files will be found, but only a single dynamically mapped task will be created. This task never starts and has map_index of -1. It can be found under the "List instances, all runs" menu, but says "No Data found." under the "Mapped Tasks" tab.

When I press the "Run" button when the mapped task is selected, the following error appears:

Could not queue task instance for execution, dependencies not met: Previous Dagrun State: depends_on_past is true for this task's DAG, but the previous task instance has not run yet., Task has been mapped: The task has yet to be mapped!

The previous task has run however. No errors appeared in my Airflow logs.

What you think should happen instead

The appropriate amount of task instances should be created, they should correctly resolve the depends_on_past check and then proceed to run correctly.

How to reproduce

This DAG reliably reproduces the error for me. The first set of mapped tasks succeeds, the subsequent ones do not.

from airflow import DAG
from airflow.decorators import task
import datetime as dt

from airflow.operators.python import PythonOperator

@task
def get_filenames_kwargs():
    return [
        {"file_name": i}
        for i in range(10)
    ]

def print_filename(file_name):
    print(file_name)

with DAG(
        dag_id="dtm_test",
        start_date=dt.datetime(2022, 12, 10),
        default_args={
            "owner": "airflow",
            "depends_on_past": True,
        },
        schedule="@daily",
) as dag:
    get_filenames_task = get_filenames_kwargs.override(task_id="get_filenames_task")()

    print_filename_task = PythonOperator.partial(
        task_id="print_filename_task",
        python_callable=print_filename,
    ).expand(op_kwargs=get_filenames_task)

    # Perhaps redundant
    get_filenames_task >> print_filename_task

Operating System

Amazon Linux 2

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@internetcoffeephone internetcoffeephone added area:core kind:bug This is a clearly a bug labels Dec 12, 2022
@uranusjr uranusjr changed the title Dynamic Task Mapping task does not start because of the error: "Task has been mapped: The task has yet to be mapped!" Dynamic Task Mapping task errors with depends_on_pask set Dec 12, 2022
@uranusjr uranusjr changed the title Dynamic Task Mapping task errors with depends_on_pask set Dynamic task mapping does not correctly handle depends_on_past Dec 12, 2022
@uranusjr uranusjr self-assigned this Dec 12, 2022
@internetcoffeephone
Copy link
Contributor Author

@uranusjr Just tested this with Airflow 2.5.1, the error still appears using the above script. I'm a bit confused here, since it did appear to work for 2.5.1rc1. I may have made a mistake while testing 2.5.1rc1.

@internetcoffeephone
Copy link
Contributor Author

In addition to the above error, a different error now also appears on older DAGs that newly include a mapped task. The mapped tasks are created in a sense, but their status remains on "no status". When clicking the "Run" button, the following error appears:

Could not queue task instance for execution, task instance is missing

I'm also observing high metadata database CPU and the scheduler slows to a crawl, such that other tasks take several minutes to be scheduled. I'm guessing the scheduler doesn't quite know what to make of these (several thousand) broken tasks and keeps looping over them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.4 Issues Reported for 2.4 area:core area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants