Skip to content

Endless sensor rescheduling if the first sensor run failed to be saved in DB #45050

@morooshka

Description

@morooshka

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.10.3

What happened?

Endless sensor rescheduling happening in reschedule mode if the first sensor run failed to be saved in DB.

  1. The sensor was configured in reschedule mode:
@task.sensor(
    task_id="sensor-s3-version",
    poke_interval=5 * 60,
    timeout=50 * 60,
    mode="reschedule",
    soft_fail=True,
)
def sensor_s3_version(connection_id: str, artefact: str) -> PokeReturnValue:
    ...

  1. First sensor run failed because of Postgre session terminated and as a result no entry was added to the task_reschedule table:
...
[2024-12-18, 11:41:11 UTC] {taskinstance.py:340} ▼ Post task execution logs
[2024-12-18, 11:41:11 UTC] {standard_task_runner.py:124} ERROR - Failed to execute job 71468 for task sensor-s3-version ((psycopg2.OperationalError) server closed the connection unexpectedly
	This probably means the server terminated abnormally
	before or while processing the request.
...
  1. Because of no record in the task_reschedule table for the try_number == 1 condition the code here returns no data:
    ...
    return session.scalar(
        select(TaskReschedule)
        .where(
            TaskReschedule.dag_id == dag_id,
            TaskReschedule.task_id == task_id,
            TaskReschedule.run_id == run_id,
            TaskReschedule.map_index == map_index,
            TaskReschedule.try_number == try_number,
        )
        .order_by(TaskReschedule.id.asc())
        .with_only_columns(TaskReschedule.start_date)
        .limit(1)
    )

  1. If no data received, the code here assigns the start_date to current system date and the cycle of rescheduling never ends:
            ...
            if not start_date:
                start_date = timezone.utcnow()

  1. As a result I have a task which lasts for 6 hours (with maximum 1 hour set) since the moment I've started debugging:

image

What you think should happen instead?

In that case we should try to find the first available task_reschedule record after the initially needed try number, it can be done easily by modifying condition from:

TaskReschedule.try_number == first_try_number

to:

TaskReschedule.try_number >= first_try_number

Thanks to sorting order_by(TaskReschedule.id.asc()) and limt limit(1) statements we would select the first record for a try next to initially needed.

If there are no records at all, we would get timezone.utcnow() by already implemented logic

How to reproduce

Fail the first try of a sensor and delete the records in the task_reschedule table for the first try group

Operating System

Airflow official docker apache/airflow:2.9.3-python3.8: Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions