Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix multiple transfer corruption issues when TLS is enabled #22760

Closed
wants to merge 10 commits into from

Conversation

lhotari
Copy link
Member

@lhotari lhotari commented May 22, 2024

UPDATE: This PR has been replaced by #22810

Fixes #22601 #21892 #19460

Motivation

In Pulsar, there are multiple reported issues where the transferred output gets corrupted and fails with exceptions around invalid reader and writer index. One source of these issues are the ones which occur only when TLS is enabled.

I found these Netty issues that provide a lot of context:

It seems that this is a long time issue in Netty and it has been partially fixed. However, it's not fixed for many locations in the Netty code base and it's not safe to share ByteBuf instances in all cases.

In Pulsar, the sharing of ByteBuf instance happens in this case at least via the broker cache (RangeEntryCacheManagerImpl) and the pending reads manager (PendingReadsManager).

The SslHandler related issue was originally reported in Pulsar in 2018 with #2401 . The fix that time was #2464.
The ByteBuf .copy() method was used to copy the ByteBuf. The problem with this change is that .copy() itself isn't thread safe and accesses the internalNioBuffer instance directly.

This happens at least when the ByteBuf instance contains a ReadOnlyByteBufferBuf wrapper. This can be seen in the code https://github.com/netty/netty/blob/243de91df2e9a9bf0ad938f54f76063c14ba6e3d/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBufferBuf.java#L412-L433 .

As a result of this, exceptions such as these ones occur:

java.lang.IllegalArgumentException: newPosition > limit: (2094 > 88)
    at java.base/java.nio.Buffer.createPositionException(Buffer.java:341)
    at java.base/java.nio.Buffer.position(Buffer.java:316)
    at java.base/java.nio.ByteBuffer.position(ByteBuffer.java:1516)
    at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:185)
    at io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:268)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1113)
    at io.netty.buffer.ReadOnlyByteBufferBuf.copy(ReadOnlyByteBufferBuf.java:431)
    at io.netty.buffer.DuplicatedByteBuf.copy(DuplicatedByteBuf.java:210)
    at io.netty.buffer.AbstractByteBuf.copy(AbstractByteBuf.java:1194)
    at org.apache.pulsar.common.protocol.ByteBufPair$CopyingEncoder.write(ByteBufPair.java:149)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:893)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868)
    at org.apache.pulsar.broker.service.PulsarCommandSenderImpl.lambda$sendMessagesToConsumer$1(PulsarCommandSenderImpl.java:277)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:840)
java.nio.BufferUnderflowException
    at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:183)
    at io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:268)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1113)
    at io.netty.buffer.ReadOnlyByteBufferBuf.copy(ReadOnlyByteBufferBuf.java:431)
    at io.netty.buffer.DuplicatedByteBuf.copy(DuplicatedByteBuf.java:210)
    at io.netty.buffer.AbstractByteBuf.copy(AbstractByteBuf.java:1194)
    at org.apache.pulsar.common.protocol.ByteBufPair$CopyingEncoder.write(ByteBufPair.java:149)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:893)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868)
    at org.apache.pulsar.broker.service.PulsarCommandSenderImpl.lambda$sendMessagesToConsumer$1(PulsarCommandSenderImpl.java:277)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:840)

It is likely that Failed to peek sticky key from the message metadata java.lang.IllegalArgumentException: Invalid unknonwn tag type: 4 exceptions are also caused by the same root cause.
java.lang.IndexOutOfBoundsException: readerIndex: 31215, writerIndex: 21324 (expected: 0 <= readerIndex <= writerIndex <= capacity(65536)) type of exceptions on the broker side are possibly caused by the same root cause as well.

The root cause of such exceptions could also be different. A shared Netty ByteBuf must have at least have an independent view created with duplicate, slice or retainedDuplicate if the readerIndex is mutated.
The ByteBuf instance must also be properly shared in a thread safe way. Failing to do that could result in similar symptoms and this PR doesn't fix that.

Modifications

  • Remove the ByteBufPair.CopyingEncoder and make the ByteBufPair.Encoder suitable for both use cases
    • A read-only ByteBuf needs to be passed to SslHandler so that it doesn't get mutated. A deep copy isn't required. This solution is also more performant since there will be less memory copies.
  • Workaround an IntelliJ issue where it shows a red mark in PulsarChannelInitializer classes (casting to ChannelHandler fixes the issue).

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@lhotari lhotari force-pushed the lh-fix-tls-corruption-issue branch from de75e14 to 1ffda4c Compare May 22, 2024 09:41
@lhotari
Copy link
Member Author

