Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,17 +372,29 @@ def deactivate_stale_dags(
):
"""Detect and deactivate DAGs which are no longer present in files."""
to_deactivate = set()
bundle_names = {b.name for b in self._dag_bundles}
inactive_bundles = set(
session.scalars(select(DagBundleModel.name).where(DagBundleModel.active.is_(False))).all()
)
query = select(
DagModel.dag_id,
DagModel.bundle_name,
DagModel.fileloc,
DagModel.last_parsed_time,
DagModel.relative_fileloc,
).where(~DagModel.is_stale, DagModel.bundle_name.in_(bundle_names))
).where(~DagModel.is_stale)
dags_parsed = session.execute(query)

for dag in dags_parsed:
# Dags whose bundle has been removed from config (bundle no longer active) are stale —
# the processor has stopped parsing their files, so the time-based check below would never fire.
if dag.bundle_name in inactive_bundles:
self.log.info(
"Deactivating Dag %s. Its bundle %s is no longer active.",
dag.dag_id,
dag.bundle_name,
)
to_deactivate.add(dag.dag_id)
continue
# When the Dag's last_parsed_time is more than the stale_dag_threshold older than the
# Dag file's last_finish_time, the Dag is considered stale as has apparently been removed from the file,
# This is especially relevant for Dag files that generate Dags in a dynamic manner.
Expand Down
40 changes: 40 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,46 @@ def test_scan_stale_dags(self, session):
# SerializedDagModel gives history about Dags
assert serialized_dag_count == 1

@pytest.mark.usefixtures("testing_dag_bundle")
def test_deactivate_stale_dags_marks_dags_in_inactive_bundles(self, session):
"""Dags whose bundle is no longer active should be marked stale even without a parse signal."""
session.add(DagBundleModel(name="gone-bundle"))
session.flush()
session.execute(
DagBundleModel.__table__.update().where(DagBundleModel.name == "gone-bundle").values(active=False)
)
session.add(
DagModel(
dag_id="dag_in_inactive_bundle",
bundle_name="gone-bundle",
relative_fileloc="some_file.py",
last_parsed_time=timezone.utcnow(),
is_stale=False,
)
)
session.add(
DagModel(
dag_id="dag_in_active_bundle",
bundle_name="testing",
relative_fileloc="other_file.py",
last_parsed_time=timezone.utcnow(),
is_stale=False,
)
)
session.flush()

manager = DagFileProcessorManager(max_runs=1, processor_timeout=10 * 60)
manager.deactivate_stale_dags(last_parsed={})

is_stale_by_dag = dict(
session.execute(
select(DagModel.dag_id, DagModel.is_stale).where(
DagModel.dag_id.in_(["dag_in_inactive_bundle", "dag_in_active_bundle"])
)
).all()
)
assert is_stale_by_dag == {"dag_in_inactive_bundle": True, "dag_in_active_bundle": False}

@mock.patch("airflow.dag_processing.manager.BundleUsageTrackingManager")
def test_cleanup_stale_bundle_versions_interval(self, mock_bundle_manager):
manager = DagFileProcessorManager(max_runs=1)
Expand Down
Loading