diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py index d669cb652d153..01fac3a44e5d1 100644 --- a/airflow/timetables/interval.py +++ b/airflow/timetables/interval.py @@ -218,7 +218,7 @@ def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime: raise AssertionError("next schedule shouldn't be earlier") if earliest is None: return new_start - return max(new_start, earliest) + return max(new_start, self._align(earliest)) def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: # Get the last complete period before run_after, e.g. if a DAG run is diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py index 842cc1f234f3c..fe09e0c58969a 100644 --- a/tests/timetables/test_interval_timetable.py +++ b/tests/timetables/test_interval_timetable.py @@ -35,11 +35,32 @@ PREV_DATA_INTERVAL = DataInterval(start=PREV_DATA_INTERVAL_START, end=PREV_DATA_INTERVAL_END) CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE) +YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1) HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE) HOURLY_TIMEDELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1)) HOURLY_RELATIVEDELTA_TIMETABLE = DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta(hours=1)) +CRON_TIMETABLE = CronDataIntervalTimetable("30 16 * * *", TIMEZONE) +DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16) + + +@pytest.mark.parametrize( + "last_automated_data_interval", + [pytest.param(None, id="first-run"), pytest.param(PREV_DATA_INTERVAL, id="subsequent")], +) +@freezegun.freeze_time(CURRENT_TIME) +def test_no_catchup_first_starts_at_current_time( + last_automated_data_interval: Optional[DataInterval], +) -> None: + """If ``catchup=False`` and start_date is a day before""" + next_info = CRON_TIMETABLE.next_dagrun_info( + last_automated_data_interval=last_automated_data_interval, + restriction=TimeRestriction(earliest=YESTERDAY, latest=None, catchup=False), + ) + expected_start = YESTERDAY + DELTA_FROM_MIDNIGHT + assert next_info == DagRunInfo.interval(start=expected_start, end=CURRENT_TIME + DELTA_FROM_MIDNIGHT) + @pytest.mark.parametrize( "timetable",