diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 4b852342b91df..676b6105aad0c 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -1054,14 +1054,20 @@ def prepare_file_path_queue(self): if is_mtime_mode: files_with_mtime[file_path] = os.path.getmtime(file_path) + file_modified_time = timezone.make_aware(datetime.fromtimestamp(files_with_mtime[file_path])) else: file_paths.append(file_path) + file_modified_time = None - # Find file paths that were recently processed + # Find file paths that were recently processed to exclude them + # from being added to file_path_queue + # unless they were modified recently and parsing mode is "modified_time" + # in which case we don't honor "self._file_process_interval" (min_file_process_interval) last_finish_time = self.get_last_finish_time(file_path) if ( last_finish_time is not None and (now - last_finish_time).total_seconds() < self._file_process_interval + and not (is_mtime_mode and file_modified_time and (file_modified_time > last_finish_time)) ): file_paths_recently_processed.append(file_path) diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py index 78cd9883234ce..3242cf3df12ed 100644 --- a/tests/utils/test_dag_processing.py +++ b/tests/utils/test_dag_processing.py @@ -31,6 +31,7 @@ from unittest.mock import MagicMock, PropertyMock import pytest +from freezegun import freeze_time from airflow.configuration import conf from airflow.jobs.local_task_job import LocalTaskJob as LJ @@ -324,6 +325,65 @@ def test_file_paths_in_queue_sorted_by_modified_time( manager.prepare_file_path_queue() assert manager._file_path_queue == ['file_4.py', 'file_1.py', 'file_3.py', 'file_2.py'] + @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"}) + @mock.patch("zipfile.is_zipfile", return_value=True) + @mock.patch("airflow.utils.file.might_contain_dag", return_value=True) + @mock.patch("airflow.utils.file.find_path_from_directory", return_value=True) + @mock.patch("airflow.utils.file.os.path.isfile", return_value=True) + @mock.patch("airflow.utils.file.os.path.getmtime") + def test_recently_modified_file_is_parsed_with_mtime_mode( + self, mock_getmtime, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile + ): + """ + Test recently updated files are processed even if min_file_process_interval is not reached + """ + freezed_base_time = timezone.datetime(2020, 1, 5, 0, 0, 0) + initial_file_1_mtime = (freezed_base_time - timedelta(minutes=5)).timestamp() + dag_files = ["file_1.py"] + mock_getmtime.side_effect = [initial_file_1_mtime] + mock_find_path.return_value = dag_files + + manager = DagFileProcessorManager( + dag_directory='directory', + max_runs=3, + processor_factory=MagicMock().return_value, + processor_timeout=timedelta.max, + signal_conn=MagicMock(), + dag_ids=[], + pickle_dags=False, + async_mode=True, + ) + + # let's say the DAG was just parsed 2 seconds before the Freezed time + last_finish_time = freezed_base_time - timedelta(seconds=10) + manager._file_stats = { + "file_1.py": DagFileStat(1, 0, last_finish_time, 1.0, 1), + } + with freeze_time(freezed_base_time): + manager.set_file_paths(dag_files) + assert manager._file_path_queue == [] + # File Path Queue will be empty as the "modified time" < "last finish time" + manager.prepare_file_path_queue() + assert manager._file_path_queue == [] + + # Simulate the DAG modification by using modified_time which is greater + # than the last_parse_time but still less than now - min_file_process_interval + file_1_new_mtime = freezed_base_time - timedelta(seconds=5) + file_1_new_mtime_ts = file_1_new_mtime.timestamp() + with freeze_time(freezed_base_time): + manager.set_file_paths(dag_files) + assert manager._file_path_queue == [] + # File Path Queue will be empty as the "modified time" < "last finish time" + mock_getmtime.side_effect = [file_1_new_mtime_ts] + manager.prepare_file_path_queue() + # Check that file is added to the queue even though file was just recently passed + assert manager._file_path_queue == ["file_1.py"] + assert last_finish_time < file_1_new_mtime + assert ( + manager._file_process_interval + > (freezed_base_time - manager.get_last_finish_time("file_1.py")).total_seconds() + ) + def test_find_zombies(self): manager = DagFileProcessorManager( dag_directory='directory',