Skip to content

Commit

Permalink
ARTEMIS-3045 ReplicationManager can batch sent replicated packets
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 authored and clebertsuconic committed Feb 19, 2021
1 parent 8611095 commit 6126d92
Show file tree
Hide file tree
Showing 12 changed files with 280 additions and 157 deletions.
Expand Up @@ -84,6 +84,13 @@ public interface Channel {
*/
boolean sendBatched(Packet packet);

/**
* Similarly to {@code flushConnection} on {@link #send(Packet, boolean)}, it requests
* any un-flushed previous sent packets to be flushed to the underlying connection.<br>
* It can be a no-op in case of InVM transports, because they would likely to flush already on each send.
*/
void flushConnection();

/**
* Sends a packet on this channel, but request it to be flushed (along with the un-flushed previous ones) only iff
* {@code flushConnection} is {@code true}.
Expand Down
Expand Up @@ -135,10 +135,9 @@ default boolean isVersionNewFQQN() {

/**
*
* @param size size we are trying to write
* @param timeout
* @return
* @throws IllegalStateException if the connection is closed
*/
boolean blockUntilWritable(int size, long timeout);
boolean blockUntilWritable(long timeout);
}
Expand Up @@ -1039,19 +1039,21 @@ private int sendSessionSendContinuationMessage(Channel channel,
} else {
chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, messageHandler);
}
final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
//perform a weak form of flow control to avoid OOM on tight loops
final CoreRemotingConnection connection = channel.getConnection();
final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout());
final long startFlowControl = System.nanoTime();
try {
final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis);
final boolean isWritable = connection.blockUntilWritable(blockingCallTimeoutMillis);
if (!isWritable) {
final long endFlowControl = System.nanoTime();
final long elapsedFlowControl = endFlowControl - startFlowControl;
final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl);
ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage();
logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]");
if (logger.isDebugEnabled()) {
logger.debugf("try to write %d bytes after blocked %d ms on a not writable connection: [%s]",
chunkPacket.expectedEncodeSize(), elapsedMillis, connection.getID());
}
}
if (requiresResponse) {
// When sending it blocking, only the last chunk will be blocking.
Expand Down
Expand Up @@ -231,6 +231,11 @@ public void returnBlocking(Throwable cause) {
}
}

@Override
public void flushConnection() {
connection.getTransportConnection().flush();
}

