Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2585,6 +2585,10 @@ scheduler:
Notably, for **CronTriggerTimetable**, the logical date is the same as the time the DAG Run will
try to schedule, while for **CronDataIntervalTimetable**, the logical date is the beginning of
the data interval, but the DAG Run will try to schedule at the end of the data interval.

When a DAG is switched from **CronTriggerTimetable** to **CronDataIntervalTimetable** (for example,
by flipping this setting from ``False`` to ``True``), the next scheduled run skips one period past
the most recent **CronTriggerTimetable** run to avoid colliding with its logical date.
version_added: 2.9.0
type: boolean
example: ~
Expand Down
14 changes: 14 additions & 0 deletions airflow-core/src/airflow/timetables/interval.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,20 @@ def next_dagrun_info(
else:
# Data interval starts from the end of the previous interval.
start = align_last_data_interval_end

# CronTriggerTimetable stores its runs as point-in-time intervals
# (start == end == logical_date). After a switch to a
# CronDataIntervalTimetable the aligned `start` lands back on that
# same logical_date, so without this guard we'd propose a run
# identical to the existing one — which collides with the
# (dag_id, logical_date) unique constraint and leaves the scheduler
# looping on "run already exists; skipping dagrun creation" until
# the next period elapses. Advance one period to skip past it.
if (
last_automated_data_interval.start == last_automated_data_interval.end
and start == last_automated_data_interval.start
):
start = self._get_next(start)
if restriction.latest is not None and start > restriction.latest:
return None
end = self._get_next(start)
Expand Down
13 changes: 7 additions & 6 deletions airflow-core/tests/unit/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ def test_bulk_write_to_db_max_active_runs(self, testing_dag_bundle, state, catch
logical_date=model.next_dagrun,
run_type=DagRunType.SCHEDULED,
session=session,
data_interval=(model.next_dagrun, model.next_dagrun),
data_interval=(model.next_dagrun, model.next_dagrun + timedelta(days=1)),
run_after=model.next_dagrun_create_after,
triggered_by=DagRunTriggeredByType.TEST,
)
Expand All @@ -638,7 +638,7 @@ def test_bulk_write_to_db_max_active_runs(self, testing_dag_bundle, state, catch
assert model.exceeds_max_non_backfill is True

if catchup is True:
assert model.next_dagrun_create_after == DEFAULT_DATE + timedelta(days=1)
assert model.next_dagrun_create_after == DEFAULT_DATE + timedelta(days=2)
else:
assert model.next_dagrun_create_after > current_time + timedelta(days=-2) # allow for fuzz

Expand Down Expand Up @@ -967,7 +967,10 @@ def test_dag_handle_callback_with_removed_task(self, dag_maker, session, testing
dag_run.execute_dag_callbacks(dag=dag, success=True)

@time_machine.travel(timezone.datetime(2025, 11, 11))
@pytest.mark.parametrize(("catchup", "expected_next_dagrun"), [(True, DEFAULT_DATE), (False, None)])
@pytest.mark.parametrize(
("catchup", "expected_next_dagrun"),
[(True, DEFAULT_DATE + datetime.timedelta(hours=1)), (False, None)],
)
def test_next_dagrun_after_fake_scheduled_previous(
self, catchup, expected_next_dagrun, testing_dag_bundle
):
Expand Down Expand Up @@ -995,7 +998,7 @@ def test_next_dagrun_after_fake_scheduled_previous(
run_type=DagRunType.SCHEDULED,
logical_date=DEFAULT_DATE,
state=State.SUCCESS,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
data_interval=(DEFAULT_DATE, DEFAULT_DATE + delta),
)
sync_dag_to_db(dag)
with create_session() as session:
Expand All @@ -1008,8 +1011,6 @@ def test_next_dagrun_after_fake_scheduled_previous(
# Verify next_dagrun_create_after is scheduled after next_dagrun
assert model.next_dagrun_create_after > model.next_dagrun
else:
# For catchup=True, even though there is a run for this date already,
# it is marked as manual/external, so we should create a scheduled one anyway!
assert model.next_dagrun == expected_next_dagrun
assert model.next_dagrun_create_after == expected_next_dagrun + delta

Expand Down
1 change: 0 additions & 1 deletion airflow-core/tests/unit/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -2260,7 +2260,6 @@ def test_schedule_tis_only_one_scheduler_update_succeeds_when_competing(dag_make
assert refreshed_ti.try_number == 1


@pytest.mark.xfail(reason="We can't keep this behaviour with remote workers where scheduler can't reach xcom")
@pytest.mark.need_serialized_dag
def test_schedule_tis_start_trigger(dag_maker, session):
"""
Expand Down
25 changes: 25 additions & 0 deletions airflow-core/tests/unit/timetables/test_interval_timetable.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,31 @@ def test_no_catchup_first_starts_at_current_time(
assert next_info == DagRunInfo.interval(start=expected_start, end=CURRENT_TIME + DELTA_FROM_MIDNIGHT)


@pytest.mark.parametrize(
"catchup",
[pytest.param(True, id="catchup_true"), pytest.param(False, id="catchup_false")],
)
@time_machine.travel(pendulum.DateTime(2021, 9, 7, 15, tzinfo=utc))
def test_zero_length_last_interval_does_not_re_emit_logical_date(catchup: bool) -> None:
"""A zero-length ``data_interval`` (``start == end``) on the previous run
must not cause ``next_dagrun_info`` to re-emit that run's logical_date.

These appear when a DAG was scheduled by ``CronTriggerTimetable`` and later
switched to ``CronDataIntervalTimetable``. Without the guard the scheduler
loops on "run already exists; skipping dagrun creation".
"""
timetable = CronDataIntervalTimetable("0 17 * * *", utc)
last_run_at = pendulum.DateTime(2021, 9, 5, 17, tzinfo=utc)
last = DataInterval(start=last_run_at, end=last_run_at)
next_info = timetable.next_dagrun_info(
last_automated_data_interval=last,
restriction=TimeRestriction(earliest=None, latest=None, catchup=catchup),
)
expected_start = pendulum.DateTime(2021, 9, 6, 17, tzinfo=utc)
expected_end = pendulum.DateTime(2021, 9, 7, 17, tzinfo=utc)
assert next_info == DagRunInfo.interval(start=expected_start, end=expected_end)


@pytest.mark.parametrize(
"earliest",
[pytest.param(None, id="none"), pytest.param(START_DATE, id="start_date")],
Expand Down
Loading