lhotari commented May 22, 2024

On Netty side, PooledByteBuf.getBytes was made thread safe by making a copy of the internalByteBuffer with netty/netty#9120 changes. However this change is not reflected in the ReadOnlyByteBufferBuf implementation. (this doesn't have an impact on this PR, just sharing the observation)

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
great work

@lhotari lhotari marked this pull request as ready for review May 22, 2024 17:18
@lhotari lhotari requested a review from eolivelli May 22, 2024 17:18
@lhotari lhotari marked this pull request as draft May 22, 2024 18:17
// If the buffer is already read-only, .asReadOnly() will return the same buffer.
// That's why the additional .retainedDuplicate() is needed to ensure that the returned buffer
// has independent readIndex and writeIndex.
return buf.asReadOnly().retainedDuplicate();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    private static ByteBuf readOnlyRetainedDuplicate(ByteBuf buf) {
        if (buf == null || buf.readableBytes() <= 0) {
            return Unpooled.EMPTY_BUFFER;
        }
        
        if (buf.isReadOnly()) {
            return buf.retainedDuplicate().asReadOnly();
        }
        return buf.asReadOnly().retain();
    }

Is this should be better? I think we don't need to call duplicate() after call asReadOnly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to call duplicate() after call asReadOnly

It is needed. The comment explains it. please check the comment for the reason.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean if the input buf is not readOnly, asReadOnly will create a new instance, so we just need to retain() after asReadOnly.
It also has independent read/write index.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (buf == null || buf.readableBytes() <= 0) {
return Unpooled.EMPTY_BUFFER;
}

I don't think that nulls should be tolerated when nulls shouldn't be passed as input. For the readableBytes() == 0 optimization, is that needed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just defensive programming, if we are sure that the input buf cannot be null, this line can be removed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I checked the source code, it looks a little strange that I didn't understand at the first time.

    private static boolean attemptCopyToCumulation(ByteBuf cumulation, ByteBuf next, int wrapDataSize) {
        final int inReadableBytes = next.readableBytes();
        final int cumulationCapacity = cumulation.capacity();
        if (wrapDataSize - cumulation.readableBytes() >= inReadableBytes &&
                // Avoid using the same buffer if next's data would make cumulation exceed the wrapDataSize.
                // Only copy if there is enough space available and the capacity is large enough, and attempt to
                // resize if the capacity is small.
                (cumulation.isWritable(inReadableBytes) && cumulationCapacity >= wrapDataSize ||
                        cumulationCapacity < wrapDataSize &&
                                ensureWritableSuccess(cumulation.ensureWritable(inReadableBytes, false)))) {
            cumulation.writeBytes(next);
            next.release();
            return true;
        }
        return false;
    }

We should return false immediately if cumulation is not writable

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach is currently blocked by another Netty bug where SslHandler doesn't support a read only buffer. That seems to be the reason why a deep copy is currently required. Fixing that issue in netty/netty#14071.

Yes, the fix looks good

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return false immediately if cumulation is not writable

In this case, isWritable returns true for a buffer for a wrapped read only buffer. This is surprising and the reason why this read only buffers aren't supported if there's another wrapper. It correctly returns true for isReadOnly.

One possible workaround could be to use Unpooled.unmodifiableBuffer to add the readonly wrapper so that it's always the "top most" wrapper. That would also avoid the need for the extra duplicate() wrapper.

Copy link
Member

@dao-jun dao-jun May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, isWritable returns true for a buffer for a wrapped read only buffer.

It sounds very strange, can you please point me where?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds very strange, can you please point me where?

Yes, it's very surprising behavior. You can try it out with a debugger by running for example TlsProducerConsumerTest and checking what happens in the attemptCopyToCumulation if the .asReadOnly().retainedDuplicate() solution is used.

The io.netty.buffer.AbstractByteBuf#isWritable(int) method gets called: https://github.com/netty/netty/blob/70d6a3f40d7e8fd3f5ced7600ed209c58944f673/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java#L171-L174
It will evaluate to true since it's implementation doesn't check for isReadOnly():

    @Override
    public boolean isWritable(int numBytes) {
        return capacity() - writerIndex >= numBytes;
    }

