Skip to content

Commit

Permalink
ARTEMIS-2205 Netty is used in a more idiomatic way
Browse files Browse the repository at this point in the history
This helped decreasing a lot of pressure on GC by not creating
as many runnables for each write.

Besides this helps fixing some of the issues I would have had on refactoring AMQP
over flushing writes and other asynchronous issues.
  • Loading branch information
franz1981 committed Jan 10, 2019
1 parent 7b34b56 commit a40a459
Showing 1 changed file with 35 additions and 90 deletions.
Expand Up @@ -21,7 +21,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -60,19 +59,12 @@ public class NettyConnection implements Connection {
* here for when the connection (or Netty Channel) becomes available again.
*/
private final List<ReadyListener> readyListeners = new ArrayList<>();
private final ThreadLocal<ArrayList<ReadyListener>> localListenersPool = ThreadLocal.withInitial(ArrayList::new);
private final ThreadLocal<ArrayList<ReadyListener>> localListenersPool = new ThreadLocal<>();

private final boolean batchingEnabled;
private final int writeBufferHighWaterMark;
private final int batchLimit;

/**
* This counter is splitted in 2 variables to write it with less performance
* impact: no volatile get is required to update its value
*/
private final AtomicLong pendingWritesOnEventLoopView = new AtomicLong();
private long pendingWritesOnEventLoop = 0;

private boolean closed;
private RemotingConnection protocolConnection;

Expand Down Expand Up @@ -129,18 +121,6 @@ public final int pendingWritesOnChannel() {
return batchBufferSize(this.channel, this.writeBufferHighWaterMark);
}

public final long pendingWritesOnEventLoop() {
final EventLoop eventLoop = channel.eventLoop();
final boolean inEventLoop = eventLoop.inEventLoop();
final long pendingWritesOnEventLoop;
if (inEventLoop) {
pendingWritesOnEventLoop = this.pendingWritesOnEventLoop;
} else {
pendingWritesOnEventLoop = pendingWritesOnEventLoopView.get();
}
return pendingWritesOnEventLoop;
}

public final Channel getNettyChannel() {
return channel;
}
Expand All @@ -163,38 +143,51 @@ public final boolean isWritable(ReadyListener callback) {

@Override
public final void fireReady(final boolean ready) {
final ArrayList<ReadyListener> readyToCall = localListenersPool.get();
ArrayList<ReadyListener> readyToCall = localListenersPool.get();
if (readyToCall != null) {
localListenersPool.set(null);
}
synchronized (readyListeners) {
this.ready = ready;

if (ready) {
final int size = this.readyListeners.size();
readyToCall.ensureCapacity(size);
if (readyToCall != null) {
readyToCall.ensureCapacity(size);
}
try {
for (int i = 0; i < size; i++) {
final ReadyListener readyListener = readyListeners.get(i);
if (readyListener == null) {
break;
}
if (readyToCall == null) {
readyToCall = new ArrayList<>(size);
}
readyToCall.add(readyListener);
}
} finally {
readyListeners.clear();
}
}
}
try {
final int size = readyToCall.size();
for (int i = 0; i < size; i++) {
try {
final ReadyListener readyListener = readyToCall.get(i);
readyListener.readyForWriting();
} catch (Throwable logOnly) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly);
if (readyToCall != null) {
try {
readyToCall.forEach(readyListener -> {
try {
readyListener.readyForWriting();
} catch (Throwable logOnly) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly);
}
});
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t);
} finally {
readyToCall.clear();
if (localListenersPool.get() != null) {
localListenersPool.set(readyToCall);
}
}
} finally {
readyToCall.clear();
}
}

