Skip to content

Commit

Permalink
Serialize outbound messages on netty buffers (#80111)
Browse files Browse the repository at this point in the history
Currently we use BigArrays for serializing outbound messages. Since
these messages are only handled at the network layer, it is better to
use the Netty allocator when using the netty transport. This commit
implements this serialization method when netty is enabled.
  • Loading branch information
Tim-Brooks committed Nov 4, 2021
1 parent 255bf50 commit 34d9cbe
Show file tree
Hide file tree
Showing 20 changed files with 1,269 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.InboundPipeline;
Expand All @@ -42,14 +44,14 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
private WriteOperation currentWrite;
private final InboundPipeline pipeline;

Netty4MessageChannelHandler(Netty4Transport transport) {
Netty4MessageChannelHandler(Netty4Transport transport, Recycler<BytesRef> recycler) {
this.transport = transport;
final ThreadPool threadPool = transport.getThreadPool();
final Transport.RequestHandlers requestHandlers = transport.getRequestHandlers();
this.pipeline = new InboundPipeline(
transport.getVersion(),
transport.getStatsTracker(),
NettyAllocator.getRecycler(),
recycler,
threadPool::relativeTimeInMillis,
transport.getInflightBreaker(),
requestHandlers::getHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -124,6 +126,14 @@ public Netty4Transport(
}
}

@Override
protected Recycler<BytesRef> createRecycler(Settings settings, PageCacheRecycler pageCacheRecycler) {
// If this method is called by super ctor the processors will not be set. Accessing NettyAllocator initializes netty's internals
// setting the processors. We must do it ourselves first just in case.
Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
return NettyAllocator.getRecycler();
}

@Override
protected void doStart() {
boolean success = false;
Expand Down Expand Up @@ -326,7 +336,7 @@ protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("byte_buf_sizer", NettyByteBufSizer.INSTANCE);
ch.pipeline().addLast("logging", new ESLoggingHandler());
// using a dot as a prefix means this cannot come from any settings parsed
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this));
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, recycler));
}

@Override
Expand All @@ -353,7 +363,7 @@ protected void initChannel(Channel ch) throws Exception {
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
ch.pipeline().addLast("byte_buf_sizer", NettyByteBufSizer.INSTANCE);
ch.pipeline().addLast("logging", new ESLoggingHandler());
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this));
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, recycler));
serverAcceptedChannel(nettyTcpChannel);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesRefRecycler;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportSettings;

