-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload #10248
Conversation
…ting it at build time (Sink<GenericObject>)
@@ -78,27 +80,7 @@ public boolean supportSchemaVersioning() { | |||
|
|||
@Override | |||
public GenericRecord decode(byte[] bytes, byte[] schemaVersion) { | |||
if (schema == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have just moved this code into a separated method
new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8), | ||
Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.INLINE), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build())), | ||
new SinkSpec("test-kv-sink-input-kv-avro-json-sep-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8), | ||
Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.SEPARATED), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the new test case that covers the change
@congbobo184 @codelipenghui now the patch is good to go. I added the test case |
the integration test |
also @linlinnn I have pushed now a change that changes that test. |
...ration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
Outdated
Show resolved
Hide resolved
@linlinnn @codelipenghui CI passed please take a look |
@congbobo184 Could you please also help review this PR? |
this patch also fixes a problem in the integration tests that made CI more flaky. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
thank you @congbobo184 @codelipenghui @lhotari for your review |
…he schema before decoding the payload (apache#10248) (cherry picked from commit 54523bb)
Motivation
I saw this problem while developing integration tests for GenericObject + KeyValue with SEPARATED KeyValueEncoding.
Modifications
Before executing MessageImpl#getValue we are now ensuring that the Schema is really loaded when we call getSchemaInfo.
Initially I added an eager loading of the Schema in getSchemaInfo, but this resulted in lots of deadlocks because we are calling getSchemaInfo in many places.
The code also expects that AutoConsumeSchema returns getSchemaInfo == null in several places and we have to keep this behaviour.
Verifying this change
I added a new case to an existing integration test, the new case reproduced the problem.