Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce mechanism to support multiple executor configuration #37635

Merged
merged 8 commits into from
Mar 9, 2024
21 changes: 21 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,12 @@ def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
raise ValueError(f"The value {section}/{key} should be set!")
return value

def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]:
value = self.getlist(section, key, **kwargs)
if value is None:
raise ValueError(f"The value {section}/{key} should be set!")
return value

@overload # type: ignore[override]
def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str:
...
Expand Down Expand Up @@ -1176,6 +1182,21 @@ def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[o
f'Current value: "{val}".'
)

def getlist(self, section: str, key: str, delimiter=",", **kwargs):
val = self.get(section, key, **kwargs)
if val is None:
raise AirflowConfigException(
f"Failed to convert value None to list. "
f'Please check "{key}" key in "{section}" section is set.'
)
try:
return [item.strip() for item in val.split(delimiter)]
except Exception:
raise AirflowConfigException(
f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. '
f'Current value: "{val}".'
)

def getimport(self, section: str, key: str, **kwargs) -> Any:
"""
Read options, import the full qualified name, and return the object.
Expand Down
2 changes: 2 additions & 0 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from airflow.callbacks.base_callback_sink import BaseCallbackSink
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.cli.cli_config import GroupCommand
from airflow.executors.executor_utils import ExecutorName
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey

Expand Down Expand Up @@ -118,6 +119,7 @@ class BaseExecutor(LoggingMixin):
serve_logs: bool = False

job_id: None | int | str = None
name: None | ExecutorName = None
callback_sink: BaseCallbackSink | None = None

def __init__(self, parallelism: int = PARALLELISM):
Expand Down
22 changes: 22 additions & 0 deletions airflow/executors/executor_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@
# under the License.
from __future__ import annotations

from enum import Enum, unique


@unique
class ConnectorSource(Enum):
o-nikolas marked this conversation as resolved.
Show resolved Hide resolved
"""Enum of supported executor import sources."""

CORE = "core"
PLUGIN = "plugin"
CUSTOM_PATH = "custom path"


LOCAL_EXECUTOR = "LocalExecutor"
LOCAL_KUBERNETES_EXECUTOR = "LocalKubernetesExecutor"
SEQUENTIAL_EXECUTOR = "SequentialExecutor"
Expand All @@ -24,3 +36,13 @@
KUBERNETES_EXECUTOR = "KubernetesExecutor"
DEBUG_EXECUTOR = "DebugExecutor"
MOCK_EXECUTOR = "MockExecutor"
CORE_EXECUTOR_NAMES = {
LOCAL_EXECUTOR,
LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
KUBERNETES_EXECUTOR,
DEBUG_EXECUTOR,
MOCK_EXECUTOR,
}
199 changes: 161 additions & 38 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@
import logging
import os
from contextlib import suppress
from enum import Enum, unique
from typing import TYPE_CHECKING

from airflow.exceptions import AirflowConfigException
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
CORE_EXECUTOR_NAMES,
DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
ConnectorSource,
)
from airflow.executors.executor_utils import ExecutorName
from airflow.utils.module_loading import import_string

log = logging.getLogger(__name__)
Expand All @@ -42,19 +44,19 @@
from airflow.executors.base_executor import BaseExecutor


@unique
class ConnectorSource(Enum):
"""Enum of supported executor import sources."""

CORE = "core"
PLUGIN = "plugin"
CUSTOM_PATH = "custom path"
# Used to lookup an ExecutorName via a string alias or module path. An
# executor may have both so we need two lookup dicts.
_alias_to_executors: dict[str, ExecutorName] = {}
_module_to_executors: dict[str, ExecutorName] = {}
# Used to cache the computed ExecutorNames so that we don't need to read/parse config more than once
_executor_names: list[ExecutorName] = []
# Used to cache executors so that we don't construct executor objects unnecessarily
_loaded_executors: dict[ExecutorName, BaseExecutor] = {}


class ExecutorLoader:
"""Keeps constants for all the currently available executors."""

_default_executor: BaseExecutor | None = None
executors = {
LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor",
LOCAL_KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes."
Expand All @@ -69,56 +71,181 @@ class ExecutorLoader:
}

@classmethod
def get_default_executor_name(cls) -> str:
"""Return the default executor name from Airflow configuration.
def block_use_of_hybrid_exec(cls, executor_config: list):
"""Raise an exception if the user tries to use multiple executors before the feature is complete.

:return: executor name from Airflow configuration
This check is built into a method so that it can be easily mocked in unit tests.

:param executor_config: core.executor configuration value.
"""
if len(executor_config) > 1 or ":" in "".join(executor_config):
raise AirflowConfigException(
"Configuring multiple executors and executor aliases are not yet supported!: "
f"{executor_config}"
)