Expand All @@ -48,7 +47,6 @@ public class NioTransport extends TcpTransport {
private static final Logger logger = LogManager.getLogger(NioTransport.class);

protected final PageAllocator pageAllocator;
protected final BytesRefRecycler bytesRefRecycler;
private final ConcurrentMap<String, TcpChannelFactory> profileToChannelFactory = newConcurrentMap();
private final NioGroupFactory groupFactory;
private volatile NioGroup nioGroup;
Expand All @@ -67,7 +65,6 @@ protected NioTransport(
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
this.pageAllocator = new PageAllocator(pageCacheRecycler);
this.groupFactory = groupFactory;
this.bytesRefRecycler = new BytesRefRecycler(pageCacheRecycler);
}

@Override
Expand Down Expand Up @@ -168,7 +165,7 @@ private TcpChannelFactoryImpl(ProfileSettings profileSettings, boolean isClient)
public NioTcpChannel createChannel(NioSelector selector, SocketChannel channel, Config.Socket socketConfig) {
NioTcpChannel nioChannel = new NioTcpChannel(isClient == false, profileName, channel);
Consumer<Exception> exceptionHandler = (e) -> onException(nioChannel, e);
TcpReadWriteHandler handler = new TcpReadWriteHandler(nioChannel, bytesRefRecycler, NioTransport.this);
TcpReadWriteHandler handler = new TcpReadWriteHandler(nioChannel, recycler, NioTransport.this);
BytesChannelContext context = new BytesChannelContext(
nioChannel,
selector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@

package org.elasticsearch.transport.nio;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.nio.BytesWriteHandler;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.Page;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesRefRecycler;
import org.elasticsearch.transport.InboundPipeline;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
Expand All @@ -31,7 +32,7 @@ public class TcpReadWriteHandler extends BytesWriteHandler {
private final NioTcpChannel channel;
private final InboundPipeline pipeline;

public TcpReadWriteHandler(NioTcpChannel channel, BytesRefRecycler recycler, TcpTransport transport) {
public TcpReadWriteHandler(NioTcpChannel channel, Recycler<BytesRef> recycler, TcpTransport transport) {
this.channel = channel;
final ThreadPool threadPool = transport.getThreadPool();
final Supplier<CircuitBreaker> breaker = transport.getInflightBreaker();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
threadPool.executor(executorName).execute(new ActionRunnable<>(releasingListener) {
threadPool.executor(Names.WRITE).execute(new ActionRunnable<>(releasingListener) {
@Override
protected void doRun() {
doInternalExecute(task, bulkRequest, executorName, releasingListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.core.Nullable;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

/**
Expand Down Expand Up @@ -127,7 +128,7 @@ public void close() {
*
* @return the value of the <code>count</code> field, which is the number of valid
* bytes in this output stream.
* @see java.io.ByteArrayOutputStream#count
* @see ByteArrayOutputStream#size()
*/
public int size() {
return count;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.io.stream;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.nio.ByteOrder;
import java.util.ArrayList;

/**
* A @link {@link StreamOutput} that uses {@link Recycler.V<BytesRef>} to acquire pages of bytes, which
* avoids frequent reallocation &amp; copying of the internal data. When {@link #close()} is called,
* the bytes will be released.
*/
public class RecyclerBytesStreamOutput extends BytesStream implements Releasable {

static final VarHandle VH_BE_INT = MethodHandles.byteArrayViewVarHandle(int[].class, ByteOrder.BIG_ENDIAN);
static final VarHandle VH_BE_LONG = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);

private final ArrayList<Recycler.V<BytesRef>> pages = new ArrayList<>();
private final Recycler<BytesRef> recycler;
private final int pageSize;
private int pageIndex = -1;
private int currentCapacity = 0;
private int currentPageOffset;

public RecyclerBytesStreamOutput(Recycler<BytesRef> recycler) {
this.recycler = recycler;
try (Recycler.V<BytesRef> obtain = recycler.obtain()) {
pageSize = obtain.v().length;
}
this.currentPageOffset = pageSize;
}

@Override
public long position() {
return ((long) pageSize * pageIndex) + currentPageOffset;
}

@Override
public void writeByte(byte b) {
ensureCapacity(1);
BytesRef currentPage = pages.get(pageIndex).v();
currentPage.bytes[currentPage.offset + currentPageOffset] = b;
currentPageOffset++;
}

@Override
public void writeBytes(byte[] b, int offset, int length) {
// nothing to copy
if (length == 0) {
return;
}

// illegal args: offset and/or length exceed array size
if (b.length < (offset + length)) {
throw new IllegalArgumentException("Illegal offset " + offset + "/length " + length + " for byte[] of length " + b.length);
}

// get enough pages for new size
ensureCapacity(length);

// bulk copy
int bytesToCopy = length;
int srcOff = offset;
int j = 0;
while (true) {
BytesRef currentPage = pages.get(pageIndex + j).v();
int toCopyThisLoop = Math.min(pageSize - currentPageOffset, bytesToCopy);
System.arraycopy(b, srcOff, currentPage.bytes, currentPage.offset + currentPageOffset, toCopyThisLoop);
srcOff += toCopyThisLoop;
bytesToCopy -= toCopyThisLoop;
if (bytesToCopy > 0) {
currentPageOffset = 0;
} else {
currentPageOffset += toCopyThisLoop;
break;
}
j++;
}

// advance
pageIndex += j;
}

@Override
public void writeInt(int i) throws IOException {
if (4 > (pageSize - currentPageOffset)) {
super.writeInt(i);
} else {
BytesRef currentPage = pages.get(pageIndex).v();
VH_BE_INT.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
currentPageOffset += 4;
}
}

@Override
public void writeLong(long i) throws IOException {
if (8 > (pageSize - currentPageOffset)) {
super.writeLong(i);
} else {
BytesRef currentPage = pages.get(pageIndex).v();
VH_BE_LONG.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
currentPageOffset += 8;
}
}

@Override
public void reset() {
Releasables.close(pages);
pages.clear();
pageIndex = -1;
currentPageOffset = pageSize;
}

@Override
public void flush() {
// nothing to do
}

@Override
public void seek(long position) {
ensureCapacityFromPosition(position);
this.pageIndex = (int) position / pageSize;
this.currentPageOffset = (int) position % pageSize;
}

public void skip(int length) {
seek(position() + length);
}

@Override
public void close() {
try {
Releasables.close(pages);
} finally {
pages.clear();
}
}

/**
* Returns the current size of the buffer.
*
* @return the value of the <code>count</code> field, which is the number of valid
* bytes in this output stream.
* @see ByteArrayOutputStream#size()
*/
public int size() {
return Math.toIntExact(position());
}

@Override
public BytesReference bytes() {
int position = (int) position();
if (position == 0) {
return BytesArray.EMPTY;
} else {
final int adjustment;
final int bytesInLastPage;
final int remainder = position % pageSize;
if (remainder != 0) {
adjustment = 1;
bytesInLastPage = remainder;
} else {
adjustment = 0;
bytesInLastPage = pageSize;
}
final int pageCount = (position / pageSize) + adjustment;
if (pageCount == 1) {
BytesRef page = pages.get(0).v();
return new BytesArray(page.bytes, page.offset, bytesInLastPage);
} else {
BytesReference[] references = new BytesReference[pageCount];
for (int i = 0; i < pageCount - 1; ++i) {
references[i] = new BytesArray(this.pages.get(i).v());
}
BytesRef last = this.pages.get(pageCount - 1).v();
references[pageCount - 1] = new BytesArray(last.bytes, last.offset, bytesInLastPage);
return CompositeBytesReference.of(references);
}
}
}

private void ensureCapacity(int bytesNeeded) {
if (bytesNeeded > pageSize - currentPageOffset) {
ensureCapacityFromPosition(position() + bytesNeeded);
}
}

private void ensureCapacityFromPosition(long newPosition) {
while (newPosition > currentCapacity) {
if (newPosition > Integer.MAX_VALUE) {
throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data");
}
Recycler.V<BytesRef> newPage = recycler.obtain();
assert pageSize == newPage.v().length;
pages.add(newPage);
// We are at the end of the current page, increment page index
if (currentPageOffset == pageSize) {
pageIndex++;
currentPageOffset = 0;
}
currentCapacity += pageSize;
}
}
}

0 comments on commit 34d9cbe

Please sign in to comment.