Skip to content

Commit

Permalink
Fix the incorrect scheduling time for the first run of dag (#21011)
Browse files Browse the repository at this point in the history
When Catchup_by_default is set to false and start_date in the DAG is the
previous day, the first schedule time for this DAG may be incorrect

Co-authored-by: wanlce <who@foxmail.com>
(cherry picked from commit 0bcca55)
  • Loading branch information
wanlce authored and jedcunningham committed Feb 10, 2022
1 parent 160d879 commit 9e806aa
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/timetables/interval.py
Expand Up @@ -218,7 +218,7 @@ def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime:
raise AssertionError("next schedule shouldn't be earlier")
if earliest is None:
return new_start
return max(new_start, earliest)
return max(new_start, self._align(earliest))

def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
# Get the last complete period before run_after, e.g. if a DAG run is
Expand Down
21 changes: 21 additions & 0 deletions tests/timetables/test_interval_timetable.py
Expand Up @@ -35,11 +35,32 @@
PREV_DATA_INTERVAL = DataInterval(start=PREV_DATA_INTERVAL_START, end=PREV_DATA_INTERVAL_END)

CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE)
YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1)

HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE)
HOURLY_TIMEDELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1))
HOURLY_RELATIVEDELTA_TIMETABLE = DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta(hours=1))

CRON_TIMETABLE = CronDataIntervalTimetable("30 16 * * *", TIMEZONE)
DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16)


@pytest.mark.parametrize(
"last_automated_data_interval",
[pytest.param(None, id="first-run"), pytest.param(PREV_DATA_INTERVAL, id="subsequent")],
)
@freezegun.freeze_time(CURRENT_TIME)
def test_no_catchup_first_starts_at_current_time(
last_automated_data_interval: Optional[DataInterval],
) -> None:
"""If ``catchup=False`` and start_date is a day before"""
next_info = CRON_TIMETABLE.next_dagrun_info(
last_automated_data_interval=last_automated_data_interval,
restriction=TimeRestriction(earliest=YESTERDAY, latest=None, catchup=False),
)
expected_start = YESTERDAY + DELTA_FROM_MIDNIGHT
assert next_info == DagRunInfo.interval(start=expected_start, end=CURRENT_TIME + DELTA_FROM_MIDNIGHT)


@pytest.mark.parametrize(
"timetable",
Expand Down

0 comments on commit 9e806aa

Please sign in to comment.