diff --git a/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java b/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java
index 13f55226483..3b8c595a12e 100644
--- a/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java
+++ b/netty/src/main/java/io/grpc/netty/GrpcHttp2ConnectionHandler.java
@@ -18,6 +18,7 @@
import static com.google.common.base.Preconditions.checkState;
+import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.Internal;
@@ -27,6 +28,7 @@
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Settings;
+import io.netty.util.Version;
import javax.annotation.Nullable;
/**
@@ -41,6 +43,30 @@ public abstract class GrpcHttp2ConnectionHandler extends Http2ConnectionHandler
@Nullable
protected final ChannelPromise channelUnused;
private final ChannelLogger negotiationLogger;
+ private static final boolean usingPre4_1_111_Netty;
+
+ static {
+ // Netty 4.1.111 introduced a change in the behavior of duplicate() method
+ // that breaks the assumption of the cumulator. We need to detect this version
+ // and adjust the behavior accordingly.
+
+ boolean identifiedOldVersion = false;
+ try {
+ Version version = Version.identify().get("netty-buffer");
+ if (version != null) {
+ String[] split = version.artifactVersion().split("\\.");
+ if (split.length >= 3
+ && Integer.parseInt(split[0]) == 4
+ && Integer.parseInt(split[1]) <= 1
+ && Integer.parseInt(split[2]) < 111) {
+ identifiedOldVersion = true;
+ }
+ }
+ } catch (Exception e) {
+ // Ignore, we'll assume it's a new version.
+ }
+ usingPre4_1_111_Netty = identifiedOldVersion;
+ }
protected GrpcHttp2ConnectionHandler(
ChannelPromise channelUnused,
@@ -51,7 +77,16 @@ protected GrpcHttp2ConnectionHandler(
super(decoder, encoder, initialSettings);
this.channelUnused = channelUnused;
this.negotiationLogger = negotiationLogger;
- setCumulator(ADAPTIVE_CUMULATOR);
+ if (usingPre4_1_111_Netty()) {
+ // We need to use the adaptive cumulator only if we're using a version of Netty that
+ // doesn't have the behavior that breaks it.
+ setCumulator(ADAPTIVE_CUMULATOR);
+ }
+ }
+
+ @VisibleForTesting
+ static boolean usingPre4_1_111_Netty() {
+ return usingPre4_1_111_Netty;
}
/**
diff --git a/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java b/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java
index e56b97b799d..a1fe3fc2c38 100644
--- a/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java
+++ b/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java
@@ -27,12 +27,6 @@
/**
* "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
* compose strategies.
- *
- *
- *
Avoid using - * {@link CompositeByteBuf#addFlattenedComponents(boolean, ByteBuf)} as it can lead - * to corruption, where the components' readable area are not equal to the Composite's capacity - * (see https://github.com/netty/netty/issues/12844). */ class NettyAdaptiveCumulator implements Cumulator { @@ -95,7 +89,8 @@ public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBu composite.capacity(composite.writerIndex()); } } else { - composite = alloc.compositeBuffer(Integer.MAX_VALUE).addComponent(true, cumulation); + composite = alloc.compositeBuffer(Integer.MAX_VALUE) + .addFlattenedComponents(true, cumulation); } addInput(alloc, composite, in); in = null; @@ -115,7 +110,7 @@ public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBu @VisibleForTesting void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) { if (shouldCompose(composite, in, composeMinSize)) { - composite.addComponent(true, in); + composite.addFlattenedComponents(true, in); } else { // The total size of the new data and the last component are below the threshold. Merge them. mergeWithCompositeTail(alloc, composite, in); @@ -161,13 +156,32 @@ static void mergeWithCompositeTail( ByteBuf tail = composite.component(tailComponentIndex); ByteBuf newTail = null; try { - if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity() - && !isCompositeOrWrappedComposite(tail)) { + if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()) { // Ideal case: the tail isn't shared, and can be expanded to the required capacity. // Take ownership of the tail. newTail = tail.retain(); + // TODO(https://github.com/netty/netty/issues/12844): remove when we use Netty with + // the issue fixed. + // In certain cases, removing the CompositeByteBuf component, and then adding it back + // isn't idempotent. An example is provided in https://github.com/netty/netty/issues/12844. + // This happens because the buffer returned by composite.component() has out-of-sync + // indexes. Under the hood the CompositeByteBuf returns a duplicate() of the underlying + // buffer, but doesn't set the indexes. + // + // To get the right indexes we use the fact that composite.internalComponent() returns + // the slice() into the readable portion of the underlying buffer. + // We use this implementation detail (internalComponent() returning a *SlicedByteBuf), + // and combine it with the fact that SlicedByteBuf duplicates have their indexes + // adjusted so they correspond to the to the readable portion of the slice. + // + // Hence composite.internalComponent().duplicate() returns a buffer with the + // indexes that should've been on the composite.component() in the first place. + // Until the issue is fixed, we manually adjust the indexes of the removed component. + ByteBuf sliceDuplicate = composite.internalComponent(tailComponentIndex).duplicate(); + newTail.setIndex(sliceDuplicate.readerIndex(), sliceDuplicate.writerIndex()); + /* * The tail is a readable non-composite buffer, so writeBytes() handles everything for us. * @@ -183,11 +197,7 @@ static void mergeWithCompositeTail( newTail.writeBytes(in); } else { - // The tail satisfies one or more criteria: - // - Shared - // - Not expandable - // - Composite - // - Wrapped Composite + // The tail is shared, or not expandable. Replace it with a new buffer of desired capacity. newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE)); newTail.setBytes(0, composite, tailStart, tailSize) .setBytes(tailSize, in, in.readerIndex(), inputSize) @@ -200,7 +210,7 @@ static void mergeWithCompositeTail( // Remove the old tail, reset writer index. composite.removeComponent(tailComponentIndex).setIndex(0, tailStart); // Add back the new tail. - composite.addComponent(true, newTail); + composite.addFlattenedComponents(true, newTail); // New tail's ownership transferred to the composite buf. newTail = null; composite.readerIndex(prevReader); @@ -215,12 +225,4 @@ static void mergeWithCompositeTail( } } } - - private static boolean isCompositeOrWrappedComposite(ByteBuf tail) { - ByteBuf cur = tail; - while (cur.unwrap() != null) { - cur = cur.unwrap(); - } - return cur instanceof CompositeByteBuf; - } } diff --git a/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java b/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java index a09e2e08e56..1037dab4712 100644 --- a/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java @@ -81,7 +81,7 @@ public void setUp() { @Override void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) { // To limit the testing scope to NettyAdaptiveCumulator.cumulate(), always compose - composite.addComponent(true, in); + composite.addFlattenedComponents(true, in); } }; @@ -122,7 +122,7 @@ public void cumulate_contiguousCumulation_newCompositeFromContiguousAndInput() { @Test public void cumulate_compositeCumulation_inputAppendedAsANewComponent() { - CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, contiguous); + CompositeByteBuf composite = alloc.compositeBuffer().addFlattenedComponents(true, contiguous); assertSame(composite, cumulator.cumulate(alloc, composite, in)); assertEquals(DATA_INITIAL, composite.component(0).toString(US_ASCII)); assertEquals(DATA_INCOMING, composite.component(1).toString(US_ASCII)); @@ -136,7 +136,7 @@ public void cumulate_compositeCumulation_inputAppendedAsANewComponent() { @Test public void cumulate_compositeCumulation_inputReleasedOnError() { - CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, contiguous); + CompositeByteBuf composite = alloc.compositeBuffer().addFlattenedComponents(true, contiguous); try { throwingCumulator.cumulate(alloc, composite, in); fail("Cumulator didn't throw"); @@ -208,8 +208,8 @@ public void setUp() { in = ByteBufUtil.writeAscii(alloc, inData); tail = ByteBufUtil.writeAscii(alloc, tailData); composite = alloc.compositeBuffer(Integer.MAX_VALUE); - // Note that addComponent() will not add a new component when tail is not readable. - composite.addComponent(true, tail); + // Note that addFlattenedComponents() will not add a new component when tail is not readable. + composite.addFlattenedComponents(true, tail); } @After @@ -345,7 +345,7 @@ public void mergeWithCompositeTail_tailExpandable_write() { assertThat(in.readableBytes()).isAtMost(tail.writableBytes()); // All fits, so tail capacity must stay the same. - composite.addComponent(true, tail); + composite.addFlattenedComponents(true, tail); assertTailExpanded(EXPECTED_TAIL_DATA, fitCapacity); } @@ -362,7 +362,7 @@ public void mergeWithCompositeTail_tailExpandable_fastWrite() { alloc.calculateNewCapacity(EXPECTED_TAIL_DATA.length(), Integer.MAX_VALUE); // Tail capacity is extended to its fast capacity. - composite.addComponent(true, tail); + composite.addFlattenedComponents(true, tail); assertTailExpanded(EXPECTED_TAIL_DATA, tailFastCapacity); } @@ -372,7 +372,7 @@ public void mergeWithCompositeTail_tailExpandable_reallocateInMemory() { @SuppressWarnings("InlineMeInliner") // Requires Java 11 String inSuffixOverFastBytes = Strings.repeat("a", tailFastCapacity + 1); int newTailSize = tail.readableBytes() + inSuffixOverFastBytes.length(); - composite.addComponent(true, tail); + composite.addFlattenedComponents(true, tail); // Make input larger than tailFastCapacity in.writeCharSequence(inSuffixOverFastBytes, US_ASCII); @@ -386,6 +386,9 @@ public void mergeWithCompositeTail_tailExpandable_reallocateInMemory() { } private void assertTailExpanded(String expectedTailReadableData, int expectedNewTailCapacity) { + if (!GrpcHttp2ConnectionHandler.usingPre4_1_111_Netty()) { + return; // Netty 4.1.111 doesn't work with NettyAdaptiveCumulator + } int originalNumComponents = composite.numComponents(); // Handle the case when reader index is beyond all readable bytes of the cumulation. @@ -435,21 +438,21 @@ public void mergeWithCompositeTail_tailNotExpandable_maxCapacityReached() { @SuppressWarnings("InlineMeInliner") // Requires Java 11 String tailSuffixFullCapacity = Strings.repeat("a", tail.maxWritableBytes()); tail.writeCharSequence(tailSuffixFullCapacity, US_ASCII); - composite.addComponent(true, tail); + composite.addFlattenedComponents(true, tail); assertTailReplaced(); } @Test public void mergeWithCompositeTail_tailNotExpandable_shared() { tail.retain(); - composite.addComponent(true, tail); + composite.addFlattenedComponents(true, tail); assertTailReplaced(); tail.release(); } @Test public void mergeWithCompositeTail_tailNotExpandable_readOnly() { - composite.addComponent(true, tail.asReadOnly()); + composite.addFlattenedComponents(true, tail.asReadOnly()); assertTailReplaced(); } @@ -527,7 +530,8 @@ public void mergeWithCompositeTail_tailExpandable_mergedReleaseOnThrow() { CompositeByteBuf compositeThrows = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE, tail) { @Override - public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) { + public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, + ByteBuf buffer) { throw expectedError; } }; @@ -560,7 +564,8 @@ public void mergeWithCompositeTail_tailNotExpandable_mergedReleaseOnThrow() { CompositeByteBuf compositeRo = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE, tail.asReadOnly()) { @Override - public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) { + public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, + ByteBuf buffer) { throw expectedError; } }; @@ -614,16 +619,20 @@ public void mergeWithCompositeTail_outOfSyncComposite() { ByteBuf buf = alloc.buffer(32).writeBytes("---01234".getBytes(US_ASCII)); // Start with a regular cumulation and add the buf as the only component. - CompositeByteBuf composite1 = alloc.compositeBuffer(8).addComponent(true, buf); + CompositeByteBuf composite1 = alloc.compositeBuffer(8).addFlattenedComponents(true, buf); // Read composite1 buf to the beginning of the numbers. assertThat(composite1.readCharSequence(3, US_ASCII).toString()).isEqualTo("---"); // Wrap composite1 into another cumulation. This is similar to // what NettyAdaptiveCumulator.cumulate() does in the case the cumulation has refCnt != 1. CompositeByteBuf composite2 = - alloc.compositeBuffer(8).addComponent(true, composite1); + alloc.compositeBuffer(8).addFlattenedComponents(true, composite1); assertThat(composite2.toString(US_ASCII)).isEqualTo("01234"); + if (!GrpcHttp2ConnectionHandler.usingPre4_1_111_Netty()) { + return; // Netty 4.1.111 doesn't work with NettyAdaptiveCumulator + } + // The previous operation does not adjust the read indexes of the underlying buffers, // only the internal Component offsets. When the cumulator attempts to append the input to // the tail buffer, it extracts it from the cumulation, writes to it, and then adds it back. @@ -637,27 +646,13 @@ public void mergeWithCompositeTail_outOfSyncComposite() { CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite2, ByteBufUtil.writeAscii(alloc, "56789")); assertThat(cumulation.toString(US_ASCII)).isEqualTo("0123456789"); - } - - @Test - public void mergeWithNonCompositeTail() { - NettyAdaptiveCumulator cumulator = new NettyAdaptiveCumulator(1024); - ByteBufAllocator alloc = new PooledByteBufAllocator(); - ByteBuf buf = alloc.buffer().writeBytes("tail".getBytes(US_ASCII)); - ByteBuf in = alloc.buffer().writeBytes("-012345".getBytes(US_ASCII)); - - CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, buf); - - CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite, in); - assertEquals("tail-012345", cumulation.toString(US_ASCII)); - assertEquals(0, in.refCnt()); - assertEquals(1, cumulation.numComponents()); - - buf.setByte(2, '*').setByte(7, '$'); - assertEquals("ta*l-01$345", cumulation.toString(US_ASCII)); - - composite.release(); + // Correctness check: we still have a single component, and this component is still the + // original underlying buffer. + assertThat(cumulation.numComponents()).isEqualTo(1); + // Replace '2' with '*', and '8' with '$'. + buf.setByte(5, '*').setByte(11, '$'); + assertThat(cumulation.toString(US_ASCII)).isEqualTo("01*34567$9"); } } }