From 19b3f1bd83e3eaf6fdcdbd428a2e121d07023f80 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Wed, 26 May 2021 11:29:35 +0100 Subject: [PATCH] Parse recently modified files even if just parsed (#16075) This commit adds an optimization where the recently modified files (detected by mtime) will be parsed even though it has not reached `min_file_process_interval`. This way you can increase `[scheduler] min_file_process_interval` to a higher value like `600` or so when you have large number of files to avoid unnecessary reparsing if files haven't changed, while still making sure that modified files are taken care of. (cherry picked from commit add7490145fabd097d605d85a662dccd02b600de) --- airflow/utils/dag_processing.py | 8 +++- tests/utils/test_dag_processing.py | 60 ++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 4b852342b91df..676b6105aad0c 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -1054,14 +1054,20 @@ def prepare_file_path_queue(self): if is_mtime_mode: files_with_mtime[file_path] = os.path.getmtime(file_path) + file_modified_time = timezone.make_aware(datetime.fromtimestamp(files_with_mtime[file_path])) else: file_paths.append(file_path) + file_modified_time = None - # Find file paths that were recently processed + # Find file paths that were recently processed to exclude them + # from being added to file_path_queue + # unless they were modified recently and parsing mode is "modified_time" + # in which case we don't honor "self._file_process_interval" (min_file_process_interval) last_finish_time = self.get_last_finish_time(file_path) if ( last_finish_time is not None and (now - last_finish_time).total_seconds() < self._file_process_interval + and not (is_mtime_mode and file_modified_time and (file_modified_time > last_finish_time)) ): file_paths_recently_processed.append(file_path) diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py index 78cd9883234ce..3242cf3df12ed 100644 --- a/tests/utils/test_dag_processing.py +++ b/tests/utils/test_dag_processing.py @@ -31,6 +31,7 @@ from unittest.mock import MagicMock, PropertyMock import pytest +from freezegun import freeze_time from airflow.configuration import conf from airflow.jobs.local_task_job import LocalTaskJob as LJ @@ -324,6 +325,65 @@ def test_file_paths_in_queue_sorted_by_modified_time( manager.prepare_file_path_queue() assert manager._file_path_queue == ['file_4.py', 'file_1.py', 'file_3.py', 'file_2.py'] + @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"}) + @mock.patch("zipfile.is_zipfile", return_value=True) + @mock.patch("airflow.utils.file.might_contain_dag", return_value=True) + @mock.patch("airflow.utils.file.find_path_from_directory", return_value=True) + @mock.patch("airflow.utils.file.os.path.isfile", return_value=True) + @mock.patch("airflow.utils.file.os.path.getmtime") + def test_recently_modified_file_is_parsed_with_mtime_mode( + self, mock_getmtime, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile + ): + """ + Test recently updated files are processed even if min_file_process_interval is not reached + """ + freezed_base_time = timezone.datetime(2020, 1, 5, 0, 0, 0) + initial_file_1_mtime = (freezed_base_time - timedelta(minutes=5)).timestamp() + dag_files = ["file_1.py"] + mock_getmtime.side_effect = [initial_file_1_mtime] + mock_find_path.return_value = dag_files + + manager = DagFileProcessorManager( + dag_directory='directory', + max_runs=3, + processor_factory=MagicMock().return_value, + processor_timeout=timedelta.max, + signal_conn=MagicMock(), + dag_ids=[], + pickle_dags=False, + async_mode=True, + ) + + # let's say the DAG was just parsed 2 seconds before the Freezed time + last_finish_time = freezed_base_time - timedelta(seconds=10) + manager._file_stats = { + "file_1.py": DagFileStat(1, 0, last_finish_time, 1.0, 1), + } + with freeze_time(freezed_base_time): + manager.set_file_paths(dag_files) + assert manager._file_path_queue == [] + # File Path Queue will be empty as the "modified time" < "last finish time" + manager.prepare_file_path_queue() + assert manager._file_path_queue == [] + + # Simulate the DAG modification by using modified_time which is greater + # than the last_parse_time but still less than now - min_file_process_interval + file_1_new_mtime = freezed_base_time - timedelta(seconds=5) + file_1_new_mtime_ts = file_1_new_mtime.timestamp() + with freeze_time(freezed_base_time): + manager.set_file_paths(dag_files) + assert manager._file_path_queue == [] + # File Path Queue will be empty as the "modified time" < "last finish time" + mock_getmtime.side_effect = [file_1_new_mtime_ts] + manager.prepare_file_path_queue() + # Check that file is added to the queue even though file was just recently passed + assert manager._file_path_queue == ["file_1.py"] + assert last_finish_time < file_1_new_mtime + assert ( + manager._file_process_interval + > (freezed_base_time - manager.get_last_finish_time("file_1.py")).total_seconds() + ) + def test_find_zombies(self): manager = DagFileProcessorManager( dag_directory='directory',