@lhotari
Copy link
Member Author

lhotari commented May 22, 2024

I'll add background to the PR description to help reviewers understand it quickly.

@dao-jun Please share that as a comment instead. This is my PR and not yours. :) I'll update the description based on the feedback.

@dao-jun
Copy link
Member

dao-jun commented May 22, 2024

Just add additional context to help understand it quickly.

CopyingEncoder is used to handle the case of SslHandler. Because SslHandler could compose ByteBufs to one(The first input ByteBuf).
For instance, we write buf1,buf2,buf3 to the handler, it will write buf2,buf3 to buf1. If we are enable EntryCache, this will pollute the entries in the EntryCache. So we use buf.copy to make a new instance and copy the bytes to handle the case.

However, buf.copy is not thread safe, multi thread copying may lead to data corruption.

The solution is pass a ReadOnlyByteBuf to SslHandler to disable ByteBuf compose(write buf2,buf3 to buf1), although netty considered the input ByteBuf can be not writable, but it seems there are some problems with the code:

    private static boolean attemptCopyToCumulation(ByteBuf cumulation, ByteBuf next, int wrapDataSize) {
        final int inReadableBytes = next.readableBytes();
        final int cumulationCapacity = cumulation.capacity();
        if (wrapDataSize - cumulation.readableBytes() >= inReadableBytes &&
                // Avoid using the same buffer if next's data would make cumulation exceed the wrapDataSize.
                // Only copy if there is enough space available and the capacity is large enough, and attempt to
                // resize if the capacity is small.
                (cumulation.isWritable(inReadableBytes) && cumulationCapacity >= wrapDataSize ||
                        cumulationCapacity < wrapDataSize &&
                                ensureWritableSuccess(cumulation.ensureWritable(inReadableBytes, false)))) {
            cumulation.writeBytes(next);
            next.release();
            return true;
        }
        return false;
    }

it should return false immediately if cumulation.isWritable(inReadableBytes)== false or isReadOnly == true
Waiting the fix PR netty/netty#14071 merged.

@dao-jun
Copy link
Member

dao-jun commented May 22, 2024

Yes, of cause, my bad, since I didn't explain myself correctly.

@lhotari
Copy link
Member Author

lhotari commented May 22, 2024

Yes, of cause, my bad, since I didn't explain myself correctly.

Thanks for the useful summary @dao-jun. I'll revisit the description of this PR later once this comes to a conclusion.

@lhotari
Copy link
Member Author

lhotari commented May 22, 2024

it should return false immediately if cumulation.isWritable(inReadableBytes)== false or isReadOnly == true
Waiting the fix PR netty/netty#14071 merged.

there might be a workaround (explained in #22760 (comment)). will test that.

@lhotari lhotari marked this pull request as ready for review May 22, 2024 20:53
@lhotari lhotari requested a review from dao-jun May 22, 2024 20:54
@lhotari lhotari marked this pull request as draft May 22, 2024 21:04
@lhotari
Copy link
Member Author

lhotari commented May 22, 2024

I think I'll need to add a repro test to the Pulsar code base. While testing the recent changes, I can see that the problem occurs even when there's the read only wrapper.

@lhotari lhotari marked this pull request as ready for review May 22, 2024 22:13
@lhotari lhotari marked this pull request as draft May 23, 2024 07:31
@lhotari
Copy link
Member Author

lhotari commented May 23, 2024

There are also other bugs in this area.

When TLS is enabled between Broker and Bookies, the Bookkeeper V3 protocol is used.
In the Bookkeeper client, there's a bug related reference counts when V3 protocol is used.
The PR to fix that issue is apache/bookkeeper#4293 .

@lhotari
Copy link
Member Author

lhotari commented May 27, 2024

Closing this PR since the transfer corruption issues will be fixed by changes in Netty 4.1.111.Final (netty/netty#14072, netty/netty#14071, netty/netty#14076 and netty/netty#14078) and Bookkeeper 4.16.6 (apache/bookkeeper#4289 and apache/bookkeeper#4293).

@lhotari lhotari closed this May 27, 2024
@lhotari
Copy link
Member Author

lhotari commented May 29, 2024

Fix in Bookkeeper is apache/bookkeeper#4404

@lhotari
Copy link
Member Author

lhotari commented May 30, 2024

UPDATE: This PR has been replaced by #22810 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] parseMessageMetadata error when broker entry metadata enable with high loading
3 participants