Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer often discards received messages #2401

Closed
massakam opened this issue Aug 20, 2018 · 10 comments · Fixed by #2464
Closed

Consumer often discards received messages #2401

massakam opened this issue Aug 20, 2018 · 10 comments · Fixed by #2464
Labels
help wanted type/bug The PR fixed a bug or issue reported a bug

Comments

@massakam
Copy link
Contributor

massakam commented Aug 20, 2018

When TLS is enabled, consumer often discards received messages. The following is the consumer log when a message is discarded.

17:53:52.113 [pulsar-client-io-1-1] ERROR o.a.pulsar.client.impl.ConsumerImpl - [persistent://xxx/xxx/xxx/xxx][xxx] Discarding corrupted message at 242595:3896

At this time, an IndexOutOfBoundsException has been thrown at this line.

java.lang.IndexOutOfBoundsException: writerIndex: 1680879946 (expected: readerIndex(23) <= writerIndex <= capacity(870))
        at io.netty.buffer.AbstractByteBuf.writerIndex(AbstractByteBuf.java:118) ~[netty-all-4.1.22.Final.jar:4.1.22.Final]
        at org.apache.pulsar.common.api.Commands.parseMessageMetadata(Commands.java:271) ~[pulsar-common-2.1.0-incubating.jar:2.1.0-incubating]
        at org.apache.pulsar.client.impl.ConsumerImpl.messageReceived(ConsumerImpl.java:713) ~[pulsar-client-original-2.1.0-incubating.jar:2.1.0-incubating]
        at org.apache.pulsar.client.impl.ClientCnx.handleMessage(ClientCnx.java:275) [pulsar-client-original-2.1.0-incubating.jar:2.1.0-incubating]
        at org.apache.pulsar.common.api.PulsarDecoder.channelRead(PulsarDecoder.java:166) [pulsar-common-2.1.0-incubating.jar:2.1.0-incubating]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:413) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1389) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1171) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1205) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1414) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:945) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:806) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:404) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:304) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.22.Final.jar:4.1.22.Final]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

If receiving the discarded messages again by resetting the cursor, this error may not occur.
When TLS is disabled, this phenomenon does not seem to occur.

System configuration

Pulsar version: 2.1.0-incubating

@massakam
Copy link
Contributor Author

It seems that this error does not occur if I revert the following change and deploy it to the broker side.

#1203

However, I don't understand the reason...

@merlimat merlimat added the type/bug The PR fixed a bug or issue reported a bug label Aug 20, 2018
@massakam
Copy link
Contributor Author

@merlimat Do you have any thoughts about what the cause might be?

@massakam
Copy link
Contributor Author

This phenomenon seems to happen when the consumer is receiving a lot of messages. For example, it is prone to happen when the consumer connects to a topic with a large number of messages in the backlog.

@ivankelly
Copy link
Contributor

Looks like a bytebuf issue. When reading a bytebuf, we often do things like reading the size from the packet and setting the writeIndex to the current writeIndex + size, so that it can be passed into the protobuf parser. This is why the write index seems so messed up. Do you have a reliable repro, or do you just see it happening regularly in production?

Is TLS auth enabled, or just TLS transport? this is going to be a bytebuf pooling issue of some sort with TLs.

@ivankelly
Copy link
Contributor

What rate are messages arriving at the topic in question?
Is the client processing messages for just this topic, or other topics also?

I'm trying to get a feel for how we could repro this locally.

@massakam
Copy link
Contributor Author

@ivankelly

Do you have a reliable repro, or do you just see it happening regularly in production?

In our environment, it can be reproduced by using the OpenMessaging benchmark tool and applying the following load:

topics: 1
partitionsPerTopic: 1
messageSize: 1024
payloadFile: "payload/payload-1Kb.data"
subscriptionsPerTopic: 100
producersPerTopic: 1
producerRate: 1000
consumerBacklogSizeGB: 0
testDurationMinutes: 2

It seems that it tends to occur when there are many subscriptions. It also occurred several times in production.

Is TLS auth enabled, or just TLS transport?

We are not using TLS auth.

@ivankelly
Copy link
Contributor

I have a local repro, will look into it more in the morning.

@massakam
Copy link
Contributor Author

Sorry, OpenMessaging benchmark tool has not yet supported TLS...
We are using the tool modified to enable TLS.

@ivankelly
Copy link
Contributor

Yes, it's only a 2 line change to make it work.

@ivankelly
Copy link
Contributor

Spent some time on this today, still no root cause, but it looks very much like something is holding on to a bytebuf that has been released back to the pool, and then it's writing to it. Will continue looking.

ivankelly added a commit that referenced this issue Aug 28, 2018
The netty SSL handler uses a coalescing buffer queue, which modifies
the buffers used to queue the writes so that SSL_write can be given
larger chunks, thereby increasing the 'goodput'.

If we pass in a retained duplicate as we have been doing until now,
then later clients will be passed junk, as SSL will have modified cached
entry buffers.

This patch introduces a copying ByteBufPair encoder, which is only
used with SSL connections.
ivankelly added a commit to ivankelly/pulsar that referenced this issue Aug 28, 2018
The netty SSL handler uses a coalescing buffer queue, which modifies
the buffers used to queue the writes so that SSL_write can be given
larger chunks, thereby increasing the 'goodput'.

If we pass in a retained duplicate as we have been doing until now,
then later clients will be passed junk, as SSL will have modified cached
entry buffers.

This patch introduces a copying ByteBufPair encoder, which is only
used with SSL connections.
@merlimat merlimat added this to the 2.1.1-incubating milestone Aug 28, 2018
merlimat pushed a commit that referenced this issue Aug 31, 2018
The netty SSL handler uses a coalescing buffer queue, which modifies
the buffers used to queue the writes so that SSL_write can be given
larger chunks, thereby increasing the 'goodput'.

If we pass in a retained duplicate as we have been doing until now,
then later clients will be passed junk, as SSL will have modified cached
entry buffers.

This patch introduces a copying ByteBufPair encoder, which is only
used with SSL connections.
merlimat pushed a commit that referenced this issue Sep 5, 2018
The netty SSL handler uses a coalescing buffer queue, which modifies
the buffers used to queue the writes so that SSL_write can be given
larger chunks, thereby increasing the 'goodput'.

If we pass in a retained duplicate as we have been doing until now,
then later clients will be passed junk, as SSL will have modified cached
entry buffers.

This patch introduces a copying ByteBufPair encoder, which is only
used with SSL connections.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants