Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pulsar SQL] Fix messageQueue release message issue. #16155

Merged
merged 1 commit into from Jun 21, 2022

Conversation

Technoboy-
Copy link
Contributor

@Technoboy- Technoboy- commented Jun 21, 2022

Motivation

When PulsarRecordCursor close, there may occur below error:

2022-06-08T09:33:33.959423259-04:00 2022-06-08T13:33:33.959Z	INFO	20220608_133129_00353_ydmer.2.0-21-52	org.apache.pulsar.sql.presto.PulsarRecordCursor	Init cacheSizeAllocator with NullCacheSizeAllocator.
2022-06-08T09:33:33.964070819-04:00 2022-06-08T13:33:33.963Z	ERROR	deserialize-thread-split-13	org.apache.pulsar.sql.presto.PulsarRecordCursor	Stop running DeserializeEntries
2022-06-08T09:33:33.964094767-04:00 java.lang.IllegalArgumentException: newPosition > limit: (48825 > 119)
2022-06-08T09:33:33.964101767-04:00 	at java.base/java.nio.Buffer.createPositionException(Buffer.java:318)
2022-06-08T09:33:33.964104898-04:00 	at java.base/java.nio.Buffer.position(Buffer.java:293)
2022-06-08T09:33:33.964123765-04:00 	at java.base/java.nio.ByteBuffer.position(ByteBuffer.java:1094)
2022-06-08T09:33:33.964126988-04:00 	at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:184)
2022-06-08T09:33:33.964130488-04:00 	at io.netty.buffer.ReadOnlyByteBufferBuf.getBytes(ReadOnlyByteBufferBuf.java:200)
2022-06-08T09:33:33.964133500-04:00 	at io.netty.buffer.AbstractUnpooledSlicedByteBuf.getBytes(AbstractUnpooledSlicedByteBuf.java:243)
2022-06-08T09:33:33.964136667-04:00 	at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:1270)
2022-06-08T09:33:33.964139714-04:00 	at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1246)
2022-06-08T09:33:33.964142670-04:00 	at org.apache.pulsar.common.api.proto.LightProtoCodec.readString(LightProtoCodec.java:250)
2022-06-08T09:33:33.964145457-04:00 	at org.apache.pulsar.common.api.proto.KeyValue.getValue(KeyValue.java:55)
2022-06-08T09:33:33.964148441-04:00 	at org.apache.pulsar.common.api.proto.KeyValue.copyFrom(KeyValue.java:159)
2022-06-08T09:33:33.964150976-04:00 	at org.apache.pulsar.common.api.proto.SingleMessageMetadata.copyFrom(SingleMessageMetadata.java:505)
2022-06-08T09:33:33.964155504-04:00 	at org.apache.pulsar.common.api.raw.RawMessageImpl.get(RawMessageImpl.java:75)
2022-06-08T09:33:33.964158176-04:00 	at org.apache.pulsar.common.api.raw.MessageParser.receiveIndividualMessagesFromBatch(MessageParser.java:176)
2022-06-08T09:33:33.964161161-04:00 	at org.apache.pulsar.common.api.raw.MessageParser.parseMessage(MessageParser.java:112)
2022-06-08T09:33:33.964163699-04:00 	at org.apache.pulsar.sql.presto.PulsarRecordCursor$DeserializeEntries$1.accept(PulsarRecordCursor.java:295)
2022-06-08T09:33:33.964166399-04:00 	at org.apache.pulsar.sql.presto.PulsarRecordCursor$DeserializeEntries$1.accept(PulsarRecordCursor.java:273)
2022-06-08T09:33:33.964174886-04:00 	at org.jctools.queues.SpscArrayQueue.drain(SpscArrayQueue.java:266)
2022-06-08T09:33:33.964177766-04:00 	at org.jctools.queues.SpscArrayQueue.drain(SpscArrayQueue.java:239)
2022-06-08T09:33:33.964180504-04:00 	at org.apache.pulsar.sql.presto.PulsarRecordCursor$DeserializeEntries.run(PulsarRecordCursor.java:273)

