Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add byte array pooling to nio http transport #31349

Merged
merged 11 commits into from
Jun 15, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

/**
Expand All @@ -41,6 +42,7 @@ public final class InboundChannelBuffer implements AutoCloseable {
private static final int PAGE_MASK = PAGE_SIZE - 1;
private static final int PAGE_SHIFT = Integer.numberOfTrailingZeros(PAGE_SIZE);
private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
private static final Page[] EMPTY_BYTE_PAGE_ARRAY = new Page[0];


private final ArrayDeque<Page> pages;
Expand Down Expand Up @@ -152,6 +154,46 @@ public ByteBuffer[] sliceBuffersTo(long to) {
return buffers;
}

/**
* This method will return an array of {@link Page} representing the bytes from the beginning of
* this buffer up through the index argument that was passed. The pages and buffers will be duplicates of
* the internal components, so any modifications to the markers {@link ByteBuffer#position()},
* {@link ByteBuffer#limit()}, etc will not modify the this class. Additionally, this will internally
* retain the underlying pages, so the pages returned by this method must be closed.
*
* @param to the index to slice up to
* @return the pages
*/
public Page[] sliceAndRetainPagesTo(long to) {
if (to > capacity) {
throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity +
"], with slice parameters to [" + to + "]");
} else if (to == 0) {
return EMPTY_BYTE_PAGE_ARRAY;
}
long indexWithOffset = to + offset;
int pageCount = pageIndex(indexWithOffset);
int finalLimit = indexInPage(indexWithOffset);
if (finalLimit != 0) {
pageCount += 1;
}

Page[] pages = new Page[pageCount];
Iterator<Page> pageIterator = this.pages.iterator();
Page firstPage = pageIterator.next().duplicate();
ByteBuffer firstBuffer = firstPage.byteBuffer;
firstBuffer.position(firstBuffer.position() + offset);
pages[0] = firstPage;
for (int i = 1; i < pages.length; i++) {
pages[i] = pageIterator.next().duplicate();
}
if (finalLimit != 0) {
pages[pages.length - 1].byteBuffer.limit(finalLimit);
}

return pages;
}

