diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 57ebd56c0e9eb..d9baaf3270d2f 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -372,17 +372,29 @@ def deactivate_stale_dags( ): """Detect and deactivate DAGs which are no longer present in files.""" to_deactivate = set() - bundle_names = {b.name for b in self._dag_bundles} + inactive_bundles = set( + session.scalars(select(DagBundleModel.name).where(DagBundleModel.active.is_(False))).all() + ) query = select( DagModel.dag_id, DagModel.bundle_name, DagModel.fileloc, DagModel.last_parsed_time, DagModel.relative_fileloc, - ).where(~DagModel.is_stale, DagModel.bundle_name.in_(bundle_names)) + ).where(~DagModel.is_stale) dags_parsed = session.execute(query) for dag in dags_parsed: + # Dags whose bundle has been removed from config (bundle no longer active) are stale — + # the processor has stopped parsing their files, so the time-based check below would never fire. + if dag.bundle_name in inactive_bundles: + self.log.info( + "Deactivating Dag %s. Its bundle %s is no longer active.", + dag.dag_id, + dag.bundle_name, + ) + to_deactivate.add(dag.dag_id) + continue # When the Dag's last_parsed_time is more than the stale_dag_threshold older than the # Dag file's last_finish_time, the Dag is considered stale as has apparently been removed from the file, # This is especially relevant for Dag files that generate Dags in a dynamic manner. diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 15899a3c7ad1e..06bdc26cb456b 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -778,6 +778,46 @@ def test_scan_stale_dags(self, session): # SerializedDagModel gives history about Dags assert serialized_dag_count == 1 + @pytest.mark.usefixtures("testing_dag_bundle") + def test_deactivate_stale_dags_marks_dags_in_inactive_bundles(self, session): + """Dags whose bundle is no longer active should be marked stale even without a parse signal.""" + session.add(DagBundleModel(name="gone-bundle")) + session.flush() + session.execute( + DagBundleModel.__table__.update().where(DagBundleModel.name == "gone-bundle").values(active=False) + ) + session.add( + DagModel( + dag_id="dag_in_inactive_bundle", + bundle_name="gone-bundle", + relative_fileloc="some_file.py", + last_parsed_time=timezone.utcnow(), + is_stale=False, + ) + ) + session.add( + DagModel( + dag_id="dag_in_active_bundle", + bundle_name="testing", + relative_fileloc="other_file.py", + last_parsed_time=timezone.utcnow(), + is_stale=False, + ) + ) + session.flush() + + manager = DagFileProcessorManager(max_runs=1, processor_timeout=10 * 60) + manager.deactivate_stale_dags(last_parsed={}) + + is_stale_by_dag = dict( + session.execute( + select(DagModel.dag_id, DagModel.is_stale).where( + DagModel.dag_id.in_(["dag_in_inactive_bundle", "dag_in_active_bundle"]) + ) + ).all() + ) + assert is_stale_by_dag == {"dag_in_inactive_bundle": True, "dag_in_active_bundle": False} + @mock.patch("airflow.dag_processing.manager.BundleUsageTrackingManager") def test_cleanup_stale_bundle_versions_interval(self, mock_bundle_manager): manager = DagFileProcessorManager(max_runs=1)