diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index d77503a588519..1be727b14846a 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -1244,10 +1244,10 @@ def process_parse_results( run_count=run_count + 1, ) - # TODO: AIP-66 emit metrics - # file_name = Path(dag_file.path).stem - # Stats.timing(f"dag_processing.last_duration.{file_name}", stat.last_duration) - # Stats.timing("dag_processing.last_duration", stat.last_duration, tags={"file_name": file_name}) + if relative_fileloc is not None and stat.last_duration is not None: + file_name = Path(relative_fileloc).stem + Stats.timing("dag_processing.last_duration", stat.last_duration, tags={"file_name": file_name}) + Stats.timing(f"dag_processing.last_duration.{file_name}", stat.last_duration) if parsing_result is None: # No DAGs were parsed - this happens for callback-only processing diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 4eab5a5f455f8..aa64b796851b6 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -767,7 +767,6 @@ def test_dag_with_system_exit(self, configure_testing_dag_bundle): @conf_vars({("core", "load_examples"): "False"}) @mock.patch("airflow.dag_processing.manager.Stats.timing") - @pytest.mark.skip("AIP-66: stats are not implemented yet") def test_send_file_processing_statsd_timing( self, statsd_timing_mock, tmp_path, configure_testing_dag_bundle ): @@ -784,7 +783,14 @@ def test_send_file_processing_statsd_timing( manager = DagFileProcessorManager(max_runs=1) manager.run() - last_runtime = manager._file_stats[os.fspath(path_to_parse)].last_duration + dag_file_info = next( + key + for key in manager._file_stats.keys() + if isinstance(key, DagFileInfo) and key.rel_path == Path("temp_dag.py") + ) + + last_runtime = manager._file_stats[dag_file_info].last_duration + statsd_timing_mock.assert_has_calls( [ mock.call("dag_processing.last_duration.temp_dag", last_runtime),