Skip to content

[FLINK-33045] Make it possible to disable auto-registering schema in Schema Registry #26662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

MartijnVisser
Copy link
Contributor

What is the purpose of the change

This PR is based on #25410 and aims to complete the necessary tasks. It introduces auto.register.schemas as a table option. Compared to the linked PR, it includes unit tests, a new IT case and updated documentation

Brief change log

  • Introduces new table option auto.register.schemas
  • Adds unit tests
  • Adds a new AvroConfluentITCase
  • Removes previous (currently disabled) bash-based tests
  • It also bumps certain dependencies

Verifying this change

This change added tests and can be verified as follows:

  • Run AvroConfluentITCase that writes and reads from/to Kafka using avro-confluent, with the table option set to true (default) to show that Flink can register the schema and false where it relies on schema registration outside of Flink

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): yes
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs / JavaDocs

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 10, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@MartijnVisser
Copy link
Contributor Author

@flinkbot run azure

1 similar comment
@MartijnVisser
Copy link
Contributor Author

@flinkbot run azure

@MartijnVisser MartijnVisser force-pushed the FLINK-33045_support-auto-register-schema branch from 94811df to ba6bd2e Compare June 11, 2025 06:52
@@ -58,7 +77,7 @@ public ConfluentSchemaRegistryCoder(String subject, SchemaRegistryClient schemaR
* @param schemaRegistryClient client to connect schema registry
*/
public ConfluentSchemaRegistryCoder(SchemaRegistryClient schemaRegistryClient) {
this.schemaRegistryClient = schemaRegistryClient;
this(null, schemaRegistryClient, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it is a bit strange to have an optional first parameter I would expect the mandatory parameters first for all these methods ie the constructors to all start
ConfluentSchemaRegistryCoder(SchemaRegistryClient schemaRegistryClient ...
I realise this is existing code, so it is your call if you want to change it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a valid nit, but the original commit that I took over had that changed already and is in this PR, see

<tr>
<td><h5>auto.register.schemas</h5></td>
<td>optional</td>
<td>yes</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this line here but not in the Chinese?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I don't know Chinese :) So I'll follow the process that's at https://flink.apache.org/how-to-contribute/contribute-documentation/#chinese-documentation-translation after this PR is merged

Copy link
Contributor

@davidradl davidradl Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had assumed we would just add here as the optional above is not translated. <td>yes</td> as per the note in the link you sent. But it sounds like you have this in hand.

@@ -289,7 +289,7 @@ under the License.
<!-- Indirectly accessed in pyflink_gateway_server -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>3.0.0-1.17</version>
<version>4.0.0-2.0</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this related to the fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's about keeping versions in sync. Ideally the Python dependencies on connector versions should be out of the Flink repo and moved into the connector repos, but we're not there yet

@MartijnVisser MartijnVisser force-pushed the FLINK-33045_support-auto-register-schema branch from 59f3a77 to 7304dde Compare June 11, 2025 18:03
@MartijnVisser MartijnVisser force-pushed the FLINK-33045_support-auto-register-schema branch from 7304dde to 0b8f50b Compare June 12, 2025 07:39
Copy link
Contributor

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this feature. It's a great quality of life improvement.

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.2.2</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaik the serializer is versioned similarly to kafka. 7.2.2 should be Kafka 3.2. Can we upgrade the serializer to 7.9.0 to be inline with the used kafka version 3.9.0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good one!

"type": "record",
"name": "record",
"fields": [
{"name": "name", "type": ["null", "string"], "default": null},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Why did you change the used type of the example? Is this related to registration?

Copy link
Contributor Author

@MartijnVisser MartijnVisser Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disabling auto registration only means that Flink won't try to register the schema in Schema Registry during every run. However, it still means that the schema that has been registered in Schema Registry by external service, is either exactly how Flink would have registered it (so with that specific namespace), or how the user would have provided it via the avro-confluent.schema table property.

"SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
INTER_CONTAINER_KAFKA_ALIAS + ":9092")
.dependsOn(KAFKA);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use

Suggested change
@RegisterExtension

to avoid manually starting the Flink Cluster and let Junit handle the lifecycle.

Comment on lines +131 to +143
@BeforeAll
public static void setup() throws Exception {
KAFKA.start();
SCHEMA_REGISTRY.start();
FLINK.start();
}

@AfterAll
public static void tearDown() {
FLINK.stop();
SCHEMA_REGISTRY.stop();
KAFKA.stop();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods should be obsolete since for the Kafka/SR container the lifecycle is already handled by using the @Container annotation and for the Flink container see the commend above.

}

@Test
public void testAvroConfluentIntegrationWithManualRegister() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected to see a test that runs a query and fails, if the schema isn't registered.

The current test IMO doesn't fully cover the behavior since it can also pass if the SQL query does the registration.


# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level = INFO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please set this to OFF before merging

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.3</version>
<version>3.9.0</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: SInce you also upgraded the FLink connector to be compatible with Kafka 4.0, can you also use kafka 4.0 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should, given that the Flink Kafka connector v4.0 itself refers on Kafka Client 3.9.0 https://github.com/apache/flink-connector-kafka/blob/v4.0.0/pom.xml#L53

out.write(schemaIdBytes);
}

private boolean registerSchema() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is registerSchema a method, instead of parsing the value once in the ctor?

try {
registeredId = schemaRegistryClient.getId(subject, schema);
} catch (RestClientException e) {
throw new IOException("Could not retrieve schema in registry", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it throw an IOException if the schema is not present?

Nit: Can we maybe throw a better exception e.g. FlinkException to make sure this is "expected" on not found schema?

@@ -229,6 +229,74 @@ public void testSerializationSchemaWithInvalidOptionalSchema() {
null, SCHEMA.toPhysicalRowDataType()));
}

@Test
Copy link
Contributor

@fapaul fapaul Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we turn the three (or at least the first/third) tests into a parameterizedTest to avoid the code duplication?

It's a bit unclear to me why only test two verifies the behavior for sinks and sources while the other tests only look at sinks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants