diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py index 8619901f717f7..dd1f1aeb86665 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/dagrun.py @@ -28,6 +28,7 @@ class TriggerDAGRunPayload(StrictBaseModel): """Schema for Trigger DAG Run API request.""" logical_date: UtcDateTime | None = None + run_after: UtcDateTime | None = None conf: dict = Field(default_factory=dict) reset_dag_run: bool = False partition_key: str | None = None diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py index 448d0b6945abf..d4a0d24024bf9 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py @@ -128,6 +128,7 @@ def trigger_dag_run( run_type=DagRunType.OPERATOR_TRIGGERED, conf=payload.conf, logical_date=payload.logical_date, + run_after=payload.run_after, triggered_by=DagRunTriggeredByType.OPERATOR, replace_microseconds=False, partition_key=payload.partition_key, diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py index 6863c64efe5a0..88464b10df602 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py @@ -39,13 +39,17 @@ MovePreviousRunEndpoint, RemoveUpstreamMapIndexesField, ) -from airflow.api_fastapi.execution_api.versions.v2026_04_17 import AddTeamNameField +from airflow.api_fastapi.execution_api.versions.v2026_04_17 import ( + AddRunAfterToTriggerPayload, + AddTeamNameField, +) bundle = VersionBundle( HeadVersion(), Version( "2026-04-17", AddTeamNameField, + AddRunAfterToTriggerPayload, ), Version( "2026-04-06", diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_17.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_17.py index e7cd9d331a591..3effb1f25d298 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_17.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_17.py @@ -19,6 +19,7 @@ from cadwyn import ResponseInfo, VersionChange, convert_response_to_previous_version_for, schema +from airflow.api_fastapi.execution_api.datamodels.dagrun import TriggerDAGRunPayload from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun, TIRunContext @@ -34,3 +35,13 @@ def remove_team_name_field(response: ResponseInfo) -> None: # type: ignore[misc """Remove the ``team_name`` field from dag_run for older API versions.""" if "dag_run" in response.body and isinstance(response.body["dag_run"], dict): response.body["dag_run"].pop("team_name", None) + + +class AddRunAfterToTriggerPayload(VersionChange): + """Add the ``run_after`` field to TriggerDAGRunPayload.""" + + description = __doc__ + + instructions_to_migrate_to_previous_version = ( + schema(TriggerDAGRunPayload).field("run_after").didnt_exist, + ) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 67115ba757805..ee21d482d0d98 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2421,7 +2421,11 @@ def _schedule_dag_run( ) return callback_to_execute - if dag_run.logical_date and dag_run.logical_date > timezone.utcnow(): + if ( + dag_run.logical_date + and dag_run.logical_date > timezone.utcnow() + and dag_run.run_type not in (DagRunType.MANUAL, DagRunType.OPERATOR_TRIGGERED) + ): self.log.error("Logical date is in future: %s", dag_run.logical_date) return callback diff --git a/airflow-core/src/airflow/ti_deps/deps/runnable_exec_date_dep.py b/airflow-core/src/airflow/ti_deps/deps/runnable_exec_date_dep.py index 396f8180d1be1..0c9361716a8e4 100644 --- a/airflow-core/src/airflow/ti_deps/deps/runnable_exec_date_dep.py +++ b/airflow-core/src/airflow/ti_deps/deps/runnable_exec_date_dep.py @@ -20,6 +20,7 @@ from airflow._shared.timezones import timezone from airflow.ti_deps.deps.base_ti_dep import BaseTIDep from airflow.utils.session import provide_session +from airflow.utils.types import DagRunType class RunnableExecDateDep(BaseTIDep): @@ -30,13 +31,17 @@ class RunnableExecDateDep(BaseTIDep): @provide_session def _get_dep_statuses(self, ti, session, dep_context): - logical_date = ti.get_dagrun(session).logical_date + dagrun = ti.get_dagrun(session) + logical_date = dagrun.logical_date if logical_date is None: return cur_date = timezone.utcnow() - if logical_date > cur_date: + if logical_date > cur_date and dagrun.run_type not in ( + DagRunType.MANUAL, + DagRunType.OPERATOR_TRIGGERED, + ): yield self._failing_status( reason=( f"Logical date {logical_date.isoformat()} is in the future " diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/ar/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/ar/components.json index c6ceca45e879e..47ed6a49fe8d6 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/ar/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/ar/components.json @@ -168,6 +168,7 @@ "loading": "جارٍ تحميل معلومات Dag...", "loadingFailed": "فشل تحميل معلومات Dag. يرجى المحاولة مرة أخرى.", "runIdHelp": "اختياري - سيتم توليده تلقائيًا إذا لم يتم توفيره.", + "runImmediately": "تشغيل فوري (التاريخ المنطقي في المستقبل)", "selectDescription": "تشغيل عملية واحدة من هذا Dag", "selectLabel": "تشغيلة واحدة", "title": "تشغيل Dag", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/ca/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/ca/components.json index c22560c862e24..d82ebf2997fb4 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/ca/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/ca/components.json @@ -132,6 +132,7 @@ "loadingFailed": "No s'ha pogut carregar la informació del Dag. Si us plau, torneu-ho a intentar.", "manualRunDenied": "Aquest Dag no permet execucions manuals", "runIdHelp": "Opcional - es generarà si no es proporciona", + "runImmediately": "Executar immediatament (la data lògica és en el futur)", "selectDescription": "Executar una única execució d'aquest Dag", "selectLabel": "Execució única", "title": "Executar Dag", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/de/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/de/components.json index 126709f247b4e..fd354b410a3a1 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/de/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/de/components.json @@ -133,6 +133,7 @@ "loadingFailed": "Das Laden der Dag Informationen ist fehlgeschlagen. Bitte versuchen Sie es noch einmal.", "manualRunDenied": "Manuelles Auslösen dieses Dags ist nicht erlaubt.", "runIdHelp": "Optional - wird automatisch erzeugt wenn nicht angegeben", + "runImmediately": "Sofort ausführen (logisches Datum liegt in der Zukunft)", "selectDescription": "Einen einzelnen Lauf dieses Dags auslösen", "selectLabel": "Einzelner Lauf", "title": "Dag Auslösen", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/el/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/el/components.json index aae983deabb1e..421e4fec25991 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/el/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/el/components.json @@ -103,6 +103,7 @@ "loading": "Φόρτωση πληροφοριών Dag...", "loadingFailed": "Αποτυχία φόρτωσης πληροφοριών Dag. Παρακαλώ δοκιμάστε ξανά.", "runIdHelp": "Προαιρετικό - θα δημιουργηθεί αν δεν δοθεί", + "runImmediately": "Εκτέλεση αμέσως (η λογική ημερομηνία είναι στο μέλλον)", "selectDescription": "Ενεργοποιήστε μία εκτέλεση αυτού του Dag", "selectLabel": "Μονή Εκτέλεση", "title": "Ενεργοποίηση Dag", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/components.json index 64a78bee04b2a..e97c47908acb5 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/components.json @@ -134,6 +134,7 @@ "loadingFailed": "Failed to load Dag information. Please try again.", "manualRunDenied": "Manual runs are not allowed for this Dag", "runIdHelp": "Optional - will be generated if not provided", + "runImmediately": "Run immediately (logical date is in the future)", "selectDescription": "Trigger a single run of this Dag", "selectLabel": "Single Run", "title": "Trigger Dag", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/es/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/es/components.json index de515551a737e..4f1b37192009f 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/es/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/es/components.json @@ -120,6 +120,7 @@ "loading": "Cargando información del Dag...", "loadingFailed": "Error al cargar la información del Dag. Por favor, inténtelo de nuevo.", "runIdHelp": "Opcional - se generará si no se proporciona", + "runImmediately": "Ejecutar inmediatamente (la fecha lógica está en el futuro)", "selectDescription": "Activar una ejecución única de este Dag", "selectLabel": "Ejecución Única", "title": "Activar Dag", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/fr/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/fr/components.json index 1f9d2e6e8b010..ad6b2b96a9cd7 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/fr/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/fr/components.json @@ -120,6 +120,7 @@ "loading": "Chargement des informations du Dag...", "loadingFailed": "Échec du chargement des informations du Dag. Veuillez réessayer.", "runIdHelp": "Optionnel – sera généré s'il n'est pas fourni", + "runImmediately": "Exécuter immédiatement (la date logique est dans le futur)", "selectDescription": "Déclencher une exécution unique de ce Dag", "selectLabel": "Exécution unique", "title": "Déclencher un Dag", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/he/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/he/components.json index d7619be736bac..24172ad651f8a 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/he/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/he/components.json @@ -141,6 +141,7 @@ "loading": "טוען מידע Dag...", "loadingFailed": "טעינת מידע ה-Dag נכשלה. אנא נסו שוב.", "runIdHelp": "אופציונלי - ייווצר אוטומטית אם לא יסופק", + "runImmediately": "הפעלה מיידית (התאריך הלוגי בעתיד)", "selectDescription": "הפעל ריצה בודדת של Dag זה", "selectLabel": "ריצה בודדת", "title": "הפעל Dag", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/hi/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/hi/components.json index e50acde366a61..2533efd4bcc60 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/hi/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/hi/components.json @@ -132,6 +132,7 @@ "loading": "डैग जानकारी लोड हो रही है...", "loadingFailed": "डैग जानकारी लोड करने में विफल। कृपया पुनः प्रयास करें।", "runIdHelp": "वैकल्पिक - प्रदान न किए जाने पर जेनरेट किया जाएगा", + "runImmediately": "तुरंत चलाएँ (लॉजिकल तारीख भविष्य में है)", "selectDescription": "इस डैग का एक रन ट्रिगर करें", "selectLabel": "सिंगल रन", "title": "डैग ट्रिगर करें", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/hu/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/hu/components.json index 92a7ea6dac9bf..1166cd1565e56 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/hu/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/hu/components.json @@ -133,6 +133,7 @@ "loadingFailed": "A Dag információk betöltése sikertelen. Kérem, próbálja újra.", "manualRunDenied": "Kézi futtatás nem engedélyezett ennél a Dag-nél", "runIdHelp": "Opcionális – ha nincs megadva, automatikusan generálódik", + "runImmediately": "Azonnali futtatás (a logikai dátum a jövőben van)", "selectDescription": "Indítsd el ennek a Dag-nek egyetlen futását", "selectLabel": "Egyszeri futás", "title": "Dag indítása", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/it/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/it/components.json index 6490aa57f9283..d74f3e12a7d4e 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/it/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/it/components.json @@ -129,6 +129,7 @@ "loading": "Caricamento informazioni del Dag...", "loadingFailed": "Impossibile caricare le informazioni del Dag. Per favore, riprova.", "runIdHelp": "Opzionale - verrà generato se non fornito", + "runImmediately": "Esegui immediatamente (la data logica è nel futuro)", "selectDescription": "Triggera un singolo run di questo Dag", "selectLabel": "Singolo Run", "title": "Trigger del Dag", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/ja/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/ja/components.json index 899f71a489af2..7c09b542ac99b 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/ja/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/ja/components.json @@ -116,6 +116,7 @@ "loading": "Dag 情報を読込中...", "loadingFailed": "情報読込に失敗しました。もう一度試してください", "runIdHelp": "オプション- 指定しない場合自動的に生成されます", + "runImmediately": "すぐに実行(論理日付が未来の日付です)", "selectDescription": "この Dag の単体での実行をトリガーします", "selectLabel": "単体での実行", "title": "Dag のトリガー", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/ko/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/ko/components.json index d228b382b4e30..b6654bd0b621a 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/ko/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/ko/components.json @@ -131,6 +131,7 @@ "loadingFailed": "Dag 정보를 로드하지 못했습니다. 다시 시도해주세요.", "manualRunDenied": "이 Dag는 수동으로 실행할 수 없습니다.", "runIdHelp": "선택 사항 - 제공되지 않으면 생성됩니다.", + "runImmediately": "즉시 실행 (논리적 날짜가 미래입니다)", "selectDescription": "이 Dag을(를) 단일 실행 트리거", "selectLabel": "단일 실행", "title": "Dag 트리거", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/nl/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/nl/components.json index fb0a3f77950e4..1fd8be784c5b5 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/nl/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/nl/components.json @@ -132,6 +132,7 @@ "loadingFailed": "Mislukt om Dag informatie te laden. Probeer het opnieuw.", "manualRunDenied": "Handmatige runs zijn niet toegestaan voor deze Dag", "runIdHelp": "Optioneel - wordt gegenereerd indien niet opgegeven", + "runImmediately": "Direct uitvoeren (logische datum ligt in de toekomst)", "selectDescription": "Trigger een enkele run van deze Dag", "selectLabel": "Enkele Run", "title": "Trigger Dag", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/pl/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/pl/components.json index 2b8815ea925e1..6d5eca21c4ac0 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/pl/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/pl/components.json @@ -149,6 +149,7 @@ "loadingFailed": "Nie udało się załadować informacji o Dagach. Spróbuj ponownie.", "manualRunDenied": "Ręczne uruchamianie nie jest dozwolone dla tego Daga", "runIdHelp": "Opcjonalne - zostanie wygenerowane, jeśli nie podano", + "runImmediately": "Uruchom natychmiast (data logiczna jest w przyszłości)", "selectDescription": "Wyzwól pojedyncze wykonanie", "selectLabel": "Pojedyncze wykonanie", "title": "Uruchom Daga", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/pt/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/pt/components.json index c582684237443..db2ef5d41436e 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/pt/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/pt/components.json @@ -151,6 +151,7 @@ "loadingFailed": "Falha ao carregar informações do Dag. Por favor, tente novamente.", "manualRunDenied": "Execuções manuais não são permitidas para este Dag", "runIdHelp": "Opcional - será gerado se não for fornecido", + "runImmediately": "Executar imediatamente (a data lógica está no futuro)", "selectDescription": "Acionar uma única execução deste Dag", "selectLabel": "Execução Única", "title": "Acionar Dag", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/ru/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/ru/components.json index 8178275fb2c57..cab6fa092616e 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/ru/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/ru/components.json @@ -140,6 +140,7 @@ "loadingFailed": "Не удалось загрузить информацию о Dag-е. Пожалуйста, попробуйте снова.", "manualRunDenied": "Ручные запуски не разрешены для этого Dag-а", "runIdHelp": "Необязательно - будет сгенерирован, если не предоставлен", + "runImmediately": "Запустить немедленно (логическая дата в будущем)", "selectDescription": "Запустить один процесс этого Dag-а", "selectLabel": "Единичный запуск", "title": "Запуск Dag-а", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/th/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/th/components.json index 6d56cc625e466..7d83b7bbc62d7 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/th/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/th/components.json @@ -101,6 +101,7 @@ "loading": "กำลังโหลดข้อมูล Dag...", "loadingFailed": "โหลดข้อมูล Dag ไม่สำเร็จ กรุณาลองใหม่", "runIdHelp": "ไม่บังคับ - จะถูกสร้างอัตโนมัติหากไม่ระบุ", + "runImmediately": "รันทันที (วันที่ลอจิคัลอยู่ในอนาคต)", "selectDescription": "ทริกเกอร์ Dag นี้สำหรับการทำงานครั้งเดียว", "selectLabel": "ทำงานครั้งเดียว", "title": "ทริกเกอร์ Dag", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/tr/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/tr/components.json index 57eeb22473eee..01e0ba7d773e5 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/tr/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/tr/components.json @@ -133,6 +133,7 @@ "loadingFailed": "Dag bilgisi yüklenemedi. Lütfen tekrar deneyin.", "manualRunDenied": "El ile tetikleme bu Dag için devre dışı bırakıldı.", "runIdHelp": "İsteğe bağlı - sağlanmazsa oluşturulacaktır", + "runImmediately": "Hemen çalıştır (mantıksal tarih gelecekte)", "selectDescription": "Bu Dag'ın tek bir çalıştırmasını tetikle", "selectLabel": "Tek Çalıştırma", "title": "Dag Tetikle", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/zh-CN/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/zh-CN/components.json index ceb20f95a9894..ea255843c59ee 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/zh-CN/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/zh-CN/components.json @@ -131,6 +131,7 @@ "loadingFailed": "加载 Dag 信息失败,请重试。", "manualRunDenied": "此 Dag 不允许手动执行", "runIdHelp": "选填 - 若未提供将会自动生成", + "runImmediately": "立即运行(逻辑日期在未来)", "selectDescription": "触发此 Dag 单次执行", "selectLabel": "单次执行", "title": "触发 Dag", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/components.json b/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/components.json index 0f84213efb83b..a48acca703482 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/components.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/components.json @@ -131,6 +131,7 @@ "loadingFailed": "載入 Dag 資訊失敗,請重試。", "manualRunDenied": "此 Dag 不允許手動執行", "runIdHelp": "選填 - 若未提供將會自動產生", + "runImmediately": "立即執行(邏輯日期在未來)", "selectDescription": "觸發此 Dag 單次執行", "selectLabel": "單次執行", "title": "觸發 Dag", diff --git a/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx b/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx index 91b2cd1daa051..6e6d062a6746a 100644 --- a/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx +++ b/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx @@ -87,6 +87,7 @@ const TriggerDAGForm = ({ logicalDate: isPartitioned ? "" : dayjs().format(DEFAULT_DATETIME_FORMAT), note: "", partitionKey: undefined, + runImmediately: false, }, }); @@ -105,6 +106,7 @@ const TriggerDAGForm = ({ logicalDate: isPartitioned ? "" : dayjs().format(DEFAULT_DATETIME_FORMAT), note: "", partitionKey: undefined, + runImmediately: false, }); // Also update the param store to keep it in sync. // Wait until we have the initial params so section ordering stays consistent. @@ -135,6 +137,9 @@ const TriggerDAGForm = ({ const resetDateError = () => setErrors((prev) => ({ ...prev, date: undefined })); + const logicalDate = watch("logicalDate"); + const isFutureDate = Boolean(logicalDate) && dayjs(logicalDate).isAfter(dayjs()); + const dataIntervalMode = watch("dataIntervalMode"); const dataIntervalStart = watch("dataIntervalStart"); const dataIntervalEnd = watch("dataIntervalEnd"); @@ -171,6 +176,22 @@ const TriggerDAGForm = ({ )} /> + {isFutureDate ? ( + ( + field.onChange((e.target as HTMLInputElement).checked)} + wordBreak="break-all" + > + {translate("components:triggerDag.runImmediately")} + + )} + /> + ) : undefined} {hasSchedule ? ( diff --git a/airflow-core/src/airflow/ui/src/components/TriggerDag/types.ts b/airflow-core/src/airflow/ui/src/components/TriggerDag/types.ts index 53897dd6d3307..032f385640394 100644 --- a/airflow-core/src/airflow/ui/src/components/TriggerDag/types.ts +++ b/airflow-core/src/airflow/ui/src/components/TriggerDag/types.ts @@ -28,6 +28,7 @@ export type DagRunTriggerParams = { logicalDate: string; note: string; partitionKey: string | undefined; + runImmediately: boolean; }; export const dataIntervalModeOptions = [ diff --git a/airflow-core/src/airflow/ui/src/queries/useTrigger.ts b/airflow-core/src/airflow/ui/src/queries/useTrigger.ts index 447b843def45e..cb63f484d9bbe 100644 --- a/airflow-core/src/airflow/ui/src/queries/useTrigger.ts +++ b/airflow-core/src/airflow/ui/src/queries/useTrigger.ts @@ -71,6 +71,12 @@ export const useTrigger = ({ dagId, onSuccessConfirm }: { dagId: string; onSucce // eslint-disable-next-line unicorn/no-null const formattedLogicalDate = logicalDate?.toISOString() ?? null; + // When the logical date is in the future and the user does NOT want to run immediately, + // set run_after to the logical date so the scheduler waits until that time. + const isFutureLogicalDate = logicalDate ? logicalDate > new Date() : false; + const runAfter = + isFutureLogicalDate && !dagRunRequestBody.runImmediately ? formattedLogicalDate : undefined; + const dataIntervalStart = dagRunRequestBody.dataIntervalStart ? new Date(dagRunRequestBody.dataIntervalStart) : undefined; @@ -96,6 +102,7 @@ export const useTrigger = ({ dagId, onSuccessConfirm }: { dagId: string; onSucce logical_date: formattedLogicalDate, note: checkNote, partition_key: dagRunRequestBody.partitionKey ?? null, + run_after: runAfter, }, }); }; diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py index b84b993f1d63d..c41689c865527 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py @@ -184,6 +184,56 @@ def test_trigger_dag_run_manual_denied_for_operator(self, client, session, dag_m } } + def test_trigger_dag_run_with_run_after(self, client, session, dag_maker): + """Test that run_after is passed through to the created DagRun.""" + dag_id = "test_trigger_dag_run_run_after" + run_id = "test_run_id" + logical_date = timezone.datetime(2025, 6, 1) + run_after = timezone.datetime(2025, 6, 1) + + with dag_maker(dag_id=dag_id, session=session, serialized=True): + EmptyOperator(task_id="test_task") + + session.commit() + + response = client.post( + f"/execution/dag-runs/{dag_id}/{run_id}", + json={ + "logical_date": logical_date.isoformat(), + "run_after": run_after.isoformat(), + }, + ) + + assert response.status_code == 204 + + dag_run = session.scalars(select(DagRun).where(DagRun.run_id == run_id)).one() + assert dag_run.logical_date == logical_date + assert dag_run.run_after == run_after + + @time_machine.travel("2025-01-01 00:00:00", tick=False) + def test_trigger_dag_run_without_run_after_defaults_to_utcnow(self, client, session, dag_maker): + """When run_after is not provided, trigger_dag() defaults run_after to utcnow().""" + dag_id = "test_trigger_dag_run_no_run_after" + run_id = "test_run_id" + logical_date = timezone.datetime(2025, 6, 1) + + with dag_maker(dag_id=dag_id, session=session, serialized=True): + EmptyOperator(task_id="test_task") + + session.commit() + + response = client.post( + f"/execution/dag-runs/{dag_id}/{run_id}", + json={"logical_date": logical_date.isoformat()}, + ) + + assert response.status_code == 204 + + dag_run = session.scalars(select(DagRun).where(DagRun.run_id == run_id)).one() + assert dag_run.logical_date == logical_date + # run_after defaults to utcnow() via trigger_dag() when not provided + assert dag_run.run_after == timezone.datetime(2025, 1, 1) + def test_trigger_dag_run_already_exists(self, client, session, dag_maker): """Test that error is raised when a DAG Run already exists.""" diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 061a87e3aa420..e1ac86f4f6906 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -976,6 +976,102 @@ def test_schedule_dag_run_with_asset_event(self, session: Session, dag_maker: Da assert events[0].asset is not None assert events[0].source_aliases is not None + @pytest.mark.usefixtures("testing_dag_bundle") + def test_schedule_dag_run_manual_with_future_logical_date(self, session, dag_maker): + """Manual DAG runs with a future logical_date should be scheduled immediately.""" + with dag_maker( + dag_id="test_manual_future_logical_date", + schedule=None, + session=session, + ): + EmptyOperator(task_id="dummy") + + future_date = timezone.utcnow() + timedelta(days=1) + dr = dag_maker.create_dagrun( + run_type=DagRunType.MANUAL, + logical_date=future_date, + state=DagRunState.RUNNING, + start_date=timezone.utcnow(), + ) + + executor = MockExecutor(do_update=False) + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(scheduler_job, executors=[executor]) + + self.job_runner._schedule_dag_run(dr, session) + session.flush() + + session.refresh(dr) + # The run should not have been skipped due to future logical_date + # Tasks should have been processed (scheduled or completed) + tis = dr.get_task_instances(session=session) + assert len(tis) == 1 + assert tis[0].state is not None + + @pytest.mark.usefixtures("testing_dag_bundle") + def test_schedule_dag_run_scheduled_with_future_logical_date_skipped(self, session, dag_maker): + """Scheduled DAG runs with a future logical_date should be skipped by the scheduler.""" + with dag_maker( + dag_id="test_scheduled_future_logical_date", + schedule=timedelta(days=1), + session=session, + ): + EmptyOperator(task_id="dummy") + + future_date = timezone.utcnow() + timedelta(days=1) + dr = dag_maker.create_dagrun( + run_type=DagRunType.SCHEDULED, + logical_date=future_date, + state=DagRunState.RUNNING, + start_date=timezone.utcnow(), + ) + + executor = MockExecutor(do_update=False) + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(scheduler_job, executors=[executor]) + + self.job_runner._schedule_dag_run(dr, session) + session.flush() + + session.refresh(dr) + # The run should still be RUNNING — but no tasks scheduled (early return) + assert dr.state == DagRunState.RUNNING + tis = dr.get_task_instances(session=session) + assert len(tis) == 1 + # Task should NOT have been scheduled — the scheduler returned early + assert tis[0].state is None + + @pytest.mark.usefixtures("testing_dag_bundle") + def test_schedule_dag_run_operator_triggered_with_future_logical_date(self, session, dag_maker): + """Operator-triggered DAG runs with a future logical_date should be scheduled immediately.""" + with dag_maker( + dag_id="test_operator_triggered_future_logical_date", + schedule=None, + session=session, + ): + EmptyOperator(task_id="dummy") + + future_date = timezone.utcnow() + timedelta(days=1) + dr = dag_maker.create_dagrun( + run_type=DagRunType.OPERATOR_TRIGGERED, + logical_date=future_date, + state=DagRunState.RUNNING, + start_date=timezone.utcnow(), + ) + + executor = MockExecutor(do_update=False) + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(scheduler_job, executors=[executor]) + + self.job_runner._schedule_dag_run(dr, session) + session.flush() + + session.refresh(dr) + # The run should not have been skipped due to future logical_date + tis = dr.get_task_instances(session=session) + assert len(tis) == 1 + assert tis[0].state is not None + def test_execute_task_instances_is_paused_wont_execute(self, session, dag_maker): dag_id = "SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute" task_id_1 = "dummy_task" diff --git a/airflow-core/tests/unit/ti_deps/deps/test_runnable_exec_date_dep.py b/airflow-core/tests/unit/ti_deps/deps/test_runnable_exec_date_dep.py index 771d55b57a8d5..2f15748f219a9 100644 --- a/airflow-core/tests/unit/ti_deps/deps/test_runnable_exec_date_dep.py +++ b/airflow-core/tests/unit/ti_deps/deps/test_runnable_exec_date_dep.py @@ -42,11 +42,11 @@ def clean_db(session): @pytest.mark.parametrize( ("logical_date", "is_met"), [ - (datetime(2016, 11, 3), False), + (datetime(2016, 11, 3), True), (datetime(2016, 11, 1), True), ], ) -def test_logical_date_dep( +def test_logical_date_dep_manual_run( dag_maker, session, create_dummy_dag, @@ -54,7 +54,7 @@ def test_logical_date_dep( is_met, ): """ - If the dag's logical date is in the future, this dep should fail + Manual runs should be allowed even if the logical date is in the future. """ create_dummy_dag( "test_localtaskjob_heartbeat", @@ -68,10 +68,14 @@ def test_logical_date_dep( assert RunnableExecDateDep().is_met(ti=ti) == is_met -@time_machine.travel("2016-01-01") -def test_logical_date_after_end_date(session, dag_maker, create_dummy_dag): +@time_machine.travel("2016-11-01") +def test_logical_date_dep_scheduled_run_future_fails( + dag_maker, + session, + create_dummy_dag, +): """ - If the dag's logical date is in the future this dep should fail + Scheduled runs with a future logical date should still fail. """ create_dummy_dag( "test_localtaskjob_heartbeat", @@ -81,14 +85,45 @@ def test_logical_date_after_end_date(session, dag_maker, create_dummy_dag): with_dagrun_type=DagRunType.MANUAL, session=session, ) - (ti,) = dag_maker.create_dagrun(logical_date=datetime(2016, 11, 2)).task_instances + (ti,) = dag_maker.create_dagrun( + run_id="scheduled", + run_type=DagRunType.SCHEDULED, + logical_date=datetime(2016, 11, 3), + ).task_instances assert not RunnableExecDateDep().is_met(ti=ti) +@time_machine.travel("2016-11-01") +def test_logical_date_dep_operator_triggered_run( + dag_maker, + session, + create_dummy_dag, +): + """ + Operator-triggered runs should be allowed even if the logical date is in the future. + """ + create_dummy_dag( + "test_localtaskjob_heartbeat", + start_date=datetime(2015, 1, 1), + end_date=datetime(2016, 11, 5), + schedule=None, + with_dagrun_type=DagRunType.MANUAL, + session=session, + ) + (ti,) = dag_maker.create_dagrun( + run_id="operator_triggered", + run_type=DagRunType.OPERATOR_TRIGGERED, + logical_date=datetime(2016, 11, 3), + ).task_instances + assert RunnableExecDateDep().is_met(ti=ti) + + class TestRunnableExecDateDep: - def _get_task_instance(self, logical_date, dag_end_date=None, task_end_date=None): + def _get_task_instance( + self, logical_date, dag_end_date=None, task_end_date=None, run_type=DagRunType.MANUAL + ): dag = Mock(end_date=dag_end_date) - dagrun = Mock(logical_date=logical_date) + dagrun = Mock(logical_date=logical_date, run_type=run_type) task = Mock(dag=dag, end_date=task_end_date) return Mock(task=task, get_dagrun=Mock(return_value=dagrun)) diff --git a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py index 26b990f9c9e3e..a7a969ae4701e 100644 --- a/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py +++ b/providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py @@ -127,6 +127,14 @@ class TriggerDagRunOperator(BaseOperator): If not provided, a run ID will be automatically generated. :param conf: Configuration for the DAG run (templated). :param logical_date: Logical date for the triggered DAG (templated). + :param run_after: Earliest time at which the triggered DAG run may be + scheduled (templated). When set, the scheduler will not start the run + before this timestamp. If not provided and ``logical_date`` is + explicitly set, ``run_after`` defaults to the same value as + ``logical_date`` so the run waits until that time. If neither is + provided, ``run_after`` defaults to ``utcnow()`` on the API server + side. Set explicitly to ``None`` to force immediate execution even + with a future ``logical_date``. :param reset_dag_run: Whether clear existing DAG run if already exists. This is useful when backfill or rerun an existing DAG run. This only resets (not recreates) the DAG run. @@ -158,6 +166,7 @@ class TriggerDagRunOperator(BaseOperator): "trigger_dag_id", "trigger_run_id", "logical_date", + "run_after", "conf", "wait_for_completion", "skip_when_already_exists", @@ -173,6 +182,7 @@ def __init__( trigger_run_id: str | None = None, conf: dict | None = None, logical_date: str | datetime.datetime | None | ArgNotSet = NOTSET, + run_after: str | datetime.datetime | None | ArgNotSet = NOTSET, reset_dag_run: bool = False, wait_for_completion: bool = False, poke_interval: int = 60, @@ -215,6 +225,16 @@ def __init__( f"Expected str, datetime.datetime, or None for parameter 'logical_date'. Got {type(logical_date).__name__}" ) + self.run_after = run_after + if run_after is NOTSET: + self.run_after = NOTSET + elif run_after is None or isinstance(run_after, (str, datetime.datetime)): + self.run_after = run_after + else: + raise TypeError( + f"Expected str, datetime.datetime, or None for parameter 'run_after'. Got {type(run_after).__name__}" + ) + if fail_when_dag_is_paused and AIRFLOW_V_3_0_PLUS: raise NotImplementedError("Setting `fail_when_dag_is_paused` not yet supported for Airflow 3.x") @@ -227,6 +247,19 @@ def execute(self, context: Context): elif isinstance(self.logical_date, str): parsed_logical_date = timezone.parse(self.logical_date) + if self.run_after is NOTSET: + # When the user explicitly set a (future) logical_date, default run_after + # to that value so the run waits until that time — preserving the + # pre-existing behaviour. When logical_date was also left unset, + # pass None so trigger_dag() uses its own default (utcnow()). + parsed_run_after = parsed_logical_date if self.logical_date is not NOTSET else None + elif self.run_after is None: + parsed_run_after = None + elif isinstance(self.run_after, datetime.datetime): + parsed_run_after = self.run_after + elif isinstance(self.run_after, str): + parsed_run_after = timezone.parse(self.run_after) + try: if self.conf and isinstance(self.conf, str): self.conf = json.loads(self.conf) @@ -267,14 +300,17 @@ def execute(self, context: Context): if AIRFLOW_V_3_0_PLUS: self._trigger_dag_af_3( - context=context, run_id=self.trigger_run_id, parsed_logical_date=parsed_logical_date + context=context, + run_id=self.trigger_run_id, + parsed_logical_date=parsed_logical_date, + parsed_run_after=parsed_run_after, ) else: self._trigger_dag_af_2( context=context, run_id=self.trigger_run_id, parsed_logical_date=parsed_logical_date ) - def _trigger_dag_af_3(self, context, run_id, parsed_logical_date): + def _trigger_dag_af_3(self, context, run_id, parsed_logical_date, parsed_run_after=None): from airflow.providers.common.compat.sdk import DagRunTriggerException kwargs_accepted = dict( @@ -294,6 +330,9 @@ def _trigger_dag_af_3(self, context, run_id, parsed_logical_date): if self.note and "note" in inspect.signature(DagRunTriggerException.__init__).parameters: kwargs_accepted["note"] = self.note + if parsed_run_after and "run_after" in inspect.signature(DagRunTriggerException.__init__).parameters: + kwargs_accepted["run_after"] = parsed_run_after + raise DagRunTriggerException(**kwargs_accepted) def _trigger_dag_af_2(self, context, run_id, parsed_logical_date): diff --git a/task-sdk/src/airflow/sdk/api/client.py b/task-sdk/src/airflow/sdk/api/client.py index d0362ce12aa61..7b1859490c493 100644 --- a/task-sdk/src/airflow/sdk/api/client.py +++ b/task-sdk/src/airflow/sdk/api/client.py @@ -713,10 +713,15 @@ def trigger( logical_date: datetime | None = None, reset_dag_run: bool = False, note: str | None = None, + run_after: datetime | None = None, ) -> OKResponse | ErrorResponse: """Trigger a Dag run via the API server.""" body = TriggerDAGRunPayload( - logical_date=logical_date, conf=conf or {}, reset_dag_run=reset_dag_run, note=note + logical_date=logical_date, + run_after=run_after, + conf=conf or {}, + reset_dag_run=reset_dag_run, + note=note, ) try: diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index 138fe8a9fd8a8..bafd6890f46bd 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -371,6 +371,7 @@ class TriggerDAGRunPayload(BaseModel): extra="forbid", ) logical_date: Annotated[AwareDatetime | None, Field(title="Logical Date")] = None + run_after: Annotated[AwareDatetime | None, Field(title="Run After")] = None conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None reset_dag_run: Annotated[bool | None, Field(title="Reset Dag Run")] = False partition_key: Annotated[str | None, Field(title="Partition Key")] = None diff --git a/task-sdk/src/airflow/sdk/exceptions.py b/task-sdk/src/airflow/sdk/exceptions.py index b69abe6226575..1f581fe225784 100644 --- a/task-sdk/src/airflow/sdk/exceptions.py +++ b/task-sdk/src/airflow/sdk/exceptions.py @@ -241,6 +241,7 @@ def __init__( dag_run_id: str, conf: dict | None, logical_date=None, + run_after=None, reset_dag_run: bool, skip_when_already_exists: bool, wait_for_completion: bool, @@ -255,6 +256,7 @@ def __init__( self.dag_run_id = dag_run_id self.conf = conf self.logical_date = logical_date + self.run_after = run_after self.reset_dag_run = reset_dag_run self.skip_when_already_exists = skip_when_already_exists self.wait_for_completion = wait_for_completion diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index aa70bd9967520..9126fb2638825 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -1523,7 +1523,13 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: dump_opts = {"exclude_unset": True} elif isinstance(msg, TriggerDagRun): resp = self.client.dag_runs.trigger( - msg.dag_id, msg.run_id, msg.conf, msg.logical_date, msg.reset_dag_run, msg.note + msg.dag_id, + msg.run_id, + msg.conf, + msg.logical_date, + msg.reset_dag_run, + msg.note, + msg.run_after, ) elif isinstance(msg, GetDagRun): dr_resp = self.client.dag_runs.get_detail(msg.dag_id, msg.run_id) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 6210797b55853..08ad31c3ed242 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1445,6 +1445,7 @@ def _handle_trigger_dag_run( dag_id=drte.trigger_dag_id, run_id=drte.dag_run_id, logical_date=drte.logical_date, + run_after=drte.run_after, conf=drte.conf, reset_dag_run=drte.reset_dag_run, note=drte.note, diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 334dfee4e10ff..f86634f1044e2 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2161,7 +2161,7 @@ class RequestTestCase: expected_body={"ok": True, "type": "OKResponse"}, client_mock=ClientMock( method_path="dag_runs.trigger", - args=("test_dag", "test_run", {"key": "value"}, timezone.datetime(2025, 1, 1), True, None), + args=("test_dag", "test_run", {"key": "value"}, timezone.datetime(2025, 1, 1), True, None, None), response=OKResponse(ok=True), ), test_id="dag_run_trigger", @@ -2178,17 +2178,50 @@ class RequestTestCase: expected_body={"ok": True, "type": "OKResponse"}, client_mock=ClientMock( method_path="dag_runs.trigger", - args=("test_dag", "test_run", {"key": "value"}, timezone.datetime(2025, 1, 1), True, "Test Note"), + args=( + "test_dag", + "test_run", + {"key": "value"}, + timezone.datetime(2025, 1, 1), + True, + "Test Note", + None, + ), response=OKResponse(ok=True), ), test_id="dag_run_trigger", ), + RequestTestCase( + message=TriggerDagRun( + dag_id="test_dag", + run_id="test_run", + conf={"key": "value"}, + logical_date=timezone.datetime(2025, 6, 1), + reset_dag_run=False, + run_after=timezone.datetime(2025, 6, 1), + ), + expected_body={"ok": True, "type": "OKResponse"}, + client_mock=ClientMock( + method_path="dag_runs.trigger", + args=( + "test_dag", + "test_run", + {"key": "value"}, + timezone.datetime(2025, 6, 1), + False, + None, + timezone.datetime(2025, 6, 1), + ), + response=OKResponse(ok=True), + ), + test_id="dag_run_trigger_with_run_after", + ), RequestTestCase( message=TriggerDagRun(dag_id="test_dag", run_id="test_run"), expected_body={"error": "DAGRUN_ALREADY_EXISTS", "detail": None, "type": "ErrorResponse"}, client_mock=ClientMock( method_path="dag_runs.trigger", - args=("test_dag", "test_run", None, None, False, None), + args=("test_dag", "test_run", None, None, False, None, None), response=ErrorResponse(error=ErrorType.DAGRUN_ALREADY_EXISTS), ), test_id="dag_run_trigger_already_exists", diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 5aeb009bd33da..31673f897bed5 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -4387,7 +4387,7 @@ def test_handle_trigger_dag_run(self, create_runtime_ti, mock_supervisor_comms): expected_calls = [ mock.call.send( - msg=TriggerDagRun( + TriggerDagRun( dag_id="test_dag", run_id="test_run_id", reset_dag_run=False, @@ -4395,7 +4395,7 @@ def test_handle_trigger_dag_run(self, create_runtime_ti, mock_supervisor_comms): ), ), mock.call.send( - msg=SetXCom( + SetXCom( key="trigger_run_id", value="test_run_id", dag_id="test_handle_trigger_dag_run", @@ -4438,7 +4438,7 @@ def test_handle_trigger_dag_run_conflict( expected_calls = [ mock.call.send( - msg=TriggerDagRun( + TriggerDagRun( dag_id="test_dag", logical_date=datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc), run_id="test_run_id", @@ -4511,14 +4511,14 @@ def test_handle_trigger_dag_run_wait_for_completion( expected_calls = [ mock.call.send( - msg=TriggerDagRun( + TriggerDagRun( dag_id="test_dag", run_id="test_run_id", logical_date=datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc), ), ), mock.call.send( - msg=SetXCom( + SetXCom( key="trigger_run_id", value="test_run_id", dag_id="test_handle_trigger_dag_run_wait_for_completion", @@ -4528,13 +4528,13 @@ def test_handle_trigger_dag_run_wait_for_completion( ), ), mock.call.send( - msg=GetDagRunState( + GetDagRunState( dag_id="test_dag", run_id="test_run_id", ), ), mock.call.send( - msg=GetDagRunState( + GetDagRunState( dag_id="test_dag", run_id="test_run_id", ), @@ -4632,6 +4632,117 @@ def test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_only( # Also verify it was sent to supervisor mock_supervisor_comms.send.assert_any_call(msg) + @time_machine.travel("2025-01-01 00:00:00", tick=False) + def test_trigger_dag_run_default_does_not_set_run_after(self, create_runtime_ti, mock_supervisor_comms): + """When neither logical_date nor run_after is set, run_after should be None in the message.""" + from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator + + task = TriggerDagRunOperator( + task_id="test_task", + trigger_dag_id="test_dag", + trigger_run_id="test_run_id", + ) + ti = create_runtime_ti(dag_id="test_trigger_default_no_run_after", run_id="test_run", task=task) + + log = mock.MagicMock() + run(ti, ti.get_template_context(), log) + + mock_supervisor_comms.send.assert_any_call( + TriggerDagRun( + dag_id="test_dag", + run_id="test_run_id", + reset_dag_run=False, + logical_date=datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc), + ), + ) + + @time_machine.travel("2025-01-01 00:00:00", tick=False) + def test_trigger_dag_run_with_future_logical_date_defaults_run_after_to_logical_date( + self, create_runtime_ti, mock_supervisor_comms + ): + """When logical_date is explicitly set but run_after is not, run_after defaults to logical_date.""" + from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator + + future = datetime(2025, 6, 1, 0, 0, 0, tzinfo=dt_timezone.utc) + task = TriggerDagRunOperator( + task_id="test_task", + trigger_dag_id="test_dag", + trigger_run_id="test_run_id", + logical_date=future, + ) + ti = create_runtime_ti(dag_id="test_trigger_future_logical_date", run_id="test_run", task=task) + + log = mock.MagicMock() + run(ti, ti.get_template_context(), log) + + mock_supervisor_comms.send.assert_any_call( + TriggerDagRun( + dag_id="test_dag", + run_id="test_run_id", + reset_dag_run=False, + logical_date=future, + run_after=future, + ), + ) + + @time_machine.travel("2025-01-01 00:00:00", tick=False) + def test_trigger_dag_run_with_explicit_run_after(self, create_runtime_ti, mock_supervisor_comms): + """When run_after is explicitly set, it should be used as-is.""" + from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator + + future_logical = datetime(2025, 6, 1, 0, 0, 0, tzinfo=dt_timezone.utc) + explicit_run_after = datetime(2025, 3, 1, 0, 0, 0, tzinfo=dt_timezone.utc) + task = TriggerDagRunOperator( + task_id="test_task", + trigger_dag_id="test_dag", + trigger_run_id="test_run_id", + logical_date=future_logical, + run_after=explicit_run_after, + ) + ti = create_runtime_ti(dag_id="test_trigger_explicit_run_after", run_id="test_run", task=task) + + log = mock.MagicMock() + run(ti, ti.get_template_context(), log) + + mock_supervisor_comms.send.assert_any_call( + TriggerDagRun( + dag_id="test_dag", + run_id="test_run_id", + reset_dag_run=False, + logical_date=future_logical, + run_after=explicit_run_after, + ), + ) + + @time_machine.travel("2025-01-01 00:00:00", tick=False) + def test_trigger_dag_run_with_run_after_none_forces_immediate( + self, create_runtime_ti, mock_supervisor_comms + ): + """When run_after is explicitly None with a future logical_date, run_after should be None.""" + from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator + + future = datetime(2025, 6, 1, 0, 0, 0, tzinfo=dt_timezone.utc) + task = TriggerDagRunOperator( + task_id="test_task", + trigger_dag_id="test_dag", + trigger_run_id="test_run_id", + logical_date=future, + run_after=None, + ) + ti = create_runtime_ti(dag_id="test_trigger_run_after_none", run_id="test_run", task=task) + + log = mock.MagicMock() + run(ti, ti.get_template_context(), log) + + mock_supervisor_comms.send.assert_any_call( + TriggerDagRun( + dag_id="test_dag", + run_id="test_run_id", + reset_dag_run=False, + logical_date=future, + ), + ) + class TestTaskInstanceMetrics: def test_ti_start_metric_emitted(self, create_runtime_ti, mock_supervisor_comms):