Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
ed5f577
Add Task Coordinators and Dag File Processor
jason810496 Apr 8, 2026
77433cd
Add initial Java provider for Apache Airflow
jason810496 Apr 8, 2026
072b7b2
Add common selector loop utilities for socket I/O handling for subpro…
jason810496 Apr 10, 2026
748108e
Implement Java DAG file processor with TCP communication bridge
jason810496 Apr 9, 2026
b277387
Make JavaDagFileProcessor.can_handle aware of jar file content
jason810496 Apr 9, 2026
19312c3
Fix java process startup issue
jason810496 Apr 9, 2026
bdf8944
Fix sockets bidning
jason810496 Apr 9, 2026
74954ee
Refactor Java DAG file processor to use selector-based I/O multiplexi…
jason810496 Apr 10, 2026
4b47a06
Add BaseLocaleCoordinator for non-Python DAG file processing and task…
jason810496 Apr 10, 2026
76e5139
Implement JavaLocaleCoordinator
jason810496 Apr 10, 2026
eace240
Add Java task coordinator and entrypoint for locale-specific execution
jason810496 Apr 13, 2026
1a5b97c
Refactor Java provider to with generic process coordinators and updat…
jason810496 Apr 13, 2026
db944c3
Fix Coordinator by getting the correct dag bundle and dag path
jason810496 Apr 14, 2026
2ebc2a8
Make @task.stub(language=java) works
jason810496 Apr 15, 2026
1e2243c
Make coordinator respect Jar bundle based on TI workload type
jason810496 Apr 16, 2026
a19e3c4
Add java_sdk_setup script for Breeze
jason810496 Apr 16, 2026
3756294
Add get_code_from_file interface for BaseLocaleCoordinator
jason810496 Apr 16, 2026
c77a48d
Fix the 'Pure Java Dag' disappear in metadata DB issue
jason810496 Apr 17, 2026
8f03d1d
Refactor process coordinators to runtime coordinators
jason810496 Apr 17, 2026
df206a0
Rename stub operator language field as sdk
jason810496 Apr 23, 2026
40421cd
Rename languages.java provider to sdk.java
jason810496 Apr 23, 2026
cf3f1c5
Add unit tests for socket handling and selector loop functionality
jason810496 Apr 23, 2026
9518f4a
Move TaskInstanceDTO to share to make task_runner retrieve TI.queue
jason810496 Apr 21, 2026
95628d5
Add [workers/queue_to_runtime_mapping]
jason810496 Apr 22, 2026
0edd77c
Remove the sdk field from stub operator and respect [workers/queue_to…
jason810496 Apr 23, 2026
9a1695f
Rename `[workers] queue_to_runtime_mapping` to `[sdk] queue_to_sdk`
jason810496 Apr 23, 2026
294e1ee
Simplify coordinator-related names (#1569)
uranusjr Apr 24, 2026
d7ad62f
CI: Add mypy and unit tests for shared/workloads
jason810496 Apr 28, 2026
130e168
CI: Fix DB migration and breeze images
jason810496 Apr 28, 2026
597fd5d
CI: Fix failing items
jason810496 Apr 28, 2026
57f54d0
CI: Fix failing items
jason810496 Apr 28, 2026
c427a4b
CI: Add compat for create_runtime_ti pytest fixture
jason810496 Apr 28, 2026
0bb870d
CI: Fix Java provider test to include configuration options
jason810496 Apr 28, 2026
b6a05e1
CI: Fix self-review nits
jason810496 Apr 28, 2026
94aa31c
Revert MappedOperator change
jason810496 Apr 28, 2026
e6779be
CI: Fix failing items
jason810496 Apr 28, 2026
40109d8
CI: Fix Task SDK test_task_runner failures using TaskInstanceDTO
jason810496 Apr 28, 2026
238274c
CI: Skip non-JAR paths in JavaCoordinator.can_handle_dag_file
jason810496 Apr 28, 2026
15044f1
CI: Drop literal Example: line from queue_to_sdk config description
jason810496 Apr 28, 2026
6323035
CI: Skip sdk-java provider in compat tests for older Airflow
jason810496 Apr 28, 2026
45a60e3
CI: Fix MyPy Liskov violation in JavaCoordinator.task_execution_cmd
jason810496 Apr 28, 2026
e957aab
CI: Fix sdk-java docs build warnings
jason810496 Apr 28, 2026
336cb46
Add executable provider and runtime coordinator
jason810496 Apr 29, 2026
f40646a
Add bundle specification documentation and metadata schema
jason810496 Apr 30, 2026
d90e34a
Add ADRs for bundle packing options and Go tool directive implementation
jason810496 Apr 30, 2026
1c586c6
Fix base coordinator name
jason810496 Apr 30, 2026
345251d
Add ADR for dual-mode bundle binary supporting msgpack-over-IPC coord…
jason810496 May 4, 2026
e3fb68c
Rename ExecutableRuntimeCoordinator to ExecutableCoordinator across d…
jason810496 May 4, 2026
3f4aac5
Refactor bundle spec to use self-contained executable with embedded m…
jason810496 May 5, 2026
695ced0
Implement execution server and task runner for coordinator protocol
jason810496 May 5, 2026
2e1f481
Refactor bundle scanning logic to support self-contained executable d…
jason810496 May 5, 2026
ec0e71c
Add airflow-go-pack for building self-contained Airflow bundles
jason810496 May 5, 2026
4817e88
Refactor DAG file discovery by respecting coordinator
jason810496 May 5, 2026
857b0e1
Enhance task and DAG registration with optional specifications for im…
jason810496 May 11, 2026
14cfafb
Support setting downstream at AddTask method
jason810496 May 12, 2026
efc25ef
Enhance argument validation in airflow-go-pack to allow build flags a…
jason810496 May 12, 2026
8612c45
Fix command resolution in ExecutableCoordinator for improved task exe…
jason810496 May 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/1-airflow_bug_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ body:
- redis
- salesforce
- samba
- sdk-java
- segment
- sendgrid
- sftp
Expand Down
6 changes: 6 additions & 0 deletions .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ labelPRBasedOnFilePath:
provider:keycloak:
- providers/keycloak/**

provider:sdk-java:
- providers/sdk/java/**

provider:microsoft-azure:
- providers/microsoft/azure/**

Expand Down Expand Up @@ -261,6 +264,9 @@ labelPRBasedOnFilePath:
provider:samba:
- providers/samba/**

provider:sdk-executable:
- providers/sdk/executable/**

provider:segment:
- providers/segment/**

Expand Down
11 changes: 11 additions & 0 deletions airflow-core/docs/extra-packages-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,17 @@ all the ``airflow`` packages together - similarly to what happened in Airflow 2.
``airflow-task-sdk`` separately, if you want to install providers, you need to install them separately as
``apache-airflow-providers-*`` distribution packages.

Multi-Language extras
=====================

These are extras that add dependencies needed for integration with other languages runtimes. Currently we have only Java SDK related extra, but in the future we might add more extras related to other languages runtimes.

+----------+------------------------------------------+------------------------------------------------------------------+
| extra | install command | enables |
+==========+==========================================+==================================================================+
| sdk.java | ``pip install apache-airflow[sdk.java]`` | JavaCoordinator for both dag processing and workload execution. |
+----------+------------------------------------------+------------------------------------------------------------------+

Apache Software extras
======================

Expand Down
2 changes: 2 additions & 0 deletions airflow-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ exclude = [
"../shared/serialization/src/airflow_shared/serialization" = "src/airflow/_shared/serialization"
"../shared/state/src/airflow_shared/state" = "src/airflow/_shared/state"
"../shared/timezones/src/airflow_shared/timezones" = "src/airflow/_shared/timezones"
"../shared/workloads/src/airflow_shared/workloads" = "src/airflow/_shared/workloads"
"../shared/listeners/src/airflow_shared/listeners" = "src/airflow/_shared/listeners"
"../shared/plugins_manager/src/airflow_shared/plugins_manager" = "src/airflow/_shared/plugins_manager"
"../shared/providers_discovery/src/airflow_shared/providers_discovery" = "src/airflow/_shared/providers_discovery"
Expand Down Expand Up @@ -337,6 +338,7 @@ shared_distributions = [
"apache-airflow-shared-serialization",
"apache-airflow-shared-state",
"apache-airflow-shared-timezones",
"apache-airflow-shared-workloads",
"apache-airflow-shared-plugins-manager",
"apache-airflow-shared-providers-discovery",
]
1 change: 1 addition & 0 deletions airflow-core/src/airflow/_shared/workloads
15 changes: 15 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1967,6 +1967,21 @@ workers:
type: integer
example: ~
default: "60"
sdk:
description: Settings for non-Python SDK runtime coordination
options:
queue_to_sdk:
description: |
JSON mapping of queue names to SDK runtime coordinator names.

When a task's ``language`` field is not set, this mapping is checked
to route the task to a non-Python runtime coordinator based on its
queue. This is useful when queues are used as environment or
isolation identifiers (e.g. ``foo``, ``bar``).
version_added: 3.1.7
type: string
example: '{"foo": "java", "bar": "java", "go-queue": "go"}'
default: "{{}}"
api_auth:
description: Settings relating to authentication on the Airflow APIs
options:
Expand Down
78 changes: 74 additions & 4 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
from airflow.sdk import SecretCache
from airflow.sdk.log import init_log_file, logging_processors
from airflow.typing_compat import assert_never
from airflow.utils.file import list_py_file_paths, might_contain_dag
from airflow.utils.file import might_contain_dag
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.process_utils import (
Expand All @@ -88,6 +88,9 @@
from airflow.sdk.api.client import Client


log = logging.getLogger(__name__)


class DagParsingStat(NamedTuple):
"""Information on processing progress."""

Expand Down Expand Up @@ -158,6 +161,62 @@ def utc_epoch() -> datetime:
return result


def discover_dag_file_paths(
directory: str | os.PathLike[str] | None,
bundle_name: str = "",
safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", fallback=True),
) -> list[str]:
"""
Discover paths of DAG files within a directory.

Walks ``directory`` (honouring ``.airflowignore``) and returns each file that is
either a Python DAG candidate (``.py`` source or ZIP archive that passes
:func:`~airflow.utils.file.might_contain_dag`) or accepted by a registered coordinator's
:meth:`~airflow.sdk.execution_time.coordinator.BaseCoordinator.can_handle_dag_file`
(e.g. a ``.jar`` for the Java SDK, a self-contained executable for the Go SDK).

Coordinator handling takes precedence over the generic ZIP heuristic so that,
for example, a ``.jar`` is delegated to its coordinator rather than being
scanned for embedded ``.py`` modules.

:param directory: Directory to scan, or a single file path. ``None`` returns
an empty list. A single file is returned as-is without filtering.
:param bundle_name: Bundle name forwarded to ``can_handle_dag_file``.
:param safe_mode: Whether to apply the Python DAG heuristic; see
:func:`~airflow.utils.file.might_contain_dag`.
:return: Absolute paths discovered as DAG sources.
"""
if directory is None:
return []
if os.path.isfile(directory):
return [str(directory)]
if not os.path.isdir(directory):
return []

from airflow._shared.module_loading.file_discovery import find_path_from_directory
from airflow.providers_manager import ProvidersManager

coordinators = ProvidersManager().coordinators
ignore_file_syntax = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="glob")

file_paths: list[str] = []
for file_path in find_path_from_directory(directory, ".airflowignore", ignore_file_syntax):
path = Path(file_path)
try:
if not path.is_file():
continue
if path.suffix == ".py":
if might_contain_dag(file_path, safe_mode):
file_paths.append(file_path)
elif any(c.can_handle_dag_file(bundle_name, file_path) for c in coordinators):
file_paths.append(file_path)
elif zipfile.is_zipfile(path) and might_contain_dag(file_path, safe_mode):
file_paths.append(file_path)
except Exception:
log.exception("Error while examining %s", file_path)
return file_paths


class _StubSelector(selectors.BaseSelector):
"""
Stub to stand in until the real selector is created.
Expand Down Expand Up @@ -808,9 +867,11 @@ def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):

def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:
"""Get relative paths for dag files from bundle dir."""
# Build up a list of Python files that could contain DAGs
self.log.info("Searching for files in %s at %s", bundle.name, bundle.path)
rel_paths = [Path(x).relative_to(bundle.path) for x in list_py_file_paths(bundle.path)]
rel_paths = [
Path(x).relative_to(bundle.path)
for x in discover_dag_file_paths(bundle.path, bundle_name=bundle.name)
]
self.log.info("Found %s files for bundle %s", len(rel_paths), bundle.name)

return rel_paths
Expand All @@ -822,7 +883,13 @@ def _get_observed_filelocs(self, present: set[DagFileInfo]) -> set[str]:
For regular files this includes the relative file path.
For ZIP archives this includes DAG-like inner paths such as
``archive.zip/dag.py``.

Files claimed by a registered runtime coordinator (e.g. ``.jar``)
are treated as opaque files rather than ZIP archives.
"""
from airflow.providers_manager import ProvidersManager

coordinators = ProvidersManager().coordinators

def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]:
"""Yield absolute paths for DAG-like files inside a ZIP archive."""
Expand All @@ -837,7 +904,10 @@ def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]:
observed_filelocs: set[str] = set()
for info in present:
abs_path = str(info.absolute_path)
if abs_path.endswith(".py") or not zipfile.is_zipfile(abs_path):
handled_by_coordinator = any(
c.can_handle_dag_file(info.bundle_name, abs_path) for c in coordinators
)
if abs_path.endswith(".py") or handled_by_coordinator or not zipfile.is_zipfile(abs_path):
observed_filelocs.add(str(info.rel_path))
else:
if TYPE_CHECKING:
Expand Down
62 changes: 58 additions & 4 deletions airflow-core/src/airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import contextlib
import functools
import importlib
import logging
import os
Expand Down Expand Up @@ -76,8 +77,6 @@
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
from socket import socket

from structlog.typing import FilteringBoundLogger

from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI
Expand All @@ -86,6 +85,7 @@
from airflow.sdk.definitions.context import Context
from airflow.sdk.definitions.dag import DAG
from airflow.sdk.definitions.mappedoperator import MappedOperator
from airflow.sdk.execution_time.supervisor import SelectorCallback
from airflow.typing_compat import Self


Expand Down Expand Up @@ -553,7 +553,14 @@ def start( # type: ignore[override]
) -> Self:
logger = kwargs["logger"]

_pre_import_airflow_modules(os.fspath(path), logger)
# Check if a provider-registered runtime coordinator should handle this file
logger.debug("Checking for provider-registered runtime coordinator entrypoint for file", path=path)
resolved_target = cls._resolve_processor_target(path, bundle_name, bundle_path, logger)
if resolved_target is not None:
target = resolved_target
logger.debug("Resolved provider-registered runtime coordinator entrypoint for file", path=path)
else:
_pre_import_airflow_modules(os.fspath(path), logger)

proc: Self = super().start(
target=target,
Expand All @@ -566,6 +573,53 @@ def start( # type: ignore[override]
proc._on_child_started(callbacks, path, bundle_path, bundle_name)
return proc

@staticmethod
def _resolve_processor_target(
path: str | os.PathLike[str],
bundle_name: str,
bundle_path: Path,
log: FilteringBoundLogger,
) -> Callable[[], None] | None:
"""
Return the entrypoint of the first provider runtime coordinator that can handle *path*.

The returned callable is a ``functools.partial`` that binds *path*, *bundle_name*
and *bundle_path* so the supervisor can pass it as a no-arg ``target`` to
``WatchedSubprocess.start``.
"""
from airflow.providers_manager import ProvidersManager

for coordinator_cls in ProvidersManager().coordinators:
try:
log.debug(
"Checking runtime coordinator %s for file %s",
coordinator_cls,
path,
)
if coordinator_cls.can_handle_dag_file(bundle_name, path):
log.debug(
"Using runtime coordinator %s for file %s",
coordinator_cls,
path,
)
return functools.partial(
coordinator_cls.run_dag_parsing,
path=os.fspath(path),
bundle_name=bundle_name,
bundle_path=os.fspath(bundle_path),
)
log.debug(
"Runtime coordinator %s cannot handle file %s with bundle name %s",
coordinator_cls,
path,
bundle_name,
)
except Exception:
log.warning("Failed to check runtime coordinator %s", coordinator_cls, exc_info=True)

log.debug("No runtime coordinator found for file %s, using default processor", path)
return None

def _on_child_started(
self,
callbacks: list[CallbackRequest],
Expand All @@ -591,7 +645,7 @@ def _get_target_loggers(self) -> tuple[FilteringBoundLogger, ...]:

def _create_log_forwarder(
self, loggers: tuple[FilteringBoundLogger, ...], name: str, log_level: int = logging.INFO
) -> Callable[[socket], bool]:
) -> SelectorCallback:
return super()._create_log_forwarder(loggers, name.replace("task.", "dag_processor.", 1), log_level)

def _handle_request(self, msg: ToManager, log: FilteringBoundLogger, req_id: int) -> None:
Expand Down
4 changes: 1 addition & 3 deletions airflow-core/src/airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,10 +651,8 @@ def run_workload(
if isinstance(workload, ExecuteTask):
from airflow.sdk.execution_time.supervisor import supervise_task

# workload.ti is a TaskInstanceDTO which duck-types as TaskInstance.
# TODO: Create a protocol for this.
return supervise_task(
ti=workload.ti, # type: ignore[arg-type]
ti=workload.ti,
bundle_info=workload.bundle_info,
dag_rel_path=workload.dag_rel_path,
token=workload.token,
Expand Down
12 changes: 9 additions & 3 deletions airflow-core/src/airflow/executors/workloads/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
from pathlib import Path
from typing import TYPE_CHECKING, Literal

from pydantic import BaseModel, Field
from pydantic import Field

from airflow._shared.workloads import TaskInstanceDTO as _BaseTaskInstanceDTO
from airflow.executors.workloads.base import BaseDagBundleWorkload, BundleInfo
from airflow.utils.state import TaskInstanceState

Expand All @@ -33,8 +34,13 @@
from airflow.models.taskinstancekey import TaskInstanceKey


class TaskInstanceDTO(BaseModel):
"""Schema for TaskInstance with minimal required fields needed for Executors and Task SDK."""
class TaskInstanceDTO(_BaseTaskInstanceDTO):
"""
TaskInstanceDTO with executor-specific ``key`` property.

Extends the shared :class:`~airflow._shared.workloads.TaskInstanceDTO`
to add the :attr:`key` property used by executors for workload tracking.
"""

id: uuid.UUID
dag_version_id: uuid.UUID
Expand Down
10 changes: 10 additions & 0 deletions airflow-core/src/airflow/models/dagcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ def code(cls, dag_id, session: Session = NEW_SESSION) -> str:

@staticmethod
def get_code_from_file(fileloc):
# Try from runtime coordinator first (classes are pre-loaded by ProvidersManager)
from airflow.providers_manager import ProvidersManager

for coordinator_cls in ProvidersManager().coordinators:
# TODO: Perhaps the `can_handle_dag_file` interface should just accept `path` only?
# Or maybe we can have different granularity for this. that 1 with bundle + path, another with just path
if coordinator_cls.can_handle_dag_file("", fileloc):
return coordinator_cls.get_code_from_file(fileloc)

# Then fallback to python native
try:
with open_maybe_zipped(fileloc, "r") as f:
code = f.read()
Expand Down
7 changes: 7 additions & 0 deletions airflow-core/src/airflow/provider.yaml.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,13 @@
}
}
},
"coordinators": {
"type": "array",
"description": "Runtime Coordinator class names (BaseCoordinator subclasses)",
"items": {
"type": "string"
}
},
"source-date-epoch": {
"type": "integer",
"description": "Source date epoch - seconds since epoch (gmtime) when the release documentation was prepared. Used to generate reproducible package builds with flint.",
Expand Down
7 changes: 7 additions & 0 deletions airflow-core/src/airflow/provider_info.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,13 @@
"type": "string"
}
}
},
"coordinators": {
"type": "array",
"description": "Runtime Coordinator class names (BaseCoordinator subclasses)",
"items": {
"type": "string"
}
}
},
"definitions": {
Expand Down
Loading