Skip to content

Add schema migration to supervisor-child comm#67235

Open
uranusjr wants to merge 1 commit into
apache:mainfrom
astronomer:supervisor-schema-migration
Open

Add schema migration to supervisor-child comm#67235
uranusjr wants to merge 1 commit into
apache:mainfrom
astronomer:supervisor-schema-migration

Conversation

@uranusjr
Copy link
Copy Markdown
Member

With foreign language SDKs, it may be possible the two sides of supervisor comm have different versions. This adds a migration layer at the supervisor (server) side, so an SDK (client) using a lower version of the schema may be able to communicate to the server.

This adds: (subpoints describe what each component is for, but those are not covered in this PR)

  1. A JSON schema file to describe the models used by the supervisor. This is automatically kept up-to-date with pre-commit hooks.
    • This should be published with versions for SDKs to use.
    • An SDK should use a version of this schema to talk to the supervisor.
  2. A structure to host Cadwyn migration files inside execution_time.
    • Whenever the supervisor schema changes, a migration should be added to the SDK.
  3. The supervisor use the migrations to upgrade the SDK’s message to match the schema version used by the supervisor, and downgrade messages before they are sent to the SDK to match the schema version used by the SDK.

@boring-cyborg boring-cyborg Bot added area:dev-tools area:task-sdk backport-to-v3-2-test Mark PR with this label to backport to v3-2-test branch labels May 20, 2026
@uranusjr uranusjr force-pushed the supervisor-schema-migration branch from 7cd6fc3 to ec65643 Compare May 20, 2026 09:36
@uranusjr uranusjr added AIP-108: Coordinator Change this to an 'area:' label after AIP acceptance. and removed backport-to-v3-2-test Mark PR with this label to backport to v3-2-test branch labels May 20, 2026
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this file committed? I'd been liking the fact that we don't have generated files committed in the execution API to date.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I guess we don’t since we would publish this somewhere for the Java SDK to consume.

On the other hand it’d be a while for us to get this merged and then have the infrastructure ready. Maybe have this for now before we set things up on airflow.apache.org so the Java SDK has somewhere to point to, and remove it after setup and Java SDK points to it instead?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's JSON schema for Dag serialization format airflow-core/src/airflow/serialization/schema.json, no harm to introduce explicit JSON schema snapshot for the migration schema IMHO.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sort of see why we have this, but it feels weird to have a the version history "duplicated" on the client side.

Don't a lot of the schemas/models implicitly have a dependency on the Exec API side versioning too?

Copy link
Copy Markdown
Member Author

@uranusjr uranusjr May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but there’s not an established way to forward those migrations here, especially since there are additional supervisor-only models that may change as well but aren’t tracked in the Execution API spec.

I’m open to ideas; the currently way doesn’t really provide a good way to keep shared migrations in sync.

Copy link
Copy Markdown
Member

@jason810496 jason810496 May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

especially since there are additional supervisor-only models that may change as well but aren’t tracked in the Execution API spec

Exactly, this is what the migrator and the versions bundle here trying to resolve.

Yes, there're partial of data models are directly from Execution API spec. but most of them aren't define in Execution API, like all the data models defined in task-sdk/src/airflow/sdk/execution_time/comms.py.

From my perspective, no matter the data models define in Execution API or not, all the data models consumed by CommsDecoder should be versioned somewhere.

@uranusjr uranusjr force-pushed the supervisor-schema-migration branch from ec65643 to d057c20 Compare May 20, 2026 10:52
Copy link
Copy Markdown
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM to introduce the automated mechanism to snapshot the schema between supervisor and the subprocesses and the migrator to the main branch first.

Comment thread task-sdk/src/airflow/sdk/execution_time/supervisor.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/supervisor.py
Comment thread task-sdk/src/airflow/sdk/execution_time/schema/migrator.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/schema/migrator.py Outdated
# mode="json")`` -- an unexpected kwarg that Pydantic rejects. The
# corrected version below calls ``downgrade`` with no extra args and
# lets the resulting versioned model handle serialisation.
def _corrected_serialize_response(self, msg, **dump_opts):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This monkeypatch documents the supervisor bug rather than testing the real code path.

The fixture installs a corrected _serialize_response that calls m.downgrade(msg, version) without the broken dump_kwargs=dump_opts kwarg (see comment on supervisor.py:743). With this patch in place the integration test suite never exercises the actual _serialize_response -- so the production code can ship broken and the suite stays green.

Once the supervisor call site is fixed (**dump_opts instead of dump_kwargs=dump_opts), delete the monkeypatch.setattr(WatchedSubprocess, "_serialize_response", ...) and the _corrected_serialize_response definition above. The fixture still needs the migrator + registry patches, just not the _serialize_response override.

Comment thread task-sdk/src/airflow/sdk/execution_time/schema/migrator.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/schema/migrator.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/schema/migrator.py Outdated
Comment thread task-sdk/.pre-commit-config.yaml Outdated
Comment thread uv.lock
@github-actions

This comment was marked as resolved.

@uranusjr uranusjr force-pushed the supervisor-schema-migration branch from d057c20 to 71e177d Compare May 21, 2026 12:37
With foreign language SDKs, it may be possible the two sides of
supervisor comm have different versions. This adds a migration layer at
the supervisor (server) side, so an SDK (client) using a lower version
of the schema may be able to communicate to the server.
@uranusjr uranusjr force-pushed the supervisor-schema-migration branch from 71e177d to 6ea9922 Compare May 21, 2026 12:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AIP-108: Coordinator Change this to an 'area:' label after AIP acceptance. area:dev-tools area:task-sdk

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants