Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class TriggerDAGRunPayload(StrictBaseModel):
"""Schema for Trigger DAG Run API request."""

logical_date: UtcDateTime | None = None
run_after: UtcDateTime | None = None
conf: dict = Field(default_factory=dict)
reset_dag_run: bool = False
partition_key: str | None = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def trigger_dag_run(
run_type=DagRunType.OPERATOR_TRIGGERED,
conf=payload.conf,
logical_date=payload.logical_date,
run_after=payload.run_after,
triggered_by=DagRunTriggeredByType.OPERATOR,
replace_microseconds=False,
partition_key=payload.partition_key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@
MovePreviousRunEndpoint,
RemoveUpstreamMapIndexesField,
)
from airflow.api_fastapi.execution_api.versions.v2026_04_17 import AddTeamNameField
from airflow.api_fastapi.execution_api.versions.v2026_04_17 import (
AddRunAfterToTriggerPayload,
AddTeamNameField,
)

bundle = VersionBundle(
HeadVersion(),
Version(
"2026-04-17",
AddTeamNameField,
AddRunAfterToTriggerPayload,
),
Version(
"2026-04-06",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema

from airflow.api_fastapi.execution_api.datamodels.dagrun import TriggerDAGRunPayload
from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun, TIRunContext


Expand All @@ -34,3 +35,13 @@ def remove_team_name_field(response: ResponseInfo) -> None: # type: ignore[misc
"""Remove the ``team_name`` field from dag_run for older API versions."""
if "dag_run" in response.body and isinstance(response.body["dag_run"], dict):
response.body["dag_run"].pop("team_name", None)


class AddRunAfterToTriggerPayload(VersionChange):
"""Add the ``run_after`` field to TriggerDAGRunPayload."""

description = __doc__

instructions_to_migrate_to_previous_version = (
schema(TriggerDAGRunPayload).field("run_after").didnt_exist,
)
6 changes: 5 additions & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2421,7 +2421,11 @@ def _schedule_dag_run(
)
return callback_to_execute

if dag_run.logical_date and dag_run.logical_date > timezone.utcnow():
if (
dag_run.logical_date
and dag_run.logical_date > timezone.utcnow()
and dag_run.run_type not in (DagRunType.MANUAL, DagRunType.OPERATOR_TRIGGERED)
):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the only place where logical_date is being used as an invariant. If you look at other parts of scheduler_job_runner.py (specifically _executable_task_instances_to_queued), you will see that task instances are ordered by logical_date before starvation filters are applied. This means that if we allow a manually triggered run with a future logical_date to start executing early, it can still be deprioritized relative to earlier logical dates during scheduling.

So instead of being fully unblocked, these runs may end up in a state where they start but then make slower or inconsistent progress depending on what other runs exist. More generally, we now allow execution to violate logical ordering, while the scheduler still uses logical_date to enforce it.

Copy link
Copy Markdown
Author

@ZhaoMJ ZhaoMJ Apr 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback, interesting observation! I haven't sought alignment from the maintainers in this regard. From my (limited) understanding, the direction is to pivot from logical_date to run_after entirely in scheduling, but I think it's a separate concern from this PR.

The goal of this PR is to add a non-breaking and non-confusing option to allow immediate execution of future logical date. I'm afraid changing TI scheduling order to use run_after instead of logical_date would affect existing workflows, and would be too impactful, risky and breaking.

The mismatch between run_after affecting dagrun scheduling decisions and logical_date affecting task prioritization already exists today — I'm just extending it to future logical dates.

That said, decoupling logical_date from scheduling and execution semantics feels like a separate meaningful discussion if the community wants to pursue it, and I'm happy to follow up on it.


Regarding the specific code here — I'm not quite sure how this changes anything regarding logical ordering violation. I believe you can already specify mismatched run_after and logical_date using the API today and potentially allow a dagrun with an earlier logical_date to execute later (than another one with a later logical_date but an earlier run_after.)

Besides, per our current definition of logical_date:

A date-time that logically identifies the current Dag run. This value does not contain any semantics, but is simply a value for identification.

I'd argue that the "logical" is really to the user rather than to Airflow internals, and it shouldn't be applied to ordering if not for backward compatibility reasons.

That said, looking at this part again and considering that run_after is defined as "A pendulum.DateTime instance that tells the scheduler when the Dag run can be scheduled", I feel like maybe we shouldn't be checking logical_date here at all.

self.log.error("Logical date is in future: %s", dag_run.logical_date)
return callback

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from airflow._shared.timezones import timezone
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.utils.session import provide_session
from airflow.utils.types import DagRunType


class RunnableExecDateDep(BaseTIDep):
Expand All @@ -30,13 +31,17 @@ class RunnableExecDateDep(BaseTIDep):

@provide_session
def _get_dep_statuses(self, ti, session, dep_context):
logical_date = ti.get_dagrun(session).logical_date
dagrun = ti.get_dagrun(session)
logical_date = dagrun.logical_date
if logical_date is None:
return

cur_date = timezone.utcnow()

if logical_date > cur_date:
if logical_date > cur_date and dagrun.run_type not in (
DagRunType.MANUAL,
DagRunType.OPERATOR_TRIGGERED,
):
yield self._failing_status(
reason=(
f"Logical date {logical_date.isoformat()} is in the future "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@
"loading": "جارٍ تحميل معلومات Dag...",
"loadingFailed": "فشل تحميل معلومات Dag. يرجى المحاولة مرة أخرى.",
"runIdHelp": "اختياري - سيتم توليده تلقائيًا إذا لم يتم توفيره.",
"runImmediately": "تشغيل فوري (التاريخ المنطقي في المستقبل)",
"selectDescription": "تشغيل عملية واحدة من هذا Dag",
"selectLabel": "تشغيلة واحدة",
"title": "تشغيل Dag",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
"loadingFailed": "No s'ha pogut carregar la informació del Dag. Si us plau, torneu-ho a intentar.",
"manualRunDenied": "Aquest Dag no permet execucions manuals",
"runIdHelp": "Opcional - es generarà si no es proporciona",
"runImmediately": "Executar immediatament (la data lògica és en el futur)",
"selectDescription": "Executar una única execució d'aquest Dag",
"selectLabel": "Execució única",
"title": "Executar Dag",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
"loadingFailed": "Das Laden der Dag Informationen ist fehlgeschlagen. Bitte versuchen Sie es noch einmal.",
"manualRunDenied": "Manuelles Auslösen dieses Dags ist nicht erlaubt.",
"runIdHelp": "Optional - wird automatisch erzeugt wenn nicht angegeben",
"runImmediately": "Sofort ausführen (logisches Datum liegt in der Zukunft)",
"selectDescription": "Einen einzelnen Lauf dieses Dags auslösen",
"selectLabel": "Einzelner Lauf",
"title": "Dag Auslösen",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
"loading": "Φόρτωση πληροφοριών Dag...",
"loadingFailed": "Αποτυχία φόρτωσης πληροφοριών Dag. Παρακαλώ δοκιμάστε ξανά.",
"runIdHelp": "Προαιρετικό - θα δημιουργηθεί αν δεν δοθεί",
"runImmediately": "Εκτέλεση αμέσως (η λογική ημερομηνία είναι στο μέλλον)",
"selectDescription": "Ενεργοποιήστε μία εκτέλεση αυτού του Dag",
"selectLabel": "Μονή Εκτέλεση",
"title": "Ενεργοποίηση Dag",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
"loadingFailed": "Failed to load Dag information. Please try again.",
"manualRunDenied": "Manual runs are not allowed for this Dag",
"runIdHelp": "Optional - will be generated if not provided",
"runImmediately": "Run immediately (logical date is in the future)",
"selectDescription": "Trigger a single run of this Dag",
"selectLabel": "Single Run",
"title": "Trigger Dag",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
"loading": "Cargando información del Dag...",
"loadingFailed": "Error al cargar la información del Dag. Por favor, inténtelo de nuevo.",
"runIdHelp": "Opcional - se generará si no se proporciona",
"runImmediately": "Ejecutar inmediatamente (la fecha lógica está en el futuro)",
"selectDescription": "Activar una ejecución única de este Dag",
"selectLabel": "Ejecución Única",
"title": "Activar Dag",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
"loading": "Chargement des informations du Dag...",
"loadingFailed": "Échec du chargement des informations du Dag. Veuillez réessayer.",
"runIdHelp": "Optionnel – sera généré s'il n'est pas fourni",
"runImmediately": "Exécuter immédiatement (la date logique est dans le futur)",
"selectDescription": "Déclencher une exécution unique de ce Dag",
"selectLabel": "Exécution unique",
"title": "Déclencher un Dag",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
"loading": "טוען מידע Dag...",
"loadingFailed": "טעינת מידע ה-Dag נכשלה. אנא נסו שוב.",
"runIdHelp": "אופציונלי - ייווצר אוטומטית אם לא יסופק",
"runImmediately": "הפעלה מיידית (התאריך הלוגי בעתיד)",
"selectDescription": "הפעל ריצה בודדת של Dag זה",
"selectLabel": "ריצה בודדת",
"title": "הפעל Dag",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
"loading": "डैग जानकारी लोड हो रही है...",
"loadingFailed": "डैग जानकारी लोड करने में विफल। कृपया पुनः प्रयास करें।",
"runIdHelp": "वैकल्पिक - प्रदान न किए जाने पर जेनरेट किया जाएगा",
"runImmediately": "तुरंत चलाएँ (लॉजिकल तारीख भविष्य में है)",
"selectDescription": "इस डैग का एक रन ट्रिगर करें",
"selectLabel": "सिंगल रन",
"title": "डैग ट्रिगर करें",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
"loadingFailed": "A Dag információk betöltése sikertelen. Kérem, próbálja újra.",
"manualRunDenied": "Kézi futtatás nem engedélyezett ennél a Dag-nél",
"runIdHelp": "Opcionális – ha nincs megadva, automatikusan generálódik",
"runImmediately": "Azonnali futtatás (a logikai dátum a jövőben van)",
"selectDescription": "Indítsd el ennek a Dag-nek egyetlen futását",
"selectLabel": "Egyszeri futás",
"title": "Dag indítása",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
"loading": "Caricamento informazioni del Dag...",
"loadingFailed": "Impossibile caricare le informazioni del Dag. Per favore, riprova.",
"runIdHelp": "Opzionale - verrà generato se non fornito",
"runImmediately": "Esegui immediatamente (la data logica è nel futuro)",
"selectDescription": "Triggera un singolo run di questo Dag",
"selectLabel": "Singolo Run",
"title": "Trigger del Dag",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
"loading": "Dag 情報を読込中...",
"loadingFailed": "情報読込に失敗しました。もう一度試してください",
"runIdHelp": "オプション- 指定しない場合自動的に生成されます",
"runImmediately": "すぐに実行(論理日付が未来の日付です)",
"selectDescription": "この Dag の単体での実行をトリガーします",
"selectLabel": "単体での実行",
"title": "Dag のトリガー",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
"loadingFailed": "Dag 정보를 로드하지 못했습니다. 다시 시도해주세요.",
"manualRunDenied": "이 Dag는 수동으로 실행할 수 없습니다.",
"runIdHelp": "선택 사항 - 제공되지 않으면 생성됩니다.",
"runImmediately": "즉시 실행 (논리적 날짜가 미래입니다)",
"selectDescription": "이 Dag을(를) 단일 실행 트리거",
"selectLabel": "단일 실행",
"title": "Dag 트리거",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
"loadingFailed": "Mislukt om Dag informatie te laden. Probeer het opnieuw.",
"manualRunDenied": "Handmatige runs zijn niet toegestaan voor deze Dag",
"runIdHelp": "Optioneel - wordt gegenereerd indien niet opgegeven",
"runImmediately": "Direct uitvoeren (logische datum ligt in de toekomst)",
"selectDescription": "Trigger een enkele run van deze Dag",
"selectLabel": "Enkele Run",
"title": "Trigger Dag",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
"loadingFailed": "Nie udało się załadować informacji o Dagach. Spróbuj ponownie.",
"manualRunDenied": "Ręczne uruchamianie nie jest dozwolone dla tego Daga",
"runIdHelp": "Opcjonalne - zostanie wygenerowane, jeśli nie podano",
"runImmediately": "Uruchom natychmiast (data logiczna jest w przyszłości)",
"selectDescription": "Wyzwól pojedyncze wykonanie",
"selectLabel": "Pojedyncze wykonanie",
"title": "Uruchom Daga",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
"loadingFailed": "Falha ao carregar informações do Dag. Por favor, tente novamente.",
"manualRunDenied": "Execuções manuais não são permitidas para este Dag",
"runIdHelp": "Opcional - será gerado se não for fornecido",
"runImmediately": "Executar imediatamente (a data lógica está no futuro)",
"selectDescription": "Acionar uma única execução deste Dag",
"selectLabel": "Execução Única",
"title": "Acionar Dag",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
"loadingFailed": "Не удалось загрузить информацию о Dag-е. Пожалуйста, попробуйте снова.",
"manualRunDenied": "Ручные запуски не разрешены для этого Dag-а",
"runIdHelp": "Необязательно - будет сгенерирован, если не предоставлен",
"runImmediately": "Запустить немедленно (логическая дата в будущем)",
"selectDescription": "Запустить один процесс этого Dag-а",
"selectLabel": "Единичный запуск",
"title": "Запуск Dag-а",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
"loading": "กำลังโหลดข้อมูล Dag...",
"loadingFailed": "โหลดข้อมูล Dag ไม่สำเร็จ กรุณาลองใหม่",
"runIdHelp": "ไม่บังคับ - จะถูกสร้างอัตโนมัติหากไม่ระบุ",
"runImmediately": "รันทันที (วันที่ลอจิคัลอยู่ในอนาคต)",
"selectDescription": "ทริกเกอร์ Dag นี้สำหรับการทำงานครั้งเดียว",
"selectLabel": "ทำงานครั้งเดียว",
"title": "ทริกเกอร์ Dag",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
"loadingFailed": "Dag bilgisi yüklenemedi. Lütfen tekrar deneyin.",
"manualRunDenied": "El ile tetikleme bu Dag için devre dışı bırakıldı.",
"runIdHelp": "İsteğe bağlı - sağlanmazsa oluşturulacaktır",
"runImmediately": "Hemen çalıştır (mantıksal tarih gelecekte)",
"selectDescription": "Bu Dag'ın tek bir çalıştırmasını tetikle",
"selectLabel": "Tek Çalıştırma",
"title": "Dag Tetikle",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
"loadingFailed": "加载 Dag 信息失败,请重试。",
"manualRunDenied": "此 Dag 不允许手动执行",
"runIdHelp": "选填 - 若未提供将会自动生成",
"runImmediately": "立即运行(逻辑日期在未来)",
"selectDescription": "触发此 Dag 单次执行",
"selectLabel": "单次执行",
"title": "触发 Dag",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
"loadingFailed": "載入 Dag 資訊失敗,請重試。",
"manualRunDenied": "此 Dag 不允許手動執行",
"runIdHelp": "選填 - 若未提供將會自動產生",
"runImmediately": "立即執行(邏輯日期在未來)",
"selectDescription": "觸發此 Dag 單次執行",
"selectLabel": "單次執行",
"title": "觸發 Dag",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ const TriggerDAGForm = ({
logicalDate: isPartitioned ? "" : dayjs().format(DEFAULT_DATETIME_FORMAT),
note: "",
partitionKey: undefined,
runImmediately: false,
},
});

Expand All @@ -105,6 +106,7 @@ const TriggerDAGForm = ({
logicalDate: isPartitioned ? "" : dayjs().format(DEFAULT_DATETIME_FORMAT),
note: "",
partitionKey: undefined,
runImmediately: false,
});
// Also update the param store to keep it in sync.
// Wait until we have the initial params so section ordering stays consistent.
Expand Down Expand Up @@ -135,6 +137,9 @@ const TriggerDAGForm = ({

const resetDateError = () => setErrors((prev) => ({ ...prev, date: undefined }));

const logicalDate = watch("logicalDate");
const isFutureDate = Boolean(logicalDate) && dayjs(logicalDate).isAfter(dayjs());

const dataIntervalMode = watch("dataIntervalMode");
const dataIntervalStart = watch("dataIntervalStart");
const dataIntervalEnd = watch("dataIntervalEnd");
Expand Down Expand Up @@ -171,6 +176,22 @@ const TriggerDAGForm = ({
</Field.Root>
)}
/>
{isFutureDate ? (
<Controller
control={control}
name="runImmediately"
render={({ field }) => (
<Checkbox
checked={field.value}
colorPalette="brand"
onChange={(e) => field.onChange((e.target as HTMLInputElement).checked)}
wordBreak="break-all"
>
{translate("components:triggerDag.runImmediately")}
</Checkbox>
)}
/>
) : undefined}
<Spacer />
{hasSchedule ? (
<Box>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export type DagRunTriggerParams = {
logicalDate: string;
note: string;
partitionKey: string | undefined;
runImmediately: boolean;
};

export const dataIntervalModeOptions = [
Expand Down
7 changes: 7 additions & 0 deletions airflow-core/src/airflow/ui/src/queries/useTrigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ export const useTrigger = ({ dagId, onSuccessConfirm }: { dagId: string; onSucce
// eslint-disable-next-line unicorn/no-null
const formattedLogicalDate = logicalDate?.toISOString() ?? null;

// When the logical date is in the future and the user does NOT want to run immediately,
// set run_after to the logical date so the scheduler waits until that time.
const isFutureLogicalDate = logicalDate ? logicalDate > new Date() : false;
const runAfter =
isFutureLogicalDate && !dagRunRequestBody.runImmediately ? formattedLogicalDate : undefined;

const dataIntervalStart = dagRunRequestBody.dataIntervalStart
? new Date(dagRunRequestBody.dataIntervalStart)
: undefined;
Expand All @@ -96,6 +102,7 @@ export const useTrigger = ({ dagId, onSuccessConfirm }: { dagId: string; onSucce
logical_date: formattedLogicalDate,
note: checkNote,
partition_key: dagRunRequestBody.partitionKey ?? null,
run_after: runAfter,
},
});
};
Expand Down
Loading
Loading