Skip to content

Commit

Permalink
Merge 39cbf8f into 87cf261
Browse files Browse the repository at this point in the history
  • Loading branch information
Dan Torrey committed Oct 2, 2019
2 parents 87cf261 + 39cbf8f commit 484642c
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* An abstract {@link GelfTransport} implementation serving as parent for the concrete implementations.
Expand All @@ -40,6 +41,10 @@ public abstract class AbstractGelfTransport implements GelfTransport {

private final EventLoopGroup workerGroup;

// Use an AtomicReference to manage thread safe access to the senderThread.
// A new senderThread instance is set each time a reconnect is attempted.
final AtomicReference<GelfSenderThread> senderThreadReference;

/**
* Creates a new GELF transport with the given configuration and {@link java.util.concurrent.BlockingQueue}.
*
Expand All @@ -50,6 +55,7 @@ public AbstractGelfTransport(final GelfConfiguration config, final BlockingQueue
this.config = config;
this.queue = queue;
this.workerGroup = new NioEventLoopGroup(config.getThreads(), new DefaultThreadFactory(getClass(), true));
this.senderThreadReference = new AtomicReference<>();
createBootstrap(workerGroup);
}

Expand Down Expand Up @@ -111,4 +117,16 @@ public boolean trySend(final GelfMessage message) {
public void stop() {
workerGroup.shutdownGracefully().syncUninterruptibly();
}

/**
* {@inheritDoc}
*/
@Override
public void flushAndStopSynchronously(int waitDuration, TimeUnit timeUnit, int retries) {

if (senderThreadReference != null) {
senderThreadReference.get().flushSynchronously(waitDuration, timeUnit, retries);
}
stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,28 @@
*/
public class GelfSenderThread {
private static final Logger LOG = LoggerFactory.getLogger(GelfSenderThread.class);

private final ReentrantLock lock;
private final Condition connectedCond;
private final AtomicBoolean keepRunning = new AtomicBoolean(true);
private final Thread senderThread;
private Channel channel;
private final int maxInflightSends;
private final BlockingQueue<GelfMessage> queue;
private final AtomicInteger inflightSends;

/**
* Creates a new sender thread with the given {@link BlockingQueue} as source of messages.
*
* @param queue the {@link BlockingQueue} used as source of {@link GelfMessage}s
* @param queue the {@link BlockingQueue} used as source of {@link GelfMessage}s
* @param maxInflightSends the maximum number of outstanding network writes/flushes before the sender spins
*/
public GelfSenderThread(final BlockingQueue<GelfMessage> queue, int maxInflightSends) {
this.maxInflightSends = maxInflightSends;
this.lock = new ReentrantLock();
this.connectedCond = lock.newCondition();
this.inflightSends = new AtomicInteger(0);
this.queue = queue;

if (maxInflightSends <= 0) {
throw new IllegalArgumentException("maxInflightSends must be larger than 0");
Expand All @@ -64,7 +69,6 @@ public GelfSenderThread(final BlockingQueue<GelfMessage> queue, int maxInflightS
@Override
public void run() {
GelfMessage gelfMessage = null;
final AtomicInteger inflightSends = new AtomicInteger(0);
final ChannelFutureListener inflightListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Expand Down Expand Up @@ -139,4 +143,43 @@ public void stop() {
keepRunning.set(false);
senderThread.interrupt();
}

/***
* Block and wait for all messages in the queue to send until the indicated {@code waitDuration}, {@code timeUnit} and
* {@code retries} have elapsed. Each retry waits for the indicated {@code waitDuration} and {@code timeUnit} again.
* @param waitDuration the wait duration.
* @param timeUnit the time unit for the {@code waitDuration}.
* @param retries the number of times to retry and wait for messages to flush.
*/
void flushSynchronously(int waitDuration, TimeUnit timeUnit, int retries) {

LOG.debug("Attempting to flush messages in [{}/{}] with [{}] retries", waitDuration, timeUnit, retries);

for (int i = 0; i <= retries; i++) {
if (!flushingInProgress()) {
LOG.debug("Successfully flushed messages. Shutting down now.");
return;
}

LOG.debug("Flushing in progress. [{}] messages are still enqueued, and [{}] messages are still in-flight.",
queue.size(), inflightSends.get());

try {
timeUnit.sleep(waitDuration);
} catch (InterruptedException e) {
LOG.error("Interrupted message flushing during shutdown after [{}}] attempts.", i);
Thread.currentThread().interrupt();
return;
}
}
LOG.error("Failed to flush messages in [{}] attempts. Shutting down anyway.", retries);
}

/**
* @return {@code true} if messages are queued or in-flight.
*/
private boolean flushingInProgress() {

return (inflightSends != null && inflightSends.get() != 0) || !queue.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public GelfTcpTransport(GelfConfiguration config) {
@Override
protected void createBootstrap(final EventLoopGroup workerGroup) {
final Bootstrap bootstrap = new Bootstrap();
final GelfSenderThread senderThread = new GelfSenderThread(queue, config.getMaxInflightSends());
senderThreadReference.set(new GelfSenderThread(queue, config.getMaxInflightSends()));

bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
Expand Down Expand Up @@ -102,13 +102,13 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Excep

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
senderThread.start(ctx.channel());
senderThreadReference.get().start(ctx.channel());
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOG.info("Channel disconnected!");
senderThread.stop();
senderThreadReference.get().stop();
scheduleReconnect(ctx.channel().eventLoop());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.graylog2.gelfclient.GelfMessage;

import java.util.concurrent.TimeUnit;

/**
* A common interface for all GELF network transports.
*/
Expand All @@ -42,7 +44,23 @@ public interface GelfTransport {
boolean trySend(GelfMessage message);

/**
* Stops the transport. Should be used to gracefully shutdown the backend.
* Stops the transport. Can be used to gracefully shutdown the backend.
*/
void stop();

/**
* Blocks and stops the transport after flushing/sending all enqueued messages.
* Blocking occurs until either all messages are flushed, or the indicated {@code waitDuration}, {@code timeUnit}
* and {@code retries} have elapsed.
*
* Each retry waits for the indicated {@code waitDuration} and {@code timeUnit} again.
*
* This can be used to gracefully shutdown the backend.
*
* @param waitDuration the wait duration.
* @param timeUnit the time unit for the {@code waitDuration}.
* @param retries the number of times to retry and wait for messages to flush. Zero retries indicates that
* one initial attempt will be made.
*/
void flushAndStopSynchronously(int waitDuration, TimeUnit timeUnit, int retries);
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public GelfUdpTransport(final GelfConfiguration config) {
@Override
protected void createBootstrap(final EventLoopGroup workerGroup) {
final Bootstrap bootstrap = new Bootstrap();
final GelfSenderThread senderThread = new GelfSenderThread(queue, config.getMaxInflightSends());
senderThreadReference.set(new GelfSenderThread(queue, config.getMaxInflightSends()));

bootstrap.group(workerGroup)
.channel(NioDatagramChannel.class)
Expand Down Expand Up @@ -81,12 +81,12 @@ protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throw

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
senderThread.start(ctx.channel());
senderThreadReference.get().start(ctx.channel());
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
senderThread.stop();
senderThreadReference.get().stop();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.graylog2.gelfclient.GelfConfiguration;
import org.testng.annotations.Test;

import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertEquals;

public class AbstractGelfTransportTest {
Expand All @@ -34,6 +36,11 @@ protected void createBootstrap(EventLoopGroup workerGroup) {
final NioEventLoopGroup eventLoopGroup = (NioEventLoopGroup) workerGroup;
assertEquals(threads, eventLoopGroup.executorCount());
}

@Override
public void flushAndStopSynchronously(int waitDuration, TimeUnit timeUnit, int retries) {
super.flushAndStopSynchronously(waitDuration, timeUnit, retries);
}
};
transport.stop();
}
Expand Down

0 comments on commit 484642c

Please sign in to comment.