diff --git a/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java b/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java index e72ecbb7d4e0..f493e18d1983 100644 --- a/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java +++ b/netty/src/main/java/io/grpc/netty/NettyAdaptiveCumulator.java @@ -106,8 +106,10 @@ static boolean shouldCompose(CompositeByteBuf composite, ByteBuf in, int compose if (composite.numComponents() == 0) { return true; } - int tailSize = composite.capacity() - composite.toByteIndex(componentCount - 1); - return tailSize + in.readableBytes() >= composeMinSize; + int inputSize = in.readableBytes(); + int tailStart = composite.toByteIndex(componentCount - 1); + int tailSize = composite.writerIndex() - tailStart; + return tailSize + inputSize >= composeMinSize; } /** @@ -127,23 +129,20 @@ static boolean shouldCompose(CompositeByteBuf composite, ByteBuf in, int compose * is verified in unit tests for this method. */ @VisibleForTesting - static void mergeWithCompositeTail(ByteBufAllocator alloc, CompositeByteBuf composite, - ByteBuf in) { - - int newBytes = in.readableBytes(); - int tailIndex = composite.numComponents() - 1; - int tailStart = composite.toByteIndex(tailIndex); - int tailBytes = composite.capacity() - tailStart; - int totalBytes = newBytes + tailBytes; - - ByteBuf tail = composite.component(tailIndex); - ByteBuf merged = null; - + static void mergeWithCompositeTail( + ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) { + int inputSize = in.readableBytes(); + int tailComponentIndex = composite.numComponents() - 1; + int tailStart = composite.toByteIndex(tailComponentIndex); + int tailSize = composite.writerIndex() - tailStart; + int newTailSize = inputSize + tailSize; + ByteBuf tail = composite.component(tailComponentIndex); + ByteBuf newTail = null; try { - if (tail.refCnt() == 1 && !tail.isReadOnly() && totalBytes <= tail.maxCapacity()) { + if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()) { // Ideal case: the tail isn't shared, and can be expanded to the required capacity. // Take ownership of the tail. - merged = tail.retain(); + newTail = tail.retain(); /* * The tail is a readable non-composite buffer, so writeBytes() handles everything for us. * @@ -156,33 +155,37 @@ static void mergeWithCompositeTail(ByteBufAllocator alloc, CompositeByteBuf comp * Unwrapping buffers is unsafe, and potential benefit of fast writes may not be * as pronounced because the capacity is doubled with each reallocation. */ - merged.writeBytes(in); + newTail.writeBytes(in); } else { // The tail is shared, or not expandable. Replace it with a new buffer of desired capacity. - merged = alloc.buffer(alloc.calculateNewCapacity(totalBytes, Integer.MAX_VALUE)); - merged.setBytes(0, composite, tailStart, tailBytes) - .setBytes(tailBytes, in, in.readerIndex(), newBytes) - .writerIndex(totalBytes); + newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE)); + newTail.setBytes(0, composite, tailStart, tailSize) + .setBytes(tailSize, in, in.readerIndex(), inputSize) + .writerIndex(newTailSize); in.readerIndex(in.writerIndex()); } // Store readerIndex to avoid out of bounds writerIndex during component replacement. int prevReader = composite.readerIndex(); - // Remove the tail, reset writer index, add merged component. - composite.removeComponent(tailIndex).setIndex(0, tailStart) - .addFlattenedComponents(true, merged); - merged = null; + // Remove the old tail, reset writer index. + composite.removeComponent(tailComponentIndex).setIndex(0, tailStart); + // Add back the new tail. + composite.addFlattenedComponents(true, newTail); + // New tail's ownership transferred to the composite buf. + newTail = null; in.release(); in = null; - // Restore the reader. + // Restore the reader. In case it fails we restore the reader after releasing/forgetting + // the input and the new tail so that finally block can handles them properly. composite.readerIndex(prevReader); } finally { // Input buffer was merged with the tail. if (in != null) { in.release(); } - // If merge's ownership isn't transferred to the composite buf, release it to prevent a leak. - if (merged != null) { - merged.release(); + // If new tail's ownership isn't transferred to the composite buf. + // Release it to prevent a leak. + if (newTail != null) { + newTail.release(); } } } diff --git a/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java b/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java index 09cc88ff1f34..34ef9244a418 100644 --- a/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java @@ -374,7 +374,7 @@ public void mergeWithCompositeTail_tailExpandable_fastWrite() { public void mergeWithCompositeTail_tailExpandable_reallocateInMemory() { int tailFastCapacity = tail.writerIndex() + tail.maxFastWritableBytes(); String inSuffixOverFastBytes = Strings.repeat("a", tailFastCapacity + 1); - int totalBytes = tail.readableBytes() + inSuffixOverFastBytes.length(); + int newTailSize = tail.readableBytes() + inSuffixOverFastBytes.length(); composite.addFlattenedComponents(true, tail); // Make input larger than tailFastCapacity @@ -384,7 +384,7 @@ public void mergeWithCompositeTail_tailExpandable_reallocateInMemory() { assertThat(in.readableBytes()).isAtMost(tail.maxWritableBytes()); // Confirm the assumption that new capacity is produced by alloc.calculateNewCapacity(). - int expectedTailCapacity = alloc.calculateNewCapacity(totalBytes, Integer.MAX_VALUE); + int expectedTailCapacity = alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE); assertTailExpanded(EXPECTED_TAIL_DATA.concat(inSuffixOverFastBytes), expectedTailCapacity); } @@ -566,10 +566,10 @@ public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, }; // Return our instance of the new buffer to ensure it's released. - int totalBytes = tail.readableBytes() + in.readableBytes(); - ByteBuf merged = alloc.buffer(alloc.calculateNewCapacity(totalBytes, Integer.MAX_VALUE)); + int newTailSize = tail.readableBytes() + in.readableBytes(); + ByteBuf newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE)); ByteBufAllocator mockAlloc = mock(ByteBufAllocator.class); - when(mockAlloc.buffer(anyInt())).thenReturn(merged); + when(mockAlloc.buffer(anyInt())).thenReturn(newTail); try { NettyAdaptiveCumulator.mergeWithCompositeTail(mockAlloc, compositeRo, in); @@ -579,7 +579,7 @@ public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, // Input must be released unless its ownership has been to the composite cumulation. assertEquals(0, in.refCnt()); // New buffer released - assertEquals(0, merged.refCnt()); + assertEquals(0, newTail.refCnt()); // Composite cumulation is retained assertEquals(1, compositeRo.refCnt()); // Composite cumulation loses the tail