From c2c9ea69e967341cac47f96fc6c32bfdb7959813 Mon Sep 17 00:00:00 2001 From: "Boris G. Tsirkin" <367403+dotbg@users.noreply.github.com> Date: Mon, 18 May 2026 23:30:17 +0200 Subject: [PATCH 1/2] 10883. ExternalTaskSensor. execution-date pattern. --- .../standard/sensors/external_task.py | 42 ++++++++++++++++--- .../sensors/test_external_task_sensor.py | 36 ++++++++++++++++ 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/sensors/external_task.py b/providers/standard/src/airflow/providers/standard/sensors/external_task.py index 1a4684b6c4b41..1842581fccf97 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py +++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py @@ -156,6 +156,9 @@ class ExternalTaskSensor(BaseSensorOperator): :param allowed_states: Iterable of allowed states, default is ``['success']`` :param skipped_states: Iterable of states to make this task mark as skipped, default is ``None`` :param failed_states: Iterable of failed or dis-allowed states, default is ``None`` + :param execution_date supports templated values using either: + ``{{ logical_date }}`` (preferred) + ``{{ execution_date }}`` (legacy) :param execution_delta: time difference with the previous execution to look at, the default is the same logical date as the current task or DAG. For yesterday, use [positive!] datetime.timedelta(days=1). Either @@ -174,7 +177,13 @@ class ExternalTaskSensor(BaseSensorOperator): :param deferrable: Run sensor in deferrable mode """ - template_fields = ["external_dag_id", "external_task_id", "external_task_ids", "external_task_group_id"] + template_fields = [ + "external_dag_id", + "external_task_id", + "external_task_ids", + "external_task_group_id", + "execution_date", + ] ui_color = "#4db7db" operator_extra_links = [ExternalDagLink()] @@ -188,6 +197,7 @@ def __init__( allowed_states: Iterable[str] | None = None, skipped_states: Iterable[str] | None = None, failed_states: Iterable[str] | None = None, + execution_date: str | datetime.datetime | None = None, execution_delta: datetime.timedelta | None = None, execution_date_fn: Callable | None = None, check_existence: bool = False, @@ -248,12 +258,22 @@ def __init__( f"when `external_task_id` and `external_task_group_id` is `None`: {State.dag_states}" ) - if execution_delta is not None and execution_date_fn is not None: + if execution_delta is not None and execution_date_fn is not None and execution_date is not None: raise ValueError( - "Only one of `execution_delta` or `execution_date_fn` may " - "be provided to ExternalTaskSensor; not both." + "Only one of `execution_delta`, `execution_date` or `execution_date_fn` may " + "be provided to ExternalTaskSensor." ) + if execution_delta is not None: + warnings.warn( + "`execution_delta` is deprecated. Use `execution_date`.", DeprecationWarning, stacklevel=1 + ) + + if execution_date_fn is not None: + warnings.warn( + "`execution_date_fn` is deprecated. Use `execution_date`.", DeprecationWarning, stacklevel=1 + ) + self.execution_date = execution_date self.execution_delta = execution_delta self.execution_date_fn = execution_date_fn self.external_dag_id = external_dag_id @@ -267,13 +287,25 @@ def __init__( self.external_dates_filter: str | None = None def _get_dttm_filter(self, context: Context) -> Sequence[datetime.datetime]: + execution_date_value = self.execution_date + + if execution_date_value is not None: + if isinstance(execution_date_value, datetime.datetime): + return [execution_date_value] + + import pendulum + + return [pendulum.parse(execution_date_value)] + logical_date = self._get_logical_date(context) - if self.execution_delta: + if self.execution_delta is not None: return [logical_date - self.execution_delta] + if self.execution_date_fn: result = self._handle_execution_date_fn(context=context) return result if isinstance(result, list) else [result] + return [logical_date] @staticmethod diff --git a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py index fde38e4e2367a..35313152c8667 100644 --- a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py +++ b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py @@ -21,6 +21,7 @@ import logging import re from datetime import time, timedelta +from typing import Any from unittest import mock import pytest @@ -1395,6 +1396,41 @@ def test_external_task_sensor_execution_delta(self, dag_maker): ) assert op.external_dates_filter == expected_date.isoformat() + @pytest.mark.execution_timeout(10) + def test_handle_execution_date(self, dag_maker) -> None: + for param in ["logical_date", "execution_date"]: + with dag_maker("test_dag_child"): + op = ExternalTaskSensor( + task_id=f"test_external_task_sensor_check-{param}", + external_dag_id="test_dag_parent", + external_task_id="test_task", + execution_date=f"{{{{ {param} - macros.timedelta(hours=1) }}}}", + allowed_states=["success"], + ) + + import airflow.macros as macros + + ctx: dict[str, Any] = self.context + ctx["macros"] = macros # ensure template rendering works + + op.render_template_fields(ctx) + ti = self.context["ti"] + ti.get_ti_count.return_value = 1 + op.execute(context=self.context) + + expected_date = DEFAULT_DATE - timedelta(hours=1) + ti.get_ti_count.assert_has_calls( + [ + mock.call( + dag_id="test_dag_parent", + logical_dates=[expected_date], + states=["success"], + task_ids=["test_task"], + ) + ] + ) + assert op.external_dates_filter == expected_date.isoformat() + @pytest.mark.execution_timeout(10) def test_external_task_sensor_duplicate_task_ids(self, dag_maker): with dag_maker("test_dag_child"): From 120e55cf6625c08020080e23b29cdfde160fc1e2 Mon Sep 17 00:00:00 2001 From: "Boris G. Tsirkin" <367403+dotbg@users.noreply.github.com> Date: Tue, 19 May 2026 17:21:54 +0200 Subject: [PATCH 2/2] Code review comments + test fix. --- .../standard/sensors/external_task.py | 7 ++--- .../sensors/test_external_task_sensor.py | 28 ++++++++++++++++++- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/sensors/external_task.py b/providers/standard/src/airflow/providers/standard/sensors/external_task.py index 1842581fccf97..42423e75fddca 100644 --- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py +++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py @@ -29,6 +29,7 @@ BaseOperatorLink, BaseSensorOperator, conf, + timezone, ) from airflow.providers.standard.exceptions import ( DuplicateStateError, @@ -258,7 +259,7 @@ def __init__( f"when `external_task_id` and `external_task_group_id` is `None`: {State.dag_states}" ) - if execution_delta is not None and execution_date_fn is not None and execution_date is not None: + if sum(x is not None for x in (execution_delta, execution_date_fn, execution_date)) > 1: raise ValueError( "Only one of `execution_delta`, `execution_date` or `execution_date_fn` may " "be provided to ExternalTaskSensor." @@ -293,9 +294,7 @@ def _get_dttm_filter(self, context: Context) -> Sequence[datetime.datetime]: if isinstance(execution_date_value, datetime.datetime): return [execution_date_value] - import pendulum - - return [pendulum.parse(execution_date_value)] + return [timezone.parse(execution_date_value)] logical_date = self._get_logical_date(context) diff --git a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py index 35313152c8667..43210c3a693bf 100644 --- a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py +++ b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py @@ -672,7 +672,7 @@ def test_external_task_sensor_error_delta_and_fn(self): # Test that providing execution_delta and a function raises an error with pytest.raises( ValueError, - match="Only one of `execution_delta` or `execution_date_fn` may be provided to ExternalTaskSensor; not both.", + match="Only one of `execution_delta`, `execution_date` or `execution_date_fn` may be provided to ExternalTaskSensor.", ): ExternalTaskSensor( task_id="test_external_task_sensor_check_delta", @@ -1431,6 +1431,32 @@ def test_handle_execution_date(self, dag_maker) -> None: ) assert op.external_dates_filter == expected_date.isoformat() + @pytest.mark.execution_timeout(3) + def test_external_task_sensor_error_delta_and_execution_date(self) -> None: + override_candidates = [ + ( + "execution_delta", + timedelta(seconds=123), + ), + ("execution_date", "{{ logical_date - macros.timedelta(hours=1) }}"), + ("execution_date_fn", lambda dt: dt), + ] + + for r in range(2, 4): + for overrides in itertools.combinations(override_candidates, r): + with DAG("test_external_task_sensor_error_delta_and_execution_date"): + with pytest.raises( + ValueError, + match="Only one of `execution_delta`, `execution_date` or `execution_date_fn` may be provided to ExternalTaskSensor.", + ): + ExternalTaskSensor( + task_id="test_external_task_sensor_error_delta_and_execution_date", + external_dag_id="test_dag_parent", + external_task_id="test_task", + allowed_states=["success"], + **dict(overrides), + ) + @pytest.mark.execution_timeout(10) def test_external_task_sensor_duplicate_task_ids(self, dag_maker): with dag_maker("test_dag_child"):