@Override
public boolean send(Packet packet, boolean flushConnection) {
if (invokeInterceptors(packet, interceptors, connection) != null) {
Expand Down Expand Up @@ -557,7 +562,7 @@ public Packet sendBlocking(final Packet packet,
public static String invokeInterceptors(final Packet packet,
final List<Interceptor> interceptors,
final RemotingConnection connection) {
if (interceptors != null) {
if (interceptors != null && !interceptors.isEmpty()) {
for (final Interceptor interceptor : interceptors) {
try {
boolean callNext = interceptor.intercept(packet, connection);
Expand Down
Expand Up @@ -244,8 +244,8 @@ public void destroy() {
}

@Override
public boolean blockUntilWritable(int size, long timeout) {
return transportConnection.blockUntilWritable(size, timeout, TimeUnit.MILLISECONDS);
public boolean blockUntilWritable(long timeout) {
return transportConnection.blockUntilWritable(timeout, TimeUnit.MILLISECONDS);
}

@Override
Expand Down
Expand Up @@ -29,6 +29,7 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
Expand All @@ -47,7 +48,6 @@ public class NettyConnection implements Connection {

private static final Logger logger = Logger.getLogger(NettyConnection.class);

private static final int DEFAULT_BATCH_BYTES = Integer.getInteger("io.netty.batch.bytes", 8192);
private static final int DEFAULT_WAIT_MILLIS = 10_000;

protected final Channel channel;
Expand All @@ -59,11 +59,9 @@ public class NettyConnection implements Connection {
* here for when the connection (or Netty Channel) becomes available again.
*/
private final List<ReadyListener> readyListeners = new ArrayList<>();
private final ThreadLocal<ArrayList<ReadyListener>> localListenersPool = new ThreadLocal<>();
private final FastThreadLocal<ArrayList<ReadyListener>> localListenersPool = new FastThreadLocal<>();

private final boolean batchingEnabled;
private final int writeBufferHighWaterMark;
private final int batchLimit;

private boolean closed;
private RemotingConnection protocolConnection;
Expand All @@ -84,10 +82,6 @@ public NettyConnection(final Map<String, Object> configuration,
this.directDeliver = directDeliver;

this.batchingEnabled = batchingEnabled;

this.writeBufferHighWaterMark = this.channel.config().getWriteBufferHighWaterMark();

this.batchLimit = batchingEnabled ? Math.min(this.writeBufferHighWaterMark, DEFAULT_BATCH_BYTES) : 0;
}

private static void waitFor(ChannelPromise promise, long millis) {
Expand All @@ -103,22 +97,9 @@ private static void waitFor(ChannelPromise promise, long millis) {

/**
* Returns an estimation of the current size of the write buffer in the channel.
* To obtain a more precise value is necessary to use the unsafe API of the channel to
* call the {@link io.netty.channel.ChannelOutboundBuffer#totalPendingWriteBytes()}.
* Anyway, both these values are subject to concurrent modifications.
*/
private static int batchBufferSize(Channel channel, int writeBufferHighWaterMark) {
//Channel::bytesBeforeUnwritable is performing a volatile load
//this is the reason why writeBufferHighWaterMark is passed as an argument
final int bytesBeforeUnwritable = (int) channel.bytesBeforeUnwritable();
assert bytesBeforeUnwritable >= 0;
final int writtenBytes = writeBufferHighWaterMark - bytesBeforeUnwritable;
assert writtenBytes >= 0;
return writtenBytes;
}

public final int pendingWritesOnChannel() {
return batchBufferSize(this.channel, this.writeBufferHighWaterMark);
private static long batchBufferSize(Channel channel) {
return channel.unsafe().outboundBuffer().totalPendingWriteBytes();
}

public final Channel getNettyChannel() {
Expand Down Expand Up @@ -252,7 +233,7 @@ public ActiveMQBuffer createTransportBuffer(final int size) {
try {
return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
} catch (OutOfMemoryError oom) {
final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
final long totalPendingWriteBytes = batchBufferSize(this.channel);
// I'm not using the ActiveMQLogger framework here, as I wanted the class name to be very specific here
logger.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + " causes: " + oom.getMessage(), oom);
throw oom;
Expand All @@ -268,9 +249,8 @@ public final Object getID() {
@Override
public final void checkFlushBatchBuffer() {
if (this.batchingEnabled) {
//perform the flush only if necessary
final int batchBufferSize = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
if (batchBufferSize > 0) {
// perform the flush only if necessary
if (batchBufferSize(this.channel) > 0 && !channel.isWritable()) {
this.channel.flush();
}
}
Expand All @@ -292,6 +272,12 @@ public void write(ActiveMQBuffer buffer, boolean requestFlush) {
}
}

@Override
public void flush() {
checkConnectionState();
this.channel.flush();
}

@Override
public final void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched) {
write(buffer, flush, batched, null);
Expand All @@ -304,22 +290,22 @@ private void checkConnectionState() {
}

@Override
public final boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) {
public final boolean blockUntilWritable(final long timeout, final TimeUnit timeUnit) {
checkConnectionState();
final boolean isAllowedToBlock = isAllowedToBlock();
if (!isAllowedToBlock) {
if (timeout > 0) {
if (Env.isTestEnv()) {
// this will only show when inside the testsuite.
// we may great the log for FAILURE
logger.warn("FAILURE! The code is using blockUntilWritable inside a Netty worker, which would block. " + "The code will probably need fixing!", new Exception("trace"));
}

if (Env.isTestEnv()) {
// this will only show when inside the testsuite.
// we may great the log for FAILURE
logger.warn("FAILURE! The code is using blockUntilWritable inside a Netty worker, which would block. " +
"The code will probably need fixing!", new Exception("trace"));
}

if (logger.isDebugEnabled()) {
logger.debug("Calling blockUntilWritable using a thread where it's not allowed");
if (logger.isDebugEnabled()) {
logger.debug("Calling blockUntilWritable using a thread where it's not allowed");
}
}
return canWrite(requiredCapacity);
return channel.isWritable();
} else {
final long timeoutNanos = timeUnit.toNanos(timeout);
final long deadline = System.nanoTime() + timeoutNanos;
Expand All @@ -333,7 +319,7 @@ public final boolean blockUntilWritable(final int requiredCapacity, final long t
parkNanos = 1000L;
}
boolean canWrite;
while (!(canWrite = canWrite(requiredCapacity)) && (System.nanoTime() - deadline) < 0) {
while (!(canWrite = channel.isWritable()) && (System.nanoTime() - deadline) < 0) {
//periodically check the connection state
checkConnectionState();
LockSupport.parkNanos(parkNanos);
Expand All @@ -348,31 +334,12 @@ private boolean isAllowedToBlock() {
return !inEventLoop;
}

private boolean canWrite(final int requiredCapacity) {
//evaluate if the write request could be taken:
//there is enough space in the write buffer?
final long totalPendingWrites = this.pendingWritesOnChannel();
final boolean canWrite;
if (requiredCapacity > this.writeBufferHighWaterMark) {
canWrite = totalPendingWrites == 0;
} else {
canWrite = (totalPendingWrites + requiredCapacity) <= this.writeBufferHighWaterMark;
}
return canWrite;
}

@Override
public final void write(ActiveMQBuffer buffer,
final boolean flush,
final boolean batched,
final ChannelFutureListener futureListener) {
final int readableBytes = buffer.readableBytes();
if (logger.isDebugEnabled()) {
final int remainingBytes = this.writeBufferHighWaterMark - readableBytes;
if (remainingBytes < 0) {
logger.debug("a write request is exceeding by " + (-remainingBytes) + " bytes the writeBufferHighWaterMark size [ " + this.writeBufferHighWaterMark + " ] : consider to set it at least of " + readableBytes + " bytes");
}
}
//no need to lock because the Netty's channel is thread-safe
//and the order of write is ensured by the order of the write calls
final Channel channel = this.channel;
Expand All @@ -385,10 +352,9 @@ public final void write(ActiveMQBuffer buffer,
final ChannelFuture future;
final ByteBuf bytes = buffer.byteBuf();
assert readableBytes >= 0;
final int writeBatchSize = this.batchLimit;
final boolean batchingEnabled = this.batchingEnabled;
if (batchingEnabled && batched && !flush && readableBytes < writeBatchSize) {
future = writeBatch(bytes, readableBytes, promise);
if (batchingEnabled && batched && !flush && channel.isWritable()) {
future = channel.write(bytes, promise);
} else {
future = channel.writeAndFlush(bytes, promise);
}
Expand All @@ -411,22 +377,6 @@ private static void flushAndWait(final Channel channel, final ChannelPromise pro
}
}

private ChannelFuture writeBatch(final ByteBuf bytes, final int readableBytes, final ChannelPromise promise) {
final int batchBufferSize = batchBufferSize(channel, this.writeBufferHighWaterMark);
final int nextBatchSize = batchBufferSize + readableBytes;
if (nextBatchSize > batchLimit) {
//request to flush before writing, to create the chance to make the channel writable again
channel.flush();
//let netty use its write batching ability
return channel.write(bytes, promise);
} else if (nextBatchSize == batchLimit) {
return channel.writeAndFlush(bytes, promise);
} else {
//let netty use its write batching ability
return channel.write(bytes, promise);
}
}

@Override
public final String getRemoteAddress() {
SocketAddress address = channel.remoteAddress();
Expand Down
Expand Up @@ -46,18 +46,17 @@ public interface Connection {
boolean isOpen();

/**
* Causes the current thread to wait until the connection can enqueue the required capacity unless the specified waiting time elapses.
* Causes the current thread to wait until the connection is writable unless the specified waiting time elapses.
* The available capacity of the connection could change concurrently hence this method is suitable to perform precise flow-control
* only in a single writer case, while its precision decrease inversely proportional with the rate and the number of concurrent writers.
* If the current thread is not allowed to block the timeout will be ignored dependently on the connection type.
*
* @param requiredCapacity the capacity in bytes to be enqueued
* @param timeout the maximum time to wait
* @param timeUnit the time unit of the timeout argument
* @return {@code true} if the connection can enqueue {@code requiredCapacity} bytes, {@code false} otherwise
* @return {@code true} if the connection is writable, {@code false} otherwise
* @throws IllegalStateException if the connection is closed
*/
default boolean blockUntilWritable(final int requiredCapacity, final long timeout, final TimeUnit timeUnit) {
default boolean blockUntilWritable(final long timeout, final TimeUnit timeUnit) {
return true;
}

Expand Down Expand Up @@ -85,6 +84,13 @@ default boolean blockUntilWritable(final int requiredCapacity, final long timeou
*/
void write(ActiveMQBuffer buffer, boolean requestFlush);

/**
* Request to flush any previous written buffers into the wire.
*/
default void flush() {

}

/**
* writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection.
*
Expand Down
Expand Up @@ -237,7 +237,7 @@ public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
}

@Override
public boolean blockUntilWritable(int size, long timeout) {
public boolean blockUntilWritable(long timeout) {
return false;
}

Expand Down
Expand Up @@ -21,7 +21,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.ArrayDeque;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -80,7 +80,6 @@
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;

import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
Expand Down Expand Up @@ -134,7 +133,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon

private List<Interceptor> outgoingInterceptors = null;

private final ArrayList<Packet> pendingPackets;
private final ArrayDeque<Packet> pendingPackets;


// Constructors --------------------------------------------------
Expand All @@ -146,7 +145,7 @@ public ReplicationEndpoint(final ActiveMQServerImpl server,
this.criticalErrorListener = criticalErrorListener;
this.wantedFailBack = wantedFailBack;
this.activation = activation;
this.pendingPackets = new ArrayList<>();
this.pendingPackets = new ArrayDeque<>();
this.supportResponseBatching = false;
}

Expand Down Expand Up @@ -262,18 +261,14 @@ public void handlePacket(final Packet packet) {

@Override
public void endOfBatch() {
final ArrayList<Packet> pendingPackets = this.pendingPackets;
final ArrayDeque<Packet> pendingPackets = this.pendingPackets;
if (pendingPackets.isEmpty()) {
return;
}
try {
for (int i = 0, size = pendingPackets.size(); i < size; i++) {
final Packet packet = pendingPackets.get(i);
final boolean isLast = i == (size - 1);
channel.send(packet, isLast);
}
} finally {
pendingPackets.clear();
for (int i = 0, size = pendingPackets.size(); i < size; i++) {
final Packet packet = pendingPackets.poll();
final boolean isLast = i == (size - 1);
channel.send(packet, isLast);
}
}

Expand Down

0 comments on commit 6126d92

Please sign in to comment.