Skip to content

Fix refresh_from_task not applied on retries#65932

Open
aeroyorch wants to merge 2 commits into
apache:mainfrom
aeroyorch:fix-retry-refresh-from-task
Open

Fix refresh_from_task not applied on retries#65932
aeroyorch wants to merge 2 commits into
apache:mainfrom
aeroyorch:fix-retry-refresh-from-task

Conversation

@aeroyorch
Copy link
Copy Markdown
Contributor

@aeroyorch aeroyorch commented Apr 27, 2026

Summary

task_instance_mutation_hook and dynamic PriorityWeightStrategy
subclasses are not re-applied on natural retries: refresh_from_task
is missing from the UP_FOR_RETRY → SCHEDULED transition.

This adds it to DagRun.schedule_tis against the new computed
try_number. First attempts and UP_FOR_RESCHEDULE are unaffected,
per-TI UPDATEs only fire when field values actually change.

Related: #32452, #32471, #20143


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Claude Opus 4.7 for code planning and implementing the new tests.


Verification

Setup

Mutation hook ($AIRFLOW_HOME/plugins/airflow_local_settings.py):

from __future__ import annotations

import logging

from airflow.models.taskinstance import TaskInstance

log = logging.getLogger(__name__)
log.warning("[mutation-hook-demo] airflow_local_settings.py LOADED")


def task_instance_mutation_hook(task_instance: TaskInstance) -> None:
    log.warning(
        "[mutation-hook-demo] hook fired: task=%s try=%s queue_before=%s",
        task_instance.task_id,
        task_instance.try_number,
        task_instance.queue,
    )
    if (task_instance.try_number or 0) >= 1:
        task_instance.queue = "recovery"

Demo DAG:

from __future__ import annotations

from datetime import timedelta

from decreasing_priority_plugin import DecreasingPriorityStrategy

from airflow.exceptions import AirflowException
from airflow.operators.python import PythonOperator
from airflow.sdk import DAG
from airflow.utils import timezone


def fails_until_last_try(**context) -> str:
    ti = context["ti"]
    if ti.try_number < 3:
        raise AirflowException(f"forced failure on try {ti.try_number}")
    return "recovered"


with DAG(
    dag_id="example_mutation_hook_retry_queue",
    schedule=None,
    start_date=timezone.datetime(2026, 1, 1),
    catchup=False,
    tags=["mutation_hook_demo"],
):
    PythonOperator(
        task_id="fail_until_last_try",
        python_callable=fails_until_last_try,
        queue="default",
        retries=2,
        retry_delay=timedelta(seconds=5),
        weight_rule=DecreasingPriorityStrategy(),
    )

Result

try queue priority_weight
1 default 9
2 recovery 8
3 recovery 7

Try 1:

image

Try 2:

image

Try 3:

image

@aeroyorch aeroyorch force-pushed the fix-retry-refresh-from-task branch from b82dc12 to 79efe8d Compare April 27, 2026 08:11
@aeroyorch aeroyorch marked this pull request as ready for review April 27, 2026 09:26
@aeroyorch aeroyorch requested review from XD-DENG and ashb as code owners April 27, 2026 09:26
@aeroyorch aeroyorch force-pushed the fix-retry-refresh-from-task branch 2 times, most recently from e4ca16c to 3a98127 Compare May 2, 2026 19:46
Comment thread airflow-core/src/airflow/models/dagrun.py Outdated
Comment thread airflow-core/docs/administration-and-deployment/cluster-policies.rst Outdated
Copy link
Copy Markdown
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is "too specialized" and the logic about why it only applies to specific fields or in specific cases is not clear to me that it is the behaviour we want.

@aeroyorch
Copy link
Copy Markdown
Contributor Author

This is "too specialized" and the logic about why it only applies to specific fields or in specific cases is not clear to me that it is the behaviour we want.

This is "too specialized" and the logic about why it only applies to specific fields or in specific cases is not clear to me that it is the behaviour we want.

First of all, thanks for the review!

I agree that the current approach is too specialized. The intent behind limiting the fields was to "optimize" the update so that we only refreshed the values that can be changed internally by refresh_from_task.

That said, the desired fix in this PR is to correctly update fields such as priority, queue, etc., which, according to the Airflow docs and existing examples, can be changed per retry attempt. In fact, our specific use case is to use a different queue for retries, backed by KEDA autoscaling through the official Helm chart, which uses the queue SQL column to autoscale groups of workers.

So my understanding is that dynamically modifying some scheduling options per try is still expected Airflow behavior, and not something considered legacy or unintended. Is that correct? I just want to confirm that I am not trying to fix behavior that we actually want to move away from.

@ashb
Copy link
Copy Markdown
Member

ashb commented May 5, 2026

So my understanding is that dynamically modifying some scheduling options per try is still expected Airflow behavior, and not something considered legacy or unintended. Is that correct? I just want to confirm that I am not trying to fix behavior that we actually want to move away from.

Yeah, you are correct - it's documented somewhere (or at least not unheard of in the wild) to do things like "on the second attempt, put this in to a high priority queue" etc. And yes, we want to continue supporting that.

@aeroyorch
Copy link
Copy Markdown
Contributor Author

So my understanding is that dynamically modifying some scheduling options per try is still expected Airflow behavior, and not something considered legacy or unintended. Is that correct? I just want to confirm that I am not trying to fix behavior that we actually want to move away from.

Yeah, you are correct - it's documented somewhere (or at least not unheard of in the wild) to do things like "on the second attempt, put this in to a high priority queue" etc. And yes, we want to continue supporting that.

Ok, thanks.

One additional follow-up: with refresh_from_task now running at the retry transition, a buggy hook or PriorityWeightStrategy will raise and wedge the retry. Worth catching and sending the TI to FAILED with a loud log, or does a different approach make more sense?

@aeroyorch aeroyorch force-pushed the fix-retry-refresh-from-task branch 3 times, most recently from 8c259ac to ddc6130 Compare May 5, 2026 15:01
@potiuk
Copy link
Copy Markdown
Member

potiuk commented May 5, 2026

@aeroyorch — Your unresolved review thread(s) from @ashb appear to have been addressed (post-review commits and/or in-thread replies on every thread, with the latest commit pushed after the most recent thread). I've added the ready for maintainer review label so the PR re-enters the maintainer review queue.

@ashb — could you take another look when you have a chance? If you agree the feedback was addressed, please mark the threads as resolved so the queue signal stays accurate. If a thread still needs work, please reply in-line — @aeroyorch will follow up.


Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.

@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label May 5, 2026
@aeroyorch aeroyorch force-pushed the fix-retry-refresh-from-task branch from ddc6130 to 7abdd61 Compare May 6, 2026 05:26
@aeroyorch aeroyorch requested a review from ashb May 6, 2026 15:30
@aeroyorch aeroyorch force-pushed the fix-retry-refresh-from-task branch 2 times, most recently from 47d84b1 to 1c1a430 Compare May 12, 2026 04:43
@aeroyorch aeroyorch force-pushed the fix-retry-refresh-from-task branch from 1c1a430 to 0ea03d3 Compare May 15, 2026 05:13
@aeroyorch aeroyorch force-pushed the fix-retry-refresh-from-task branch from 0ea03d3 to 5720871 Compare May 19, 2026 06:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants