Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Chunk direct buffer usage by networking layer
Today, due to how netty works (both on http layer and transport layer), and even though the buffers sent over to netty are paged (CompositeChannelBuffer), it ends up re-copying the whole buffer into another heap buffer (bad), and then send it over directly to sun.nio which allocates a full thread local direct buffer to send it (which can be repeated if not all message is sent). This is problematic for very large messages, aside from the extra heap temporal usage, the large direct buffers will stay around and not released by the JVM. This change forces the use of gathering when building a CompositeChannelBuffer, which results in netty using the sun.nio write method that accepts an array of ByteBuffer (so no extra heap copying), and also reduces the amount of direct memory allocated for large messages. See the doc on NettyUtils#DEFAULT_GATHERING for more info. closes #7811
- Loading branch information
Showing
7 changed files
with
262 additions
and
30 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
52 changes: 52 additions & 0 deletions
52
src/main/java/org/elasticsearch/http/netty/ESHttpResponseEncoder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.http.netty; | ||
|
||
import org.elasticsearch.common.netty.NettyUtils; | ||
import org.jboss.netty.buffer.ChannelBuffer; | ||
import org.jboss.netty.buffer.ChannelBuffers; | ||
import org.jboss.netty.buffer.CompositeChannelBuffer; | ||
import org.jboss.netty.channel.Channel; | ||
import org.jboss.netty.channel.ChannelHandlerContext; | ||
import org.jboss.netty.handler.codec.http.HttpResponseEncoder; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* Wraps a netty {@link HttpResponseEncoder} and makes sure that if the resulting | ||
* channel buffer is composite, it will use the correct gathering flag. See more | ||
* at {@link NettyUtils#DEFAULT_GATHERING}. | ||
*/ | ||
public class ESHttpResponseEncoder extends HttpResponseEncoder { | ||
|
||
@Override | ||
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { | ||
Object retVal = super.encode(ctx, channel, msg); | ||
if (retVal instanceof CompositeChannelBuffer) { | ||
CompositeChannelBuffer ccb = (CompositeChannelBuffer) retVal; | ||
if (ccb.useGathering() != NettyUtils.DEFAULT_GATHERING) { | ||
List<ChannelBuffer> decompose = ccb.decompose(ccb.readerIndex(), ccb.readableBytes()); | ||
return ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, | ||
decompose.toArray(new ChannelBuffer[decompose.size()])); | ||
} | ||
} | ||
return retVal; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
148 changes: 148 additions & 0 deletions
148
src/test/java/org/elasticsearch/network/DirectBufferNetworkTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.network; | ||
|
||
import org.apache.http.impl.client.HttpClients; | ||
import org.apache.lucene.util.TestUtil; | ||
import org.elasticsearch.action.index.IndexRequestBuilder; | ||
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.common.transport.InetSocketTransportAddress; | ||
import org.elasticsearch.common.unit.ByteSizeValue; | ||
import org.elasticsearch.common.xcontent.XContentBuilder; | ||
import org.elasticsearch.common.xcontent.XContentFactory; | ||
import org.elasticsearch.http.HttpServerTransport; | ||
import org.elasticsearch.test.ElasticsearchIntegrationTest; | ||
import org.elasticsearch.test.rest.client.http.HttpRequestBuilder; | ||
import org.hamcrest.Matchers; | ||
import org.junit.Test; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.lang.reflect.Field; | ||
import java.net.InetSocketAddress; | ||
import java.nio.ByteBuffer; | ||
|
||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; | ||
|
||
/** | ||
*/ | ||
public class DirectBufferNetworkTests extends ElasticsearchIntegrationTest { | ||
|
||
/** | ||
* This test validates that using large data sets (large docs + large API requests) don't | ||
* cause a large direct byte buffer to be allocated internally in the sun.nio buffer cache. | ||
* <p/> | ||
* See {@link org.elasticsearch.common.netty.NettyUtils#DEFAULT_GATHERING} for more info. | ||
*/ | ||
@Test | ||
public void verifySaneDirectBufferAllocations() throws Exception { | ||
createIndex("test"); | ||
|
||
int estimatedBytesSize = scaledRandomIntBetween(ByteSizeValue.parseBytesSizeValue("1.1mb").bytesAsInt(), ByteSizeValue.parseBytesSizeValue("1.5mb").bytesAsInt()); | ||
byte[] data = new byte[estimatedBytesSize]; | ||
getRandom().nextBytes(data); | ||
|
||
ByteArrayOutputStream docOut = new ByteArrayOutputStream(); | ||
// we use smile to automatically use the binary mapping | ||
XContentBuilder doc = XContentFactory.smileBuilder(docOut).startObject().startObject("doc").field("value", data).endObject(); | ||
doc.close(); | ||
byte[] docBytes = docOut.toByteArray(); | ||
|
||
int numDocs = randomIntBetween(2, 5); | ||
logger.info("indexing [{}] docs, each with size [{}]", numDocs, estimatedBytesSize); | ||
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; | ||
for (int i = 0; i < numDocs; ++i) { | ||
builders[i] = client().prepareIndex("test", "type").setSource(docBytes); | ||
} | ||
indexRandom(true, builders); | ||
logger.info("done indexing"); | ||
|
||
logger.info("executing random client search for all docs"); | ||
assertHitCount(client().prepareSearch("test").setFrom(0).setSize(numDocs).get(), numDocs); | ||
logger.info("executing transport client search for all docs"); | ||
assertHitCount(internalCluster().transportClient().prepareSearch("test").setFrom(0).setSize(numDocs).get(), numDocs); | ||
|
||
logger.info("executing HTTP search for all docs"); | ||
// simulate large HTTP call as well | ||
httpClient().method("GET").path("/test/_search").addParam("size", Integer.toString(numDocs)).execute(); | ||
|
||
logger.info("validating large direct buffer not allocated"); | ||
validateNoLargeDirectBufferAllocated(); | ||
} | ||
|
||
private static HttpRequestBuilder httpClient() { | ||
HttpServerTransport httpServerTransport = internalCluster().getDataNodeInstance(HttpServerTransport.class); | ||
InetSocketAddress address = ((InetSocketTransportAddress) httpServerTransport.boundAddress().publishAddress()).address(); | ||
return new HttpRequestBuilder(HttpClients.createDefault()).host(address.getHostName()).port(address.getPort()); | ||
} | ||
|
||
/** | ||
* Validates that all the thread local allocated ByteBuffer in sun.nio under the Util$BufferCache | ||
* are not greater than 1mb. | ||
*/ | ||
private void validateNoLargeDirectBufferAllocated() throws Exception { | ||
// Make the fields in the Thread class that store ThreadLocals | ||
// accessible | ||
Field threadLocalsField = Thread.class.getDeclaredField("threadLocals"); | ||
threadLocalsField.setAccessible(true); | ||
// Make the underlying array of ThreadLoad.ThreadLocalMap.Entry objects | ||
// accessible | ||
Class<?> tlmClass = Class.forName("java.lang.ThreadLocal$ThreadLocalMap"); | ||
Field tableField = tlmClass.getDeclaredField("table"); | ||
tableField.setAccessible(true); | ||
|
||
for (Thread thread : Thread.getAllStackTraces().keySet()) { | ||
if (thread == null) { | ||
continue; | ||
} | ||
Object threadLocalMap = threadLocalsField.get(thread); | ||
if (threadLocalMap == null) { | ||
continue; | ||
} | ||
Object[] table = (Object[]) tableField.get(threadLocalMap); | ||
if (table == null) { | ||
continue; | ||
} | ||
for (Object entry : table) { | ||
if (entry == null) { | ||
continue; | ||
} | ||
Field valueField = entry.getClass().getDeclaredField("value"); | ||
valueField.setAccessible(true); | ||
Object value = valueField.get(entry); | ||
if (value == null) { | ||
continue; | ||
} | ||
if (!value.getClass().getName().equals("sun.nio.ch.Util$BufferCache")) { | ||
continue; | ||
} | ||
Field buffersField = value.getClass().getDeclaredField("buffers"); | ||
buffersField.setAccessible(true); | ||
Object[] buffers = (Object[]) buffersField.get(value); | ||
for (Object buffer : buffers) { | ||
if (buffer == null) { | ||
continue; | ||
} | ||
assertThat(((ByteBuffer) buffer).capacity(), Matchers.lessThan(1 * 1024 * 1024)); | ||
} | ||
} | ||
} | ||
|
||
} | ||
} |