diff --git a/airflow/timetables/_cron.py b/airflow/timetables/_cron.py index 7337478c2f43d..b18fc3eaf84da 100644 --- a/airflow/timetables/_cron.py +++ b/airflow/timetables/_cron.py @@ -116,7 +116,7 @@ def _get_prev(self, current: DateTime) -> DateTime: delta = naive - scheduled return convert_to_utc(current.in_timezone(self._timezone) - delta) - def _align(self, current: DateTime) -> DateTime: + def _align_to_next(self, current: DateTime) -> DateTime: """Get the next scheduled time. This is ``current + interval``, unless ``current`` falls right on the @@ -126,3 +126,14 @@ def _align(self, current: DateTime) -> DateTime: if self._get_prev(next_time) != current: return next_time return current + + def _align_to_prev(self, current: DateTime) -> DateTime: + """Get the prev scheduled time. + + This is ``current - interval``, unless ``current`` falls right on the + interval boundary, when ``current`` is returned. + """ + prev_time = self._get_prev(current) + if self._get_next(prev_time) != current: + return prev_time + return current diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py index 4e162dc15b9a1..1b3e639c15e9c 100644 --- a/airflow/timetables/interval.py +++ b/airflow/timetables/interval.py @@ -37,8 +37,16 @@ class _DataIntervalTimetable(Timetable): instance), and schedule a DagRun at the end of each interval. """ - def _align(self, current: DateTime) -> DateTime: - """Align given time to the scheduled. + def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime: + """Bound the earliest time a run can be scheduled. + + This is called when ``catchup=False``. See docstring of subclasses for + exact skipping behaviour of a schedule. + """ + raise NotImplementedError() + + def _align_to_next(self, current: DateTime) -> DateTime: + """Align given time to the next scheduled time. For fixed schedules (e.g. every midnight); this finds the next time that aligns to the declared time, if the given time does not align. If the @@ -46,6 +54,19 @@ def _align(self, current: DateTime) -> DateTime: """ raise NotImplementedError() + def _align_to_prev(self, current: DateTime) -> DateTime: + """Align given time to the previous scheduled time. + + For fixed schedules (e.g. every midnight); this finds the prev time that + aligns to the declared time, if the given time does not align. If the + schedule is not fixed (e.g. every hour), the given time is returned. + + It is not enough to use ``_get_prev(_align_to_next())``, since when a + DAG's schedule changes, this alternative would make the first scheduling + after the schedule change remain the same. + """ + raise NotImplementedError() + def _get_next(self, current: DateTime) -> DateTime: """Get the first schedule after the current time.""" raise NotImplementedError() @@ -54,14 +75,6 @@ def _get_prev(self, current: DateTime) -> DateTime: """Get the last schedule before the current time.""" raise NotImplementedError() - def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime: - """Bound the earliest time a run can be scheduled. - - This is called when ``catchup=False``. See docstring of subclasses for - exact skipping behaviour of a schedule. - """ - raise NotImplementedError() - def next_dagrun_info( self, *, @@ -72,7 +85,7 @@ def next_dagrun_info( if not restriction.catchup: earliest = self._skip_to_latest(earliest) elif earliest is not None: - earliest = self._align(earliest) + earliest = self._align_to_next(earliest) if last_automated_data_interval is None: # First run; schedule the run at the first available time matching # the schedule, and retrospectively create a data interval for it. @@ -80,13 +93,15 @@ def next_dagrun_info( return None start = earliest else: # There's a previous run. + # Alignment is needed when DAG has new schedule interval. + align_last_data_interval_end = self._align_to_prev(last_automated_data_interval.end) if earliest is not None: # Catchup is False or DAG has new start date in the future. # Make sure we get the later one. - start = max(last_automated_data_interval.end, earliest) + start = max(align_last_data_interval_end, earliest) else: # Data interval starts from the end of the previous interval. - start = last_automated_data_interval.end + start = align_last_data_interval_end if restriction.latest is not None and start > restriction.latest: return None end = self._get_next(start) @@ -138,13 +153,13 @@ def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime: raise AssertionError("next schedule shouldn't be earlier") if earliest is None: return new_start - return max(new_start, self._align(earliest)) + return max(new_start, self._align_to_next(earliest)) def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval: # Get the last complete period before run_after, e.g. if a DAG run is # scheduled at each midnight, the data interval of a manually triggered # run at 1am 25th is between 0am 24th and 0am 25th. - end = self._get_prev(self._align(run_after)) + end = self._align_to_prev(run_after) return DataInterval(start=self._get_prev(end), end=end) @@ -202,7 +217,10 @@ def _get_next(self, current: DateTime) -> DateTime: def _get_prev(self, current: DateTime) -> DateTime: return convert_to_utc(current - self._delta) - def _align(self, current: DateTime) -> DateTime: + def _align_to_next(self, current: DateTime) -> DateTime: + return current + + def _align_to_prev(self, current: DateTime) -> DateTime: return current def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime: diff --git a/airflow/timetables/trigger.py b/airflow/timetables/trigger.py index 4a3ffe18e40c1..7d5eaa022d305 100644 --- a/airflow/timetables/trigger.py +++ b/airflow/timetables/trigger.py @@ -86,15 +86,15 @@ def next_dagrun_info( if last_automated_data_interval is None: if restriction.earliest is None: return None - next_start_time = self._align(restriction.earliest) + next_start_time = self._align_to_next(restriction.earliest) else: next_start_time = self._get_next(last_automated_data_interval.end) else: current_time = DateTime.utcnow() if restriction.earliest is not None and current_time < restriction.earliest: - next_start_time = self._align(restriction.earliest) + next_start_time = self._align_to_next(restriction.earliest) else: - next_start_time = self._align(current_time) + next_start_time = self._align_to_next(current_time) if restriction.latest is not None and restriction.latest < next_start_time: return None return DagRunInfo.interval(next_start_time - self._interval, next_start_time) diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py index 13ebb5a87cb3c..8f73a32909441 100644 --- a/tests/timetables/test_interval_timetable.py +++ b/tests/timetables/test_interval_timetable.py @@ -37,6 +37,7 @@ CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE) YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1) +OLD_INTERVAL = DataInterval(start=YESTERDAY, end=CURRENT_TIME) HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE) HOURLY_TIMEDELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1)) @@ -63,6 +64,29 @@ def test_no_catchup_first_starts_at_current_time( assert next_info == DagRunInfo.interval(start=expected_start, end=CURRENT_TIME + DELTA_FROM_MIDNIGHT) +@pytest.mark.parametrize( + "earliest", + [pytest.param(None, id="none"), pytest.param(START_DATE, id="start_date")], +) +@pytest.mark.parametrize( + "catchup", + [pytest.param(True, id="catchup_true"), pytest.param(False, id="catchup_false")], +) +@freezegun.freeze_time(CURRENT_TIME) +def test_new_schedule_interval_next_info_starts_at_new_time( + earliest: Optional[pendulum.DateTime], + catchup: bool, +) -> None: + """First run after DAG has new schedule interval.""" + next_info = CRON_TIMETABLE.next_dagrun_info( + last_automated_data_interval=OLD_INTERVAL, + restriction=TimeRestriction(earliest=earliest, latest=None, catchup=catchup), + ) + expected_start = YESTERDAY + datetime.timedelta(hours=16, minutes=30) + expected_end = CURRENT_TIME + datetime.timedelta(hours=16, minutes=30) + assert next_info == DagRunInfo.interval(start=expected_start, end=expected_end) + + @pytest.mark.parametrize( "timetable", [ @@ -159,3 +183,72 @@ def test_validate_failure(timetable: Timetable, error_message: str) -> None: def test_cron_interval_timezone_from_string(): timetable = CronDataIntervalTimetable("@hourly", "UTC") assert timetable.serialize()['timezone'] == 'UTC' + + +@pytest.mark.parametrize( + "trigger_at, expected_interval", + [ + # Arbitrary trigger time. + pytest.param( + pendulum.DateTime(2022, 8, 8, 1, tzinfo=TIMEZONE), + DataInterval( + pendulum.DateTime(2022, 8, 7, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + ), + id="adhoc", + ), + # Trigger time falls exactly on interval boundary. + pytest.param( + pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + DataInterval( + pendulum.DateTime(2022, 8, 7, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + ), + id="exact", + ), + ], +) +def test_cron_infer_manual_data_interval_alignment( + trigger_at: pendulum.DateTime, + expected_interval: DataInterval, +) -> None: + timetable = CronDataIntervalTimetable("@daily", TIMEZONE) + assert timetable.infer_manual_data_interval(run_after=trigger_at) == expected_interval + + +@pytest.mark.parametrize( + "last_data_interval, expected_info", + [ + pytest.param( + DataInterval( + pendulum.DateTime(2022, 8, 7, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + ), + DagRunInfo.interval( + pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 9, tzinfo=TIMEZONE), + ), + id="exact", + ), + pytest.param( + # Previous data interval does not align with the current timetable. + # This is possible if the user edits a DAG with existing runs. + DataInterval( + pendulum.DateTime(2022, 8, 7, 1, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 8, 1, tzinfo=TIMEZONE), + ), + DagRunInfo.interval( + pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE), + pendulum.DateTime(2022, 8, 9, tzinfo=TIMEZONE), + ), + id="changed", + ), + ], +) +def test_cron_next_dagrun_info_alignment(last_data_interval: DataInterval, expected_info: DagRunInfo): + timetable = CronDataIntervalTimetable("@daily", TIMEZONE) + info = timetable.next_dagrun_info( + last_automated_data_interval=last_data_interval, + restriction=TimeRestriction(None, None, True), + ) + assert info == expected_info