Skip to content

[DON'T MERGE] Go-SDK with Coordinator interface#67032

Draft
jason810496 wants to merge 57 commits into
apache:mainfrom
astronomer:refactor/go-sdk/all
Draft

[DON'T MERGE] Go-SDK with Coordinator interface#67032
jason810496 wants to merge 57 commits into
apache:mainfrom
astronomer:refactor/go-sdk/all

Conversation

@jason810496
Copy link
Copy Markdown
Member

Disclaimer

The coordinator interface of this branch is not up to date with the latest discuss on Dev List.
I will wait until the final decision settle down then make the interface up to date and break down this PR into smaller pieces.

The implementation on this branch already worked for both "multi-lang tasks" and "defining the whole Dag in Go" features anyway.

Try it out

  1. Checkout to this feature branch
  2. in files/airflow-breeze-config/environment_variables.env
AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST='[{"name": "go-bundles", "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", "kwargs": {"path": "/files/go-bundles", "refresh_interval": 20}}, {"name": "multi-lang", "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", "kwargs": {"path": "/files/multi-lang", "refresh_interval": 20}}]'
AIRFLOW__SDK__QUEUE_TO_SDK='{"go-task": "executable"}'
AIRFLOW__EXECUTABLE__BUNDLES_FOLDER='/files/multi-lang-go-bundles'
AIRFLOW_CONN_TEST_HTTP='{"conn_type": "http", "login": "user", "password": "pass", "host": "example.com", "port": 1234, "extra": {"param1": "val1", "param2": "val2"}}'
AIRFLOW_VAR_MY_VARIABLE=123
  1. breeze start-airflow --backend postgres
  2. in Breeze container shell tab
mkdir -p /opt/airflow/go-sdk/bin /files/go-bundles /files/multi-lang-go-bundles /files/multi-lang

# Build #1: simple_dag (pure Go)
cd /opt/airflow/go-sdk/example/bundle && go tool airflow-go-pack \
    --output /opt/airflow/go-sdk/bin/example_dags_simple \
    -- -ldflags "-X main.dagId=simple_dag"
cp /opt/airflow/go-sdk/bin/example_dags_simple /files/go-bundles/example_dags
sudo chown airflow:airflow /files/go-bundles/example_dags

# Build #2: go_multi_lang (Python stub backend)
cd /opt/airflow/go-sdk/example/bundle && go tool airflow-go-pack \
    --output /opt/airflow/go-sdk/bin/example_dags_stub \
    -- -ldflags "-X main.dagId=go_multi_lang"
cp /opt/airflow/go-sdk/bin/example_dags_stub /files/multi-lang-go-bundles/example_dags
sudo chown airflow:airflow /files/multi-lang-go-bundles/example_dags

# Old coordinator interface (not up to date with the latest discussion, but it works)
cd /opt/airflow && pip install providers/sdk/executable

# create Stub Dag if not already created. dag_id must match the dagId baked
# into the /files/multi-lang-go-bundles/ binary above ("go_multi_lang"), and the stub
# must expose one @task.stub per Go task the scheduler should trigger.
cat <<EOF > /files/multi-lang/stub_dag.py
from airflow.sdk import dag, task

@task.stub(queue="go-task")
def extract(): ...

@task.stub(queue="go-task")
def transform(): ...

@task
def load():
    print("This is a Python task that depends on two Go tasks!")

@dag()
def go_multi_lang():
    extract() >> transform() >> load()

go_multi_lang()
EOF
  1. Then restart dag-processor and scheduler (press r at scheduler and dag-processor tab)

Note

The same Go source (go-sdk/example/bundle/main.go) is built twice with a different main.dagId override per build:

  • dagId=simple_dag -> pure-Go bundle, dropped into /files/go-bundles/
  • dagId=go_multi_lang -> stub-mode binary, dropped into /files/multi-lang-go-bundles/ next to stub_dag.py (which declares the matching dag_id on the Python side).

airflow-go-pack forwards everything after -- straight to go build, so we pass -ldflags "-X main.dagId=..." to overwrite the package-level dagId string in the example bundle.

jason810496 and others added 30 commits May 6, 2026 13:36
- Introduced the `apache-airflow-providers-languages-java` package with version 0.1.0.
- Added Java-specific task coordinators and DAG file processors.
- Created documentation including README, changelog, and installation instructions.
- Implemented provider info retrieval and commit tracking.
- Established testing framework with initial unit tests for Java provider components.
- Renamed all instances of "process coordinators" to "runtime coordinators" in the codebase.
- Updated the ProvidersManager and ProvidersManagerTaskRuntime classes to handle runtime coordinators.
- Modified the DagFileProcessorManager to collect file extensions from runtime coordinators.
- Adjusted the Java provider to implement the new runtime coordinator structure.
- Updated tests to reflect changes from process to runtime coordinators.
Tweak coordinator class names, attribute names, and method names to be
shorter and avoid the term 'runtime'.
- Remove Java SDK setup in Dockerfile
- add multi-language extras documentation
- Update TaskInstanceDTO description, and adjust API version in generated files
jason810496 added 16 commits May 6, 2026 14:41
* Add 'sdk' to empty_subpackages in provider_conf so the autoapi-
  generated _api/airflow/providers/sdk/index.rst is excluded the
  same way the other namespace-only directories are. Without this,
  Sphinx warned that the document was not in any toctree.
* Fix the relative include paths in security.rst and installing-
  providers-from-sources.rst. Nested providers (those under a
  namespace package like sdk/) sit one directory deeper than
  flat providers, so the include needs four ../ segments instead
  of three to reach devel-common/src/sphinx_exts/includes/.
- Introduced the `apache-airflow-providers-languages-executable` provider with version 0.1.0.
- Updated `pyproject.toml` to include the new provider in dependencies and mypy paths.
- Modified CI Docker Compose files to include paths for the new provider's source and tests.
- Implemented `BaseRuntimeCoordinator` for handling non-Python DAG file processing and task execution.
- Added selector-based I/O loop utilities for managing socket communication between the supervisor and runtime subprocesses.
- Enhanced task runner to resolve and verify access to DAG bundles for runtime coordinators.
- Add `messages_test.go` to test message decoding and encoding functionalities.
- Introduce `serde.go` for serialization of various data types to Airflow's format.
- Create `serde_test.go` to validate serialization logic and ensure correctness.
- Implement `server.go` to handle communication with the supervisor and manage task execution.
- Add `task_runner.go` to execute tasks based on received startup details and handle success/failure.
…etection and update documentation accordingly
- Added new command `airflow-go-pack` to build a self-contained Airflow bundle from a Go package.
- Introduced `inspect` command to print the embedded manifest and optionally the source from a bundle.
- Implemented `dump-bundle-spec` functionality to output the bundle specification in JSON format.
- Created `bundlefooter` package to manage appending source and metadata to the binary with a defined trailer.
- Added tests for bundle footer operations and manifest rendering to ensure correctness.
- Updated Justfile for building and packing example DAG bundles.
…fter "--" and update example bundle to support dynamic DAG naming
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

Status: Backlog

Development

Successfully merging this pull request may close these issues.

2 participants