Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP/1.x and HTTP/2.0 decoder Buffer visibility and data corruption #898

Merged
merged 1 commit into from
Dec 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ public void setup() {
responseBuffer.writeShort(CRLF_SHORT);
responseByteBuf = toByteBuf(responseBuffer.slice());

final HttpResponseDecoder decoder = new HttpResponseDecoder(new ArrayDeque<>(),
DefaultHttpHeadersFactory.INSTANCE, 8192, 8192);
decoder.setDiscardAfterReads(1);
channel = new EmbeddedChannel(decoder);
channel = new EmbeddedChannel(new HttpResponseDecoder(new ArrayDeque<>(),
DefaultHttpHeadersFactory.INSTANCE, 8192, 8192));
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ final class ServiceTalkBufferAllocator extends AbstractByteBufAllocator implemen
private final boolean noZeroing;

ServiceTalkBufferAllocator(boolean preferDirect, boolean tryNoZeroing) {

super(preferDirect);
this.noZeroing = tryNoZeroing && useDirectBufferWithoutZeroing();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ final class UnreleasableCompositeByteBuf extends CompositeByteBuf {

UnreleasableCompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents) {
super(alloc, direct, maxNumComponents);
// ServiceTalk buffers are unreleasable. There are some optimizations in Netty which use `refCnt() > 1` to
// judge if a ByteBuf maybe shared, and if not shared Netty may assume is is safe to make changes to the
// underlying storage (e.g. write reallocation, compact data in place) of the ByteBuf which may lead to
// visibility issues across threads and data corruption. We retain() here to imply the ByteBuf maybe shared and
// these optimizations are not safe.
super.retain();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,22 @@ final class UnreleasableDirectByteBuf extends UnpooledDirectByteBuf {

UnreleasableDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
// ServiceTalk buffers are unreleasable. There are some optimizations in Netty which use `refCnt() > 1` to
// judge if a ByteBuf maybe shared, and if not shared Netty may assume is is safe to make changes to the
// underlying storage (e.g. write reallocation, compact data in place) of the ByteBuf which may lead to
// visibility issues across threads and data corruption. We retain() here to imply the ByteBuf maybe shared and
// these optimizations are not safe.
super.retain();
}

UnreleasableDirectByteBuf(ByteBufAllocator alloc, ByteBuffer initialBuffer, int maxCapacity) {
super(alloc, initialBuffer, maxCapacity);
// ServiceTalk buffers are unreleasable. There are some optimizations in Netty which use `refCnt() > 1` to
// judge if a ByteBuf maybe shared, and if not shared Netty may assume is is safe to make changes to the
// underlying storage (e.g. write reallocation, compact data in place) of the ByteBuf which may lead to
// visibility issues across threads and data corruption. We retain() here to imply the ByteBuf maybe shared and
// these optimizations are not safe.
super.retain();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,22 @@ class UnreleasableHeapByteBuf extends UnpooledHeapByteBuf {

UnreleasableHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
// ServiceTalk buffers are unreleasable. There are some optimizations in Netty which use `refCnt() > 1` to
// judge if a ByteBuf maybe shared, and if not shared Netty may assume is is safe to make changes to the
// underlying storage (e.g. write reallocation, compact data in place) of the ByteBuf which may lead to
// visibility issues across threads and data corruption. We retain() here to imply the ByteBuf maybe shared and
// these optimizations are not safe.
super.retain();
}

UnreleasableHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int maxCapacity) {
super(alloc, initialArray, maxCapacity);
// ServiceTalk buffers are unreleasable. There are some optimizations in Netty which use `refCnt() > 1` to
// judge if a ByteBuf maybe shared, and if not shared Netty may assume is is safe to make changes to the
// underlying storage (e.g. write reallocation, compact data in place) of the ByteBuf which may lead to
// visibility issues across threads and data corruption. We retain() here to imply the ByteBuf maybe shared and
// these optimizations are not safe.
super.retain();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,22 @@
class UnreleasableUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
UnreleasableUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
// ServiceTalk buffers are unreleasable. There are some optimizations in Netty which use `refCnt() > 1` to
// judge if a ByteBuf maybe shared, and if not shared Netty may assume is is safe to make changes to the
// underlying storage (e.g. write reallocation, compact data in place) of the ByteBuf which may lead to
// visibility issues across threads and data corruption. We retain() here to imply the ByteBuf maybe shared and
// these optimizations are not safe.
super.retain();
}

UnreleasableUnsafeDirectByteBuf(ByteBufAllocator alloc, ByteBuffer initialBuffer, int maxCapacity) {
super(alloc, initialBuffer, maxCapacity);
// ServiceTalk buffers are unreleasable. There are some optimizations in Netty which use `refCnt() > 1` to
// judge if a ByteBuf maybe shared, and if not shared Netty may assume is is safe to make changes to the
// underlying storage (e.g. write reallocation, compact data in place) of the ByteBuf which may lead to
// visibility issues across threads and data corruption. We retain() here to imply the ByteBuf maybe shared and
// these optimizations are not safe.
super.retain();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,11 @@ protected final void decode(ChannelHandlerContext ctx, ByteBuf buffer) {
}

@Override
protected final ByteBuf swapCumulation(ByteBuf cumulation, ByteBufAllocator allocator) {
protected final ByteBuf swapAndCopyCumulation(final ByteBufAllocator alloc,
final ByteBuf cumulation,
final ByteBuf in) {
final int readerIndex = cumulation.readerIndex();
ByteBuf newCumulation = super.swapCumulation(cumulation, allocator);
ByteBuf newCumulation = super.swapAndCopyCumulation(alloc, cumulation, in);
cumulationIndex -= readerIndex - newCumulation.readerIndex();
return newCumulation;
}
Expand Down Expand Up @@ -525,8 +527,8 @@ private boolean skipControlCharacters(ByteBuf buffer) {
cumulationIndex = buffer.writerIndex();
return false;
} else {
cumulationIndex = i;
buffer.readerIndex(i);
cumulationIndex = i;
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,8 @@ public void acceptShortRequestMethod() {
}

private static EmbeddedChannel newEmbeddedChannel() {
HttpRequestDecoder decoder = new HttpRequestDecoder(new ArrayDeque<>(),
DefaultHttpHeadersFactory.INSTANCE, 8192, 8192);
decoder.setDiscardAfterReads(1);
return new EmbeddedChannel(decoder);
return new EmbeddedChannel(new HttpRequestDecoder(new ArrayDeque<>(),
DefaultHttpHeadersFactory.INSTANCE, 8192, 8192));
}

private static void validateHttpRequest(EmbeddedChannel channel, int expectedContentLength) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,9 +473,7 @@ private static void assertStandardHeaders(HttpHeaders headers) {
}

private static EmbeddedChannel newEmbeddedChannel() {
HttpResponseDecoder decoder = new HttpResponseDecoder(new ArrayDeque<>(), DefaultHttpHeadersFactory.INSTANCE,
8192, 8192);
decoder.setDiscardAfterReads(1);
return new EmbeddedChannel(decoder);
return new EmbeddedChannel(new HttpResponseDecoder(new ArrayDeque<>(), DefaultHttpHeadersFactory.INSTANCE,
8192, 8192));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.handler.codec.ByteToMessageDecoder.Cumulator;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
Expand All @@ -44,8 +43,7 @@
import javax.annotation.Nullable;

import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static io.netty.handler.codec.ByteToMessageDecoder.MERGE_CUMULATOR;
import static java.lang.Math.min;
import static java.lang.Integer.MAX_VALUE;

/**
* {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
Expand Down Expand Up @@ -92,11 +90,9 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
private static final byte STATE_INIT = 0;
private static final byte STATE_CALLING_CHILD_DECODE = 1;
private static final byte STATE_HANDLER_REMOVED_PENDING = 2;
private static final int INTEGER_MAX_DOUBLEABLE_VALUE = Integer.MAX_VALUE >>> 1;

@Nullable
private ByteBuf cumulation;
private Cumulator cumulator = MERGE_CUMULATOR;
private final CtxWrapper ctxWrapper = new CtxWrapper();
private boolean decodeWasNull;
/**
Expand All @@ -108,8 +104,6 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
* </ul>
*/
private byte decodeState = STATE_INIT;
private int discardAfterReads = 16;
private int numReads;

/**
* Create a new instance.
Expand All @@ -118,22 +112,6 @@ protected ByteToMessageDecoder() {
ensureNotSharable();
}

/**
* Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory.
* The default is {@code 16}.
* <p>
* It is assumed this method is only called in the associated {@link Channel}'s {@link EventLoop} thread, otherwise
* external synchronization must be provided.
* @param discardAfterReads The number of calls to {@link ChannelHandlerContext#fireChannelRead(Object)} before
* attempting to discard bytes from the buffer cumulator.
*/
public final void setDiscardAfterReads(int discardAfterReads) {
if (discardAfterReads <= 0) {
throw new IllegalArgumentException("discardAfterReads must be > 0");
}
this.discardAfterReads = discardAfterReads;
}

@Override
public final void handlerRemoved(ChannelHandlerContext ctx) {
if (decodeState == STATE_CALLING_CHILD_DECODE) {
Expand All @@ -151,12 +129,10 @@ public final void handlerRemoved(ChannelHandlerContext ctx) {
ByteBuf bytes = buf.readBytes(readable);
buf.release();
ctx.fireChannelRead(bytes);
ctx.fireChannelReadComplete();
} else {
buf.release();
}

numReads = 0;
ctx.fireChannelReadComplete();
}
handlerRemoved0(ctx);
}
Expand All @@ -179,7 +155,17 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (cumulation == null) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
final int required = data.readableBytes();
if (required > cumulation.maxWritableBytes() ||
(required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1)) {
// Expand cumulation (by replacing it) under the following conditions:
// - cumulation cannot be resized to accommodate the additional data
// - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
// assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
cumulation = swapAndCopyCumulation(ctx.alloc(), cumulation, data);
} else {
cumulation.writeBytes(data);
}
}
callDecode(ctxWrapper, cumulation);
} catch (DecoderException e) {
Expand All @@ -188,13 +174,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
releaseCumulation();
} else if (++numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
tryDiscardSomeReadBytes(ctx.alloc());
}
decodeWasNull = firedChannelReadCount == ctxWrapper.getFireChannelReadCount();
ctxWrapper.resetFireChannelReadCount();
Expand All @@ -206,8 +186,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
numReads = 0;
tryDiscardSomeReadBytes(ctx.alloc());
if (decodeWasNull) {
decodeWasNull = false;
if (!ctx.channel().config().isAutoRead()) {
Expand All @@ -217,39 +195,30 @@ public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.fireChannelReadComplete();
}

private void tryDiscardSomeReadBytes(ByteBufAllocator allocator) {
// Avoid using discardSomeReadBytes because this will modify the underlying storage of the ByteBuf.
// Modifying the underlying storage of the ByteBuf means that any slices or views on this data will become
// corrupted. This is particularly a problem when offloading user code to a non-EventLoop thread to avoid
// blocking the EventLoop. In this case the users code will be executed asynchronously, which maybe a slice
// of this cumulation, and if we use discardSomeReadBytes the user code will see corrupted data.
if (cumulation != null && cumulation.readerIndex() >= cumulation.capacity() >>> 1) {
cumulation = swapCumulation(cumulation, allocator);
}
}

/**
* Swap the existing {@code cumulation} {@link ByteBuf} for a new {@link ByteBuf}. This method is called when a
* heuristic determines the amount of unused bytes is sufficiently high that a resize / defragmentation of the
* bytes from {@code cumulation} is beneficial.
* Swap the existing {@code cumulation} {@link ByteBuf} for a new {@link ByteBuf} and copy {@code in}. This method
* is called when a heuristic determines the amount of unused bytes is sufficiently high that a
* resize / defragmentation of the bytes from {@code cumulation} is beneficial.
* <p>
* {@link ByteBuf#discardReadBytes()} is generally avoided in this method because it changes the underlying data
* structure. If others have slices of this {@link ByteBuf} their view on the data will become corrupted. This is
* commonly a problem when processing data asynchronously to avoid blocking the EventLoop thread.
*
* commonly a problem when processing data asynchronously to avoid blocking the {@link EventLoop} thread.
* @param alloc Used to allocate a new {@link ByteBuf} if necessary.
* @param cumulation The {@link ByteBuf} that accumulates across socket read operations.
* @param allocator Used to allocate a new {@link ByteBuf} if necessary.
* @return the {@link ByteBuf} that is responsible for accumulated socket reads.
* @param in The bytes to copy.
* @return the result of the swap and copy operation.
*/
protected ByteBuf swapCumulation(ByteBuf cumulation, ByteBufAllocator allocator) {
protected ByteBuf swapAndCopyCumulation(final ByteBufAllocator alloc, final ByteBuf cumulation, final ByteBuf in) {
ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(
cumulation.readableBytes() + in.readableBytes(), MAX_VALUE));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we take care about int overflow here? Otherwise, users may get IllegalArgumentException from calculateNewCapacity

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we overflow int then we wouldn't be able to accommodate all the data in a single buffer. We could attempt to fall back to a multi-buffer approach, or provide a pre-check for a more informative exception, but since this case is unlikely to occur I think the current "check in a single spot" approach is sufficient for now.

ByteBuf toRelease = newCumulation;
try {
ByteBuf newCumulation = allocator.buffer(allocator.calculateNewCapacity(cumulation.capacity(),
min(INTEGER_MAX_DOUBLEABLE_VALUE, cumulation.capacity()) << 1));
newCumulation.writeBytes(cumulation);
newCumulation.writeBytes(in);
toRelease = cumulation;
return newCumulation;
} finally {
cumulation.release();
// no need to call cumulationReset, folks can override this method instead.
toRelease.release();
}
}

Expand Down