2022-06-08T09:33:33.965527031-04:00 2022-06-08T13:33:33.963Z	ERROR	SplitRunner-0-47	io.prestosql.execution.executor.TaskExecutor	Error processing Split 20220608_133129_00352_ydmer.2.0-7 PulsarSplit{splitId=13, connectorId='pulsar', originSchemaName='derived_margin_data', schemaName='qa-ipg/portfolio_finance', tableName='derived_margin_data', splitSize=37, schema='{"type":"record","namespace":"ASC.DerivedMarginData","name":"DerivedMarginDataItem","fields":[{"name":"ASCPermId","type":"int"},{"name":"ASCId","type":["null","string"],"default":null},{"name":"Ubs30AvgDtv","type":["null","double"],"default":null},{"name":"Ubs90Volatility","type":["null","double"],"default":null},{"name":"Jpm20AvgDtv","type":["null","double"],"default":null},{"name":"Jpm20Volatility","type":["null","double"],"default":null},{"name":"Empirical20Volatility","type":["null","double"],"default":null},{"name":"Dbk100AvgDtv","type":["null","double"],"default":null},{"name":"Dbk90AvgDtv","type":["null","double"],"default":null},{"name":"Dbk20AvgDtv","type":["null","double"],"default":null},{"name":"Dbk10AvgDtv","type":["null","double"],"default":null},{"name":"Csf90AvgDtv","type":["null","double"],"default":null},{"name":"Csf60AvgDtv","type":["null","double"],"default":null},{"name":"Csf30AvgDtv","type":["null","double"],"default":null}]}', schemaType=JSON, startPositionEntryId=33, endPositionEntryId=70, startPositionLedgerId=3463486, endPositionLedgerId=3463486, schemaInfoProperties={}} (start = 2.296111546565531E9, wall = 147 ms, cpu = 0 ms, wait = 429 ms, calls = 1)
2022-06-08T09:33:33.965565080-04:00 java.nio.BufferUnderflowException
2022-06-08T09:33:33.965571091-04:00 	at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:182)
2022-06-08T09:33:33.965577921-04:00 	at io.netty.buffer.ReadOnlyByteBufferBuf.getBytes(ReadOnlyByteBufferBuf.java:200)
2022-06-08T09:33:33.965582062-04:00 	at io.netty.buffer.AbstractUnpooledSlicedByteBuf.getBytes(AbstractUnpooledSlicedByteBuf.java:243)
2022-06-08T09:33:33.965586668-04:00 	at io.netty.buffer.AbstractByteBuf.getBytes(AbstractByteBuf.java:490)
2022-06-08T09:33:33.965589527-04:00 	at org.apache.pulsar.common.api.proto.MessageMetadata.getSchemaVersion(MessageMetadata.java:537)
2022-06-08T09:33:33.965592372-04:00 	at org.apache.pulsar.common.api.raw.RawMessageImpl.getSchemaVersion(RawMessageImpl.java:156)
2022-06-08T09:33:33.965595098-04:00 	at org.apache.pulsar.sql.presto.PulsarRecordCursor.getSchemaInfo(PulsarRecordCursor.java:662)
2022-06-08T09:33:33.965597713-04:00 	at org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:530)
2022-06-08T09:33:33.965600538-04:00 	at io.prestosql.$gen.CursorProcessor_20220608_133326_33.process(Unknown Source)
2022-06-08T09:33:33.965603289-04:00 	at io.prestosql.operator.ScanFilterAndProjectOperator$RecordCursorToPages.process(ScanFilterAndProjectOperator.java:323)
2022-06-08T09:33:33.965606919-04:00 	at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965610158-04:00 	at io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
2022-06-08T09:33:33.965612957-04:00 	at io.prestosql.operator.WorkProcessorUtils.access$000(WorkProcessorUtils.java:37)
2022-06-08T09:33:33.965615585-04:00 	at io.prestosql.operator.WorkProcessorUtils$YieldingProcess.process(WorkProcessorUtils.java:181)
2022-06-08T09:33:33.965618223-04:00 	at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965628121-04:00 	at io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
2022-06-08T09:33:33.965631636-04:00 	at io.prestosql.operator.WorkProcessorUtils.lambda$processStateMonitor$2(WorkProcessorUtils.java:200)
2022-06-08T09:33:33.965635773-04:00 	at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965639680-04:00 	at io.prestosql.operator.WorkProcessorUtils.lambda$flatten$6(WorkProcessorUtils.java:277)
2022-06-08T09:33:33.965643475-04:00 	at io.prestosql.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:319)
2022-06-08T09:33:33.965646669-04:00 	at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965649650-04:00 	at io.prestosql.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:306)
2022-06-08T09:33:33.965652535-04:00 	at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965655504-04:00 	at io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
2022-06-08T09:33:33.965859342-04:00 	at io.prestosql.operator.WorkProcessorUtils.lambda$processStateMonitor$2(WorkProcessorUtils.java:200)
2022-06-08T09:33:33.965867541-04:00 	at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965871003-04:00 	at io.prestosql.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:221)
2022-06-08T09:33:33.965877994-04:00 	at io.prestosql.operator.WorkProcessorUtils.lambda$finishWhen$3(WorkProcessorUtils.java:215)
2022-06-08T09:33:33.965882107-04:00 	at io.prestosql.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:372)
2022-06-08T09:33:33.965889844-04:00 	at io.prestosql.operator.WorkProcessorSourceOperatorAdapter.getOutput(WorkProcessorSourceOperatorAdapter.java:149)

It's the same issue with #14379.

Because DeserializeEntries offer entries into messageQueue and will release related entries after processing.
But when PulsarRecordCursor closes, it will also release entries in the messageQueue, so different threads have released the same entry to cause the above issue.

Modification

  • Release the related queue messages after deserializeEntries close.

Documentation

  • doc-not-needed
    (Please explain why)

@Technoboy- Technoboy- marked this pull request as ready for review June 21, 2022 06:49
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 21, 2022
@Technoboy- Technoboy- added type/bug The PR fixed a bug or issue reported a bug area/sql Pulsar SQL related features release/2.8.4 release/2.10.2 release/2.9.4 labels Jun 21, 2022
@Technoboy- Technoboy- added this to the 2.11.0 milestone Jun 21, 2022
@Technoboy- Technoboy- requested a review from gaoran10 June 21, 2022 08:13
Copy link
Contributor

@gaoran10 gaoran10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Technoboy- Technoboy- merged commit 141c440 into apache:master Jun 21, 2022
codelipenghui pushed a commit that referenced this pull request Jun 28, 2022
@mattisonchao mattisonchao added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Jul 2, 2022
mattisonchao pushed a commit that referenced this pull request Jul 2, 2022
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Jul 4, 2022
(cherry picked from commit 141c440)
(cherry picked from commit b68fa32)
BewareMyPower pushed a commit that referenced this pull request Jul 29, 2022
@BewareMyPower BewareMyPower added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Jul 29, 2022
@Technoboy- Technoboy- deleted the fix-plsql-release-issue branch August 10, 2022 05:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/sql Pulsar SQL related features cherry-picked/branch-2.8 Archived: 2.8 is end of life cherry-picked/branch-2.9 Archived: 2.9 is end of life cherry-picked/branch-2.10 doc-not-needed Your PR changes do not impact docs release/2.8.4 release/2.9.4 release/2.10.2 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants