From 64e0c5024b3cb13d2fc53f42b8096c2ae3441553 Mon Sep 17 00:00:00 2001 From: wano <55907021+wanlce@users.noreply.github.com> Date: Mon, 7 Feb 2022 02:02:57 +0800 Subject: [PATCH] Fix the incorrect scheduling time for the first run of dag (#21011) When Catchup_by_default is set to false and start_date in the DAG is the previous day, the first schedule time for this DAG may be incorrect Co-authored-by: wanlce (cherry picked from commit 0bcca55f4881bacc3fbe86f69e71981f5552b398) --- airflow/timetables/interval.py | 2 +- tests/timetables/test_interval_timetable.py | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py index d669cb652d153..01fac3a44e5d1 100644 --- a/airflow/timetables/interval.py +++ b/airflow/timetables/interval.py @@ -218,7 +218,7 @@ def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime: raise AssertionError("next schedule shouldn't be earlier") if earliest is None: return new_start - return max(new_start, earliest) + return max(new_start, self._align(earliest)) def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: # Get the last complete period before run_after, e.g. if a DAG run is diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py index 842cc1f234f3c..fe09e0c58969a 100644 --- a/tests/timetables/test_interval_timetable.py +++ b/tests/timetables/test_interval_timetable.py @@ -35,11 +35,32 @@ PREV_DATA_INTERVAL = DataInterval(start=PREV_DATA_INTERVAL_START, end=PREV_DATA_INTERVAL_END) CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE) +YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1) HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE) HOURLY_TIMEDELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1)) HOURLY_RELATIVEDELTA_TIMETABLE = DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta(hours=1)) +CRON_TIMETABLE = CronDataIntervalTimetable("30 16 * * *", TIMEZONE) +DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16) + + +@pytest.mark.parametrize( + "last_automated_data_interval", + [pytest.param(None, id="first-run"), pytest.param(PREV_DATA_INTERVAL, id="subsequent")], +) +@freezegun.freeze_time(CURRENT_TIME) +def test_no_catchup_first_starts_at_current_time( + last_automated_data_interval: Optional[DataInterval], +) -> None: + """If ``catchup=False`` and start_date is a day before""" + next_info = CRON_TIMETABLE.next_dagrun_info( + last_automated_data_interval=last_automated_data_interval, + restriction=TimeRestriction(earliest=YESTERDAY, latest=None, catchup=False), + ) + expected_start = YESTERDAY + DELTA_FROM_MIDNIGHT + assert next_info == DagRunInfo.interval(start=expected_start, end=CURRENT_TIME + DELTA_FROM_MIDNIGHT) + @pytest.mark.parametrize( "timetable",