Skip to content

Commit

Permalink
Chunk direct buffer usage by networking layer
Browse files Browse the repository at this point in the history
Today, due to how netty works (both on http layer and transport layer), and even though the buffers sent over to netty are paged (CompositeChannelBuffer), it ends up re-copying the whole buffer into another heap buffer (bad), and then send it over directly to sun.nio which allocates a full thread local direct buffer to send it (which can be repeated if not all message is sent).
  This is problematic for very large messages, aside from the extra heap temporal usage, the large direct buffers will stay around and not released by the JVM.
  This change forces the use of gathering when building a CompositeChannelBuffer, which results in netty using the sun.nio write method that accepts an array of ByteBuffer (so no extra heap copying), and also reduces the amount of direct memory allocated for large messages.
  See the doc on NettyUtils#DEFAULT_GATHERING for more info.
closes #7811
  • Loading branch information
kimchy committed Sep 23, 2014
1 parent c4b2fab commit 9c20c3b
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 30 deletions.
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.netty.NettyUtils;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.jboss.netty.buffer.ChannelBuffer;
Expand All @@ -42,7 +43,6 @@
public class PagedBytesReference implements BytesReference {

private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;
private static final int NIO_GATHERING_LIMIT = 524288;

private final BigArrays bigarrays;
protected final ByteArray bytearray;
Expand Down Expand Up @@ -230,9 +230,7 @@ public ChannelBuffer toChannelBuffer() {
// this would indicate that our numBuffer calculation is off by one.
assert (numBuffers == bufferSlot);

// we can use gathering writes from the ChannelBuffers, but only if they are
// moderately small to prevent OOMs due to DirectBuffer allocations.
return ChannelBuffers.wrappedBuffer(length <= NIO_GATHERING_LIMIT, buffers);
return ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, buffers);
}

@Override
Expand Down
77 changes: 55 additions & 22 deletions src/main/java/org/elasticsearch/common/netty/NettyUtils.java
Expand Up @@ -18,22 +18,64 @@
*/
package org.elasticsearch.common.netty;

import com.google.common.collect.Lists;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.transport.netty.NettyInternalESLoggerFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.buffer.CompositeChannelBuffer;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable;

import java.util.List;

/**
*/
public class NettyUtils {

/**
* Here we go....
* <p/>
* When using the socket or file channel API to write or read using heap ByteBuffer, the sun.nio
* package will convert it to a direct buffer before doing the actual operation. The direct buffer is
* cached on an array of buffers under the nio.ch.Util$BufferCache on a thread local.
* <p/>
* In netty specifically, if we send a single ChannelBuffer that is bigger than
* SocketSendBufferPool#DEFAULT_PREALLOCATION_SIZE (64kb), it will just convert the ChannelBuffer
* to a ByteBuffer and send it. The problem is, that then same size DirectByteBuffer will be
* allocated (or reused) and kept around on a thread local in the sun.nio BufferCache. If very
* large buffer is sent, imagine a 10mb one, then a 10mb direct buffer will be allocated as an
* entry within the thread local buffers.
* <p/>
* In ES, we try and page the buffers allocated, all serialized data uses {@link org.elasticsearch.common.bytes.PagedBytesReference}
* typically generated from {@link org.elasticsearch.common.io.stream.BytesStreamOutput}. When sending it over
* to netty, it creates a {@link org.jboss.netty.buffer.CompositeChannelBuffer} that wraps the relevant pages.
* <p/>
* The idea with the usage of composite channel buffer is that a single large buffer will not be sent over
* to the sun.nio layer. But, this will only happen if the composite channel buffer is created with a gathering
* flag set to true. In such a case, the GatheringSendBuffer is used in netty, resulting in calling the sun.nio
* layer with a ByteBuffer array.
* <p/>
* This, potentially would have been as disastrous if the sun.nio layer would have tried to still copy over
* all of it to a direct buffer. But, the write(ByteBuffer[]) API (see sun.nio.ch.IOUtil), goes one buffer
* at a time, and gets a temporary direct buffer from the BufferCache, up to a limit of IOUtil#IOV_MAX (which
* is 1024 on most OSes). This means that there will be a max of 1024 direct buffer per thread.
* <p/>
* This is still less than optimal to be honest, since it means that if not all data was written successfully
* (1024 paged buffers), then the rest of the data will need to be copied over again to the direct buffer
* and re-transmitted, but its much better than trying to send the full large buffer over and over again.
* <p/>
* In ES, we use by default, in our paged data structures, a page of 16kb, so this is not so terrible.
* <p/>
* Note, on the read size of netty, it uses a single direct buffer that is defined in both the transport
* and http configuration (based on the direct memory available), and the upstream handlers (SizeHeaderFrameDecoder,
* or more specifically the FrameDecoder base class) makes sure to use a cumulation buffer and not copy it
* over all the time.
* <p/>
* TODO: potentially, a more complete solution would be to write a netty channel handler that is the last
* in the pipeline, and if the buffer is composite, verifies that its a gathering one with reasonable
* sized pages, and if its a single one, makes sure that it gets sliced and wrapped in a composite
* buffer.
*/
public static final boolean DEFAULT_GATHERING;

private static EsThreadNameDeterminer ES_THREAD_NAME_DETERMINER = new EsThreadNameDeterminer();

public static class EsThreadNameDeterminer implements ThreadNameDeterminer {
Expand All @@ -53,25 +95,16 @@ public InternalLogger newInstance(String name) {
});

ThreadRenamingRunnable.setThreadNameDeterminer(ES_THREAD_NAME_DETERMINER);

/**
* This is here just to give us an option to rollback the change, if its stable, we should remove
* the option to even set it.
*/
DEFAULT_GATHERING = Booleans.parseBoolean(System.getProperty("es.netty.gathering"), true);
Loggers.getLogger(NettyUtils.class).debug("using gathering [{}]", DEFAULT_GATHERING);
}

