-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Pulsar SQL query compression data #9663
Fix Pulsar SQL query compression data #9663
Conversation
+1 on this, facing same issue working with presto connector. |
@@ -94,7 +94,7 @@ public static void parseMessage(TopicName topicName, long ledgerId, long entryId | |||
throw new IOException("Cannot parse encrypted message " + msgMetadata + " on topic " + topicName); | |||
} | |||
|
|||
uncompressedPayload = uncompressPayloadIfNeeded(topicName, msgMetadata, headersAndPayload, ledgerId, | |||
uncompressedPayload = uncompressPayloadIfNeeded(topicName, msgMetadata, headersAndPayload.retain(), ledgerId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably better to let ReferenceCountedMessageMetadata
do both retain
and release
, else caller forget to call retain
might cause trouble when ReferenceCountedMessageMetadata
try to do release
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, updated.
(cherry picked from commit 5ff2c0033684a3fcdb56dae9ddf5545452f2ee75)
0f8ec13
to
2362dd3
Compare
/pulsarbot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
### Motivation In the master branch, If enable data compression, Pulsar SQL will occurs following errors: ``` io.netty.util.IllegalReferenceCountException: refCnt: 0 at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1489) at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1418) at io.netty.buffer.AbstractPooledDerivedByteBuf$PooledNonRetainedSlicedByteBuf.slice(AbstractPooledDerivedByteBuf.java:298) at org.apache.pulsar.common.api.proto.MessageMetadata.getSchemaVersionSlice(MessageMetadata.java:545) at org.apache.pulsar.common.api.proto.MessageMetadata.getSchemaVersion(MessageMetadata.java:535) at org.apache.pulsar.common.api.raw.RawMessageImpl.getSchemaVersion(RawMessageImpl.java:147) at org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:454) at io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90) at io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302) at io.prestosql.operator.Driver.processInternal(Driver.java:379) at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283) at io.prestosql.operator.Driver.tryWithLock(Driver.java:675) at io.prestosql.operator.Driver.processFor(Driver.java:276) at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075) at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163) at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484) at io.prestosql.$gen.Presto_332__testversion____20210221_151218_2.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ### Modifications 1. Retain the original buffer for RawMessage metadata peek since we reuse the data buffer to get the schema version etc. 2. Release the original data buffer while deallocating the `ReferenceCountedMessageMetadata.java` ### Verifying this change Unit test added. Integration test added. ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): (no) - The public API: (no) - The schema: (no) - The default values of configurations: (no) - The wire protocol: (no) - The rest endpoints: (no) - The admin cli options: (no) - Anything that affects deployment: (no) ### Documentation - Does this pull request introduce a new feature? (no)
### Motivation In the master branch, If enable data compression, Pulsar SQL will occurs following errors: ``` io.netty.util.IllegalReferenceCountException: refCnt: 0 at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1489) at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1418) at io.netty.buffer.AbstractPooledDerivedByteBuf$PooledNonRetainedSlicedByteBuf.slice(AbstractPooledDerivedByteBuf.java:298) at org.apache.pulsar.common.api.proto.MessageMetadata.getSchemaVersionSlice(MessageMetadata.java:545) at org.apache.pulsar.common.api.proto.MessageMetadata.getSchemaVersion(MessageMetadata.java:535) at org.apache.pulsar.common.api.raw.RawMessageImpl.getSchemaVersion(RawMessageImpl.java:147) at org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:454) at io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90) at io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302) at io.prestosql.operator.Driver.processInternal(Driver.java:379) at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283) at io.prestosql.operator.Driver.tryWithLock(Driver.java:675) at io.prestosql.operator.Driver.processFor(Driver.java:276) at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075) at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163) at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484) at io.prestosql.$gen.Presto_332__testversion____20210221_151218_2.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ### Modifications 1. Retain the original buffer for RawMessage metadata peek since we reuse the data buffer to get the schema version etc. 2. Release the original data buffer while deallocating the `ReferenceCountedMessageMetadata.java` ### Verifying this change Unit test added. Integration test added. ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): (no) - The public API: (no) - The schema: (no) - The default values of configurations: (no) - The wire protocol: (no) - The rest endpoints: (no) - The admin cli options: (no) - Anything that affects deployment: (no) ### Documentation - Does this pull request introduce a new feature? (no)
Motivation
In the master branch, If enable data compression, Pulsar SQL will occurs following errors:
Modifications
ReferenceCountedMessageMetadata.java
Verifying this change
Unit test added.
Integration test added.
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation