-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Schema] Support consume multiple schema types messages by AutoConsumeSchema #10604
[Schema] Support consume multiple schema types messages by AutoConsumeSchema #10604
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice work
it is a good complement for all of the schema related enhancements we delivered in 2.8.0
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have merged #10573
please rebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
@@ -61,7 +62,7 @@ public void testGetSchema() throws Exception { | |||
any(TopicName.class), | |||
any(byte[].class))) | |||
.thenReturn(completableFuture); | |||
SchemaInfo schemaInfoByVersion = schemaProvider.getSchemaByVersion(new byte[0]).get(); | |||
SchemaInfo schemaInfoByVersion = schemaProvider.getSchemaByVersion(new LongSchemaVersion(0).bytes()).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like we are changing the behaviour.
can this be a problem ?
do we need to add some compatibility layer ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change of the class MultiVersionSchemaInfoProvider is not necessary and I revert this test. Thanks.
/pulsarbot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I left one suggestion for the test case, PTAL
Assert.assertEquals(genericRecord.get("id"), 10); | ||
break; | ||
case "k_one_v_three_schema_separate": | ||
kv = (KeyValue<?, ?>) nativeObject; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be a KeyValue<GenericRecord, GenericRecord>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll fix.
kv = (KeyValue<?, ?>) nativeObject; | ||
jsonNode = ((GenericJsonRecord) kv.getKey()).getJsonNode(); | ||
Assert.assertEquals(jsonNode.get("id").intValue(), 1); | ||
jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about kv.getValue().getNativeObject()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that using the getNativeObject still needs to constraint convert to JsonNode.
…ema AutoConsumeSchema
864a188
to
3a801e0
Compare
There are still two problems that need to be resolved when using AutoConsumeSchema to consume key-value schema messages.
Maybe we could adjust the key-value schema encoding logic. Fix these two problems in the next PR. |
@gaoran10 I would postpone fixing those problems to a follow up patch |
@@ -357,6 +358,11 @@ public void handleGetSchema(CommandGetSchema commandGetSchema) { | |||
final long clientRequestId = commandGetSchema.getRequestId(); | |||
String serviceUrl = getBrokerServiceUrl(clientRequestId); | |||
String topic = commandGetSchema.getTopic(); | |||
Optional<SchemaVersion> tempVersion = Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about:
final Optional<SchemaVersion> schemaVersion;
if (commandGetSchema.hasSchemaVersion()) {
schemaVersion = Optional.of(commandGetSchema.getSchemaVersion()).map(BytesSchemaVersion::of);
} else {
schemaVersion = Optional.empty()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
SchemaVersion sv = BytesSchemaVersion.of(schemaVersion); | ||
fetchSchemaIfNeeded(sv); | ||
ensureSchemaInitialized(sv); | ||
return adapt(schemaMap.get(sv).decode(bytes, schemaVersion), schemaVersion); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may don't need decode(bytes, schemaVersion), only use decode(bytes) best?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good idea!
Assert.assertEquals(jsonNode.get("age").intValue(), 18); | ||
break; | ||
case "avro_one_schema": | ||
org.apache.avro.generic.GenericRecord genericRecord = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this type is PersonOne
not GenericAvroRecord
is right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that if using the Schema.AUTO_CONSUME()
, the reader doesn't know the Pojo PersonOne
so the decoding result is org.apache.avro.generic.GenericRecord
.
…eSchema (apache#10604) Based on the PR apache#10573 Support consuming multiple schema types messages by AutoConsumeSchema. (cherry picked from commit f5e10a9)
…eSchema (apache#10604) Based on the PR apache#10573 Support consuming multiple schema types messages by AutoConsumeSchema. (cherry picked from commit f5e10a9)
…toConsumeSchema (apache#10604)" This reverts commit 3398e3f.
…eSchema (apache#10604) Based on the PR apache#10573 ### Motivation Support consuming multiple schema types messages by AutoConsumeSchema.
…eSchema (apache#10604) Based on the PR apache#10573 ### Motivation Support consuming multiple schema types messages by AutoConsumeSchema.
Based on the PR #10573
Motivation
Support consuming multiple schema types messages by AutoConsumeSchema.
Modifications
Describe the modifications you've done.
Verifying this change
Add a new unit test to verify consuming multiple schema type messages by AutoConsumeSchema.
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changes