Skip to content

Commit

Permalink
Introduce mechanism to support multiple executor configuration
Browse files Browse the repository at this point in the history
This commit delivers an isolated component of AIP-61:
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-61+Hybrid+Execution

It updates the executor configuration logic to allow configuring multiple
executors (and aliases per executor) in a list form (comma delimited), as
well as associated methods for core Airflow code to load the default executor
(first executor in the list) or any other configured executor.

Testing is included with the changes but not documentation, because the
feature is not yet complete and is currently disabled. User facing
documentation will be delivered when the entire AIP-61 feature is
released.
  • Loading branch information
o-nikolas committed Feb 22, 2024
1 parent 3c1d051 commit 43b6b7a
Show file tree
Hide file tree
Showing 8 changed files with 523 additions and 57 deletions.
21 changes: 21 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,12 @@ def get_mandatory_value(self, section: str, key: str, **kwargs) -> str:
raise ValueError(f"The value {section}/{key} should be set!")
return value

def get_mandatory_list_value(self, section: str, key: str, **kwargs) -> list[str]:
value = self.getlist(section, key, **kwargs)
if value is None:
raise ValueError(f"The value {section}/{key} should be set!")
return value

@overload # type: ignore[override]
def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str:
...
Expand Down Expand Up @@ -1176,6 +1182,21 @@ def getfloat(self, section: str, key: str, **kwargs) -> float: # type: ignore[o
f'Current value: "{val}".'
)

def getlist(self, section: str, key: str, delimiter=",", **kwargs):
val = self.get(section, key, **kwargs)
if val is None:
raise AirflowConfigException(
f"Failed to convert value None to list. "
f'Please check "{key}" key in "{section}" section is set.'
)
try:
return [item.strip() for item in val.split(delimiter)]
except Exception:
raise AirflowConfigException(
f'Failed to parse value to a list. Please check "{key}" key in "{section}" section. '
f'Current value: "{val}".'
)

def getimport(self, section: str, key: str, **kwargs) -> Any:
"""
Read options, import the full qualified name, and return the object.
Expand Down
2 changes: 2 additions & 0 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from airflow.callbacks.base_callback_sink import BaseCallbackSink
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.cli.cli_config import GroupCommand
from airflow.executors.executor_utils import ExecutorName
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey

Expand Down Expand Up @@ -118,6 +119,7 @@ class BaseExecutor(LoggingMixin):
serve_logs: bool = False

job_id: None | int | str = None
name: None | ExecutorName = None
callback_sink: BaseCallbackSink | None = None

def __init__(self, parallelism: int = PARALLELISM):
Expand Down
22 changes: 22 additions & 0 deletions airflow/executors/executor_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@
# under the License.
from __future__ import annotations

from enum import Enum, unique


@unique
class ConnectorSource(Enum):
"""Enum of supported executor import sources."""

CORE = "core"
PLUGIN = "plugin"
CUSTOM_PATH = "custom path"


LOCAL_EXECUTOR = "LocalExecutor"
LOCAL_KUBERNETES_EXECUTOR = "LocalKubernetesExecutor"
SEQUENTIAL_EXECUTOR = "SequentialExecutor"
Expand All @@ -24,3 +36,13 @@
KUBERNETES_EXECUTOR = "KubernetesExecutor"
DEBUG_EXECUTOR = "DebugExecutor"
MOCK_EXECUTOR = "MockExecutor"
CORE_EXECUTOR_NAMES = {
LOCAL_EXECUTOR,
LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
KUBERNETES_EXECUTOR,
DEBUG_EXECUTOR,
MOCK_EXECUTOR,
}
203 changes: 163 additions & 40 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@
import logging
import os
from contextlib import suppress
from enum import Enum, unique
from typing import TYPE_CHECKING

from airflow.exceptions import AirflowConfigException
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
CORE_EXECUTOR_NAMES,
DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
ConnectorSource,
)
from airflow.executors.executor_utils import ExecutorName
from airflow.utils.module_loading import import_string

log = logging.getLogger(__name__)
Expand All @@ -42,19 +44,18 @@
from airflow.executors.base_executor import BaseExecutor


@unique
class ConnectorSource(Enum):
"""Enum of supported executor import sources."""

CORE = "core"
PLUGIN = "plugin"
CUSTOM_PATH = "custom path"


class ExecutorLoader:
"""Keeps constants for all the currently available executors."""

_default_executor: BaseExecutor | None = None
# Used to lookup an ExecutorName via a string alias or module path. An
# executor may have both so we need two lookup dicts.
_alias_to_executors: dict[str, ExecutorName] = {}
_module_to_executors: dict[str, ExecutorName] = {}
# Used to cache the computed ExecutorNames so that we don't need to read/parse config more than once
_executor_names: list[ExecutorName] = []
# Used to cache executors so that we don't construct executor objects unnecessarily
_loaded_executors: dict[ExecutorName, BaseExecutor] = {}

executors = {
LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor",
LOCAL_KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes."
Expand All @@ -69,56 +70,182 @@ class ExecutorLoader:
}

@classmethod
def get_default_executor_name(cls) -> str:
"""Return the default executor name from Airflow configuration.
def block_use_of_hybrid_exec(cls, executor_config: list):
"""Raise an exception if the user tries to use multiple executors before the feature is complete.
:return: executor name from Airflow configuration
This check is built into a method so that it can be easily mocked in unit tests.
:param executor_config: core.executor configuration value.
"""
if len(executor_config) > 1 or ":" in "".join(executor_config):
raise AirflowConfigException(
"Configuring multiple executors and executor aliases are not yet supported!: "
f"{executor_config}"
)

@classmethod
def _get_executor_names(cls) -> list[ExecutorName]:
"""Return the executor names from Airflow configuration.
:return: List of executor names from Airflow configuration
"""
from airflow.configuration import conf

return conf.get_mandatory_value("core", "EXECUTOR")
if cls._executor_names:
return cls._executor_names

executor_names_raw = conf.get_mandatory_list_value("core", "EXECUTOR")

# AIP-61 is WIP. Unblock configuring multiple executors when the feature is ready to launch
cls.block_use_of_hybrid_exec(executor_names_raw)

executor_names = []
for name in executor_names_raw:
if len(split_name := name.split(":")) == 1:
name = split_name[0]
# Check if this is an alias for a core airflow executor, module
# paths won't be provided by the user in that case.
if core_executor_module := cls.executors.get(name):
executor_names.append(ExecutorName(alias=name, module_path=core_executor_module))
# Only a module path or plugin name was provided
else:
executor_names.append(ExecutorName(alias=None, module_path=name))
# An alias was provided with the module path
elif len(split_name) == 2:
# Ensure the user is not trying to override the existing aliases of any of the core
# executors by providing an alias along with the existing core airflow executor alias
# (e.g. my_local_exec_alias:LocalExecutor). Allowing this makes things unnecessarily
# complicated. Multiple Executors of the same type will be supported by a future multitenancy
# AIP.
# The module component should always be a module or plugin path.
if not split_name[1] or split_name[1] in CORE_EXECUTOR_NAMES:
raise AirflowConfigException(
f"Incorrectly formatted executor configuration: {name}\n"
"second portion of an executor configuration must be a module path"
)
else:
executor_names.append(ExecutorName(alias=split_name[0], module_path=split_name[1]))
else:
raise AirflowConfigException(f"Incorrectly formatted executor configuration: {name}")

# As of now, we do not allow duplicate executors.
# Add all module paths/plugin names to a set, since the actual code is what is unique
unique_modules = set([exec_name.module_path for exec_name in executor_names])
if len(unique_modules) < len(executor_names):
msg = (
"At least one executor was configured twice. Duplicate executors is not yet supported. "
"Please check your configuration again to correct the issue."
)
raise AirflowConfigException(msg)

# Populate some mappings for fast future lookups
for executor_name in executor_names:
# Executors will not always have aliases
if executor_name.alias:
cls._alias_to_executors[executor_name.alias] = executor_name
# All executors will have a module path
cls._module_to_executors[executor_name.module_path] = executor_name

# Cache the executor names, so the logic of this method only runs once
cls._executor_names = executor_names

return executor_names

@classmethod
def get_default_executor_name(cls) -> ExecutorName:
"""Return the default executor name from Airflow configuration.
:return: executor name from Airflow configuration
"""
# The default executor is the first configured executor in the list
return cls._get_executor_names()[0]

@classmethod
def get_default_executor(cls) -> BaseExecutor:
"""Create a new instance of the configured executor if none exists and returns it."""
if cls._default_executor is not None:
return cls._default_executor
default_executor = cls.load_executor(cls.get_default_executor_name())

return default_executor

@classmethod
def init_executors(cls) -> list[BaseExecutor]:
"""Create a new instance of all configured executors if not cached already."""
executor_names = cls._get_executor_names()
loaded_executors = []
for executor_name in executor_names:
loaded_executor = cls.load_executor(executor_name.module_path)
if executor_name.alias:
cls.executors[executor_name.alias] = executor_name.module_path
else:
cls.executors[loaded_executor.__class__.__name__] = executor_name.module_path

loaded_executors.append(loaded_executor)

return cls.load_executor(cls.get_default_executor_name())
return loaded_executors

@classmethod
def lookup_executor_name_by_str(cls, executor_name_str: str) -> ExecutorName:
# lookup the executor by alias first, if not check if we're given a module path
if executor_name := cls._alias_to_executors.get(executor_name_str):
return executor_name
elif executor_name := cls._module_to_executors.get(executor_name_str):
return executor_name
else:
raise AirflowException(f"Unknown executor being loaded: {executor_name}")

@classmethod
def load_executor(cls, executor_name: str) -> BaseExecutor:
def load_executor(cls, executor_name: ExecutorName | str) -> BaseExecutor:
"""
Load the executor.
This supports the following formats:
* by executor name for core executor
* by ``{plugin_name}.{class_name}`` for executor from plugins
* by import path.
* by ExecutorName object specification
:return: an instance of executor class via executor_name
"""
if executor_name == CELERY_KUBERNETES_EXECUTOR:
return cls.__load_celery_kubernetes_executor()
elif executor_name == LOCAL_KUBERNETES_EXECUTOR:
return cls.__load_local_kubernetes_executor()
if isinstance(executor_name, str):
_executor_name = cls.lookup_executor_name_by_str(executor_name)
else:
_executor_name = executor_name

# Check if the executor has been previously loaded. Avoid constructing a new object
if _executor_name in cls._loaded_executors:
return cls._loaded_executors[_executor_name]

try:
executor_cls, import_source = cls.import_executor_cls(executor_name)
log.debug("Loading executor %s from %s", executor_name, import_source.value)
if _executor_name.alias == CELERY_KUBERNETES_EXECUTOR:
executor = cls.__load_celery_kubernetes_executor()
elif _executor_name.alias == LOCAL_KUBERNETES_EXECUTOR:
executor = cls.__load_local_kubernetes_executor()
else:
executor_cls, import_source = cls.import_executor_cls(_executor_name)
log.debug("Loading executor %s from %s", _executor_name, import_source.value)
executor = executor_cls()

except ImportError as e:
log.error(e)
raise AirflowConfigException(
f'The module/attribute could not be loaded. Please check "executor" key in "core" section. '
f'Current value: "{executor_name}".'
f'Current value: "{_executor_name}".'
)
log.info("Loaded executor: %s", executor_name)
log.info("Loaded executor: %s", _executor_name)

return executor_cls()
# Store the executor name we've built for this executor in the
# instance. This makes it easier for the Scheduler, Backfill, etc to
# know how we refer to this executor.
executor.name = _executor_name
# Cache this executor by name here, so we can look it up later if it is
# requested again, and not have to construct a new object
cls._loaded_executors[_executor_name] = executor

return executor

@classmethod
def import_executor_cls(
cls, executor_name: str, validate: bool = True
cls, executor_name: ExecutorName, validate: bool = True
) -> tuple[type[BaseExecutor], ConnectorSource]:
"""
Import the executor class.
Expand All @@ -137,22 +264,18 @@ def _import_and_validate(path: str) -> type[BaseExecutor]:
cls.validate_database_executor_compatibility(executor)
return executor

if executor_name in cls.executors:
return _import_and_validate(cls.executors[executor_name]), ConnectorSource.CORE
if executor_name.count(".") == 1:
log.debug(
"The executor name looks like the plugin path (executor_name=%s). Trying to import a "
"executor from a plugin",
executor_name,
)
if executor_name.connector_source == ConnectorSource.PLUGIN:
with suppress(ImportError, AttributeError):
# Load plugins here for executors as at that time the plugins might not have been
# initialized yet
from airflow import plugins_manager

plugins_manager.integrate_executor_plugins()
return _import_and_validate(f"airflow.executors.{executor_name}"), ConnectorSource.PLUGIN
return _import_and_validate(executor_name), ConnectorSource.CUSTOM_PATH
return (
_import_and_validate(f"airflow.executors.{executor_name.module_path}"),
ConnectorSource.PLUGIN,
)
return _import_and_validate(executor_name.module_path), executor_name.connector_source

@classmethod
def import_default_executor_cls(cls, validate: bool = True) -> tuple[type[BaseExecutor], ConnectorSource]:
Expand Down

0 comments on commit 43b6b7a

Please sign in to comment.