Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1946,6 +1946,21 @@
type: string
example: ~
default: "30"
- name: max_file_process_interval
description: |
Number of seconds since the previous parse time after which a DAG file will be parsed
again. This is only useful if your system contains a lot of dags / slas which update
exceedingly frequently, as this may prevent the system from scanning the file system
for changes to your dag files. This operates on a best efforts basis; if you have
many dags, and it takes 10 minutes to scan and parse them all, this will not make
it parse any more often.
Setting this to <= 0 disables the behaviour, in case it's important to you that those
frequently updating dags / slas always take priority at the cost of delaying updates
from disk
version_added: 2.4.0
type: integer
example: ~
default: "120"
- name: deactivate_stale_dags_interval
description: |
How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
Expand Down
11 changes: 11 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,17 @@ scheduler_idle_sleep_time = 1
# this interval. Keeping this number low will increase CPU usage.
min_file_process_interval = 30

# Number of seconds since the previous parse time after which a DAG file will be parsed
# again. This is only useful if your system contains a lot of dags / slas which update
# exceedingly frequently, as this may prevent the system from scanning the file system
# for changes to your dag files. This operates on a best efforts basis; if you have
# many dags, and it takes 10 minutes to scan and parse them all, this will not make
# it parse any more often.
# Setting this to <= 0 disables the behaviour, in case it's important to you that those
# frequently updating dags / slas always take priority at the cost of delaying updates
# from disk
max_file_process_interval = 120

# How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
# the expected files) which should be deactivated.
deactivate_stale_dags_interval = 60
Expand Down
167 changes: 132 additions & 35 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@
import sys
import time
import zipfile
from collections import defaultdict
from collections import defaultdict, deque
from datetime import datetime, timedelta
from importlib import import_module
from multiprocessing.connection import Connection as MultiprocessingConnection
from pathlib import Path
from typing import Any, NamedTuple, cast
from typing import TYPE_CHECKING, Any, Deque, Dict, List, NamedTuple, Optional, Set, Union, cast

from setproctitle import setproctitle
from sqlalchemy.orm import Session
Expand Down Expand Up @@ -380,8 +381,19 @@ def __init__(
async_mode: bool = True,
):
super().__init__()
self._file_paths: list[str] = []
self._file_path_queue: list[str] = []
self._log = logging.getLogger('airflow.processor_manager')

# known files; this will be updated every `dag_dir_list_interval` and stuff added/removed accordingly
self._file_paths: List[str] = []

# we maintain 2 queues: stuff requiring rapid response due to scheduler updates, and stuff that
# should be serviced once the priority stuff has all been worked through, e.g. periodic dir scans
# additionally there's a set to track which files on disk still haven't been refreshed yet
self._priority_file_path_queue: Deque[str] = deque()
self._std_file_path_queue: Deque[str] = deque()
self._outstanding_std_file_paths: Set[str] = set()
self._dag_directory = dag_directory

self._max_runs = max_runs
# signal_conn is None for dag_processor_standalone mode.
self._direct_scheduler_conn = signal_conn
Expand Down Expand Up @@ -414,7 +426,17 @@ def __init__(
self._parallelism = 1

# Parse and schedule each file no faster than this interval.
self._file_process_interval = conf.getint('scheduler', 'min_file_process_interval')
self._min_file_process_interval = conf.getint('scheduler', 'min_file_process_interval')
# Best efforts attempt to ensure all dags are processed at least this often. Inactive if <= 0
self._max_file_process_interval = conf.getint('scheduler', 'max_file_process_interval')
if 0 < self._max_file_process_interval < self._min_file_process_interval:
self.log.warning(
"Max file process interval cannot be less than min file process interval, "
"increasing max interval to %s",
self._min_file_process_interval,
)
self._max_file_process_interval = self._min_file_process_interval

# How often to print out DAG file processing stats to the log. Default to
# 30 seconds.
self.print_stats_interval = conf.getint('scheduler', 'print_stats_interval')
Expand Down Expand Up @@ -481,7 +503,10 @@ def start(self):
set_new_process_group()

self.log.info("Processing files using up to %s processes at a time ", self._parallelism)
self.log.info("Process each file at most once every %s seconds", self._file_process_interval)
self.log.info("Process each file at most once every %s seconds", self._min_file_process_interval)
self.log.info(
"Best efforts process each file at least once every %s seconds", self._max_file_process_interval
)
self.log.info(
"Checking for new files in %s every %s seconds", self._dag_directory, self.dag_dir_list_interval
)
Expand Down Expand Up @@ -544,7 +569,7 @@ def _run_parsing_loop(self):
poll_time = None

self._refresh_dag_dir()
self.prepare_file_path_queue()
self.populate_std_file_queue_from_dir()
max_callbacks_per_loop = conf.getint("scheduler", "max_callbacks_per_loop")

if self._async_mode:
Expand Down Expand Up @@ -605,11 +630,14 @@ def _run_parsing_loop(self):

self._kill_timed_out_processors()

# Generate more file paths to process if we processed all the files
# already.
if not self._file_path_queue:
# Generate more file paths to process if we processed all the files already. Note for this
# to clear down, we must have first either:
# 1. cleared the priority queue then cleared this set while draining the std queue, or
# 2. been unable to clear the priority queue, hit max_file_process_interval, and drained the set
# while clearing overdue files
if not self._outstanding_std_file_paths:
self.emit_metrics()
self.prepare_file_path_queue()
self.populate_std_file_queue_from_dir()

self.start_new_processes()

Expand Down Expand Up @@ -687,33 +715,32 @@ def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION):
def _add_callback_to_queue(self, request: CallbackRequest):