public static void setup() {

}

public static ChannelBuffer buildComposite(boolean useGathering, ChannelBuffer... buffers) {
if (buffers == null || buffers.length == 0) {
return ChannelBuffers.EMPTY_BUFFER;
}
List<ChannelBuffer> list = Lists.newArrayList();
for (ChannelBuffer buffer : buffers) {
if (buffer instanceof CompositeChannelBuffer) {
CompositeChannelBuffer compBuffer = (CompositeChannelBuffer) buffer;
list.addAll(compBuffer.decompose(0, compBuffer.readableBytes()));
} else {
list.add(buffer);
}
}
return new CompositeChannelBuffer(buffers[0].order(), list, useGathering);
}
}
@@ -0,0 +1,52 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.http.netty;

import org.elasticsearch.common.netty.NettyUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.buffer.CompositeChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;

import java.util.List;

/**
* Wraps a netty {@link HttpResponseEncoder} and makes sure that if the resulting
* channel buffer is composite, it will use the correct gathering flag. See more
* at {@link NettyUtils#DEFAULT_GATHERING}.
*/
public class ESHttpResponseEncoder extends HttpResponseEncoder {

@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
Object retVal = super.encode(ctx, channel, msg);
if (retVal instanceof CompositeChannelBuffer) {
CompositeChannelBuffer ccb = (CompositeChannelBuffer) retVal;
if (ccb.useGathering() != NettyUtils.DEFAULT_GATHERING) {
List<ChannelBuffer> decompose = ccb.decompose(ccb.readerIndex(), ccb.readableBytes());
return ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING,
decompose.toArray(new ChannelBuffer[decompose.size()]));
}
}
return retVal;
}
}
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.netty.NettyUtils;
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.rest.RestResponse;
Expand Down Expand Up @@ -148,7 +149,7 @@ public void sendResponse(RestResponse response) {
final BytesRef callbackBytes = new BytesRef(callback);
callbackBytes.bytes[callbackBytes.length] = '(';
callbackBytes.length++;
buffer = ChannelBuffers.wrappedBuffer(
buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING,
ChannelBuffers.wrappedBuffer(callbackBytes.bytes, callbackBytes.offset, callbackBytes.length),
buffer,
ChannelBuffers.wrappedBuffer(END_JSONP)
Expand Down
Expand Up @@ -46,7 +46,6 @@
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpContentCompressor;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.timeout.ReadTimeoutException;

import java.io.IOException;
Expand Down Expand Up @@ -367,7 +366,7 @@ public ChannelPipeline getPipeline() throws Exception {
httpChunkAggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
}
pipeline.addLast("aggregator", httpChunkAggregator);
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("encoder", new ESHttpResponseEncoder());
if (transport.compression) {
pipeline.addLast("encoder_compress", new HttpContentCompressor(transport.compressionLevel));
}
Expand Down
Expand Up @@ -56,6 +56,7 @@
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
Expand Down Expand Up @@ -551,7 +552,7 @@ public void sendRequest(final DiscoveryNode node, final long requestId, final St
bytes = bStream.bytes();
ChannelBuffer headerBuffer = bytes.toChannelBuffer();
ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
buffer = NettyUtils.buildComposite(false, headerBuffer, contentBuffer);
buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, headerBuffer, contentBuffer);
} else {
request.writeTo(stream);
stream.close();
Expand Down
148 changes: 148 additions & 0 deletions src/test/java/org/elasticsearch/network/DirectBufferNetworkTests.java
@@ -0,0 +1,148 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.network;

import org.apache.http.impl.client.HttpClients;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.rest.client.http.HttpRequestBuilder;
import org.hamcrest.Matchers;
import org.junit.Test;

import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;

/**
*/
public class DirectBufferNetworkTests extends ElasticsearchIntegrationTest {

/**
* This test validates that using large data sets (large docs + large API requests) don't
* cause a large direct byte buffer to be allocated internally in the sun.nio buffer cache.
* <p/>
* See {@link org.elasticsearch.common.netty.NettyUtils#DEFAULT_GATHERING} for more info.
*/
@Test
public void verifySaneDirectBufferAllocations() throws Exception {
createIndex("test");

int estimatedBytesSize = scaledRandomIntBetween(ByteSizeValue.parseBytesSizeValue("1.1mb").bytesAsInt(), ByteSizeValue.parseBytesSizeValue("1.5mb").bytesAsInt());
byte[] data = new byte[estimatedBytesSize];
getRandom().nextBytes(data);

ByteArrayOutputStream docOut = new ByteArrayOutputStream();
// we use smile to automatically use the binary mapping
XContentBuilder doc = XContentFactory.smileBuilder(docOut).startObject().startObject("doc").field("value", data).endObject();
doc.close();
byte[] docBytes = docOut.toByteArray();

int numDocs = randomIntBetween(2, 5);
logger.info("indexing [{}] docs, each with size [{}]", numDocs, estimatedBytesSize);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; ++i) {
builders[i] = client().prepareIndex("test", "type").setSource(docBytes);
}
indexRandom(true, builders);
logger.info("done indexing");

logger.info("executing random client search for all docs");
assertHitCount(client().prepareSearch("test").setFrom(0).setSize(numDocs).get(), numDocs);
logger.info("executing transport client search for all docs");
assertHitCount(internalCluster().transportClient().prepareSearch("test").setFrom(0).setSize(numDocs).get(), numDocs);

logger.info("executing HTTP search for all docs");
// simulate large HTTP call as well
httpClient().method("GET").path("/test/_search").addParam("size", Integer.toString(numDocs)).execute();

logger.info("validating large direct buffer not allocated");
validateNoLargeDirectBufferAllocated();
}

private static HttpRequestBuilder httpClient() {
HttpServerTransport httpServerTransport = internalCluster().getDataNodeInstance(HttpServerTransport.class);
InetSocketAddress address = ((InetSocketTransportAddress) httpServerTransport.boundAddress().publishAddress()).address();
return new HttpRequestBuilder(HttpClients.createDefault()).host(address.getHostName()).port(address.getPort());
}

/**
* Validates that all the thread local allocated ByteBuffer in sun.nio under the Util$BufferCache
* are not greater than 1mb.
*/
private void validateNoLargeDirectBufferAllocated() throws Exception {
// Make the fields in the Thread class that store ThreadLocals
// accessible
Field threadLocalsField = Thread.class.getDeclaredField("threadLocals");
threadLocalsField.setAccessible(true);
// Make the underlying array of ThreadLoad.ThreadLocalMap.Entry objects
// accessible
Class<?> tlmClass = Class.forName("java.lang.ThreadLocal$ThreadLocalMap");
Field tableField = tlmClass.getDeclaredField("table");
tableField.setAccessible(true);

for (Thread thread : Thread.getAllStackTraces().keySet()) {
if (thread == null) {
continue;
}
Object threadLocalMap = threadLocalsField.get(thread);
if (threadLocalMap == null) {
continue;
}
Object[] table = (Object[]) tableField.get(threadLocalMap);
if (table == null) {
continue;
}
for (Object entry : table) {
if (entry == null) {
continue;
}
Field valueField = entry.getClass().getDeclaredField("value");
valueField.setAccessible(true);
Object value = valueField.get(entry);
if (value == null) {
continue;
}
if (!value.getClass().getName().equals("sun.nio.ch.Util$BufferCache")) {
continue;
}
Field buffersField = value.getClass().getDeclaredField("buffers");
buffersField.setAccessible(true);
Object[] buffers = (Object[]) buffersField.get(value);
for (Object buffer : buffers) {
if (buffer == null) {
continue;
}
assertThat(((ByteBuffer) buffer).capacity(), Matchers.lessThan(1 * 1024 * 1024));
}
}
}

}
}

0 comments on commit 9c20c3b

Please sign in to comment.