-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-33045] Make it possible to disable auto-registering schema in Schema Registry #26662
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[FLINK-33045] Make it possible to disable auto-registering schema in Schema Registry #26662
Conversation
@flinkbot run azure |
1 similar comment
@flinkbot run azure |
94811df
to
ba6bd2e
Compare
@@ -58,7 +77,7 @@ public ConfluentSchemaRegistryCoder(String subject, SchemaRegistryClient schemaR | |||
* @param schemaRegistryClient client to connect schema registry | |||
*/ | |||
public ConfluentSchemaRegistryCoder(SchemaRegistryClient schemaRegistryClient) { | |||
this.schemaRegistryClient = schemaRegistryClient; | |||
this(null, schemaRegistryClient, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it is a bit strange to have an optional first parameter I would expect the mandatory parameters first for all these methods ie the constructors to all start
ConfluentSchemaRegistryCoder(SchemaRegistryClient schemaRegistryClient
...
I realise this is existing code, so it is your call if you want to change it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a valid nit, but the original commit that I took over had that changed already and is in this PR, see
Line 70 in 0b8f50b
this(subject, schemaRegistryClient, null); |
<tr> | ||
<td><h5>auto.register.schemas</h5></td> | ||
<td>optional</td> | ||
<td>yes</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this line here but not in the Chinese?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I don't know Chinese :) So I'll follow the process that's at https://flink.apache.org/how-to-contribute/contribute-documentation/#chinese-documentation-translation after this PR is merged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had assumed we would just add here as the optional above is not translated. <td>yes</td>
as per the note in the link you sent. But it sounds like you have this in hand.
@@ -289,7 +289,7 @@ under the License. | |||
<!-- Indirectly accessed in pyflink_gateway_server --> | |||
<groupId>org.apache.flink</groupId> | |||
<artifactId>flink-sql-connector-kafka</artifactId> | |||
<version>3.0.0-1.17</version> | |||
<version>4.0.0-2.0</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this related to the fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's about keeping versions in sync. Ideally the Python dependencies on connector versions should be out of the Flink repo and moved into the connector repos, but we're not there yet
59f3a77
to
7304dde
Compare
7304dde
to
0b8f50b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this feature. It's a great quality of life improvement.
<dependency> | ||
<groupId>io.confluent</groupId> | ||
<artifactId>kafka-avro-serializer</artifactId> | ||
<version>7.2.2</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Afaik the serializer is versioned similarly to kafka. 7.2.2 should be Kafka 3.2. Can we upgrade the serializer to 7.9.0
to be inline with the used kafka version 3.9.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good one!
"type": "record", | ||
"name": "record", | ||
"fields": [ | ||
{"name": "name", "type": ["null", "string"], "default": null}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Why did you change the used type of the example? Is this related to registration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Disabling auto registration only means that Flink won't try to register the schema in Schema Registry during every run. However, it still means that the schema that has been registered in Schema Registry by external service, is either exactly how Flink would have registered it (so with that specific namespace), or how the user would have provided it via the avro-confluent.schema
table property.
"SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", | ||
INTER_CONTAINER_KAFKA_ALIAS + ":9092") | ||
.dependsOn(KAFKA); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use
@RegisterExtension |
to avoid manually starting the Flink Cluster and let Junit handle the lifecycle.
@BeforeAll | ||
public static void setup() throws Exception { | ||
KAFKA.start(); | ||
SCHEMA_REGISTRY.start(); | ||
FLINK.start(); | ||
} | ||
|
||
@AfterAll | ||
public static void tearDown() { | ||
FLINK.stop(); | ||
SCHEMA_REGISTRY.stop(); | ||
KAFKA.stop(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods should be obsolete since for the Kafka/SR container the lifecycle is already handled by using the @Container
annotation and for the Flink container see the commend above.
} | ||
|
||
@Test | ||
public void testAvroConfluentIntegrationWithManualRegister() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have expected to see a test that runs a query and fails, if the schema isn't registered.
The current test IMO doesn't fully cover the behavior since it can also pass if the SQL query does the registration.
|
||
# Set root logger level to OFF to not flood build logs | ||
# set manually to INFO for debugging purposes | ||
rootLogger.level = INFO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please set this to OFF before merging
<dependency> | ||
<groupId>org.apache.kafka</groupId> | ||
<artifactId>kafka-clients</artifactId> | ||
<version>3.2.3</version> | ||
<version>3.9.0</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: SInce you also upgraded the FLink connector to be compatible with Kafka 4.0, can you also use kafka 4.0 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should, given that the Flink Kafka connector v4.0 itself refers on Kafka Client 3.9.0 https://github.com/apache/flink-connector-kafka/blob/v4.0.0/pom.xml#L53
out.write(schemaIdBytes); | ||
} | ||
|
||
private boolean registerSchema() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is registerSchema
a method, instead of parsing the value once in the ctor?
try { | ||
registeredId = schemaRegistryClient.getId(subject, schema); | ||
} catch (RestClientException e) { | ||
throw new IOException("Could not retrieve schema in registry", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it throw an IOException if the schema is not present?
Nit: Can we maybe throw a better exception e.g. FlinkException to make sure this is "expected" on not found schema?
@@ -229,6 +229,74 @@ public void testSerializationSchemaWithInvalidOptionalSchema() { | |||
null, SCHEMA.toPhysicalRowDataType())); | |||
} | |||
|
|||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can we turn the three (or at least the first/third) tests into a parameterizedTest
to avoid the code duplication?
It's a bit unclear to me why only test two verifies the behavior for sinks and sources while the other tests only look at sinks.
What is the purpose of the change
This PR is based on #25410 and aims to complete the necessary tasks. It introduces
auto.register.schemas
as a table option. Compared to the linked PR, it includes unit tests, a new IT case and updated documentationBrief change log
auto.register.schemas
AvroConfluentITCase
Verifying this change
This change added tests and can be verified as follows:
AvroConfluentITCase
that writes and reads from/to Kafka usingavro-confluent
, with the table option set totrue
(default) to show that Flink can register the schema andfalse
where it relies on schema registration outside of FlinkDoes this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation