From 5a7099c83f5b798134f46438cb8c1ddcd4b4d30e Mon Sep 17 00:00:00 2001 From: Andrew Gibbs Date: Tue, 2 Aug 2022 22:09:52 +0200 Subject: [PATCH] Dag processor manager queue overhaul - Existing single queue split into std and priority queues - Additional tracking of processing of files read from disk - Additional config property added - Updated config.yml template to match - Fixes SLA issues from #20683, prior to this SLA callbacks could stop the queue from ever emptying, and thus prevent the dags being refreshed from disk - Also prevents a similar, theoretically-possible-but-not-observed-by-me issue where lots of dag callbacks could have the same effect --- airflow/config_templates/config.yml | 15 ++ airflow/config_templates/default_airflow.cfg | 11 + airflow/dag_processing/manager.py | 167 +++++++++++--- tests/dag_processing/test_manager.py | 219 +++++++++++++++---- 4 files changed, 340 insertions(+), 72 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index eef9fa0381bdd..e21b37b869eab 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1946,6 +1946,21 @@ type: string example: ~ default: "30" + - name: max_file_process_interval + description: | + Number of seconds since the previous parse time after which a DAG file will be parsed + again. This is only useful if your system contains a lot of dags / slas which update + exceedingly frequently, as this may prevent the system from scanning the file system + for changes to your dag files. This operates on a best efforts basis; if you have + many dags, and it takes 10 minutes to scan and parse them all, this will not make + it parse any more often. + Setting this to <= 0 disables the behaviour, in case it's important to you that those + frequently updating dags / slas always take priority at the cost of delaying updates + from disk + version_added: 2.4.0 + type: integer + example: ~ + default: "120" - name: deactivate_stale_dags_interval description: | How often (in seconds) to check for stale DAGs (DAGs which are no longer present in diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index d1f7069cbbdbc..625b72bcdf881 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -984,6 +984,17 @@ scheduler_idle_sleep_time = 1 # this interval. Keeping this number low will increase CPU usage. min_file_process_interval = 30 +# Number of seconds since the previous parse time after which a DAG file will be parsed +# again. This is only useful if your system contains a lot of dags / slas which update +# exceedingly frequently, as this may prevent the system from scanning the file system +# for changes to your dag files. This operates on a best efforts basis; if you have +# many dags, and it takes 10 minutes to scan and parse them all, this will not make +# it parse any more often. +# Setting this to <= 0 disables the behaviour, in case it's important to you that those +# frequently updating dags / slas always take priority at the cost of delaying updates +# from disk +max_file_process_interval = 120 + # How often (in seconds) to check for stale DAGs (DAGs which are no longer present in # the expected files) which should be deactivated. deactivate_stale_dags_interval = 60 diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 87fd61b609fab..e35a4b4ecdd00 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -29,12 +29,13 @@ import sys import time import zipfile -from collections import defaultdict +from collections import defaultdict, deque from datetime import datetime, timedelta from importlib import import_module from multiprocessing.connection import Connection as MultiprocessingConnection from pathlib import Path from typing import Any, NamedTuple, cast +from typing import TYPE_CHECKING, Any, Deque, Dict, List, NamedTuple, Optional, Set, Union, cast from setproctitle import setproctitle from sqlalchemy.orm import Session @@ -380,8 +381,19 @@ def __init__( async_mode: bool = True, ): super().__init__() - self._file_paths: list[str] = [] - self._file_path_queue: list[str] = [] + self._log = logging.getLogger('airflow.processor_manager') + + # known files; this will be updated every `dag_dir_list_interval` and stuff added/removed accordingly + self._file_paths: List[str] = [] + + # we maintain 2 queues: stuff requiring rapid response due to scheduler updates, and stuff that + # should be serviced once the priority stuff has all been worked through, e.g. periodic dir scans + # additionally there's a set to track which files on disk still haven't been refreshed yet + self._priority_file_path_queue: Deque[str] = deque() + self._std_file_path_queue: Deque[str] = deque() + self._outstanding_std_file_paths: Set[str] = set() + self._dag_directory = dag_directory + self._max_runs = max_runs # signal_conn is None for dag_processor_standalone mode. self._direct_scheduler_conn = signal_conn @@ -414,7 +426,17 @@ def __init__( self._parallelism = 1 # Parse and schedule each file no faster than this interval. - self._file_process_interval = conf.getint('scheduler', 'min_file_process_interval') + self._min_file_process_interval = conf.getint('scheduler', 'min_file_process_interval') + # Best efforts attempt to ensure all dags are processed at least this often. Inactive if <= 0 + self._max_file_process_interval = conf.getint('scheduler', 'max_file_process_interval') + if 0 < self._max_file_process_interval < self._min_file_process_interval: + self.log.warning( + "Max file process interval cannot be less than min file process interval, " + "increasing max interval to %s", + self._min_file_process_interval, + ) + self._max_file_process_interval = self._min_file_process_interval + # How often to print out DAG file processing stats to the log. Default to # 30 seconds. self.print_stats_interval = conf.getint('scheduler', 'print_stats_interval') @@ -481,7 +503,10 @@ def start(self): set_new_process_group() self.log.info("Processing files using up to %s processes at a time ", self._parallelism) - self.log.info("Process each file at most once every %s seconds", self._file_process_interval) + self.log.info("Process each file at most once every %s seconds", self._min_file_process_interval) + self.log.info( + "Best efforts process each file at least once every %s seconds", self._max_file_process_interval + ) self.log.info( "Checking for new files in %s every %s seconds", self._dag_directory, self.dag_dir_list_interval ) @@ -544,7 +569,7 @@ def _run_parsing_loop(self): poll_time = None self._refresh_dag_dir() - self.prepare_file_path_queue() + self.populate_std_file_queue_from_dir() max_callbacks_per_loop = conf.getint("scheduler", "max_callbacks_per_loop") if self._async_mode: @@ -605,11 +630,14 @@ def _run_parsing_loop(self): self._kill_timed_out_processors() - # Generate more file paths to process if we processed all the files - # already. - if not self._file_path_queue: + # Generate more file paths to process if we processed all the files already. Note for this + # to clear down, we must have first either: + # 1. cleared the priority queue then cleared this set while draining the std queue, or + # 2. been unable to clear the priority queue, hit max_file_process_interval, and drained the set + # while clearing overdue files + if not self._outstanding_std_file_paths: self.emit_metrics() - self.prepare_file_path_queue() + self.populate_std_file_queue_from_dir() self.start_new_processes() @@ -687,33 +715,32 @@ def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION): def _add_callback_to_queue(self, request: CallbackRequest): # requests are sent by dag processors. SLAs exist per-dag, but can be generated once per SLA-enabled - # task in the dag. If treated like other callbacks, SLAs can cause feedback where a SLA arrives, - # goes to the front of the queue, gets processed, triggers more SLAs from the same DAG, which go to - # the front of the queue, and we never get round to picking stuff off the back of the queue + # task in the dag. If treated like other callbacks, SLAs can easily cause feedback where a SLA + # arrives, enters the priority queue, gets processed, triggers more SLAs from the same DAG, which go + # to the priority queue, and we never get round to picking stuff off the std queue except via + # max_file_process_interval. Hence SLA callbacks share the std queue. if isinstance(request, SlaCallbackRequest): if request in self._callback_to_execute[request.full_filepath]: self.log.debug("Skipping already queued SlaCallbackRequest") return # not already queued, queue the file _at the back_, and add the request to the file's callbacks + # note if we have lots of SLAs this will prevent the std queue from draining + # Note the callback_to_execute dict is shared by both queues, so a priority callback will pick up + # any SLA's at the same time self.log.debug("Queuing SlaCallbackRequest for %s", request.dag_id) self._callback_to_execute[request.full_filepath].append(request) - if request.full_filepath not in self._file_path_queue: - self._file_path_queue.append(request.full_filepath) + self._add_paths_to_queue([request.full_filepath], self._std_file_path_queue, "std") # Other callbacks have a higher priority over DAG Run scheduling, so those callbacks gazump, even if - # already in the queue + # already in the std queue. Note that these higher priority callbacks go to the _back_ of the + # priority queue; this prevents a single busy DAG from stopping other priority callbacks being + # processed. Non-SLA callbacks are relatively rare, but if you do get a lot of them over and over + # it can however stop us from ever processing the std queue, hence max_file_process_interval else: self.log.debug("Queuing %s CallbackRequest: %s", type(request).__name__, request) self._callback_to_execute[request.full_filepath].append(request) - if request.full_filepath in self._file_path_queue: - # Remove file paths matching request.full_filepath from self._file_path_queue - # Since we are already going to use that filepath to run callback, - # there is no need to have same file path again in the queue - self._file_path_queue = [ - file_path for file_path in self._file_path_queue if file_path != request.full_filepath - ] - self._file_path_queue.insert(0, request.full_filepath) + self._add_paths_to_queue([request.full_filepath], self._priority_file_path_queue, "priority") def _refresh_dag_dir(self): """Refresh file paths from dag dir if we haven't done it for too long.""" @@ -759,6 +786,7 @@ def _refresh_dag_dir(self): from airflow.models.dagcode import DagCode DagCode.remove_deleted_code(dag_filelocs) + Stats.incr('dag_processing.refresh_dag_dir') def _print_stat(self): """Occasionally print out stats about how fast the files are getting processed""" @@ -931,8 +959,18 @@ def set_file_paths(self, new_file_paths): :param new_file_paths: list of paths to DAG definition files :return: None """ + # store the new paths self._file_paths = new_file_paths - self._file_path_queue = [x for x in self._file_path_queue if x in new_file_paths] + + # clean up the queues; remove anything queued which no longer in the list, including callbacks + self._priority_file_path_queue = deque( + x for x in self._priority_file_path_queue if x in new_file_paths + ) + self._std_file_path_queue = deque(x for x in self._std_file_path_queue if x in new_file_paths) + callback_paths_to_del = list(x for x in self._callback_to_execute.keys() if x not in new_file_paths) + for path_to_del in callback_paths_to_del: + del self._callback_to_execute[path_to_del] + # Stop processors that are working on deleted files filtered_processors = {} for file_path, processor in self._processors.items(): @@ -994,7 +1032,8 @@ def collect_results(self) -> None: self.log.debug("%s/%s DAG parsing processes running", len(self._processors), self._parallelism) - self.log.debug("%s file paths queued for processing", len(self._file_path_queue)) + self.log.debug("%s file paths queued for priority processing", len(self._priority_file_path_queue)) + self.log.debug("%s file paths queued for std processing", len(self._std_file_path_queue)) @staticmethod def _create_process(file_path, pickle_dags, dag_ids, dag_directory, callback_requests): @@ -1007,15 +1046,49 @@ def _create_process(file_path, pickle_dags, dag_ids, dag_directory, callback_req callback_requests=callback_requests, ) + def is_overdue(self, file_path: str, as_of: datetime) -> bool: + if self._max_file_process_interval <= 0: + return False + last_finish_time = self.get_last_finish_time(file_path) + return ( + last_finish_time is not None + and (as_of - last_finish_time).total_seconds() > self._max_file_process_interval + ) + def start_new_processes(self): """Start more processors if we have enough slots and files to process""" - while self._parallelism - len(self._processors) > 0 and self._file_path_queue: - file_path = self._file_path_queue.pop(0) - # Stop creating duplicate processor i.e. processor with the same filepath + # check for files stuck on the std queue for too long; these need to be processed first, as they + # suggest that we found files on disk a long time ago and still haven't processed it + now = timezone.utcnow() + overdue_std_files_list = [p for p in self._std_file_path_queue if self.is_overdue(p, now)] + if overdue_std_files_list: + # sort, with earliest finish time (i.e. most overdue) first + overdue_std_files_list.sort(key=self.get_last_finish_time) + self.log.warning( + f"Have {len(overdue_std_files_list)} overdue dag files. Either max_file_process_interval " + "is not long enough to process all your dag files, or your system is unable to " + "keep up with the number of dag callbacks" + ) + overdue_std_files = deque(overdue_std_files_list) + + # there may be dupes across deques, we'll filter them out inside the while loop + def chained_deque() -> Deque[str]: + if overdue_std_files: + return overdue_std_files + elif self._priority_file_path_queue: + return self._priority_file_path_queue + else: + return self._std_file_path_queue + + while self._parallelism - len(self._processors) > 0 and chained_deque(): + file_path = chained_deque().popleft() # we append to the right side, this is FIFO + + # Skip creating duplicate processor if file_path in self._processors.keys(): continue - callback_to_execute_for_file = self._callback_to_execute[file_path] + # attempt to create a processor for this file + callbacks_to_execute_for_file = self._callback_to_execute[file_path] processor = self._create_process( file_path, self._pickle_dags, @@ -1024,6 +1097,9 @@ def start_new_processes(self): callback_to_execute_for_file, ) + # processor created ok. Remove from the set of files-from-disk if it's in there. Thus even if + # SLA callbacks keep the std queue busy, we'll know when all files need re-reading from disk + self._outstanding_std_file_paths.discard(file_path) del self._callback_to_execute[file_path] Stats.incr('dag_processing.processes') @@ -1032,8 +1108,16 @@ def start_new_processes(self): self._processors[file_path] = processor self.waitables[processor.waitable_handle] = processor - def prepare_file_path_queue(self): - """Generate more file paths to process. Result are saved in _file_path_queue.""" + Stats.gauge('dag_processing.std_path_queue_size', len(self._std_file_path_queue)) + Stats.gauge('dag_processing.priority_path_queue_size', len(self._priority_file_path_queue)) + Stats.gauge('dag_processing.overdue_path_queue_size', len(overdue_std_files)) + + def populate_std_file_queue_from_dir(self): + """ + Scan dir to generate more file paths to process. Results are saved in _std_file_path_queue. + + Note this method is only called when the std file path queue is empty + """ self._parsing_start_time = time.perf_counter() # If the file path is already being processed, or if a file was # processed recently, wait until the next batch @@ -1068,7 +1152,7 @@ def prepare_file_path_queue(self): last_finish_time = self.get_last_finish_time(file_path) if ( last_finish_time is not None - and (now - last_finish_time).total_seconds() < self._file_process_interval + and (now - last_finish_time).total_seconds() < self._min_file_process_interval and not (is_mtime_mode and file_modified_time and (file_modified_time > last_finish_time)) ): file_paths_recently_processed.append(file_path) @@ -1087,8 +1171,9 @@ def prepare_file_path_queue(self): file_path for file_path, stat in self._file_stats.items() if stat.run_count == self._max_runs ] + # do not include files already in the priority queue, there's no point file_paths_to_exclude = set(file_paths_in_progress).union( - file_paths_recently_processed, files_paths_at_run_limit + file_paths_recently_processed, files_paths_at_run_limit, self._priority_file_path_queue ) # Do not convert the following list to set as set does not preserve the order @@ -1112,7 +1197,19 @@ def prepare_file_path_queue(self): num_dags=0, import_errors=0, last_finish_time=None, last_duration=None, run_count=0 ) - self._file_path_queue.extend(files_paths_to_queue) + Stats.incr('dag_processing.std_path_queue_update_count') + self._add_paths_to_queue(files_paths_to_queue, self._std_file_path_queue, "std") + self._outstanding_std_file_paths.update(files_paths_to_queue) + + @staticmethod + def _add_paths_to_queue( + file_paths_to_enqueue: List[str], + queue_to_add_to: Deque[str], + queue_type: str, + ): + """Adds stuff to the _back_ of the given queue, unless it's already present""" + queue_to_add_to.extend(p for p in file_paths_to_enqueue if p not in queue_to_add_to) + Stats.gauge(f'dag_processing.{queue_type}_path_queue_size', len(queue_to_add_to)) def _kill_timed_out_processors(self): """Kill any file processors that timeout to defend against process hangs.""" diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py index 084056dad919d..1382d00982be3 100644 --- a/tests/dag_processing/test_manager.py +++ b/tests/dag_processing/test_manager.py @@ -219,7 +219,7 @@ def test_start_new_processes_with_same_filepath(self): file_1 = "file_1.py" file_2 = "file_2.py" file_3 = "file_3.py" - manager._file_path_queue = [file_1, file_2, file_3] + manager._std_file_path_queue.extend([file_1, file_2, file_3]) # Mock that only one processor exists. This processor runs with 'file_1' manager._processors[file_1] = MagicMock() @@ -234,7 +234,7 @@ def test_start_new_processes_with_same_filepath(self): assert file_1 in manager._processors.keys() assert file_2 in manager._processors.keys() - assert [file_3] == manager._file_path_queue + assert [file_3] == list(manager._std_file_path_queue) def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self): manager = DagFileProcessorManager( @@ -300,9 +300,10 @@ def test_file_paths_in_queue_sorted_alphabetically( ) manager.set_file_paths(dag_files) - assert manager._file_path_queue == [] - manager.prepare_file_path_queue() - assert manager._file_path_queue == ["file_1.py", "file_2.py", "file_3.py", "file_4.py"] + assert list(manager._std_file_path_queue) == [] + manager.populate_std_file_queue_from_dir() + assert list(manager._std_file_path_queue) == ["file_1.py", "file_2.py", "file_3.py", "file_4.py"] + assert manager._outstanding_std_file_paths == {"file_4.py", "file_1.py", "file_3.py", "file_2.py"} @conf_vars({("scheduler", "file_parsing_sort_mode"): "random_seeded_by_host"}) @mock.patch("zipfile.is_zipfile", return_value=True) @@ -327,17 +328,18 @@ def test_file_paths_in_queue_sorted_random_seeded_by_host( ) manager.set_file_paths(dag_files) - assert manager._file_path_queue == [] - manager.prepare_file_path_queue() + assert list(manager._std_file_path_queue) == [] + manager.populate_std_file_queue_from_dir() expected_order = dag_files random.Random(get_hostname()).shuffle(expected_order) - assert manager._file_path_queue == expected_order + assert list(manager._std_file_path_queue) == expected_order # Verify running it again produces same order manager._file_paths = [] - manager.prepare_file_path_queue() - assert manager._file_path_queue == expected_order + manager.populate_std_file_queue_from_dir() + assert list(manager._std_file_path_queue) == expected_order + assert manager._outstanding_std_file_paths == {'file_4.py', 'file_1.py', 'file_3.py', 'file_2.py'} @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"}) @mock.patch("zipfile.is_zipfile", return_value=True) @@ -365,9 +367,10 @@ def test_file_paths_in_queue_sorted_by_modified_time( ) manager.set_file_paths(dag_files) - assert manager._file_path_queue == [] - manager.prepare_file_path_queue() - assert manager._file_path_queue == ["file_4.py", "file_1.py", "file_3.py", "file_2.py"] + assert list(manager._std_file_path_queue) == [] + manager.populate_std_file_queue_from_dir() + assert list(manager._std_file_path_queue) == ["file_4.py", "file_1.py", "file_3.py", "file_2.py"] + assert manager._outstanding_std_file_paths == {"file_4.py", "file_1.py", "file_3.py", "file_2.py"} @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"}) @mock.patch("zipfile.is_zipfile", return_value=True) @@ -394,8 +397,9 @@ def test_file_paths_in_queue_excludes_missing_file( ) manager.set_file_paths(dag_files) - manager.prepare_file_path_queue() - assert manager._file_path_queue == ["file_2.py", "file_3.py"] + manager.populate_std_file_queue_from_dir() + assert list(manager._std_file_path_queue) == ["file_2.py", "file_3.py"] + assert manager._outstanding_std_file_paths == {"file_2.py", "file_3.py"} @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"}) @mock.patch("zipfile.is_zipfile", return_value=True) @@ -432,10 +436,10 @@ def test_recently_modified_file_is_parsed_with_mtime_mode( } with freeze_time(freezed_base_time): manager.set_file_paths(dag_files) - assert manager._file_path_queue == [] + assert list(manager._std_file_path_queue) == [] # File Path Queue will be empty as the "modified time" < "last finish time" - manager.prepare_file_path_queue() - assert manager._file_path_queue == [] + manager.populate_std_file_queue_from_dir() + assert list(manager._std_file_path_queue) == [] # Simulate the DAG modification by using modified_time which is greater # than the last_parse_time but still less than now - min_file_process_interval @@ -443,15 +447,15 @@ def test_recently_modified_file_is_parsed_with_mtime_mode( file_1_new_mtime_ts = file_1_new_mtime.timestamp() with freeze_time(freezed_base_time): manager.set_file_paths(dag_files) - assert manager._file_path_queue == [] + assert list(manager._std_file_path_queue) == [] # File Path Queue will be empty as the "modified time" < "last finish time" mock_getmtime.side_effect = [file_1_new_mtime_ts] - manager.prepare_file_path_queue() + manager.populate_std_file_queue_from_dir() # Check that file is added to the queue even though file was just recently passed - assert manager._file_path_queue == ["file_1.py"] + assert list(manager._std_file_path_queue) == ["file_1.py"] assert last_finish_time < file_1_new_mtime assert ( - manager._file_process_interval + manager._min_file_process_interval > (freezed_base_time - manager.get_last_finish_time("file_1.py")).total_seconds() ) @@ -995,15 +999,23 @@ def test_callback_queue(self, tmpdir): pickle_dags=False, async_mode=True, ) + file1_path = "/green_eggs/ham/file1.py" + file2_path = "/green_eggs/ham/file2.py" + file3_path = "/green_eggs/ham/file4.py" + file4_path = "/green_eggs/ham/file4.py" + file5_path = "/green_eggs/ham/file5.py" + manager._std_file_path_queue.extend([file4_path, file5_path]) + manager._priority_file_path_queue.extend([file3_path]) dag1_req1 = DagCallbackRequest( - full_filepath="/green_eggs/ham/file1.py", + full_filepath=file1_path, dag_id="dag1", run_id="run1", is_failure_callback=False, processor_subdir=tmpdir, msg=None, ) + dag1_req2 = DagCallbackRequest( full_filepath="/green_eggs/ham/file1.py", dag_id="dag1", @@ -1024,7 +1036,7 @@ def test_callback_queue(self, tmpdir): ) dag2_req1 = DagCallbackRequest( - full_filepath="/green_eggs/ham/file2.py", + full_filepath=file2_path, dag_id="dag2", run_id="run1", is_failure_callback=False, @@ -1034,28 +1046,161 @@ def test_callback_queue(self, tmpdir): # when manager._add_callback_to_queue(dag1_req1) - manager._add_callback_to_queue(dag1_sla1) manager._add_callback_to_queue(dag2_req1) - # then - requests should be in manager's queue, with dag2 ahead of dag1 (because it was added last) - assert manager._file_path_queue == [dag2_req1.full_filepath, dag1_req1.full_filepath] - assert set(manager._callback_to_execute.keys()) == {dag1_req1.full_filepath, dag2_req1.full_filepath} - assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1] - assert manager._callback_to_execute[dag2_req1.full_filepath] == [dag2_req1] + # then - requests should be in manager's prio queue, with dag2 behind dag1 (because it was added last) + assert list(manager._priority_file_path_queue) == [file3_path, file1_path, file2_path] + assert list(manager._std_file_path_queue) == [file4_path, file5_path] + assert set(manager._callback_to_execute.keys()) == {file1_path, file2_path} + assert manager._callback_to_execute[file1_path] == [dag1_req1] + assert manager._callback_to_execute[file2_path] == [dag2_req1] + + # when + manager._add_callback_to_queue(dag1_sla1) + + # then - sla requests should be in manager's std queue, at the back + assert list(manager._priority_file_path_queue) == [file3_path, file1_path, file2_path] + assert list(manager._std_file_path_queue) == [file4_path, file5_path, file1_path] + assert set(manager._callback_to_execute.keys()) == {file1_path, file2_path} + assert manager._callback_to_execute[file1_path] == [dag1_req1, dag1_sla1] + assert manager._callback_to_execute[file2_path] == [dag2_req1] # when manager._add_callback_to_queue(dag1_sla2) - # then - since sla2 == sla1, should not have brought dag1 to the fore - assert manager._file_path_queue == [dag2_req1.full_filepath, dag1_req1.full_filepath] - assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1] + # then - dupe sla shouldn't have been re-added + assert list(manager._std_file_path_queue) == [file4_path, file5_path, file1_path] + assert set(manager._callback_to_execute.keys()) == {file1_path, file2_path} + assert manager._callback_to_execute[file1_path] == [dag1_req1, dag1_sla1] + + @conf_vars({('scheduler', 'max_file_process_time'): '120'}) + def test_is_overdue(self): + # given + manager = DagFileProcessorManager( + dag_directory=TEST_DAG_FOLDER, + max_runs=1, + processor_timeout=timedelta(days=365), + signal_conn=MagicMock(), + dag_ids=[], + pickle_dags=False, + async_mode=True, + ) + + file1_path = "/green_eggs/ham/file1.py" + file2_path = "/green_eggs/ham/file2.py" + file3_path = "/green_eggs/ham/file3.py" + + freezed_base_time = timezone.datetime(2020, 1, 5, 0, 0, 0) + + last_finish_time1 = freezed_base_time - timedelta(seconds=119) + last_finish_time2 = freezed_base_time - timedelta(seconds=120) + last_finish_time3 = freezed_base_time - timedelta(seconds=121) + + manager._file_stats[file1_path] = DagFileStat(1, 0, last_finish_time1, timedelta(seconds=1.0), 1) + manager._file_stats[file2_path] = DagFileStat(1, 0, last_finish_time2, timedelta(seconds=1.0), 1) + manager._file_stats[file3_path] = DagFileStat(1, 0, last_finish_time3, timedelta(seconds=1.0), 1) + + # when + assert not manager.is_overdue(file1_path, freezed_base_time) + assert not manager.is_overdue(file2_path, freezed_base_time) + assert manager.is_overdue(file3_path, freezed_base_time) + + @conf_vars( + { + ('scheduler', 'max_file_process_time'): '120', + ('scheduler', 'parsing_processes'): '1', + ("scheduler", "file_parsing_sort_mode"): "modified_time", + } + ) + def test_pick_overdue_std_queue(self): + """If there is a stale std file check that it is picked over other priority and std queue entries""" + + # given + manager = DagFileProcessorManager( + dag_directory=TEST_DAG_FOLDER, + max_runs=1, + processor_timeout=timedelta(days=365), + signal_conn=MagicMock(), + dag_ids=[], + pickle_dags=False, + async_mode=True, + ) + + file1_path = "/green_eggs/ham/file1.py" + file2_path = "/green_eggs/ham/file2.py" + file3_path = "/green_eggs/ham/file3.py" + + freezed_base_time = timezone.datetime(2020, 1, 5, 0, 0, 0) + + last_finish_time1 = freezed_base_time - timedelta(seconds=121) + last_finish_time3 = freezed_base_time - timedelta(seconds=10) + + manager._file_stats[file1_path] = DagFileStat(1, 0, last_finish_time1, timedelta(seconds=1.0), 1) + manager._file_stats[file3_path] = DagFileStat(1, 0, last_finish_time3, timedelta(seconds=1.0), 1) + + manager._file_paths.extend([file1_path, file2_path, file3_path]) + manager._std_file_path_queue.extend([file3_path, file1_path]) + manager._outstanding_std_file_paths.update([file3_path, file1_path]) + manager._priority_file_path_queue.append(file2_path) + + # when + with freeze_time(freezed_base_time): + manager.start_new_processes() + + # then - even though file1 was at the back of the std queue, it was chosen due to its age + assert set(manager._processors.keys()) == {file1_path} + + @conf_vars( + { + ('scheduler', 'max_file_process_time'): '120', + ('scheduler', 'parsing_processes'): '1', + ("scheduler", "file_parsing_sort_mode"): "modified_time", + ('scheduler', 'dag_dir_list_interval'): '300', + } + ) + @mock.patch("airflow.utils.file.os.path.getmtime") + def test_populate_std_queue_even_if_not_empty(self, mock_getmtime, tmpdir): + """Test we update the std queue even if there's stuff in it, once the outstanding set is empty""" + # given + child_pipe, parent_pipe = multiprocessing.Pipe() + + manager = DagFileProcessorManager( + dag_directory=tmpdir, + max_runs=1, + processor_timeout=timedelta(days=365), + signal_conn=child_pipe, + dag_ids=[], + pickle_dags=False, + async_mode=False, + ) + + file1_path = "file1.py" + file2_path = "file2.py" + file3_path = "file3.py" + file4_path = "file4.py" + + freezed_base_time = timezone.datetime(2020, 1, 5, 0, 0, 0) + + paths_with_mtime = {file1_path: 5.0, file2_path: 4.0, file3_path: 3.0, file4_path: 2.0} + mock_getmtime.side_effect = list(paths_with_mtime.values()) + + manager.last_dag_dir_refresh_time = freezed_base_time - timedelta(seconds=1) + manager._file_paths.extend([file1_path, file2_path, file3_path, file4_path]) + manager._std_file_path_queue.append(file3_path) + manager._priority_file_path_queue.append(file2_path) # when - manager._add_callback_to_queue(dag1_req2) + parent_pipe.send(DagParsingSignal.AGENT_RUN_ONCE) + parent_pipe.send(DagParsingSignal.TERMINATE_MANAGER) + with freeze_time(freezed_base_time): + manager._run_parsing_loop() + + child_pipe.close() + parent_pipe.close() - # then - non-sla callback should have brought dag1 to the fore - assert manager._file_path_queue == [dag1_req1.full_filepath, dag2_req1.full_filepath] - assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1, dag1_req2] + # then - even though queue not empty, file1 and file4 appended + assert list(manager._std_file_path_queue) == [file3_path, file1_path, file4_path] + assert manager._outstanding_std_file_paths == {file1_path, file3_path, file4_path} class TestDagFileProcessorAgent: