Add ExecutableCoordinator for native self-contained Dag bundles#67161
Draft
jason810496 wants to merge 137 commits into
Draft
Add ExecutableCoordinator for native self-contained Dag bundles#67161jason810496 wants to merge 137 commits into
jason810496 wants to merge 137 commits into
Conversation
- Introduced the `apache-airflow-providers-languages-java` package with version 0.1.0. - Added Java-specific task coordinators and DAG file processors. - Created documentation including README, changelog, and installation instructions. - Implemented provider info retrieval and commit tracking. - Established testing framework with initial unit tests for Java provider components.
…ng for improved performance and reliability
…e related components
- Renamed all instances of "process coordinators" to "runtime coordinators" in the codebase. - Updated the ProvidersManager and ProvidersManagerTaskRuntime classes to handle runtime coordinators. - Modified the DagFileProcessorManager to collect file extensions from runtime coordinators. - Adjusted the Java provider to implement the new runtime coordinator structure. - Updated tests to reflect changes from process to runtime coordinators.
…_runtime_mapping] config
Tweak coordinator class names, attribute names, and method names to be shorter and avoid the term 'runtime'.
- Remove Java SDK setup in Dockerfile - add multi-language extras documentation - Update TaskInstanceDTO description, and adjust API version in generated files
- Updated the Airflow issue template to include 'sdk-java' as an option. - Added unit tests for JavaCoordinator functionality. - Created a new test file for Java bundle scanning. - Updated uv.lock to reflect new dependency requirements for tomli.
Replace TaskInstance with TaskInstanceDTO in StartupDetails fixtures and add the required pool_slots, queue, and priority_weight fields.
DagCode.get_code_from_file probes every coordinator's can_handle_dag_file on each fileloc, including .py paths nested inside ZIP DAGs (e.g. test_zip.zip/test_zip.py). The Java coordinator opened these as JAR files, raising NotADirectoryError because the parent path is a ZIP file rather than a directory. Short-circuit on the .jar suffix and add NotADirectoryError to the suppressed exceptions for safety.
The config.yml description duplicated the example field as a literal "Example:" line in the description text. With --include-descriptions this rendered as "# Example:", which trips test_cli_show_config_shows_descriptions. The example is already in the dedicated example field, so remove the duplicate from the description.
apache-airflow-providers-sdk-java requires apache-airflow>=3.3.0, so installing it against the 2.11.1 / 3.0.6 / 3.1.8 / 3.2.1 compat targets fails dependency resolution. Add it to remove-providers for each older-Airflow row in PROVIDERS_COMPATIBILITY_TESTS_MATRIX. Also silence mypy no-redef on dev/registry tomli fallback imports, which now trip the mypy-dev hook because tomli is resolvable in the mypy environment after recent uv.lock updates.
Import TaskInstanceDTO from the same airflow.sdk._shared.workloads namespace that BaseCoordinator uses. The previous import via airflow._shared.workloads pointed at the same physical file via a symlink but mypy treated the two namespaces as distinct types, flagging the override as a Liskov violation.
* Add 'sdk' to empty_subpackages in provider_conf so the autoapi- generated _api/airflow/providers/sdk/index.rst is excluded the same way the other namespace-only directories are. Without this, Sphinx warned that the document was not in any toctree. * Fix the relative include paths in security.rst and installing- providers-from-sources.rst. Nested providers (those under a namespace package like sdk/) sit one directory deeper than flat providers, so the include needs four ../ segments instead of three to reach devel-common/src/sphinx_exts/includes/.
… timezone awareness in tests
…meters in test_supervisor
…meters in TestCommsDecoder
- Removed the shared workloads dependency from pyproject.toml and related files. - Deleted the workloads directory and its references in the codebase. - Refactored imports of TaskInstanceDTO to point to the new location in execution_time.workloads.task. - Introduced new files for TaskInstanceDTO and its base class in the execution_time module. - Updated tests to reflect the changes in TaskInstanceDTO imports.
…ample Add JavaCoordinator, jvm, openjdk, Xmx to the docs spelling wordlist so the rendered configurations-ref doesn't fail Sphinx spellcheck on the [sdk] coordinators example. Also indent multi-line example/default values by 8 spaces in the shared sections-and-options template so the rendered RST code-block keeps consistent indentation and doesn't break the field list.
Reflecting current AIP-108 scope. We can add this later if we want to.
The base class is still in execution_time since the interface needs to be known by the task runner.
* CI: Fix uv.lock by removing coordinator distribution * Remove the remaining entry catch by codex bot
A JavaCoordinator can now accept more than one path to look for JARs. This path is also used to populate --class-path when executing the task, so you can now split dependencies and the task-containing JAR into different locations. This should make deployment a bit easier. The already-unused BundleScanner class has been removed. This was for Java-based DAGs, and not used in the current specification.
* Drop selector_loop module; new JavaCoordinator does not need it The rewritten ``JavaCoordinator`` (``airflow.sdk.coordinators.java``) lets the JVM connect directly to two listening sockets (``comm`` and ``logs``) and uses the accepted sockets as the supervisor's ``stdin`` / log pipes straight up. There is no bytes-bridge between two sockets, so ``make_raw_forwarder`` -- the only helper the extracted ``selector_loop`` module added beyond what was already inline in ``supervisor.py`` -- has no caller. ``_JavaActivitySubprocess`` reuses ``_register_pipe_readers`` and ``_close_unused_sockets`` via subclassing ``ActivitySubprocess``; both methods existed before the extraction and remain in ``supervisor.py`` after this revert. The inline selector dispatch loop and ``make_buffered_socket_reader`` come back into ``supervisor.py`` so the existing call sites (including the ``triggerer_job_runner`` re-export) keep working unchanged. This reverts commit 56464a8 ("Add common selector loop utilities for socket I/O handling for subprocesses") and deletes ``test_selector_loop.py`` introduced by 4b80753. * Address copilot's comments
With foreign language SDKs, it may be possible the two sides of supervisor comm have different versions. This adds a migration layer at the supervisor (server) side, so an SDK (client) using a lower version of the schema may be able to communicate to the server.
…o-sdk/executable-coordinator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Why
The
BaseCoordinatorinterface already supports non-Python SDKs (Java is the prior art),but there is no in-tree coordinator for native compiled SDKs (Go, Rust, C++, Zig, ...).
This adds one —
ExecutableCoordinator— together with a language-agnostic on-diskbundle format (
AFBNDL01) so any compiled SDK produces artifacts the TaskSDK can handle identically.What
and
airflow-metadata.yamlmanifest appended after end-of-file, located by a fixed32-byte trailer ending in the magic
AFBNDL01. The file stays directly runnable;detection is by trailer magic, not by filename or extension.
ExecutableCoordinator(subclass ofBaseCoordinator) that:BundleScanner, reading the trailer to extract theembedded source and metadata without unpacking;
sockets the child connects back to, reusing the
ActivitySubprocessplumbingintroduced for
JavaCoordinator;the worker's
$PATHdiffers from the build environment.Was generative AI tooling used to co-author this PR?