@classmethod
def _get_executor_names(cls) -> list[ExecutorName]:
"""Return the executor names from Airflow configuration.

:return: List of executor names from Airflow configuration
"""
from airflow.configuration import conf

return conf.get_mandatory_value("core", "EXECUTOR")
if _executor_names:
return _executor_names

executor_names_raw = conf.get_mandatory_list_value("core", "EXECUTOR")

# AIP-61 is WIP. Unblock configuring multiple executors when the feature is ready to launch
cls.block_use_of_hybrid_exec(executor_names_raw)

executor_names = []
for name in executor_names_raw:
if len(split_name := name.split(":")) == 1:
name = split_name[0]
# Check if this is an alias for a core airflow executor, module
# paths won't be provided by the user in that case.
if core_executor_module := cls.executors.get(name):
executor_names.append(ExecutorName(alias=name, module_path=core_executor_module))
# Only a module path or plugin name was provided
else:
executor_names.append(ExecutorName(alias=None, module_path=name))
# An alias was provided with the module path
elif len(split_name) == 2:
# Ensure the user is not trying to override the existing aliases of any of the core
# executors by providing an alias along with the existing core airflow executor alias
# (e.g. my_local_exec_alias:LocalExecutor). Allowing this makes things unnecessarily
# complicated. Multiple Executors of the same type will be supported by a future multitenancy
# AIP.
# The module component should always be a module or plugin path.
if not split_name[1] or split_name[1] in CORE_EXECUTOR_NAMES:
raise AirflowConfigException(
f"Incorrectly formatted executor configuration: {name}\n"
"second portion of an executor configuration must be a module path"
)
else:
executor_names.append(ExecutorName(alias=split_name[0], module_path=split_name[1]))
else:
raise AirflowConfigException(f"Incorrectly formatted executor configuration: {name}")

# As of now, we do not allow duplicate executors.
# Add all module paths/plugin names to a set, since the actual code is what is unique
unique_modules = set([exec_name.module_path for exec_name in executor_names])
if len(unique_modules) < len(executor_names):
msg = (
"At least one executor was configured twice. Duplicate executors are not yet supported. "
"Please check your configuration again to correct the issue."
)
raise AirflowConfigException(msg)

# Populate some mappings for fast future lookups
for executor_name in executor_names:
# Executors will not always have aliases
if executor_name.alias:
_alias_to_executors[executor_name.alias] = executor_name
# All executors will have a module path
_module_to_executors[executor_name.module_path] = executor_name
# Cache the executor names, so the logic of this method only runs once
_executor_names.append(executor_name)

return executor_names

@classmethod
def get_default_executor_name(cls) -> ExecutorName:
"""Return the default executor name from Airflow configuration.

:return: executor name from Airflow configuration
"""
# The default executor is the first configured executor in the list
return cls._get_executor_names()[0]

@classmethod
def get_default_executor(cls) -> BaseExecutor:
"""Create a new instance of the configured executor if none exists and returns it."""
if cls._default_executor is not None:
return cls._default_executor
default_executor = cls.load_executor(cls.get_default_executor_name())

return default_executor

@classmethod
def init_executors(cls) -> list[BaseExecutor]:
o-nikolas marked this conversation as resolved.
Show resolved Hide resolved
"""Create a new instance of all configured executors if not cached already."""
executor_names = cls._get_executor_names()
loaded_executors = []
for executor_name in executor_names:
loaded_executor = cls.load_executor(executor_name.module_path)
if executor_name.alias:
cls.executors[executor_name.alias] = executor_name.module_path
else:
cls.executors[loaded_executor.__class__.__name__] = executor_name.module_path

return cls.load_executor(cls.get_default_executor_name())
loaded_executors.append(loaded_executor)