Expand Down Expand Up @@ -256,7 +249,7 @@ public ActiveMQBuffer createTransportBuffer(final int size) {
} catch (OutOfMemoryError oom) {
final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
// I'm not using the ActiveMQLogger framework here, as I wanted the class name to be very specific here
logger.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + "[EVENT LOOP] -> " + pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(), oom);
logger.warn("Trying to allocate " + size + " bytes, System is throwing OutOfMemoryError on NettyConnection " + this + ", there are currently " + "pendingWrites: [NETTY] -> " + totalPendingWriteBytes + " causes: " + oom.getMessage(), oom);
throw oom;
}
}
Expand Down Expand Up @@ -342,10 +335,7 @@ private boolean isAllowedToBlock() {
private boolean canWrite(final int requiredCapacity) {
//evaluate if the write request could be taken:
//there is enough space in the write buffer?
//The pending writes on event loop will eventually go into the Netty write buffer, hence consider them
//as part of the heuristic!
final long pendingWritesOnEventLoop = this.pendingWritesOnEventLoop();
final long totalPendingWrites = pendingWritesOnEventLoop + this.pendingWritesOnChannel();
final long totalPendingWrites = this.pendingWritesOnChannel();
final boolean canWrite;
if (requiredCapacity > this.writeBufferHighWaterMark) {
canWrite = totalPendingWrites == 0;
Expand All @@ -369,34 +359,6 @@ public final void write(ActiveMQBuffer buffer,
}
//no need to lock because the Netty's channel is thread-safe
//and the order of write is ensured by the order of the write calls
final EventLoop eventLoop = channel.eventLoop();
final boolean inEventLoop = eventLoop.inEventLoop();
if (!inEventLoop) {
writeNotInEventLoop(buffer, flush, batched, futureListener);
} else {
// OLD COMMENT:
// create a task which will be picked up by the eventloop and trigger the write.
// This is mainly needed as this method is triggered by different threads for the same channel.
// if we not do this we may produce out of order writes.
// NOTE:
// the submitted task does not effect in any way the current written size in the batch
// until the loop will process it, leading to a longer life for the ActiveMQBuffer buffer!!!
// To solve it, will be necessary to manually perform the count of the current batch instead of rely on the
// Channel:Config::writeBufferHighWaterMark value.
this.pendingWritesOnEventLoop += readableBytes;
this.pendingWritesOnEventLoopView.lazySet(pendingWritesOnEventLoop);
eventLoop.execute(() -> {
this.pendingWritesOnEventLoop -= readableBytes;
this.pendingWritesOnEventLoopView.lazySet(pendingWritesOnEventLoop);
writeInEventLoop(buffer, flush, batched, futureListener);
});
}
}

private void writeNotInEventLoop(ActiveMQBuffer buffer,
final boolean flush,
final boolean batched,
final ChannelFutureListener futureListener) {
final Channel channel = this.channel;
final ChannelPromise promise;
if (flush || (futureListener != null)) {
Expand All @@ -406,7 +368,6 @@ private void writeNotInEventLoop(ActiveMQBuffer buffer,
}
final ChannelFuture future;
final ByteBuf bytes = buffer.byteBuf();
final int readableBytes = bytes.readableBytes();
assert readableBytes >= 0;
final int writeBatchSize = this.batchLimit;
final boolean batchingEnabled = this.batchingEnabled;
Expand All @@ -420,33 +381,17 @@ private void writeNotInEventLoop(ActiveMQBuffer buffer,
}
if (flush) {
//NOTE: this code path seems used only on RemotingConnection::disconnect
waitFor(promise, DEFAULT_WAIT_MILLIS);
flushAndWait(channel, promise);
}
}

private void writeInEventLoop(ActiveMQBuffer buffer,
final boolean flush,
final boolean batched,
final ChannelFutureListener futureListener) {
//no need to lock because the Netty's channel is thread-safe
//and the order of write is ensured by the order of the write calls
final ChannelPromise promise;
if (futureListener != null) {
promise = channel.newPromise();
} else {
promise = channel.voidPromise();
}
final ChannelFuture future;
final ByteBuf bytes = buffer.byteBuf();
final int readableBytes = bytes.readableBytes();
final int writeBatchSize = this.batchLimit;
if (this.batchingEnabled && batched && !flush && readableBytes < writeBatchSize) {
future = writeBatch(bytes, readableBytes, promise);
private static void flushAndWait(final Channel channel, final ChannelPromise promise) {
if (!channel.eventLoop().inEventLoop()) {
waitFor(promise, DEFAULT_WAIT_MILLIS);
} else {
future = channel.writeAndFlush(bytes, promise);
}
if (futureListener != null) {
future.addListener(futureListener);
if (logger.isDebugEnabled()) {
logger.debug("Calling write with flush from a thread where it's not allowed");
}
}
}

Expand Down

0 comments on commit a40a459

Please sign in to comment.