diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py index 57b2aa9e41cbb..ab5cea7c76ad6 100644 --- a/airflow-core/src/airflow/cli/cli_config.py +++ b/airflow-core/src/airflow/cli/cli_config.py @@ -188,7 +188,9 @@ def string_lower_type(val): ARG_PARTITION_DATE_START = Arg( ("--partition-date-start",), help=( - "Inclusive lower bound of the partition_date window (matched against DagRun.partition_date). " + "Inclusive lower bound of the partition_date window. Matched at local calendar-day " + "granularity: the start of the given local calendar day in the Dag's timetable timezone " + "(any time-of-day component is ignored). " "Accepts the same datetime formats as --start-date." ), type=parsedate, @@ -196,7 +198,9 @@ def string_lower_type(val): ARG_PARTITION_DATE_END = Arg( ("--partition-date-end",), help=( - "Inclusive upper bound of the partition_date window (matched against DagRun.partition_date). " + "Inclusive upper bound of the partition_date window. Matched at local calendar-day " + "granularity: all runs whose partition_date falls on the given local calendar day in the " + "Dag's timetable timezone are included (any time-of-day component is ignored). " "Accepts the same datetime formats as --end-date." ), type=parsedate, @@ -1190,7 +1194,8 @@ class GroupCommand(NamedTuple): "Clear Dag runs of the given dag_id and re-queue them for reprocessing. Exactly one " "of the following selectors must be provided: --run-id (single run); --partition-key " "(every run with that exact partition_key); or a partition_date window via " - "--partition-date-start and/or --partition-date-end (inclusive on both ends). " + "--partition-date-start and/or --partition-date-end (both bounds are inclusive local " + "calendar days, anchored in the Dag's timetable timezone). " "Intended for partitioned Dags, whose runs are keyed by partition_date / " "partition_key instead of logical_date. For traditional, non-partitioned Dags, use " "`airflow tasks clear --start-date / --end-date`." diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py b/airflow-core/src/airflow/cli/commands/dag_command.py index 267fcad6a22e5..bfe94261bab05 100644 --- a/airflow-core/src/airflow/cli/commands/dag_command.py +++ b/airflow-core/src/airflow/cli/commands/dag_command.py @@ -126,7 +126,15 @@ def dag_delete(args) -> None: @providers_configuration_loaded @provide_session def dag_clear(args, *, session: Session = NEW_SESSION) -> None: - """Clear Dag runs selected by run_id, partition_key, or a partition_date window.""" + """ + Clear Dag runs selected by run_id, partition_key, or a partition_date window. + + When a partition_date window is given, both bounds are **day-granular** and + anchored in the timetable's timezone for tz-aware partitioned timetables. + --partition-date-start is the inclusive start local calendar day; + --partition-date-end is the inclusive end local calendar day (any + time-of-day component in either value is ignored). + """ has_range = args.partition_date_start is not None or args.partition_date_end is not None selectors_used = sum([args.run_id is not None, args.partition_key is not None, has_range]) if selectors_used == 0: @@ -157,10 +165,52 @@ def dag_clear(args, *, session: Session = NEW_SESSION) -> None: query = query.where(DagRun.partition_key == args.partition_key) else: query = query.where(DagRun.partition_date.is_not(None)) - if args.partition_date_start is not None: - query = query.where(DagRun.partition_date >= args.partition_date_start) - if args.partition_date_end is not None: - query = query.where(DagRun.partition_date <= args.partition_date_end) + tt_tz = getattr(dag.timetable, "timezone", None) if dag.timetable.partitioned else None + if tt_tz is not None: + # Partitioned runs are stored as local-midnight UTC instants; compare at day + # granularity in the timetable's timezone rather than at the raw UTC instant. + if args.partition_date_start is not None: + start_label = args.partition_date_start.date() + lower_utc = timezone.convert_to_utc( + timezone.make_aware( + datetime.datetime(start_label.year, start_label.month, start_label.day), + tt_tz, + ) + ) + query = query.where(DagRun.partition_date >= lower_utc) + if args.partition_date_end is not None: + end_label = args.partition_date_end.date() + # Half-open upper bound: include all of the end local calendar day. + next_day = datetime.date(end_label.year, end_label.month, end_label.day) + datetime.timedelta( + days=1 + ) + upper_utc = timezone.convert_to_utc( + timezone.make_aware( + datetime.datetime(next_day.year, next_day.month, next_day.day), + tt_tz, + ) + ) + query = query.where(DagRun.partition_date < upper_utc) + else: + # No timetable timezone: partition_date values are midnight-anchored UTC dates, + # so time-of-day on the CLI flags is not meaningful — truncate to calendar day. + if args.partition_date_start is not None: + start_day = args.partition_date_start.date() + query = query.where( + DagRun.partition_date + >= datetime.datetime( + start_day.year, start_day.month, start_day.day, tzinfo=datetime.timezone.utc + ) + ) + if args.partition_date_end is not None: + end_day = args.partition_date_end.date() + next_day = end_day + datetime.timedelta(days=1) + query = query.where( + DagRun.partition_date + < datetime.datetime( + next_day.year, next_day.month, next_day.day, tzinfo=datetime.timezone.utc + ) + ) query = query.order_by(DagRun.partition_date, DagRun.run_id) runs = list(session.execute(query).all()) diff --git a/airflow-core/src/airflow/timetables/_cron.py b/airflow-core/src/airflow/timetables/_cron.py index f48593dea4ad4..c040e6d80eb86 100644 --- a/airflow-core/src/airflow/timetables/_cron.py +++ b/airflow-core/src/airflow/timetables/_cron.py @@ -137,6 +137,10 @@ def __hash__(self): def summary(self) -> str: return self._expression + @property + def timezone(self) -> Timezone | FixedTimezone: + return self._timezone + def validate(self) -> None: try: croniter(self._expression) diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py b/airflow-core/tests/unit/cli/commands/test_dag_command.py index 37178a85ef522..204f366bb0cef 100644 --- a/airflow-core/tests/unit/cli/commands/test_dag_command.py +++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py @@ -49,6 +49,7 @@ from airflow.sdk.execution_time.comms import _RequestFrame, _ResponseFrame from airflow.serialization.serialized_objects import DagSerialization, LazyDeserializedDAG from airflow.triggers.base import TriggerEvent +from airflow.utils.cli import get_db_dag from airflow.utils.session import create_session from airflow.utils.state import DagRunState, TaskInstanceState from airflow.utils.types import DagRunType @@ -1461,6 +1462,326 @@ def test_missing_dag_raises(self, parser): with pytest.raises(AirflowException, match="could not be found in the database"): dag_command.dag_clear(args) + @pytest.fixture + def seeded_taipei_runs(self, dag_maker): + """Seed DagRuns for a Asia/Taipei (UTC+8) CronPartitionTimetable. + + Local midnight in Taipei is stored as UTC-8h in partition_date. + Written as explicit UTC instants so the oracle is independent of the + timetable under test: + + local 2026-02-18 midnight → datetime(2026, 2, 17, 16, 0, 0, UTC) + local 2026-02-19 midnight → datetime(2026, 2, 18, 16, 0, 0, UTC) + local 2026-02-20 midnight → datetime(2026, 2, 19, 16, 0, 0, UTC) (outside window) + """ + with dag_maker( + self.DAG_ID, + schedule=CronPartitionTimetable("0 0 * * *", timezone="Asia/Taipei"), + start_date=datetime(2026, 2, 1, tzinfo=pendulum.UTC), + catchup=True, + serialized=True, + ): + EmptyOperator(task_id="t1") + dag_maker.create_dagrun( + run_id="taipei_2026_02_18", + state=DagRunState.SUCCESS, + logical_date=None, + partition_date=datetime(2026, 2, 17, 16, 0, 0, tzinfo=pendulum.UTC), + partition_key="2026-02-18T00:00:00", + ) + dag_maker.create_dagrun( + run_id="taipei_2026_02_19", + state=DagRunState.SUCCESS, + logical_date=None, + partition_date=datetime(2026, 2, 18, 16, 0, 0, tzinfo=pendulum.UTC), + partition_key="2026-02-19T00:00:00", + ) + dag_maker.create_dagrun( + run_id="taipei_2026_02_20", + state=DagRunState.SUCCESS, + logical_date=None, + partition_date=datetime(2026, 2, 19, 16, 0, 0, tzinfo=pendulum.UTC), + partition_key="2026-02-20T00:00:00", + ) + dag_maker.sync_dagbag_to_db() + + @pytest.mark.usefixtures("seeded_taipei_runs") + def test_taipei_lower_bound_selects_correct_partition(self, parser): + """--partition-date-start 2026-02-19 must match the run stored at 2026-02-18T16Z. + + Without the timezone fix, parsedate("2026-02-19") yields 2026-02-19T00:00:00Z + under the UTC default timezone. The old filter compares + partition_date >= 2026-02-19T00:00Z; the run for the local 2026-02-19 partition + is stored as 2026-02-18T16:00Z, which is *before* that UTC boundary, so the run + is NOT selected and the requested boundary day is silently missed. Converting + the bound through the timetable timezone fixes the off-by-one: the run whose + partition_date is the UTC representation of Taipei 2026-02-19 midnight is + selected, the earlier run is not. + """ + args = parser.parse_args( + [ + "dags", + "clear", + self.DAG_ID, + "--partition-date-start", + "2026-02-19", + "--yes", + ] + ) + dag_command.dag_clear(args) + + states = self._get_run_states() + # 2026-02-19 local midnight stored as 2026-02-18T16Z — must be cleared. + assert states["taipei_2026_02_19"] == DagRunState.QUEUED + # 2026-02-20 local midnight stored as 2026-02-19T16Z — also in window (no upper bound). + assert states["taipei_2026_02_20"] == DagRunState.QUEUED + # 2026-02-18 local midnight stored as 2026-02-17T16Z — before the start, must NOT be cleared. + assert states["taipei_2026_02_18"] == DagRunState.SUCCESS + + @pytest.mark.usefixtures("seeded_taipei_runs") + def test_taipei_upper_bound_at_cap(self, parser): + """--partition-date-end 2026-02-19 must include the run stored at 2026-02-18T16Z (at-cap).""" + args = parser.parse_args( + [ + "dags", + "clear", + self.DAG_ID, + "--partition-date-end", + "2026-02-19", + "--yes", + ] + ) + dag_command.dag_clear(args) + + states = self._get_run_states() + # Both 2026-02-18 and 2026-02-19 local dates are within the window. + assert states["taipei_2026_02_18"] == DagRunState.QUEUED + assert states["taipei_2026_02_19"] == DagRunState.QUEUED + # 2026-02-20 is outside the half-open upper bound — must NOT be cleared. + assert states["taipei_2026_02_20"] == DagRunState.SUCCESS + + @pytest.mark.usefixtures("seeded_taipei_runs") + def test_taipei_upper_bound_over_cap(self, parser): + """--partition-date-end 2026-02-18 must NOT include the run stored at 2026-02-18T16Z (over-cap). + + The half-open upper bound is strictly less than 2026-02-19 midnight in + Taipei (= 2026-02-18T16Z), so the run for local 2026-02-19 falls outside + the window. + """ + args = parser.parse_args( + [ + "dags", + "clear", + self.DAG_ID, + "--partition-date-end", + "2026-02-18", + "--yes", + ] + ) + dag_command.dag_clear(args) + + states = self._get_run_states() + # Only the 2026-02-18 local date run is within the window. + assert states["taipei_2026_02_18"] == DagRunState.QUEUED + # 2026-02-19 and 2026-02-20 are outside — must NOT be cleared. + assert states["taipei_2026_02_19"] == DagRunState.SUCCESS + assert states["taipei_2026_02_20"] == DagRunState.SUCCESS + + @pytest.fixture + def seeded_ny_runs(self, dag_maker): + """Seed DagRuns for an America/New_York (UTC-5, EST in February) CronPartitionTimetable. + + Local midnight in New York is stored as UTC+5h in partition_date, so for a + west-of-UTC timetable the pre-fix off-by-one lands on the *upper* bound (the + end day's run sits later in the UTC day than the user-typed boundary) rather + than the lower bound exercised by the Asia/Taipei cases. Explicit UTC instants + keep the oracle independent of the timetable under test: + + local 2026-02-18 midnight → datetime(2026, 2, 18, 5, 0, 0, UTC) + local 2026-02-19 midnight → datetime(2026, 2, 19, 5, 0, 0, UTC) + local 2026-02-20 midnight → datetime(2026, 2, 20, 5, 0, 0, UTC) (outside window) + """ + with dag_maker( + self.DAG_ID, + schedule=CronPartitionTimetable("0 0 * * *", timezone="America/New_York"), + start_date=datetime(2026, 2, 1, tzinfo=pendulum.UTC), + catchup=True, + serialized=True, + ): + EmptyOperator(task_id="t1") + dag_maker.create_dagrun( + run_id="ny_2026_02_18", + state=DagRunState.SUCCESS, + logical_date=None, + partition_date=datetime(2026, 2, 18, 5, 0, 0, tzinfo=pendulum.UTC), + partition_key="2026-02-18T00:00:00", + ) + dag_maker.create_dagrun( + run_id="ny_2026_02_19", + state=DagRunState.SUCCESS, + logical_date=None, + partition_date=datetime(2026, 2, 19, 5, 0, 0, tzinfo=pendulum.UTC), + partition_key="2026-02-19T00:00:00", + ) + dag_maker.create_dagrun( + run_id="ny_2026_02_20", + state=DagRunState.SUCCESS, + logical_date=None, + partition_date=datetime(2026, 2, 20, 5, 0, 0, tzinfo=pendulum.UTC), + partition_key="2026-02-20T00:00:00", + ) + dag_maker.sync_dagbag_to_db() + + @pytest.mark.usefixtures("seeded_ny_runs") + def test_ny_upper_bound_includes_end_day(self, parser): + """--partition-date-end 2026-02-19 must include the local 2026-02-19 run (stored 2026-02-19T05Z). + + parsedate("2026-02-19") yields 2026-02-19T00:00Z, so the pre-fix filter compares + partition_date <= 2026-02-19T00:00Z. The local 2026-02-19 run is stored at + 2026-02-19T05:00Z, which is *after* that UTC boundary, so the old code wrongly + drops the requested end day. The timezone-aware half-open bound includes it. + """ + args = parser.parse_args( + [ + "dags", + "clear", + self.DAG_ID, + "--partition-date-end", + "2026-02-19", + "--yes", + ] + ) + dag_command.dag_clear(args) + + states = self._get_run_states() + # local 2026-02-18 and 2026-02-19 are within the window (no lower bound). + assert states["ny_2026_02_18"] == DagRunState.QUEUED + # The end-day run: dropped by the pre-fix code, included after the fix. + assert states["ny_2026_02_19"] == DagRunState.QUEUED + # local 2026-02-20 is outside the half-open upper bound. + assert states["ny_2026_02_20"] == DagRunState.SUCCESS + + # ------------------------------------------------------------------ + # No-tz fallback: partitioned=True but timetable has no timezone attr + # ------------------------------------------------------------------ + + @pytest.fixture + def seeded_no_tz_runs(self, dag_maker): + """Seed runs with UTC midnight partition_dates for the no-tz fallback path. + + The Dag is created with CronPartitionTimetable(timezone=UTC) so that + serialization works normally. The tests monkeypatch get_db_dag to swap + the timetable for a stub that has partitioned=True but no timezone + attribute, forcing the else-branch in dag_clear. + + Stored partition_dates are plain UTC midnights: + 2026-03-08 → datetime(2026, 3, 8, 0, 0, 0, UTC) + 2026-03-09 → datetime(2026, 3, 9, 0, 0, 0, UTC) + """ + with dag_maker( + self.DAG_ID, + schedule=CronPartitionTimetable("0 0 * * *", timezone=pendulum.UTC), + start_date=datetime(2026, 3, 1, tzinfo=pendulum.UTC), + catchup=True, + serialized=True, + ): + EmptyOperator(task_id="t1") + dag_maker.create_dagrun( + run_id="no_tz_2026_03_08", + state=DagRunState.SUCCESS, + logical_date=None, + partition_date=datetime(2026, 3, 8, tzinfo=pendulum.UTC), + partition_key="2026-03-08T00:00:00", + ) + dag_maker.create_dagrun( + run_id="no_tz_2026_03_09", + state=DagRunState.SUCCESS, + logical_date=None, + partition_date=datetime(2026, 3, 9, tzinfo=pendulum.UTC), + partition_key="2026-03-09T00:00:00", + ) + dag_maker.sync_dagbag_to_db() + + @pytest.mark.usefixtures("seeded_no_tz_runs") + def test_no_tz_lower_bound_truncates_time_of_day(self, parser, monkeypatch): + """--partition-date-start with a non-midnight time-of-day must still match the day. + + With no timetable timezone the else-branch truncates the CLI value to + date() and compares against UTC midnight. A start of 2026-03-08T12:00:00 + truncates to 2026-03-08, giving lower = 2026-03-08T00:00Z, which is <= + the stored 2026-03-08T00:00Z run (so it is included). Without truncation + the raw-instant comparison would be 2026-03-08T12:00Z > 2026-03-08T00:00Z + and the run would be missed. + """ + + class _NoTzTimetable: + partitioned = True + # no timezone attribute — triggers the no-tz else-branch + + def _patched(*, bundle_names, dag_id): + dag = get_db_dag(bundle_names=bundle_names, dag_id=dag_id) + dag.timetable = _NoTzTimetable() + return dag + + monkeypatch.setattr(dag_command, "get_db_dag", _patched) + + args = parser.parse_args( + [ + "dags", + "clear", + self.DAG_ID, + "--partition-date-start", + "2026-03-08T12:00:00", + "--yes", + ] + ) + dag_command.dag_clear(args) + + states = self._get_run_states() + # 2026-03-08T12 truncates to 2026-03-08 → lower = 2026-03-08T00Z; run is included. + assert states["no_tz_2026_03_08"] == DagRunState.QUEUED + # 2026-03-09 is also >= 2026-03-08T00Z (no upper bound). + assert states["no_tz_2026_03_09"] == DagRunState.QUEUED + + @pytest.mark.usefixtures("seeded_no_tz_runs") + def test_no_tz_upper_bound_is_half_open(self, parser, monkeypatch): + """--partition-date-end 2026-03-08T00:00:00 must include 2026-03-08 and exclude 2026-03-09. + + The end date truncates to 2026-03-08; next_day = 2026-03-09, upper = + 2026-03-09T00:00Z (half-open). The 2026-03-08T00Z run satisfies + partition_date < 2026-03-09T00Z (included). The 2026-03-09T00Z run + does NOT satisfy partition_date < 2026-03-09T00Z (excluded). + """ + + class _NoTzTimetable: + partitioned = True + # no timezone attribute — triggers the no-tz else-branch + + def _patched(*, bundle_names, dag_id): + dag = get_db_dag(bundle_names=bundle_names, dag_id=dag_id) + dag.timetable = _NoTzTimetable() + return dag + + monkeypatch.setattr(dag_command, "get_db_dag", _patched) + + args = parser.parse_args( + [ + "dags", + "clear", + self.DAG_ID, + "--partition-date-end", + "2026-03-08T00:00:00", + "--yes", + ] + ) + dag_command.dag_clear(args) + + states = self._get_run_states() + # 2026-03-08T00Z < 2026-03-09T00Z (upper, half-open) → included. + assert states["no_tz_2026_03_08"] == DagRunState.QUEUED + # 2026-03-09T00Z is NOT < 2026-03-09T00Z → excluded. + assert states["no_tz_2026_03_09"] == DagRunState.SUCCESS + class TestDagDetailsIsBackfillable: """Tests for the is_backfillable computation in _get_dagbag_dag_details."""