/**
* This method will return an array of {@link ByteBuffer} representing the bytes from the index passed
* through the end of this buffer. The buffers will be duplicates of the internal buffers, so any
Expand Down Expand Up @@ -232,15 +274,34 @@ public static class Page implements AutoCloseable {

private final ByteBuffer byteBuffer;
private final Runnable closeable;
private final AtomicInteger referenceCount;

public Page(ByteBuffer byteBuffer, Runnable closeable) {
this(byteBuffer, closeable, new AtomicInteger(1));
}

private Page(ByteBuffer byteBuffer, Runnable closeable, AtomicInteger referenceCount) {
this.byteBuffer = byteBuffer;
this.closeable = closeable;
this.referenceCount = referenceCount;
}

private Page duplicate() {
referenceCount.incrementAndGet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little concerned about this. What if the referenceCount is already 0? I think this should implement AbstractRefCounted? I guess that is not available in this lib? I would love to move that class to core and replace the AlreadyClosedException with IllegalStateException and use it here?!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do wonder why we need to refCount this. I think it would be good to have some explanation why this is is needed. I also assume if we miss returning a page we will fail tests because not all byte blocks are released right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do wonder why we need to refCount this.

It's to integrate with netty. Netty can hold onto bytes internally and we don't know when it is down with them without overriding its buffer.

I also assume if we miss returning a page we will fail tests because not all byte blocks are released right?

Yeah normal mock page recycler stuff

I think this should implement AbstractRefCounted?

I will do this but it will cause the introduction of more layers of objects. Can Page be reference counted? Yes. But when a return a Page, it needs to be a different Page object because the ByteBuffer needs to be different. So internally we will need to create like a Releaser object that is reference counted that is passed around to the different pages.

return new Page(byteBuffer.duplicate(), closeable, referenceCount);
}

public ByteBuffer getByteBuffer() {
return byteBuffer;
}

@Override
public void close() {
closeable.run();
int newReferenceCount = referenceCount.decrementAndGet();
assert newReferenceCount >= 0 : "Reference count should never be less than 0. Found: [" + newReferenceCount + "].";
if (newReferenceCount == 0) {
closeable.run();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public class InboundChannelBufferTests extends ESTestCase {

private static final int PAGE_SIZE = BigArrays.PAGE_SIZE_IN_BYTES;
private final Supplier<InboundChannelBuffer.Page> defaultPageSupplier = () ->
new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {});
new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {
});

public void testNewBufferHasSinglePage() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);
Expand Down Expand Up @@ -167,6 +168,49 @@ public void testClose() {
expectThrows(IllegalStateException.class, () -> channelBuffer.ensureCapacity(1));
}

public void testCloseRetainedPages() {
ConcurrentLinkedQueue<AtomicBoolean> queue = new ConcurrentLinkedQueue<>();
Supplier<InboundChannelBuffer.Page> supplier = () -> {
AtomicBoolean atomicBoolean = new AtomicBoolean();
queue.add(atomicBoolean);
return new InboundChannelBuffer.Page(ByteBuffer.allocate(PAGE_SIZE), () -> atomicBoolean.set(true));
};
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(supplier);
channelBuffer.ensureCapacity(PAGE_SIZE * 4);

assertEquals(4, queue.size());

for (AtomicBoolean closedRef : queue) {
assertFalse(closedRef.get());
}

InboundChannelBuffer.Page[] pages = channelBuffer.sliceAndRetainPagesTo(PAGE_SIZE * 2);

pages[1].close();

for (AtomicBoolean closedRef : queue) {
assertFalse(closedRef.get());
}

channelBuffer.close();

int i = 0;
for (AtomicBoolean closedRef : queue) {
if (i < 1) {
assertFalse(closedRef.get());
} else {
assertTrue(closedRef.get());
}
++i;
}

pages[0].close();

for (AtomicBoolean closedRef : queue) {
assertTrue(closedRef.get());
}
}

public void testAccessByteBuffers() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP

@Override
public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class HttpReadWriteHandler implements ReadWriteHandler {

@Override
public int consumeReads(InboundChannelBuffer channelBuffer) throws IOException {
int bytesConsumed = adaptor.read(channelBuffer.sliceBuffersTo(channelBuffer.getIndex()));
int bytesConsumed = adaptor.read(channelBuffer.sliceAndRetainPagesTo(channelBuffer.getIndex()));
Object message;
while ((message = adaptor.pollInboundMessage()) != null) {
handleRequest(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.netty.channel.embedded.EmbeddedChannel;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.nio.FlushOperation;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.WriteOperation;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -97,6 +98,13 @@ public int read(ByteBuffer[] buffers) {
return byteBuf.readerIndex() - initialReaderIndex;
}

public int read(InboundChannelBuffer.Page[] pages) {
ByteBuf byteBuf = PagedByteBuf.byteBufFromPages(pages);
int readableBytes = byteBuf.readableBytes();
nettyChannel.writeInbound(byteBuf);
return readableBytes;
}

public Object pollInboundMessage() {
return nettyChannel.readInbound();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.AbstractHttpServerTransport;
Expand All @@ -63,6 +65,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
Expand Down Expand Up @@ -103,6 +106,8 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
(s) -> Integer.toString(EsExecutors.numberOfProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "http.nio.worker_count"), Setting.Property.NodeScope);

private final PageCacheRecycler pageCacheRecycler;

private final boolean tcpNoDelay;
private final boolean tcpKeepAlive;
private final boolean reuseAddress;
Expand All @@ -115,9 +120,11 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
private HttpChannelFactory channelFactory;
private final NioCorsConfig corsConfig;

public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, HttpServerTransport.Dispatcher dispatcher) {
public NioHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler, ThreadPool threadPool, NamedXContentRegistry xContentRegistry,
HttpServerTransport.Dispatcher dispatcher) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher);
this.pageCacheRecycler = pageCacheRecycler;

ByteSizeValue maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
ByteSizeValue maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
Expand Down Expand Up @@ -329,11 +336,15 @@ private HttpChannelFactory() {
@Override
public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException {
NioHttpChannel nioChannel = new NioHttpChannel(channel);
java.util.function.Supplier<InboundChannelBuffer.Page> pageSupplier = () -> {
Recycler.V<byte[]> bytes = pageCacheRecycler.bytePage(false);
return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close);
};
HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(nioChannel,NioHttpServerTransport.this,
handlingSettings, corsConfig);
Consumer<Exception> exceptionHandler = (e) -> exceptionCaught(nioChannel, e);
SocketChannelContext context = new BytesChannelContext(nioChannel, selector, exceptionHandler, httpReadWritePipeline,
InboundChannelBuffer.allocatingInstance());
new InboundChannelBuffer(pageSupplier));
nioChannel.setContext(context);
return nioChannel;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.http.nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.UnpooledHeapByteBuf;
import org.elasticsearch.nio.InboundChannelBuffer;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

public class PagedByteBuf extends UnpooledHeapByteBuf {

private final Runnable releasable;

private PagedByteBuf(byte[] array, Runnable releasable) {
super(UnpooledByteBufAllocator.DEFAULT, array, array.length);
this.releasable = releasable;
}

static ByteBuf byteBufFromPages(InboundChannelBuffer.Page[] pages) {
int componentCount = pages.length;
if (componentCount == 0) {
return Unpooled.EMPTY_BUFFER;
} else if (componentCount == 1) {
return byteBufFromPage(pages[0]);
} else {
int maxComponents = Math.max(16, componentCount);
final List<ByteBuf> components = new ArrayList<>(componentCount);
for (InboundChannelBuffer.Page page : pages) {
components.add(byteBufFromPage(page));
}
return new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, false, maxComponents, components);
}
}

private static ByteBuf byteBufFromPage(InboundChannelBuffer.Page page) {
ByteBuffer buffer = page.getByteBuffer();
assert !buffer.isDirect() && buffer.hasArray() : "Must be a heap buffer with an array";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use == false

int offset = buffer.arrayOffset() + buffer.position();
PagedByteBuf newByteBuf = new PagedByteBuf(buffer.array(), page::close);
return newByteBuf.slice(offset, buffer.remaining());
}


@Override
protected void deallocate() {
try {
super.deallocate();
} finally {
releasable.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP

@Override
public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher) {
return Collections.singletonMap(NIO_HTTP_TRANSPORT_NAME,
() -> new NioHttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher));
() -> new NioHttpServerTransport(settings, networkService, bigArrays, pageCacheRecycler, threadPool, xContentRegistry,
dispatcher));
}
}
Loading