Skip to content

Commit

Permalink
mAke standalone dag file processor works in DB isolation mode (#40916)
Browse files Browse the repository at this point in the history
There were a few missing DB operations in DAGFileProcessor that
prevented it to run in DB isolation mode. Those have been refactored
and exposed as internal API calls.

A bug was fixed in scheduler_job_runner that caused using of next_event
before it has been declared (which occured when standalone dag processor
is used and db isolation mode.

The DB retry will now correctly use logger when it is used as decorator
on class method.

The "main" code that removes DB connection from configuration (mostly in
case of Breeze) when untrusted components are used has been improved
to handle the case where DAGFile Processor forks parsing subprocesses.

Tmux configuration got improved so that both non-isolation and isolation
mode distribute panels better.

Simplified InternalApiConfig - "main" directly sets db/internal use
in db_isolation mode depending on the component.
  • Loading branch information
potiuk committed Jul 24, 2024
1 parent eca0555 commit 913395d
Show file tree
Hide file tree
Showing 24 changed files with 427 additions and 276 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/basic-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ on: # yamllint disable-line rule:truthy
description: "Whether to run only latest version checks (true/false)"
required: true
type: string
enable-aip-44:
description: "Whether to enable AIP-44 (true/false)"
required: true
type: string
env:
AIRFLOW_ENABLE_AIP_44: "${{ inputs.enable-aip-44 }}"
jobs:
run-breeze-tests:
timeout-minutes: 10
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ jobs:
skip-pre-commits: ${{needs.build-info.outputs.skip-pre-commits}}
canary-run: ${{needs.build-info.outputs.canary-run}}
latest-versions-only: ${{needs.build-info.outputs.latest-versions-only}}
enable-aip-44: "false"

build-ci-images:
name: >
Expand Down
28 changes: 20 additions & 8 deletions airflow/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from __future__ import annotations

import os
from argparse import Namespace

import argcomplete

Expand All @@ -35,7 +36,8 @@
# any possible import cycles with settings downstream.
from airflow import configuration
from airflow.cli import cli_parser
from airflow.configuration import write_webserver_configuration_if_needed
from airflow.configuration import AirflowConfigParser, write_webserver_configuration_if_needed
from airflow.exceptions import AirflowException


def main():
Expand All @@ -55,23 +57,33 @@ def main():
conf = write_default_airflow_configuration_if_needed()
if args.subcommand in ["webserver", "internal-api", "worker"]:
write_webserver_configuration_if_needed(conf)
configure_internal_api(args, conf)

args.func(args)


def configure_internal_api(args: Namespace, conf: AirflowConfigParser):
if conf.getboolean("core", "database_access_isolation", fallback=False):
if args.subcommand in ["worker", "dag-processor", "triggerer", "run"]:
# Untrusted components
if "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN" in os.environ:
# make sure that the DB is not available for the components that should not access it
del os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"]
os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = "none://"
conf.set("database", "sql_alchemy_conn", "none://")
from airflow.settings import force_traceback_session_for_untrusted_components
from airflow.api_internal.internal_api_call import InternalApiConfig

force_traceback_session_for_untrusted_components()
InternalApiConfig.set_use_internal_api(args.subcommand)
else:
# Trusted components
# Trusted components (this setting is mostly for Breeze where db_isolation and DB are both set
db_connection_url = conf.get("database", "sql_alchemy_conn")
if not db_connection_url or db_connection_url == "none://":
raise AirflowException(
f"Running trusted components {args.subcommand} in db isolation mode "
f"requires connection to be configured via database/sql_alchemy_conn."
)
from airflow.api_internal.internal_api_call import InternalApiConfig

InternalApiConfig.force_database_direct_access("Running " + args.subcommand + " command")

args.func(args)
InternalApiConfig.set_use_database_access(args.subcommand)


if __name__ == "__main__":
Expand Down
16 changes: 13 additions & 3 deletions airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from airflow.api_connexion.exceptions import PermissionDenied
from airflow.configuration import conf
from airflow.jobs.job import Job, most_recent_job
from airflow.models.dagcode import DagCode
from airflow.models.taskinstance import _record_task_map_for_downstreams
from airflow.models.xcom_arg import _get_task_map_length
from airflow.sensors.base import _orig_start_date
Expand Down Expand Up @@ -89,13 +90,21 @@ def initialize_method_map() -> dict[str, Callable]:
_add_log,
_xcom_pull,
_record_task_map_for_downstreams,
DagFileProcessor.update_import_errors,
DagFileProcessor.manage_slas,
DagFileProcessorManager.deactivate_stale_dags,
DagCode.remove_deleted_code,
DagModel.deactivate_deleted_dags,
DagModel.get_paused_dag_ids,
DagModel.get_current,
DagFileProcessor._execute_task_callbacks,
DagFileProcessor.execute_callbacks,
DagFileProcessor.execute_callbacks_without_dag,
DagFileProcessor.manage_slas,
DagFileProcessor.save_dag_to_db,
DagFileProcessor.update_import_errors,
DagFileProcessor._validate_task_pools_and_update_dag_warnings,
DagFileProcessorManager._fetch_callbacks,
DagFileProcessorManager._get_priority_filelocs,
DagFileProcessorManager.clear_nonexistent_import_errors,
DagFileProcessorManager.deactivate_stale_dags,
DagWarning.purge_inactive_dag_warnings,
DatasetManager.register_dataset_change,
FileTaskHandler._render_filename_db_access,
Expand Down Expand Up @@ -124,6 +133,7 @@ def initialize_method_map() -> dict[str, Callable]:
DagRun._get_log_template,
RenderedTaskInstanceFields._update_runtime_evaluated_template_fields,
SerializedDagModel.get_serialized_dag,
SerializedDagModel.remove_deleted_dags,
SkipMixin._skip,
SkipMixin._skip_all_except,
TaskInstance._check_and_change_state_before_execution,
Expand Down
63 changes: 24 additions & 39 deletions airflow/api_internal/internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.settings import _ENABLE_AIP_44
from airflow.settings import _ENABLE_AIP_44, force_traceback_session_for_untrusted_components
from airflow.typing_compat import ParamSpec
from airflow.utils.jwt_signer import JWTSigner

Expand All @@ -43,67 +43,52 @@
class InternalApiConfig:
"""Stores and caches configuration for Internal API."""

_initialized = False
_use_internal_api = False
_internal_api_endpoint = ""

@staticmethod
def force_database_direct_access(message: str):
def set_use_database_access(component: str):
"""
Block current component from using Internal API.
All methods decorated with internal_api_call will always be executed locally.`
This mode is needed for "trusted" components like Scheduler, Webserver, Internal Api server
"""
InternalApiConfig._initialized = True
InternalApiConfig._use_internal_api = False
if _ENABLE_AIP_44:
logger.info("Forcing database direct access. %s", message)
if not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it. ")
logger.info(
"DB isolation mode. But this is a trusted component and DB connection is set. "
"Using database direct access when running %s.",
component,
)

@staticmethod
def force_api_access(api_endpoint: str):
"""
Force using Internal API with provided endpoint.
All methods decorated with internal_api_call will always be executed remote/via API.
This mode is needed for remote setups/remote executor.
"""
InternalApiConfig._initialized = True
def set_use_internal_api(component: str):
if not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it. ")
internal_api_url = conf.get("core", "internal_api_url")
url_conf = urlparse(internal_api_url)
api_path = url_conf.path
if api_path in ["", "/"]:
# Add the default path if not given in the configuration
api_path = "/internal_api/v1/rpcapi"
if url_conf.scheme not in ["http", "https"]:
raise AirflowConfigException("[core]internal_api_url must start with http:// or https://")
internal_api_endpoint = f"{url_conf.scheme}://{url_conf.netloc}{api_path}"
InternalApiConfig._use_internal_api = True
InternalApiConfig._internal_api_endpoint = api_endpoint
InternalApiConfig._internal_api_endpoint = internal_api_endpoint
logger.info("DB isolation mode. Using internal_api when running %s.", component)
force_traceback_session_for_untrusted_components()

@staticmethod
def get_use_internal_api():
if not InternalApiConfig._initialized:
InternalApiConfig._init_values()
return InternalApiConfig._use_internal_api

@staticmethod
def get_internal_api_endpoint():
if not InternalApiConfig._initialized:
InternalApiConfig._init_values()
return InternalApiConfig._internal_api_endpoint

@staticmethod
def _init_values():
use_internal_api = conf.getboolean("core", "database_access_isolation", fallback=False)
if use_internal_api and not _ENABLE_AIP_44:
raise RuntimeError("The AIP_44 is not enabled so you cannot use it.")
internal_api_endpoint = ""
if use_internal_api:
url_conf = urlparse(conf.get("core", "internal_api_url"))
api_path = url_conf.path
if api_path in ["", "/"]:
# Add the default path if not given in the configuration
api_path = "/internal_api/v1/rpcapi"
if url_conf.scheme not in ["http", "https"]:
raise AirflowConfigException("[core]internal_api_url must start with http:// or https://")
internal_api_endpoint = f"{url_conf.scheme}://{url_conf.netloc}{api_path}"

InternalApiConfig._initialized = True
InternalApiConfig._use_internal_api = use_internal_api
InternalApiConfig._internal_api_endpoint = internal_api_endpoint


def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]:
"""
Expand Down
4 changes: 4 additions & 0 deletions airflow/cli/commands/dag_processor_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from datetime import timedelta
from typing import Any

from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.configuration import conf
from airflow.dag_processing.manager import DagFileProcessorManager, reload_configuration_for_dag_processing
Expand All @@ -37,6 +38,9 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
"""Create DagFileProcessorProcess instance."""
processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout")
processor_timeout = timedelta(seconds=processor_timeout_seconds)
if InternalApiConfig.get_use_internal_api():
from airflow.models.renderedtifields import RenderedTaskInstanceFields # noqa: F401
from airflow.models.trigger import Trigger # noqa: F401
return DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
Expand Down
7 changes: 6 additions & 1 deletion airflow/cli/commands/internal_api_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,12 @@ def create_app(config=None, testing=False):
if "SQLALCHEMY_ENGINE_OPTIONS" not in flask_app.config:
flask_app.config["SQLALCHEMY_ENGINE_OPTIONS"] = settings.prepare_engine_args()

InternalApiConfig.force_database_direct_access("Gunicorn worker initialization")
if conf.getboolean("core", "database_access_isolation", fallback=False):
InternalApiConfig.set_use_database_access("Gunicorn worker initialization")
else:
raise AirflowConfigException(
"The internal-api component should only be run when database_access_isolation is enabled."
)

csrf = CSRFProtect()
csrf.init_app(flask_app)
Expand Down
68 changes: 47 additions & 21 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,10 @@ def _run_parsing_loop(self):
self._processors.pop(processor.file_path)

if self.standalone_dag_processor:
self._fetch_callbacks(max_callbacks_per_loop)
for callback in DagFileProcessorManager._fetch_callbacks(
max_callbacks_per_loop, self.standalone_dag_processor, self.get_dag_directory()
):
self._add_callback_to_queue(callback)
self._scan_stale_dags()
DagWarning.purge_inactive_dag_warnings()
refreshed_dag_dir = self._refresh_dag_dir()
Expand Down Expand Up @@ -707,30 +710,46 @@ def _run_parsing_loop(self):
else:
poll_time = 0.0

@classmethod
@internal_api_call
@provide_session
def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION):
self._fetch_callbacks_with_retries(max_callbacks, session)
def _fetch_callbacks(
cls,
max_callbacks: int,
standalone_dag_processor: bool,
dag_directory: str,
session: Session = NEW_SESSION,
) -> list[CallbackRequest]:
return cls._fetch_callbacks_with_retries(
max_callbacks, standalone_dag_processor, dag_directory, session
)

@classmethod
@retry_db_transaction
def _fetch_callbacks_with_retries(self, max_callbacks: int, session: Session):
def _fetch_callbacks_with_retries(
cls, max_callbacks: int, standalone_dag_processor: bool, dag_directory: str, session: Session
) -> list[CallbackRequest]:
"""Fetch callbacks from database and add them to the internal queue for execution."""
self.log.debug("Fetching callbacks from the database.")
cls.logger().debug("Fetching callbacks from the database.")

callback_queue: list[CallbackRequest] = []
with prohibit_commit(session) as guard:
query = select(DbCallbackRequest)
if self.standalone_dag_processor:
if standalone_dag_processor:
query = query.where(
DbCallbackRequest.processor_subdir == self.get_dag_directory(),
DbCallbackRequest.processor_subdir == dag_directory,
)
query = query.order_by(DbCallbackRequest.priority_weight.asc()).limit(max_callbacks)
query = with_row_locks(query, of=DbCallbackRequest, session=session, skip_locked=True)
callbacks = session.scalars(query)
for callback in callbacks:
try:
self._add_callback_to_queue(callback.get_callback_request())
callback_queue.append(callback.get_callback_request())
session.delete(callback)
except Exception as e:
self.log.warning("Error adding callback for execution: %s, %s", callback, e)
cls.logger().warning("Error adding callback for execution: %s, %s", callback, e)
guard.commit()
return callback_queue

def _add_callback_to_queue(self, request: CallbackRequest):
# requests are sent by dag processors. SLAs exist per-dag, but can be generated once per SLA-enabled
Expand Down Expand Up @@ -768,23 +787,30 @@ def _add_callback_to_queue(self, request: CallbackRequest):
self._add_paths_to_queue([request.full_filepath], True)
Stats.incr("dag_processing.other_callback_count")

@provide_session
def _refresh_requested_filelocs(self, session=NEW_SESSION) -> None:
def _refresh_requested_filelocs(self) -> None:
"""Refresh filepaths from dag dir as requested by users via APIs."""
# Get values from DB table
filelocs = DagFileProcessorManager._get_priority_filelocs()
for fileloc in filelocs:
# Try removing the fileloc if already present
try:
self._file_path_queue.remove(fileloc)
except ValueError:
pass
# enqueue fileloc to the start of the queue.
self._file_path_queue.appendleft(fileloc)

@classmethod
@internal_api_call
@provide_session
def _get_priority_filelocs(cls, session: Session = NEW_SESSION):
"""Get filelocs from DB table."""
filelocs: list[str] = []
requests = session.scalars(select(DagPriorityParsingRequest))
for request in requests:
# Check if fileloc is in valid file paths. Parsing any
# filepaths can be a security issue.
if request.fileloc in self._file_paths:
# Try removing the fileloc if already present
try:
self._file_path_queue.remove(request.fileloc)
except ValueError:
pass
# enqueue fileloc to the start of the queue.
self._file_path_queue.appendleft(request.fileloc)
filelocs.append(request.fileloc)
session.delete(request)
return filelocs

def _refresh_dag_dir(self) -> bool:
"""Refresh file paths from dag dir if we haven't done it for too long."""
Expand Down
Loading

0 comments on commit 913395d

Please sign in to comment.