# requests are sent by dag processors. SLAs exist per-dag, but can be generated once per SLA-enabled
# task in the dag. If treated like other callbacks, SLAs can cause feedback where a SLA arrives,
# goes to the front of the queue, gets processed, triggers more SLAs from the same DAG, which go to
# the front of the queue, and we never get round to picking stuff off the back of the queue
# task in the dag. If treated like other callbacks, SLAs can easily cause feedback where a SLA
# arrives, enters the priority queue, gets processed, triggers more SLAs from the same DAG, which go
# to the priority queue, and we never get round to picking stuff off the std queue except via
# max_file_process_interval. Hence SLA callbacks share the std queue.
if isinstance(request, SlaCallbackRequest):
if request in self._callback_to_execute[request.full_filepath]:
self.log.debug("Skipping already queued SlaCallbackRequest")
return

# not already queued, queue the file _at the back_, and add the request to the file's callbacks
# note if we have lots of SLAs this will prevent the std queue from draining
# Note the callback_to_execute dict is shared by both queues, so a priority callback will pick up
# any SLA's at the same time
self.log.debug("Queuing SlaCallbackRequest for %s", request.dag_id)
self._callback_to_execute[request.full_filepath].append(request)
if request.full_filepath not in self._file_path_queue:
self._file_path_queue.append(request.full_filepath)
self._add_paths_to_queue([request.full_filepath], self._std_file_path_queue, "std")

# Other callbacks have a higher priority over DAG Run scheduling, so those callbacks gazump, even if
# already in the queue
# already in the std queue. Note that these higher priority callbacks go to the _back_ of the
# priority queue; this prevents a single busy DAG from stopping other priority callbacks being
# processed. Non-SLA callbacks are relatively rare, but if you do get a lot of them over and over
# it can however stop us from ever processing the std queue, hence max_file_process_interval
else:
self.log.debug("Queuing %s CallbackRequest: %s", type(request).__name__, request)
self._callback_to_execute[request.full_filepath].append(request)
if request.full_filepath in self._file_path_queue:
# Remove file paths matching request.full_filepath from self._file_path_queue
# Since we are already going to use that filepath to run callback,
# there is no need to have same file path again in the queue
self._file_path_queue = [
file_path for file_path in self._file_path_queue if file_path != request.full_filepath
]
self._file_path_queue.insert(0, request.full_filepath)
self._add_paths_to_queue([request.full_filepath], self._priority_file_path_queue, "priority")

def _refresh_dag_dir(self):
"""Refresh file paths from dag dir if we haven't done it for too long."""
Expand Down Expand Up @@ -759,6 +786,7 @@ def _refresh_dag_dir(self):
from airflow.models.dagcode import DagCode

DagCode.remove_deleted_code(dag_filelocs)
Stats.incr('dag_processing.refresh_dag_dir')

