Skip to content

Commit

Permalink
Parse recently modified files even if just parsed (apache#16075)
Browse files Browse the repository at this point in the history
This commit adds an optimization where the recently modified files
(detected by mtime) will be parsed even though it has not reached
`min_file_process_interval`.

This way you can increase `[scheduler] min_file_process_interval` to
a higher value like `600` or so when you have large number of files to
avoid unnecessary reparsing if files haven't changed, while still making
sure that modified files are taken care of.

(cherry picked from commit add7490)
(cherry picked from commit 19b3f1b)
(cherry picked from commit cb21b0a)
  • Loading branch information
kaxil committed Jun 2, 2021
1 parent a3e624c commit ebb9c30
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
8 changes: 7 additions & 1 deletion airflow/utils/dag_processing.py
Expand Up @@ -1051,14 +1051,20 @@ def prepare_file_path_queue(self):

if is_mtime_mode:
files_with_mtime[file_path] = os.path.getmtime(file_path)
file_modified_time = timezone.make_aware(datetime.fromtimestamp(files_with_mtime[file_path]))
else:
file_paths.append(file_path)
file_modified_time = None

# Find file paths that were recently processed
# Find file paths that were recently processed to exclude them
# from being added to file_path_queue
# unless they were modified recently and parsing mode is "modified_time"
# in which case we don't honor "self._file_process_interval" (min_file_process_interval)
last_finish_time = self.get_last_finish_time(file_path)
if (
last_finish_time is not None
and (now - last_finish_time).total_seconds() < self._file_process_interval
and not (is_mtime_mode and file_modified_time and (file_modified_time > last_finish_time))
):
file_paths_recently_processed.append(file_path)

Expand Down
60 changes: 60 additions & 0 deletions tests/utils/test_dag_processing.py
Expand Up @@ -30,6 +30,7 @@
from unittest.mock import MagicMock, PropertyMock

import pytest
from freezegun import freeze_time

from airflow.configuration import conf
from airflow.jobs.local_task_job import LocalTaskJob as LJ
Expand Down Expand Up @@ -324,6 +325,65 @@ def test_file_paths_in_queue_sorted_by_modified_time(
manager.prepare_file_path_queue()
assert manager._file_path_queue == ['file_4.py', 'file_1.py', 'file_3.py', 'file_2.py']

@conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("zipfile.is_zipfile", return_value=True)
@mock.patch("airflow.utils.file.might_contain_dag", return_value=True)
@mock.patch("airflow.utils.file.find_path_from_directory", return_value=True)
@mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
@mock.patch("airflow.utils.file.os.path.getmtime")
def test_recently_modified_file_is_parsed_with_mtime_mode(
self, mock_getmtime, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile
):
"""
Test recently updated files are processed even if min_file_process_interval is not reached
"""
freezed_base_time = timezone.datetime(2020, 1, 5, 0, 0, 0)
initial_file_1_mtime = (freezed_base_time - timedelta(minutes=5)).timestamp()
dag_files = ["file_1.py"]
mock_getmtime.side_effect = [initial_file_1_mtime]
mock_find_path.return_value = dag_files

manager = DagFileProcessorManager(
dag_directory='directory',
max_runs=3,
processor_factory=MagicMock().return_value,
processor_timeout=timedelta.max,
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
)

# let's say the DAG was just parsed 2 seconds before the Freezed time
last_finish_time = freezed_base_time - timedelta(seconds=10)
manager._file_stats = {
"file_1.py": DagFileStat(1, 0, last_finish_time, 1.0, 1),
}
with freeze_time(freezed_base_time):
manager.set_file_paths(dag_files)
assert manager._file_path_queue == []
# File Path Queue will be empty as the "modified time" < "last finish time"
manager.prepare_file_path_queue()
assert manager._file_path_queue == []

# Simulate the DAG modification by using modified_time which is greater
# than the last_parse_time but still less than now - min_file_process_interval
file_1_new_mtime = freezed_base_time - timedelta(seconds=5)
file_1_new_mtime_ts = file_1_new_mtime.timestamp()
with freeze_time(freezed_base_time):
manager.set_file_paths(dag_files)
assert manager._file_path_queue == []
# File Path Queue will be empty as the "modified time" < "last finish time"
mock_getmtime.side_effect = [file_1_new_mtime_ts]
manager.prepare_file_path_queue()
# Check that file is added to the queue even though file was just recently passed
assert manager._file_path_queue == ["file_1.py"]
assert last_finish_time < file_1_new_mtime
assert (
manager._file_process_interval
> (freezed_base_time - manager.get_last_finish_time("file_1.py")).total_seconds()
)

def test_find_zombies(self):
manager = DagFileProcessorManager(
dag_directory='directory',
Expand Down

0 comments on commit ebb9c30

Please sign in to comment.