Skip to content

[DISCUSS] Consumer messages from un-support compression type #5616

@codelipenghui

Description

@codelipenghui

Is your feature request related to a problem? Please describe.
Currently, if use an old version pulsar consumer (e.g. 2.2.0) to read messages from a new version producer(e.g. 2.4.0), if producer enable compression and use a 2.2.0 un-supported compression type, we will get following error logs:

19:26:55.918 [pulsar-client-io-1-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:6650] Got exception IndexOutOfBoundsException : writerIndex: 682963024 (expected: readerIndex(83) <= writerIndex <= capacity(28476))
java.lang.IndexOutOfBoundsException: writerIndex: 682963024 (expected: readerIndex(83) <= writerIndex <= capacity(28476))
	at io.netty.buffer.AbstractByteBuf.writerIndex(AbstractByteBuf.java:118) ~[org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at org.apache.pulsar.common.api.Commands.deSerializeSingleMessageInBatch(Commands.java:1070) ~[org.apache.pulsar-pulsar-common-2.2.0.jar:2.2.0]
	at org.apache.pulsar.client.impl.ConsumerImpl.receiveIndividualMessagesFromBatch(ConsumerImpl.java:971) ~[org.apache.pulsar-pulsar-client-original-2.2.0.jar:2.2.0]
	at org.apache.pulsar.client.impl.ConsumerImpl.messageReceived(ConsumerImpl.java:860) ~[org.apache.pulsar-pulsar-client-original-2.2.0.jar:2.2.0]
	at org.apache.pulsar.client.impl.ClientCnx.handleMessage(ClientCnx.java:287) ~[org.apache.pulsar-pulsar-client-original-2.2.0.jar:2.2.0]
	at org.apache.pulsar.common.api.PulsarDecoder.channelRead(PulsarDecoder.java:169) ~[org.apache.pulsar-pulsar-common-2.2.0.jar:2.2.0]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:413) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1414) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:945) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:806) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:404) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:304) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [org.apache.pulsar-pulsar-functions-metrics-2.2.0.jar:?]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
19:26:55.919 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [id: 0x6515eaab, L:/127.0.0.1:36184 ! R:localhost/127.0.0.1:6650] Disconnected
19:26:55.919 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://sql/stat/position-feature-recommend-smater-observation-byte] [test] Closed connection [id: 0x6515eaab, L:/127.0.0.1:36184 ! R:localhost/127.0.0.1:6650] -- Will try again in 0.1 s

It's difficult to troubleshoot the problems from the error log, after doing some compatibility tests then found the problem is producer using ZSTD compression type and consumer using 2.2.0 version which have not support ZSTD compression yet.

Describe the solution you'd like
We need more logs to show that the reason of consumer read failed is un-supported compression type, and then we should close the consumer.

If you have any ideas please left comments here, looking forward to your feedbacks.

Metadata

Metadata

Assignees

No one assigned

    Labels

    lifecycle/staletype/featureThe PR added a new feature or issue requested a new feature

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions