Skip to content

[FLINK-38023][formats] Fix GenericRecord Avro state migration#28161

Open
avi-sanwal wants to merge 5 commits into
apache:masterfrom
avi-sanwal:FLINK-38023-generic-record-avro-state-restore
Open

[FLINK-38023][formats] Fix GenericRecord Avro state migration#28161
avi-sanwal wants to merge 5 commits into
apache:masterfrom
avi-sanwal:FLINK-38023-generic-record-avro-state-restore

Conversation

@avi-sanwal
Copy link
Copy Markdown

@avi-sanwal avi-sanwal commented May 14, 2026

What is the purpose of the change

This pull request fixes GenericRecord Avro state migration when compatible schema evolution changes the record shape, for example by adding a field with a default.

During migration, Flink can deserialize old state bytes with the restored previous serializer. For GenericRecord that value can still carry the old Avro schema, while the new serializer must write bytes using the new schema. Writing the old-shaped GenericRecord directly with the new GenericDatumWriter can fail or address fields by the wrong index.

Brief change log

  • When serializing GenericRecord values, keep the existing same-schema fast path.
  • If the GenericRecord value schema differs from the serializer runtime schema, resolve the value through Avro writer-schema to reader-schema conversion before writing it with the new serializer.
  • Add serializer snapshot regression coverage for restoring old GenericRecord bytes and then writing them with the evolved serializer, including inserting a field in the middle of the record.
  • Add coverage for same-schema and nullable GenericRecord serialization branches.

Verifying this change

This change added tests and can be verified as follows:

  • Added AvroSerializerSnapshotTest#genericRecordWithSerializerSchemaShouldBeSerializedAsIs.
  • Added AvroSerializerSnapshotTest#nullGenericRecordShouldBeSerializedWithNullableSchema.
  • Added AvroSerializerSnapshotTest#migratedGenericRecordShouldBeSerializedWithNewSchema.
  • Added AvroSerializerSnapshotTest#migratedGenericRecordShouldBeSerializedWithNewSchemaWhenFieldIsInsertedInMiddle.
  • Ran ./mvnw -pl flink-formats/flink-avro spotless:apply -Dspotless.check.skip=false.
  • Ran ./mvnw -pl flink-formats/flink-avro -am -Dtest=AvroSerializerSnapshotTest -DfailIfNoTests=false -Dsurefire.failIfNoSpecifiedTests=false -Dcheckstyle.skip -Drat.skip -Dspotless.check.skip test.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: yes
  • The runtime per-record code paths (performance sensitive): yes, only GenericRecord serialization when the value schema differs from the serializer schema
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes, state restore/migration
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

Was generative AI tooling used to co-author this PR?
  • Yes: OpenAI Codex GPT-5

Generated-by: OpenAI Codex GPT-5

Resolve GenericRecord values with an older record schema to the serializer runtime schema before writing them during state migration.

Generated-by: OpenAI Codex GPT-5
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 14, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Align GenericRecord restore behavior with SpecificRecord by constructing the restored reader with previous and current schemas instead of rewriting records at serialize time.

Generated-by: OpenAI Codex GPT-5
Handle the nullable previous schema inside the GenericRecord factory helper instead of passing Optional as a parameter.

Generated-by: OpenAI Codex GPT-5
this.avroData = factory.getAvroData();
}

private T resolveGenericRecord(T value) throws IOException {
Copy link
Copy Markdown
Contributor

@davidradl davidradl May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful to include a comment as to the intent of this new method. I assume it is when the generic record does not match the type we think it is , due to schema migration.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, addressed in 85bd596. I added a short method comment clarifying that this handles GenericRecord values restored during state migration that still carry the previous schema, before the writer indexes fields by position.

final GenericRecord record = (GenericRecord) value;
final Schema recordSchema = record.getSchema();
if (Objects.equals(recordSchema, runtimeSchema)) {
return value;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a test to test this return?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, addressed in 85bd596. I added coverage for the same-schema GenericRecord path, and also added a nullable-union GenericRecord roundtrip to exercise the null return branch through public serializer behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants