Skip to content

Commit

Permalink
Split up large HTTP responses in outbound pipeline (#62666)
Browse files Browse the repository at this point in the history
Currently Netty will batch compression an entire HTTP response
regardless of its content size. It allocates a byte array at least of
the same size as the uncompressed content. This causes issues with our
attempts to remove humungous G1GC allocations. This commit resolves the
issue by split responses into 128KB chunks.

This has the side-effect of making large outbound HTTP responses that
are compressed be send as chunked transfer-encoding.
  • Loading branch information
Tim-Brooks committed Sep 24, 2020
1 parent 43a4882 commit 59dd889
Show file tree
Hide file tree
Showing 11 changed files with 323 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.http.netty4;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.compression.JdkZlibEncoder;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpResponse;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.transport.NettyAllocator;

import java.util.List;

/**
* Split up large responses to prevent batch compression {@link JdkZlibEncoder} down the pipeline.
*/
@ChannelHandler.Sharable
class Netty4HttpResponseCreator extends MessageToMessageEncoder<Netty4HttpResponse> {

private static final String DO_NOT_SPLIT = "es.unsafe.do_not_split_http_responses";

private static final boolean DO_NOT_SPLIT_HTTP_RESPONSES;
private static final int SPLIT_THRESHOLD;

static {
DO_NOT_SPLIT_HTTP_RESPONSES = Booleans.parseBoolean(System.getProperty(DO_NOT_SPLIT), false);
// Netty will add some header bytes if it compresses this message. So we downsize slightly.
SPLIT_THRESHOLD = (int) (NettyAllocator.suggestedMaxAllocationSize() * 0.99);
}

@Override
protected void encode(ChannelHandlerContext ctx, Netty4HttpResponse msg, List<Object> out) {
if (DO_NOT_SPLIT_HTTP_RESPONSES || msg.content().readableBytes() <= SPLIT_THRESHOLD) {
out.add(msg.retain());
} else {
HttpResponse response = new DefaultHttpResponse(msg.protocolVersion(), msg.status(), msg.headers());
out.add(response);
ByteBuf content = msg.content();
while (content.readableBytes() > SPLIT_THRESHOLD) {
out.add(new DefaultHttpContent(content.readRetainedSlice(SPLIT_THRESHOLD)));
}
out.add(new DefaultLastHttpContent(content.readRetainedSlice(content.readableBytes())));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,15 @@ protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
private final Netty4HttpServerTransport transport;
private final Netty4HttpRequestCreator requestCreator;
private final Netty4HttpRequestHandler requestHandler;
private final Netty4HttpResponseCreator responseCreator;
private final HttpHandlingSettings handlingSettings;

protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) {
this.transport = transport;
this.handlingSettings = handlingSettings;
this.requestCreator = new Netty4HttpRequestCreator();
this.requestHandler = new Netty4HttpRequestHandler(transport);
this.responseCreator = new Netty4HttpResponseCreator();
}

@Override
Expand All @@ -314,6 +316,7 @@ protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}
ch.pipeline().addLast("request_creator", requestCreator);
ch.pipeline().addLast("response_creator", responseCreator);
ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
ch.pipeline().addLast("handler", requestHandler);
transport.serverAcceptedChannel(nettyHttpChannel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,17 @@ private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int old
private static void copyBytes(ByteBuffer[] source, int nioBufferCnt, ByteBuffer destination) {
for (int i = 0; i < nioBufferCnt && destination.hasRemaining(); i++) {
ByteBuffer buffer = source[i];
assert buffer.hasArray() : "Buffer must have heap array";
int nBytesToCopy = Math.min(destination.remaining(), buffer.remaining());
destination.put(buffer.array(), buffer.arrayOffset() + buffer.position(), nBytesToCopy);
if (buffer.hasArray()) {
destination.put(buffer.array(), buffer.arrayOffset() + buffer.position(), nBytesToCopy);
} else {
int initialLimit = buffer.limit();
int initialPosition = buffer.position();
buffer.limit(buffer.position() + nBytesToCopy);
destination.put(buffer);
buffer.position(initialPosition);
buffer.limit(initialLimit);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class NettyAllocator {
private static final Logger logger = LogManager.getLogger(NettyAllocator.class);
private static final AtomicBoolean descriptionLogged = new AtomicBoolean(false);

private static final long SUGGESTED_MAX_ALLOCATION_SIZE;
private static final ByteBufAllocator ALLOCATOR;
private static final String DESCRIPTION;

Expand All @@ -50,7 +51,9 @@ public class NettyAllocator {
static {
if (Booleans.parseBoolean(System.getProperty(USE_NETTY_DEFAULT), false)) {
ALLOCATOR = ByteBufAllocator.DEFAULT;
DESCRIPTION = "[name=netty_default, factors={es.unsafe.use_netty_default_allocator=true}]";
SUGGESTED_MAX_ALLOCATION_SIZE = 1024 * 1024;
DESCRIPTION = "[name=netty_default, suggested_max_allocation_size=" + new ByteSizeValue(SUGGESTED_MAX_ALLOCATION_SIZE)
+ ", factors={es.unsafe.use_netty_default_allocator=true}]";
} else {
final long heapSizeInBytes = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();
final boolean g1gcEnabled = Boolean.parseBoolean(JvmInfo.jvmInfo().useG1GC());
Expand All @@ -62,7 +65,15 @@ public class NettyAllocator {
ByteBufAllocator delegate;
if (useUnpooled(heapSizeInBytes, g1gcEnabled, g1gcRegionSizeIsKnown, g1gcRegionSizeInBytes)) {
delegate = UnpooledByteBufAllocator.DEFAULT;
DESCRIPTION = "[name=unpooled, factors={es.unsafe.use_unpooled_allocator=" + userForcedUnpooled()
if (g1gcEnabled && g1gcRegionSizeIsKnown) {
// Suggested max allocation size 1/4 of region size. Guard against unknown edge cases
// where this value would be less than 256KB.
SUGGESTED_MAX_ALLOCATION_SIZE = Math.max(g1gcRegionSizeInBytes >> 2, 256 * 1024);
} else {
SUGGESTED_MAX_ALLOCATION_SIZE = 1024 * 1024;
}
DESCRIPTION = "[name=unpooled, suggested_max_allocation_size=" + new ByteSizeValue(SUGGESTED_MAX_ALLOCATION_SIZE)
+ ", factors={es.unsafe.use_unpooled_allocator=" + System.getProperty(USE_UNPOOLED)
+ ", g1gc_enabled=" + g1gcEnabled
+ ", g1gc_region_size=" + g1gcRegionSize
+ ", heap_size=" + heapSize + "}]";
Expand Down Expand Up @@ -92,8 +103,11 @@ public class NettyAllocator {
boolean useCacheForAllThreads = PooledByteBufAllocator.defaultUseCacheForAllThreads();
delegate = new PooledByteBufAllocator(false, nHeapArena, 0, pageSize, maxOrder, tinyCacheSize,
smallCacheSize, normalCacheSize, useCacheForAllThreads);
ByteSizeValue chunkSize = new ByteSizeValue(pageSize << maxOrder);
int chunkSizeInBytes = pageSize << maxOrder;
ByteSizeValue chunkSize = new ByteSizeValue(chunkSizeInBytes);
SUGGESTED_MAX_ALLOCATION_SIZE = chunkSizeInBytes;
DESCRIPTION = "[name=elasticsearch_configured, chunk_size=" + chunkSize
+ ", suggested_max_allocation_size=" + new ByteSizeValue(SUGGESTED_MAX_ALLOCATION_SIZE)
+ ", factors={es.unsafe.use_netty_default_chunk_and_page_size=" + useDefaultChunkAndPageSize()
+ ", g1gc_enabled=" + g1gcEnabled
+ ", g1gc_region_size=" + g1gcRegionSize + "}]";
Expand All @@ -112,6 +126,10 @@ public static ByteBufAllocator getAllocator() {
return ALLOCATOR;
}

public static long suggestedMaxAllocationSize() {
return SUGGESTED_MAX_ALLOCATION_SIZE;
}

public static String getAllocatorDescription() {
return DESCRIPTION;
}
Expand All @@ -135,6 +153,8 @@ public static Class<? extends ServerChannel> getServerChannelType() {
private static boolean useUnpooled(long heapSizeInBytes, boolean g1gcEnabled, boolean g1gcRegionSizeIsKnown, long g1RegionSize) {
if (userForcedUnpooled()) {
return true;
} else if (userForcedPooled()) {
return true;
} else if (heapSizeInBytes <= 1 << 30) {
// If the heap is 1GB or less we use unpooled
return true;
Expand All @@ -155,6 +175,14 @@ private static boolean userForcedUnpooled() {
}
}

private static boolean userForcedPooled() {
if (System.getProperty(USE_UNPOOLED) != null) {
return Booleans.parseBoolean(System.getProperty(USE_UNPOOLED)) == false;
} else {
return false;
}
}

private static boolean useDefaultChunkAndPageSize() {
if (System.getProperty(USE_NETTY_DEFAULT_CHUNK) != null) {
return Booleans.parseBoolean(System.getProperty(USE_NETTY_DEFAULT_CHUNK));
Expand All @@ -163,7 +191,7 @@ private static boolean useDefaultChunkAndPageSize() {
}
}

private static class NoDirectBuffers implements ByteBufAllocator {
public static class NoDirectBuffers implements ByteBufAllocator {

private final ByteBufAllocator delegate;

Expand Down Expand Up @@ -271,5 +299,9 @@ public boolean isDirectBufferPooled() {
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
return delegate.calculateNewCapacity(minNewCapacity, maxCapacity);
}

public ByteBufAllocator getDelegate() {
return delegate;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObject;
Expand Down Expand Up @@ -91,8 +92,8 @@ static Collection<String> returnOpaqueIds(Collection<FullHttpResponse> responses
.group(new NioEventLoopGroup(1));
}

public Collection<FullHttpResponse> get(SocketAddress remoteAddress, String... uris) throws InterruptedException {
Collection<HttpRequest> requests = new ArrayList<>(uris.length);
public List<FullHttpResponse> get(SocketAddress remoteAddress, String... uris) throws InterruptedException {
List<HttpRequest> requests = new ArrayList<>(uris.length);
for (int i = 0; i < uris.length; i++) {
final HttpRequest httpRequest = new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]);
httpRequest.headers().add(HOST, "localhost");
Expand All @@ -107,20 +108,20 @@ public final Collection<FullHttpResponse> post(SocketAddress remoteAddress, List
return processRequestsWithBody(HttpMethod.POST, remoteAddress, urisAndBodies);
}

public final FullHttpResponse post(SocketAddress remoteAddress, FullHttpRequest httpRequest) throws InterruptedException {
Collection<FullHttpResponse> responses = sendRequests(remoteAddress, Collections.singleton(httpRequest));
public final FullHttpResponse send(SocketAddress remoteAddress, FullHttpRequest httpRequest) throws InterruptedException {
List<FullHttpResponse> responses = sendRequests(remoteAddress, Collections.singleton(httpRequest));
assert responses.size() == 1 : "expected 1 and only 1 http response";
return responses.iterator().next();
return responses.get(0);
}

public final Collection<FullHttpResponse> put(SocketAddress remoteAddress, List<Tuple<String, CharSequence>> urisAndBodies)
throws InterruptedException {
return processRequestsWithBody(HttpMethod.PUT, remoteAddress, urisAndBodies);
}

private Collection<FullHttpResponse> processRequestsWithBody(HttpMethod method, SocketAddress remoteAddress, List<Tuple<String,
private List<FullHttpResponse> processRequestsWithBody(HttpMethod method, SocketAddress remoteAddress, List<Tuple<String,
CharSequence>> urisAndBodies) throws InterruptedException {
Collection<HttpRequest> requests = new ArrayList<>(urisAndBodies.size());
List<HttpRequest> requests = new ArrayList<>(urisAndBodies.size());
for (Tuple<String, CharSequence> uriAndBody : urisAndBodies) {
ByteBuf content = Unpooled.copiedBuffer(uriAndBody.v2(), StandardCharsets.UTF_8);
HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uriAndBody.v1(), content);
Expand All @@ -132,11 +133,11 @@ private Collection<FullHttpResponse> processRequestsWithBody(HttpMethod method,
return sendRequests(remoteAddress, requests);
}

private synchronized Collection<FullHttpResponse> sendRequests(
private synchronized List<FullHttpResponse> sendRequests(
final SocketAddress remoteAddress,
final Collection<HttpRequest> requests) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(requests.size());
final Collection<FullHttpResponse> content = Collections.synchronizedList(new ArrayList<>(requests.size()));
final List<FullHttpResponse> content = Collections.synchronizedList(new ArrayList<>(requests.size()));

clientBootstrap.handler(new CountDownLatchHandler(latch, content));

Expand Down Expand Up @@ -180,16 +181,20 @@ private static class CountDownLatchHandler extends ChannelInitializer<SocketChan
}

@Override
protected void initChannel(SocketChannel ch) throws Exception {
protected void initChannel(SocketChannel ch) {
final int maxContentLength = new ByteSizeValue(100, ByteSizeUnit.MB).bytesAsInt();
ch.pipeline().addLast(new HttpResponseDecoder());
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(new HttpContentDecompressor());
ch.pipeline().addLast(new HttpObjectAggregator(maxContentLength));
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpObject>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
final FullHttpResponse response = (FullHttpResponse) msg;
content.add(response.copy());
// We copy the buffer manually to avoid a huge allocation on a pooled allocator. We have
// a test that tracks huge allocations, so we want to avoid them in this test code.
ByteBuf newContent = Unpooled.copiedBuffer(((FullHttpResponse) msg).content());
content.add(response.replace(newContent));
latch.countDown();
}

Expand Down

0 comments on commit 59dd889

Please sign in to comment.