def _print_stat(self):
"""Occasionally print out stats about how fast the files are getting processed"""
Expand Down Expand Up @@ -931,8 +959,18 @@ def set_file_paths(self, new_file_paths):
:param new_file_paths: list of paths to DAG definition files
:return: None
"""
# store the new paths
self._file_paths = new_file_paths
self._file_path_queue = [x for x in self._file_path_queue if x in new_file_paths]

# clean up the queues; remove anything queued which no longer in the list, including callbacks
self._priority_file_path_queue = deque(
x for x in self._priority_file_path_queue if x in new_file_paths
)
self._std_file_path_queue = deque(x for x in self._std_file_path_queue if x in new_file_paths)
callback_paths_to_del = list(x for x in self._callback_to_execute.keys() if x not in new_file_paths)
for path_to_del in callback_paths_to_del:
del self._callback_to_execute[path_to_del]

# Stop processors that are working on deleted files
filtered_processors = {}
for file_path, processor in self._processors.items():
Expand Down Expand Up @@ -994,7 +1032,8 @@ def collect_results(self) -> None:

self.log.debug("%s/%s DAG parsing processes running", len(self._processors), self._parallelism)

self.log.debug("%s file paths queued for processing", len(self._file_path_queue))
self.log.debug("%s file paths queued for priority processing", len(self._priority_file_path_queue))
self.log.debug("%s file paths queued for std processing", len(self._std_file_path_queue))

@staticmethod
def _create_process(file_path, pickle_dags, dag_ids, dag_directory, callback_requests):
Expand All @@ -1007,15 +1046,49 @@ def _create_process(file_path, pickle_dags, dag_ids, dag_directory, callback_req
callback_requests=callback_requests,
)

def is_overdue(self, file_path: str, as_of: datetime) -> bool:
if self._max_file_process_interval <= 0:
return False
last_finish_time = self.get_last_finish_time(file_path)
return (
last_finish_time is not None
and (as_of - last_finish_time).total_seconds() > self._max_file_process_interval
)

def start_new_processes(self):
"""Start more processors if we have enough slots and files to process"""
while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
file_path = self._file_path_queue.pop(0)
# Stop creating duplicate processor i.e. processor with the same filepath
# check for files stuck on the std queue for too long; these need to be processed first, as they
# suggest that we found files on disk a long time ago and still haven't processed it
now = timezone.utcnow()
overdue_std_files_list = [p for p in self._std_file_path_queue if self.is_overdue(p, now)]
if overdue_std_files_list:
# sort, with earliest finish time (i.e. most overdue) first
overdue_std_files_list.sort(key=self.get_last_finish_time)
self.log.warning(
f"Have {len(overdue_std_files_list)} overdue dag files. Either max_file_process_interval "
"is not long enough to process all your dag files, or your system is unable to "
"keep up with the number of dag callbacks"
)
overdue_std_files = deque(overdue_std_files_list)

# there may be dupes across deques, we'll filter them out inside the while loop
def chained_deque() -> Deque[str]:
if overdue_std_files:
return overdue_std_files
elif self._priority_file_path_queue:
return self._priority_file_path_queue
else:
return self._std_file_path_queue

while self._parallelism - len(self._processors) > 0 and chained_deque():
file_path = chained_deque().popleft() # we append to the right side, this is FIFO

# Skip creating duplicate processor
if file_path in self._processors.keys():
continue

callback_to_execute_for_file = self._callback_to_execute[file_path]
# attempt to create a processor for this file
callbacks_to_execute_for_file = self._callback_to_execute[file_path]
processor = self._create_process(
file_path,
self._pickle_dags,
Expand All @@ -1024,6 +1097,9 @@ def start_new_processes(self):
callback_to_execute_for_file,
)

# processor created ok. Remove from the set of files-from-disk if it's in there. Thus even if
# SLA callbacks keep the std queue busy, we'll know when all files need re-reading from disk
self._outstanding_std_file_paths.discard(file_path)
del self._callback_to_execute[file_path]
Stats.incr('dag_processing.processes')

Expand All @@ -1032,8 +1108,16 @@ def start_new_processes(self):
self._processors[file_path] = processor
self.waitables[processor.waitable_handle] = processor

def prepare_file_path_queue(self):
"""Generate more file paths to process. Result are saved in _file_path_queue."""
Stats.gauge('dag_processing.std_path_queue_size', len(self._std_file_path_queue))
Stats.gauge('dag_processing.priority_path_queue_size', len(self._priority_file_path_queue))
Stats.gauge('dag_processing.overdue_path_queue_size', len(overdue_std_files))

def populate_std_file_queue_from_dir(self):
"""
Scan dir to generate more file paths to process. Results are saved in _std_file_path_queue.

Note this method is only called when the std file path queue is empty
"""
self._parsing_start_time = time.perf_counter()
# If the file path is already being processed, or if a file was
# processed recently, wait until the next batch
Expand Down Expand Up @@ -1068,7 +1152,7 @@ def prepare_file_path_queue(self):
last_finish_time = self.get_last_finish_time(file_path)
if (
last_finish_time is not None
and (now - last_finish_time).total_seconds() < self._file_process_interval
and (now - last_finish_time).total_seconds() < self._min_file_process_interval
and not (is_mtime_mode and file_modified_time and (file_modified_time > last_finish_time))
):
file_paths_recently_processed.append(file_path)
Expand All @@ -1087,8 +1171,9 @@ def prepare_file_path_queue(self):
file_path for file_path, stat in self._file_stats.items() if stat.run_count == self._max_runs
]

# do not include files already in the priority queue, there's no point
file_paths_to_exclude = set(file_paths_in_progress).union(
file_paths_recently_processed, files_paths_at_run_limit
file_paths_recently_processed, files_paths_at_run_limit, self._priority_file_path_queue
)

# Do not convert the following list to set as set does not preserve the order
Expand All @@ -1112,7 +1197,19 @@ def prepare_file_path_queue(self):
num_dags=0, import_errors=0, last_finish_time=None, last_duration=None, run_count=0
)

self._file_path_queue.extend(files_paths_to_queue)
Stats.incr('dag_processing.std_path_queue_update_count')
self._add_paths_to_queue(files_paths_to_queue, self._std_file_path_queue, "std")
self._outstanding_std_file_paths.update(files_paths_to_queue)

@staticmethod
def _add_paths_to_queue(
file_paths_to_enqueue: List[str],
queue_to_add_to: Deque[str],
queue_type: str,
):
"""Adds stuff to the _back_ of the given queue, unless it's already present"""
queue_to_add_to.extend(p for p in file_paths_to_enqueue if p not in queue_to_add_to)
Stats.gauge(f'dag_processing.{queue_type}_path_queue_size', len(queue_to_add_to))

def _kill_timed_out_processors(self):
"""Kill any file processors that timeout to defend against process hangs."""
Expand Down
Loading