return loaded_executors

@classmethod
def lookup_executor_name_by_str(cls, executor_name_str: str) -> ExecutorName:
# lookup the executor by alias first, if not check if we're given a module path
if executor_name := _alias_to_executors.get(executor_name_str):
return executor_name
elif executor_name := _module_to_executors.get(executor_name_str):
return executor_name
else:
raise AirflowException(f"Unknown executor being loaded: {executor_name}")

@classmethod
def load_executor(cls, executor_name: str) -> BaseExecutor:
def load_executor(cls, executor_name: ExecutorName | str) -> BaseExecutor:
"""
Load the executor.

This supports the following formats:
* by executor name for core executor
* by ``{plugin_name}.{class_name}`` for executor from plugins
* by import path.
* by ExecutorName object specification

:return: an instance of executor class via executor_name
"""
if executor_name == CELERY_KUBERNETES_EXECUTOR:
return cls.__load_celery_kubernetes_executor()
elif executor_name == LOCAL_KUBERNETES_EXECUTOR:
return cls.__load_local_kubernetes_executor()
if isinstance(executor_name, str):
_executor_name = cls.lookup_executor_name_by_str(executor_name)
else:
_executor_name = executor_name

# Check if the executor has been previously loaded. Avoid constructing a new object
if _executor_name in _loaded_executors:
return _loaded_executors[_executor_name]

try:
executor_cls, import_source = cls.import_executor_cls(executor_name)
log.debug("Loading executor %s from %s", executor_name, import_source.value)
if _executor_name.alias == CELERY_KUBERNETES_EXECUTOR:
o-nikolas marked this conversation as resolved.
Show resolved Hide resolved
executor = cls.__load_celery_kubernetes_executor()
elif _executor_name.alias == LOCAL_KUBERNETES_EXECUTOR:
executor = cls.__load_local_kubernetes_executor()
else:
executor_cls, import_source = cls.import_executor_cls(_executor_name)
log.debug("Loading executor %s from %s", _executor_name, import_source.value)
executor = executor_cls()

except ImportError as e:
log.error(e)
raise AirflowConfigException(
f'The module/attribute could not be loaded. Please check "executor" key in "core" section. '
f'Current value: "{executor_name}".'
f'Current value: "{_executor_name}".'
)
log.info("Loaded executor: %s", executor_name)
log.info("Loaded executor: %s", _executor_name)

# Store the executor name we've built for this executor in the
# instance. This makes it easier for the Scheduler, Backfill, etc to
# know how we refer to this executor.
executor.name = _executor_name
# Cache this executor by name here, so we can look it up later if it is
# requested again, and not have to construct a new object
_loaded_executors[_executor_name] = executor

return executor_cls()
return executor

@classmethod
def import_executor_cls(
cls, executor_name: str, validate: bool = True
cls, executor_name: ExecutorName, validate: bool = True
) -> tuple[type[BaseExecutor], ConnectorSource]:
"""
Import the executor class.
Expand All @@ -137,22 +264,18 @@ def _import_and_validate(path: str) -> type[BaseExecutor]:
cls.validate_database_executor_compatibility(executor)
return executor

if executor_name in cls.executors:
return _import_and_validate(cls.executors[executor_name]), ConnectorSource.CORE
if executor_name.count(".") == 1:
log.debug(
"The executor name looks like the plugin path (executor_name=%s). Trying to import a "
"executor from a plugin",
executor_name,
)
if executor_name.connector_source == ConnectorSource.PLUGIN:
with suppress(ImportError, AttributeError):
# Load plugins here for executors as at that time the plugins might not have been
# initialized yet
from airflow import plugins_manager

plugins_manager.integrate_executor_plugins()
return _import_and_validate(f"airflow.executors.{executor_name}"), ConnectorSource.PLUGIN
return _import_and_validate(executor_name), ConnectorSource.CUSTOM_PATH
return (
_import_and_validate(f"airflow.executors.{executor_name.module_path}"),
ConnectorSource.PLUGIN,
)
return _import_and_validate(executor_name.module_path), executor_name.connector_source

@classmethod
def import_default_executor_cls(cls, validate: bool = True) -> tuple[type[BaseExecutor], ConnectorSource]:
Expand Down