Skip to content

Commit

Permalink
Fix incorrect data interval alignment due to assumption on input time…
Browse files Browse the repository at this point in the history
… alignment (#22658)

* Fix incorrect data_interval_start due to DAG's schedule changed

This PR fixes the incorrect first run of data_interval_start after
changing the scheduling time.

* Added _align_to_prev function for scheduling alignment after time change.

* renamed _align to _align_to_next.

Co-authored-by: wanlce <who@foxmail.com>
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
  • Loading branch information
3 people committed Aug 16, 2022
1 parent 026f1bb commit d991d98
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 20 deletions.
13 changes: 12 additions & 1 deletion airflow/timetables/_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def _get_prev(self, current: DateTime) -> DateTime:
delta = naive - scheduled
return convert_to_utc(current.in_timezone(self._timezone) - delta)

def _align(self, current: DateTime) -> DateTime:
def _align_to_next(self, current: DateTime) -> DateTime:
"""Get the next scheduled time.
This is ``current + interval``, unless ``current`` falls right on the
Expand All @@ -126,3 +126,14 @@ def _align(self, current: DateTime) -> DateTime:
if self._get_prev(next_time) != current:
return next_time
return current

def _align_to_prev(self, current: DateTime) -> DateTime:
"""Get the prev scheduled time.
This is ``current - interval``, unless ``current`` falls right on the
interval boundary, when ``current`` is returned.
"""
prev_time = self._get_prev(current)
if self._get_next(prev_time) != current:
return prev_time
return current
50 changes: 34 additions & 16 deletions airflow/timetables/interval.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,36 @@ class _DataIntervalTimetable(Timetable):
instance), and schedule a DagRun at the end of each interval.
"""

def _align(self, current: DateTime) -> DateTime:
"""Align given time to the scheduled.
def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime:
"""Bound the earliest time a run can be scheduled.
This is called when ``catchup=False``. See docstring of subclasses for
exact skipping behaviour of a schedule.
"""
raise NotImplementedError()

def _align_to_next(self, current: DateTime) -> DateTime:
"""Align given time to the next scheduled time.
For fixed schedules (e.g. every midnight); this finds the next time that
aligns to the declared time, if the given time does not align. If the
schedule is not fixed (e.g. every hour), the given time is returned.
"""
raise NotImplementedError()

def _align_to_prev(self, current: DateTime) -> DateTime:
"""Align given time to the previous scheduled time.
For fixed schedules (e.g. every midnight); this finds the prev time that
aligns to the declared time, if the given time does not align. If the
schedule is not fixed (e.g. every hour), the given time is returned.
It is not enough to use ``_get_prev(_align_to_next())``, since when a
DAG's schedule changes, this alternative would make the first scheduling
after the schedule change remain the same.
"""
raise NotImplementedError()

def _get_next(self, current: DateTime) -> DateTime:
"""Get the first schedule after the current time."""
raise NotImplementedError()
Expand All @@ -54,14 +75,6 @@ def _get_prev(self, current: DateTime) -> DateTime:
"""Get the last schedule before the current time."""
raise NotImplementedError()

def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime:
"""Bound the earliest time a run can be scheduled.
This is called when ``catchup=False``. See docstring of subclasses for
exact skipping behaviour of a schedule.
"""
raise NotImplementedError()

def next_dagrun_info(
self,
*,
Expand All @@ -72,21 +85,23 @@ def next_dagrun_info(
if not restriction.catchup:
earliest = self._skip_to_latest(earliest)
elif earliest is not None:
earliest = self._align(earliest)
earliest = self._align_to_next(earliest)
if last_automated_data_interval is None:
# First run; schedule the run at the first available time matching
# the schedule, and retrospectively create a data interval for it.
if earliest is None:
return None
start = earliest
else: # There's a previous run.
# Alignment is needed when DAG has new schedule interval.
align_last_data_interval_end = self._align_to_prev(last_automated_data_interval.end)
if earliest is not None:
# Catchup is False or DAG has new start date in the future.
# Make sure we get the later one.
start = max(last_automated_data_interval.end, earliest)
start = max(align_last_data_interval_end, earliest)
else:
# Data interval starts from the end of the previous interval.
start = last_automated_data_interval.end
start = align_last_data_interval_end
if restriction.latest is not None and start > restriction.latest:
return None
end = self._get_next(start)
Expand Down Expand Up @@ -138,13 +153,13 @@ def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime:
raise AssertionError("next schedule shouldn't be earlier")
if earliest is None:
return new_start
return max(new_start, self._align(earliest))
return max(new_start, self._align_to_next(earliest))

def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
# Get the last complete period before run_after, e.g. if a DAG run is
# scheduled at each midnight, the data interval of a manually triggered
# run at 1am 25th is between 0am 24th and 0am 25th.
end = self._get_prev(self._align(run_after))
end = self._align_to_prev(run_after)
return DataInterval(start=self._get_prev(end), end=end)


Expand Down Expand Up @@ -202,7 +217,10 @@ def _get_next(self, current: DateTime) -> DateTime:
def _get_prev(self, current: DateTime) -> DateTime:
return convert_to_utc(current - self._delta)

def _align(self, current: DateTime) -> DateTime:
def _align_to_next(self, current: DateTime) -> DateTime:
return current

def _align_to_prev(self, current: DateTime) -> DateTime:
return current

def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime:
Expand Down
6 changes: 3 additions & 3 deletions airflow/timetables/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ def next_dagrun_info(
if last_automated_data_interval is None:
if restriction.earliest is None:
return None
next_start_time = self._align(restriction.earliest)
next_start_time = self._align_to_next(restriction.earliest)
else:
next_start_time = self._get_next(last_automated_data_interval.end)
else:
current_time = DateTime.utcnow()
if restriction.earliest is not None and current_time < restriction.earliest:
next_start_time = self._align(restriction.earliest)
next_start_time = self._align_to_next(restriction.earliest)
else:
next_start_time = self._align(current_time)
next_start_time = self._align_to_next(current_time)
if restriction.latest is not None and restriction.latest < next_start_time:
return None
return DagRunInfo.interval(next_start_time - self._interval, next_start_time)
93 changes: 93 additions & 0 deletions tests/timetables/test_interval_timetable.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE)
YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1)
OLD_INTERVAL = DataInterval(start=YESTERDAY, end=CURRENT_TIME)

HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE)
HOURLY_TIMEDELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1))
Expand All @@ -63,6 +64,29 @@ def test_no_catchup_first_starts_at_current_time(
assert next_info == DagRunInfo.interval(start=expected_start, end=CURRENT_TIME + DELTA_FROM_MIDNIGHT)


@pytest.mark.parametrize(
"earliest",
[pytest.param(None, id="none"), pytest.param(START_DATE, id="start_date")],
)
@pytest.mark.parametrize(
"catchup",
[pytest.param(True, id="catchup_true"), pytest.param(False, id="catchup_false")],
)
@freezegun.freeze_time(CURRENT_TIME)
def test_new_schedule_interval_next_info_starts_at_new_time(
earliest: Optional[pendulum.DateTime],
catchup: bool,
) -> None:
"""First run after DAG has new schedule interval."""
next_info = CRON_TIMETABLE.next_dagrun_info(
last_automated_data_interval=OLD_INTERVAL,
restriction=TimeRestriction(earliest=earliest, latest=None, catchup=catchup),
)
expected_start = YESTERDAY + datetime.timedelta(hours=16, minutes=30)
expected_end = CURRENT_TIME + datetime.timedelta(hours=16, minutes=30)
assert next_info == DagRunInfo.interval(start=expected_start, end=expected_end)


@pytest.mark.parametrize(
"timetable",
[
Expand Down Expand Up @@ -159,3 +183,72 @@ def test_validate_failure(timetable: Timetable, error_message: str) -> None:
def test_cron_interval_timezone_from_string():
timetable = CronDataIntervalTimetable("@hourly", "UTC")
assert timetable.serialize()['timezone'] == 'UTC'


@pytest.mark.parametrize(
"trigger_at, expected_interval",
[
# Arbitrary trigger time.
pytest.param(
pendulum.DateTime(2022, 8, 8, 1, tzinfo=TIMEZONE),
DataInterval(
pendulum.DateTime(2022, 8, 7, tzinfo=TIMEZONE),
pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE),
),
id="adhoc",
),
# Trigger time falls exactly on interval boundary.
pytest.param(
pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE),
DataInterval(
pendulum.DateTime(2022, 8, 7, tzinfo=TIMEZONE),
pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE),
),
id="exact",
),
],
)
def test_cron_infer_manual_data_interval_alignment(
trigger_at: pendulum.DateTime,
expected_interval: DataInterval,
) -> None:
timetable = CronDataIntervalTimetable("@daily", TIMEZONE)
assert timetable.infer_manual_data_interval(run_after=trigger_at) == expected_interval


@pytest.mark.parametrize(
"last_data_interval, expected_info",
[
pytest.param(
DataInterval(
pendulum.DateTime(2022, 8, 7, tzinfo=TIMEZONE),
pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE),
),
DagRunInfo.interval(
pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE),
pendulum.DateTime(2022, 8, 9, tzinfo=TIMEZONE),
),
id="exact",
),
pytest.param(
# Previous data interval does not align with the current timetable.
# This is possible if the user edits a DAG with existing runs.
DataInterval(
pendulum.DateTime(2022, 8, 7, 1, tzinfo=TIMEZONE),
pendulum.DateTime(2022, 8, 8, 1, tzinfo=TIMEZONE),
),
DagRunInfo.interval(
pendulum.DateTime(2022, 8, 8, tzinfo=TIMEZONE),
pendulum.DateTime(2022, 8, 9, tzinfo=TIMEZONE),
),
id="changed",
),
],
)
def test_cron_next_dagrun_info_alignment(last_data_interval: DataInterval, expected_info: DagRunInfo):
timetable = CronDataIntervalTimetable("@daily", TIMEZONE)
info = timetable.next_dagrun_info(
last_automated_data_interval=last_data_interval,
restriction=TimeRestriction(None, None, True),
)
assert info == expected_info

0 comments on commit d991d98

Please sign in to comment.