Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
137 commits
Select commit Hold shift + click to select a range
625b083
Add Task Coordinators and Dag File Processor
jason810496 Apr 8, 2026
dda69f2
Add initial Java provider for Apache Airflow
jason810496 Apr 8, 2026
56464a8
Add common selector loop utilities for socket I/O handling for subpro…
jason810496 Apr 10, 2026
67affe7
Implement Java DAG file processor with TCP communication bridge
jason810496 Apr 9, 2026
128e2bb
Make JavaDagFileProcessor.can_handle aware of jar file content
jason810496 Apr 9, 2026
7964c99
Fix java process startup issue
jason810496 Apr 9, 2026
57dbe6a
Fix sockets bidning
jason810496 Apr 9, 2026
740964b
Refactor Java DAG file processor to use selector-based I/O multiplexi…
jason810496 Apr 10, 2026
4db208b
Add BaseLocaleCoordinator for non-Python DAG file processing and task…
jason810496 Apr 10, 2026
a911913
Implement JavaLocaleCoordinator
jason810496 Apr 10, 2026
847c2a2
Add Java task coordinator and entrypoint for locale-specific execution
jason810496 Apr 13, 2026
1089edc
Refactor Java provider to with generic process coordinators and updat…
jason810496 Apr 13, 2026
40c10b9
Fix Coordinator by getting the correct dag bundle and dag path
jason810496 Apr 14, 2026
00078b6
Make @task.stub(language=java) works
jason810496 Apr 15, 2026
217b6e5
Make coordinator respect Jar bundle based on TI workload type
jason810496 Apr 16, 2026
f0b13ad
Add java_sdk_setup script for Breeze
jason810496 Apr 16, 2026
65a0410
Add get_code_from_file interface for BaseLocaleCoordinator
jason810496 Apr 16, 2026
c307f19
Fix the 'Pure Java Dag' disappear in metadata DB issue
jason810496 Apr 17, 2026
7cfbea4
Refactor process coordinators to runtime coordinators
jason810496 Apr 17, 2026
b5aa4c8
Rename stub operator language field as sdk
jason810496 Apr 23, 2026
ec2977b
Rename languages.java provider to sdk.java
jason810496 Apr 23, 2026
4b80753
Add unit tests for socket handling and selector loop functionality
jason810496 Apr 23, 2026
2b830b6
Move TaskInstanceDTO to share to make task_runner retrieve TI.queue
jason810496 Apr 21, 2026
90f0385
Add [workers/queue_to_runtime_mapping]
jason810496 Apr 22, 2026
8a9ac46
Remove the sdk field from stub operator and respect [workers/queue_to…
jason810496 Apr 23, 2026
fbcefa2
Rename `[workers] queue_to_runtime_mapping` to `[sdk] queue_to_sdk`
jason810496 Apr 23, 2026
1ccf3a5
Simplify coordinator-related names (#1569)
uranusjr Apr 24, 2026
6056355
CI: Add mypy and unit tests for shared/workloads
jason810496 Apr 28, 2026
8cf3c57
CI: Fix DB migration and breeze images
jason810496 Apr 28, 2026
05531e6
CI: Fix failing items
jason810496 Apr 28, 2026
3e3054a
CI: Fix failing items
jason810496 Apr 28, 2026
64b6595
CI: Add compat for create_runtime_ti pytest fixture
jason810496 Apr 28, 2026
12083e0
CI: Fix Java provider test to include configuration options
jason810496 Apr 28, 2026
da3a5d9
CI: Fix self-review nits
jason810496 Apr 28, 2026
da74c1a
Revert MappedOperator change
jason810496 Apr 28, 2026
738ea08
CI: Fix failing items
jason810496 Apr 28, 2026
ed081ec
CI: Fix Task SDK test_task_runner failures using TaskInstanceDTO
jason810496 Apr 28, 2026
e9d4a99
CI: Skip non-JAR paths in JavaCoordinator.can_handle_dag_file
jason810496 Apr 28, 2026
3b864c7
CI: Drop literal Example: line from queue_to_sdk config description
jason810496 Apr 28, 2026
f22a4ff
CI: Skip sdk-java provider in compat tests for older Airflow
jason810496 Apr 28, 2026
3677e81
CI: Fix MyPy Liskov violation in JavaCoordinator.task_execution_cmd
jason810496 Apr 28, 2026
a8c16a1
CI: Fix sdk-java docs build warnings
jason810496 Apr 28, 2026
3f4ce29
CI: Update SDK Java configuration and documentation references
jason810496 Apr 29, 2026
5902653
CI: Update map_index handling and add fixture to restore process cont…
jason810496 Apr 29, 2026
bd9f497
CI: Refactor map_index handling and update time travel decorators for…
jason810496 Apr 30, 2026
baf094f
CI: Replace TaskInstance with TaskInstanceDTO and add additional para…
jason810496 Apr 30, 2026
9d49792
CI: Update test cases to use -1 for map_index and add additional para…
jason810496 Apr 30, 2026
cd52b3e
Remove shared workloads dependency and refactor TaskInstanceDTO usage
jason810496 Apr 30, 2026
5e34653
Move sdk.java out of provider as apache-airflow-coordinators-java dis…
jason810496 May 6, 2026
80488ea
Enhance documentation for BaseCoordinator lifecycle methods and IPC m…
jason810496 May 7, 2026
ccd02c6
CI: Fix docs spellcheck and code-block indent for sdk coordinators ex…
jason810496 May 7, 2026
186d621
Remove pure dag parsing from coordinator (#1578)
uranusjr May 14, 2026
26d292d
Move coordinator implementation back into SDK (#1580)
uranusjr May 15, 2026
ebe061c
CI: Fix uv.lock by removing coordinator distribution (#1585)
jason810496 May 18, 2026
cea4017
Fix import ignore test
uranusjr May 19, 2026
6f10fc7
Rewrite Java coordinator
uranusjr May 18, 2026
3c053ef
Accept [sdk] coordinators as dict and lazy init
uranusjr May 19, 2026
e4b5125
Redo JAR lookup
uranusjr May 19, 2026
5abb82e
Drop selector_loop module (#1587)
jason810496 May 20, 2026
0c97b89
Add schema migration to supervisor-child comm
uranusjr May 20, 2026
6709ba7
Redo 16332b2e3be427e614a1ddc7c7bb610f4d284b22
uranusjr May 20, 2026
6b4f5ca
Add executable provider and runtime coordinator
jason810496 Apr 29, 2026
a2fc8b7
Add bundle specification documentation and metadata schema
jason810496 Apr 30, 2026
ac75f52
Fix base coordinator name
jason810496 Apr 30, 2026
4fdbaf5
Rename ExecutableRuntimeCoordinator to ExecutableCoordinator across d…
jason810496 May 4, 2026
24baf05
Refactor bundle spec to use self-contained executable with embedded m…
jason810496 May 5, 2026
01103ba
Refactor bundle scanning logic to support self-contained executable d…
jason810496 May 5, 2026
bb14ef1
Refactor DAG file discovery by respecting coordinator
jason810496 May 5, 2026
48c767b
Fix command resolution in ExecutableCoordinator for improved task exe…
jason810496 May 12, 2026
7d57137
Move executable coordinator back to SDK
jason810496 May 19, 2026
d58b330
Align the latest coordinator interface for executable coordinator
jason810496 May 19, 2026
04a6420
Cleanup coordinator distribution
jason810496 May 21, 2026
7468413
refactor: consolidate bundle_scanner with ExecutableCoordinator
jason810496 May 21, 2026
a908f3e
Redo executable lookup
jason810496 May 21, 2026
3dd0513
Add Task Coordinators and Dag File Processor
jason810496 Apr 8, 2026
f7f0aa3
Add initial Java provider for Apache Airflow
jason810496 Apr 8, 2026
c5c118f
Add common selector loop utilities for socket I/O handling for subpro…
jason810496 Apr 10, 2026
bf47596
Implement Java DAG file processor with TCP communication bridge
jason810496 Apr 9, 2026
8bfd9f6
Make JavaDagFileProcessor.can_handle aware of jar file content
jason810496 Apr 9, 2026
663985b
Fix java process startup issue
jason810496 Apr 9, 2026
b2eeba2
Fix sockets bidning
jason810496 Apr 9, 2026
6a15f34
Refactor Java DAG file processor to use selector-based I/O multiplexi…
jason810496 Apr 10, 2026
bb47453
Add BaseLocaleCoordinator for non-Python DAG file processing and task…
jason810496 Apr 10, 2026
b49384a
Implement JavaLocaleCoordinator
jason810496 Apr 10, 2026
aaf74a7
Add Java task coordinator and entrypoint for locale-specific execution
jason810496 Apr 13, 2026
df48bd8
Refactor Java provider to with generic process coordinators and updat…
jason810496 Apr 13, 2026
0e92379
Fix Coordinator by getting the correct dag bundle and dag path
jason810496 Apr 14, 2026
fef5d07
Make @task.stub(language=java) works
jason810496 Apr 15, 2026
1f35ad3
Make coordinator respect Jar bundle based on TI workload type
jason810496 Apr 16, 2026
1d60dae
Add java_sdk_setup script for Breeze
jason810496 Apr 16, 2026
a432533
Add get_code_from_file interface for BaseLocaleCoordinator
jason810496 Apr 16, 2026
fec62dc
Fix the 'Pure Java Dag' disappear in metadata DB issue
jason810496 Apr 17, 2026
2d113a1
Refactor process coordinators to runtime coordinators
jason810496 Apr 17, 2026
bc42d0b
Rename stub operator language field as sdk
jason810496 Apr 23, 2026
8b96a21
Rename languages.java provider to sdk.java
jason810496 Apr 23, 2026
f58c12e
Add unit tests for socket handling and selector loop functionality
jason810496 Apr 23, 2026
16a975c
Move TaskInstanceDTO to share to make task_runner retrieve TI.queue
jason810496 Apr 21, 2026
9a082f5
Add [workers/queue_to_runtime_mapping]
jason810496 Apr 22, 2026
8ccbed1
Remove the sdk field from stub operator and respect [workers/queue_to…
jason810496 Apr 23, 2026
66d328c
Rename `[workers] queue_to_runtime_mapping` to `[sdk] queue_to_sdk`
jason810496 Apr 23, 2026
245d796
Simplify coordinator-related names (#1569)
uranusjr Apr 24, 2026
4cfa393
CI: Add mypy and unit tests for shared/workloads
jason810496 Apr 28, 2026
52420e4
CI: Fix DB migration and breeze images
jason810496 Apr 28, 2026
81ec073
CI: Fix failing items
jason810496 Apr 28, 2026
749002e
CI: Fix failing items
jason810496 Apr 28, 2026
de70320
CI: Add compat for create_runtime_ti pytest fixture
jason810496 Apr 28, 2026
f0f2ef5
CI: Fix Java provider test to include configuration options
jason810496 Apr 28, 2026
34e50ae
CI: Fix self-review nits
jason810496 Apr 28, 2026
e5d8c94
Revert MappedOperator change
jason810496 Apr 28, 2026
cb0a02b
CI: Fix failing items
jason810496 Apr 28, 2026
610739d
CI: Fix Task SDK test_task_runner failures using TaskInstanceDTO
jason810496 Apr 28, 2026
7110d00
CI: Skip non-JAR paths in JavaCoordinator.can_handle_dag_file
jason810496 Apr 28, 2026
4869769
CI: Drop literal Example: line from queue_to_sdk config description
jason810496 Apr 28, 2026
68396a6
CI: Skip sdk-java provider in compat tests for older Airflow
jason810496 Apr 28, 2026
3a25a0b
CI: Fix MyPy Liskov violation in JavaCoordinator.task_execution_cmd
jason810496 Apr 28, 2026
8c480a4
CI: Fix sdk-java docs build warnings
jason810496 Apr 28, 2026
eeb9e8f
CI: Update SDK Java configuration and documentation references
jason810496 Apr 29, 2026
d949f5e
CI: Update map_index handling and add fixture to restore process cont…
jason810496 Apr 29, 2026
08a8daa
CI: Refactor map_index handling and update time travel decorators for…
jason810496 Apr 30, 2026
454f6ce
CI: Replace TaskInstance with TaskInstanceDTO and add additional para…
jason810496 Apr 30, 2026
d75d6c8
CI: Update test cases to use -1 for map_index and add additional para…
jason810496 Apr 30, 2026
4f1ac0b
Remove shared workloads dependency and refactor TaskInstanceDTO usage
jason810496 Apr 30, 2026
59cf866
Move sdk.java out of provider as apache-airflow-coordinators-java dis…
jason810496 May 6, 2026
c5a34cb
Enhance documentation for BaseCoordinator lifecycle methods and IPC m…
jason810496 May 7, 2026
9d063b6
CI: Fix docs spellcheck and code-block indent for sdk coordinators ex…
jason810496 May 7, 2026
f756944
Remove pure dag parsing from coordinator (#1578)
uranusjr May 14, 2026
12e7760
Move coordinator implementation back into SDK (#1580)
uranusjr May 15, 2026
c1ab610
CI: Fix uv.lock by removing coordinator distribution (#1585)
jason810496 May 18, 2026
a6e9b50
Fix import ignore test
uranusjr May 19, 2026
efc71d1
Rewrite Java coordinator
uranusjr May 18, 2026
b23e701
Accept [sdk] coordinators as dict and lazy init
uranusjr May 19, 2026
6efd212
Redo JAR lookup
uranusjr May 19, 2026
27b3f5a
Drop selector_loop module (#1587)
jason810496 May 20, 2026
95f9dbd
Add schema migration to supervisor-child comm
uranusjr May 20, 2026
74dfb40
Redo 16332b2e3be427e614a1ddc7c7bb610f4d284b22
uranusjr May 20, 2026
b18171e
CI: Fix uv lock
jason810496 May 22, 2026
d8bd51e
Merge branch 'task-sdk/feature/coordinator-interface' into refactor/g…
jason810496 May 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,20 @@ repos:
pass_filenames: true
files: ^airflow-core/src/airflow/api_fastapi/execution_api/(datamodels|versions)/.*\.py$
require_serial: true
- id: generate-supervisor-schemas-snapshot
name: Regenerate supervisor schema snapshot
entry: ./scripts/ci/prek/generate_supervisor_schemas_snapshot.py
language: python
pass_filenames: false
files: ^(task-sdk/src/airflow/sdk/execution_time/(comms\.py|schema/.*\.py)|airflow-core/src/airflow/dag_processing/processor\.py)$
require_serial: true
- id: check-supervisor-schemas-versions
name: Check supervisor schema changes have corresponding version updates
entry: ./scripts/ci/prek/check_supervisor_schemas_versions.py
language: python
pass_filenames: true
files: ^(task-sdk/src/airflow/sdk/execution_time/(comms\.py|schema/.*\.py)|airflow-core/src/airflow/dag_processing/processor\.py)$
require_serial: true
- id: generate-tasksdk-datamodels
name: Generate Datamodels for TaskSDK client
language: python
Expand Down
37 changes: 37 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2004,6 +2004,43 @@ workers:
type: integer
example: ~
default: "60"
sdk:
description: Settings for non-Python SDK runtime coordination
options:
coordinators:
description: |
JSON object mapping of coordinator keys to coordinator definitions.

Each value is an object with ``classpath`` and optional ``kwargs``.
``classpath`` is resolved via ``import_string`` and constructed with
``kwargs`` on first use. Entries are
independent instances, so the same ``classpath`` can be configured
multiple times under different names with different ``kwargs`` (for
example, two ``JavaCoordinator`` instances pinned to different JDK
versions).
version_added: 3.3.0
type: string
example: |
{
"jdk-17": {
"classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
"kwargs": {"java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java", "jvm_args": ["-Xmx1024m"]}
}
}
default: ~
queue_to_coordinator:
description: |
JSON mapping of queue names to a coordinator key from
``[sdk] coordinators``.

When a task's ``language`` field is not set, this mapping is checked
to route the task to a configured coordinator instance based on its
queue. This is useful when queues are used as environment or
isolation identifiers (e.g. ``legacy-java``, ``modern-java``).
version_added: 3.3.0
type: string
example: '{"legacy-java": "jdk-11", "modern-java": "jdk-17"}'
default: ~
api_auth:
description: Settings relating to authentication on the Airflow APIs
options:
Expand Down
5 changes: 2 additions & 3 deletions airflow-core/src/airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,11 +652,10 @@ def run_workload(

if isinstance(workload, ExecuteTask):
from airflow.sdk.execution_time.supervisor import supervise_task
from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO as SDKTaskInstanceDTO

# workload.ti is a TaskInstanceDTO which duck-types as TaskInstance.
# TODO: Create a protocol for this.
return supervise_task(
ti=workload.ti, # type: ignore[arg-type]
ti=SDKTaskInstanceDTO.model_validate(workload.ti, from_attributes=True),
bundle_info=workload.bundle_info,
dag_rel_path=workload.dag_rel_path,
token=workload.token,
Expand Down
2 changes: 2 additions & 0 deletions airflow-core/src/airflow/executors/workloads/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
import uuid

from airflow.api_fastapi.auth.tokens import JWTGenerator
from airflow.models.taskinstance import TaskInstance as TIModel
from airflow.models.taskinstancekey import TaskInstanceKey
Expand Down
17 changes: 14 additions & 3 deletions devel-common/src/tests_common/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2519,7 +2519,6 @@ def execute(self, context):
from uuid6 import uuid7

from airflow.sdk import DAG
from airflow.sdk.api.datamodels._generated import TaskInstance
from airflow.sdk.execution_time.comms import BundleInfo, StartupDetails
from airflow.timetables.base import TimeRestriction

Expand Down Expand Up @@ -2547,6 +2546,15 @@ def _create_task_instance(
should_retry: bool | None = None,
max_tries: int | None = None,
) -> RuntimeTaskInstance:
from tests_common.test_utils.version_compat import AIRFLOW_V_3_3_PLUS

if AIRFLOW_V_3_3_PLUS:
from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
else:
from airflow.sdk.api.datamodels._generated import ( # type: ignore[no-redef,assignment]
TaskInstance as TaskInstanceDTO,
)

from airflow.sdk.api.datamodels._generated import DagRun, DagRunState, TIRunContext
from airflow.utils.types import DagRunType

Expand Down Expand Up @@ -2624,14 +2632,17 @@ def _create_task_instance(
}

startup_details = StartupDetails(
ti=TaskInstance(
ti=TaskInstanceDTO(
id=ti_id,
task_id=task.task_id,
dag_id=dag_id,
run_id=run_id,
try_number=try_number,
map_index=map_index,
map_index=map_index, # type: ignore[arg-type]
dag_version_id=uuid7(),
pool_slots=1,
queue="default",
priority_weight=1,
),
dag_rel_path="",
bundle_info=BundleInfo(name="anything", version="any"),
Expand Down
4 changes: 4 additions & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,7 @@ iTerm
iterm
itertools
Jarek
JavaCoordinator
javascript
jaydebeapi
Jdbc
Expand Down Expand Up @@ -897,6 +898,7 @@ jsonl
juli
Jupyter
jupyter
jvm
jwks
JWT
jwt
Expand Down Expand Up @@ -1130,6 +1132,7 @@ openai
openapi
openfaas
OpenID
openjdk
openlineage
OpenSearch
opensearch
Expand Down Expand Up @@ -1860,6 +1863,7 @@ XComs
Xiaodong
xlarge
xml
Xmx
xpath
XSS
xyz
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def stub(
Stub tasks exist in the Dag graph only, but the execution must happen in an external
environment via the Task Execution Interface.
"""
return task_decorator_factory(
decorated_operator_class=_StubOperator,
Expand Down
204 changes: 204 additions & 0 deletions scripts/ci/prek/check_supervisor_schemas_versions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# /// script
# requires-python = ">=3.10,<3.11"
# dependencies = [
# "rich>=13.6.0",
# ]
# ///
"""
Fail when a supervisor schema has changed without a matching
``VersionChange`` entry under
``task-sdk/src/airflow/sdk/execution_time/schema/versions/``.

Mirrors :mod:`scripts.ci.prek.check_execution_api_versions` for the
supervisor bundle. The check is per-commit: every PR that mutates a
registered supervisor schema must add an instruction to the in-progress head
``v<YYYY>_<MM>_<DD>.py`` file. The release-time version-file bump itself
is one-per-release; this hook is what keeps the in-progress file
honest between releases.

The comparison is done by dumping the snapshot JSON in this worktree
and in a temporary worktree of the upstream target branch, then
diffing them. Both sides invoke the sibling ``dump_supervisor_schemas.py``
script so the comparison is dump-version stable.
"""

from __future__ import annotations

import os
import subprocess
import sys
import tempfile
from pathlib import Path

from common_prek_utils import console, get_remote_for_main

SUPERVISOR_SCHEMAS_PREFIX = "task-sdk/src/airflow/sdk/execution_time/schema/"
VERSIONS_PREFIX = SUPERVISOR_SCHEMAS_PREFIX + "versions/"
TASK_SDK_COMMS_PATH = "task-sdk/src/airflow/sdk/execution_time/comms.py"
CORE_PROCESSOR_PATH = "airflow-core/src/airflow/dag_processing/processor.py"

DUMP_SCRIPT = Path(__file__).parent / "dump_supervisor_schemas.py"


# TODO: We should consolidte the common logic with check_execution_api_versions.py into common_prek_utils
def get_target_branch() -> str:
"""Branch to compare against. GITHUB_BASE_REF for PRs, DEFAULT_BRANCH in CI, else main."""
return os.environ.get("GITHUB_BASE_REF") or os.environ.get("DEFAULT_BRANCH") or "main"


def get_changed_files(filenames: list[str]) -> list[str]:
"""Get changed files. Uses filenames from prek when provided, else staged files for local runs."""
if filenames:
return filenames
result = subprocess.run(
["git", "diff", "--cached", "--name-only"],
capture_output=True,
text=True,
check=True,
)
return [f for f in result.stdout.strip().splitlines() if f]


def dump_snapshot(cwd: Path) -> str:
"""Run ``dump_supervisor_schemas.py`` in *cwd* and return its stdout."""
result = subprocess.run(
[
"uv",
"run",
"-p",
"3.12",
"--no-progress",
"--project",
"task-sdk",
"-s",
str(DUMP_SCRIPT),
],
cwd=cwd,
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
raise RuntimeError(f"Snapshot dump failed: {result.stderr}")
return result.stdout


def _upstream_has_schema() -> bool:
"""Return True if the target branch carries the schema package."""
target_branch = get_target_branch()
remote = get_remote_for_main()
ref = f"{remote}/{target_branch}"
subprocess.run(["git", "fetch", remote, target_branch], capture_output=True, check=False)
# ``git cat-file -e`` exits zero iff the path exists at the ref.
result = subprocess.run(
["git", "cat-file", "-e", f"{ref}:{VERSIONS_PREFIX}__init__.py"],
capture_output=True,
check=False,
)
return result.returncode == 0


def dump_snapshot_from_main() -> str:
"""Dump snapshot from target branch using a temporary worktree."""
target_branch = get_target_branch()
remote = get_remote_for_main()
ref = f"{remote}/{target_branch}"
worktree_path = Path(tempfile.mkdtemp()) / "airflow-main"
subprocess.run(["git", "fetch", remote, target_branch], capture_output=True, check=False)
subprocess.run(["git", "worktree", "add", str(worktree_path), ref], capture_output=True, check=True)
try:
return dump_snapshot(worktree_path)
finally:
subprocess.run(
["git", "worktree", "remove", "--force", str(worktree_path)],
capture_output=True,
check=False,
)


def main() -> int:
changed_files = get_changed_files(sys.argv[1:])

# Files under schema/ that reference the bundle's
# registered models. Schema changes in those models' homes
# (``comms.py``, ``processor.py``) trigger this hook too because
# the snapshot embeds their head shape.
schema_source_files = [
f
for f in changed_files
if f.startswith(SUPERVISOR_SCHEMAS_PREFIX) or f == TASK_SDK_COMMS_PATH or f == CORE_PROCESSOR_PATH
]
version_files = [f for f in changed_files if f.startswith(VERSIONS_PREFIX)]

if not schema_source_files:
return 0
if version_files:
# Contributor added a version-change entry: trust them.
return 0

if not _upstream_has_schema():
# The package is being introduced in this PR -- nothing on the
# target branch to compare against. The check will start firing
# normally once the package is on the target branch.
console.print(
"[yellow]Skipping supervisor-schemas version check:[/] target branch "
"has no schema package yet. The check activates once "
"this PR merges."
)
return 0

try:
main_snapshot = dump_snapshot_from_main()
except Exception as e:
console.print(f"[bold red]ERROR:[/] Failed to generate upstream snapshot for comparison: {e}")
return 1

try:
current_snapshot = dump_snapshot(Path.cwd())
except Exception as e:
console.print(f"[bold red]ERROR:[/] Failed to generate current snapshot: {e}")
return 1

if current_snapshot != main_snapshot:
console.print("[bold red]ERROR:[/] Supervisor schema has changed but no version file was updated.")
console.print("")
console.print("The following files were changed:")
for f in schema_source_files:
console.print(f" - [magenta]{f}[/]")
console.print("")
remote = get_remote_for_main()
target_branch = get_target_branch()
console.print(
f"Snapshot diff against [cyan]{remote}/{target_branch}[/] detected differences.\n"
"\n"
"Append a ``VersionChange`` subclass to the in-progress head "
"``v<YYYY>_<MM>_<DD>.py`` file under:\n"
f" [cyan]{VERSIONS_PREFIX}[/]\n"
"\n"
"See [cyan]task-sdk/src/airflow/sdk/execution_time/schema/AGENTS.md[/]."
)
return 1
console.print("[green]Snapshot unchanged:[/] Source changes do not affect the supervisor schema.")

return 0


if __name__ == "__main__":
sys.exit(main())
Loading
Loading