Skip to content

Commit

Permalink
let the default output block instead of being time limited
Browse files Browse the repository at this point in the history
* only the default output can block, the stream outputs still have a time limit so they don't block the system writing messages
  • Loading branch information
kroepke committed Dec 30, 2014
1 parent af205ba commit 7c5632a
Showing 1 changed file with 56 additions and 8 deletions.
Expand Up @@ -22,10 +22,12 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.Inject;
import com.lmax.disruptor.WorkHandler;
import org.graylog2.Configuration;
import org.graylog2.outputs.CachedOutputRouter;
import org.graylog2.outputs.DefaultMessageOutput;
import org.graylog2.outputs.OutputRouter;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.ServerStatus;
Expand All @@ -38,6 +40,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -62,17 +65,20 @@ public class OutputBufferProcessor implements WorkHandler<MessageEvent> {
private final Timer processTime;

private final OutputRouter outputRouter;
private final MessageOutput defaultMessageOutput;

@Inject
public OutputBufferProcessor(Configuration configuration,
MetricRegistry metricRegistry,
ThroughputStats throughputStats,
ServerStatus serverStatus,
CachedOutputRouter outputRouter) {
CachedOutputRouter outputRouter,
@DefaultMessageOutput MessageOutput defaultMessageOutput) {
this.configuration = configuration;
this.throughputStats = throughputStats;
this.serverStatus = serverStatus;
this.outputRouter = outputRouter;
this.defaultMessageOutput = defaultMessageOutput;

final String nameFormat = "outputbuffer-processor-executor-%d";
final int corePoolSize = configuration.getOutputBufferProcessorThreadsCorePoolSize();
Expand All @@ -95,6 +101,24 @@ private ExecutorService executorService(final MetricRegistry metricRegistry, fin
name(this.getClass(), "executor-service"));
}

/**
* Each message will be written to one or more outputs.
* <p>
* The default output is always being used for every message, but optionally the message can be routed to additional
* outputs, currently based on the stream outputs that are configured in the system.
* </p>
* <p>
* The stream outputs are time limited so one bad output does not impact throughput too much. Essentially this means
* that the work of writing to the outputs is performed, but the writer threads will not wait forever for stream
* outputs to finish their work. <b>This might lead to increased memory usage!</b>
* </p>
* <p>
* The default output, however, is allowed to block and is not subject to time limiting. This is important because it
* can exert back pressure on the processing pipeline this way, making sure we don't run into excessive heap usage.
* </p>
* @param event the message to write to outputs
* @throws Exception
*/
@Override
public void onEvent(MessageEvent event) throws Exception {
incomingMessages.mark();
Expand All @@ -108,7 +132,11 @@ public void onEvent(MessageEvent event) throws Exception {

final Set<MessageOutput> messageOutputs = outputRouter.getOutputsForMessage(msg);
msg.recordCounter(serverStatus, "matched-outputs", messageOutputs.size());
final CountDownLatch doneSignal = new CountDownLatch(messageOutputs.size());

// minus one, because the default output does not count against the time limited outputs, and is always included
final CountDownLatch streamOutputsDoneSignal = new CountDownLatch(messageOutputs.size() - 1);

Future<?> defaultOutputCompletion = null;
for (final MessageOutput output : messageOutputs) {
if (output == null) {
LOG.error("Got null output!");
Expand All @@ -120,33 +148,53 @@ public void onEvent(MessageEvent event) throws Exception {
}
continue;
}

final boolean isDefaultOutput = defaultMessageOutput.equals(output);

try {
LOG.debug("Writing message to [{}].", output.getClass());
if (LOG.isTraceEnabled()) {
LOG.trace("Message id for [{}]: <{}>", output.getClass(), msg.getId());
}
executor.submit(new Runnable() {
final Future<?> future = executor.submit(new Runnable() {
@Override
public void run() {
try (Timer.Context ignored = processTime.time()) {
output.write(msg);
} catch (Exception e) {
LOG.error("Error in output [" + output.getClass() + "].", e);
} finally {
doneSignal.countDown();
// do not touch the latch if this is the default output!
// we use the returned future to block on its completion.
if (!isDefaultOutput) {
streamOutputsDoneSignal.countDown();
}
}
}
});
if (isDefaultOutput) {
// save the future so we can wait for its completion below, this implements the blocking behavior
defaultOutputCompletion = future;
}

} catch (Exception e) {
LOG.error("Could not write message batch to output [" + output.getClass() + "].", e);
doneSignal.countDown();
streamOutputsDoneSignal.countDown();
}
}

// Wait until all writer threads have finished or timeout is reached.
if (!doneSignal.await(configuration.getOutputModuleTimeout(), TimeUnit.MILLISECONDS)) {
LOG.warn("Timeout reached. Not waiting any longer for writer threads to complete.");
// Wait until all writer threads for stream outputs have finished or timeout is reached.
if (!streamOutputsDoneSignal.await(configuration.getOutputModuleTimeout(), TimeUnit.MILLISECONDS)) {
LOG.warn("Timeout reached. Not waiting any longer for stream output writer threads to complete.");
}

// now block until the default output has finished. most batching outputs will already been done because their
// fast path is really fast (usually an insert into a queue), but the slow flush path might block for a long time
// this exerts the back pressure to the system
if (defaultOutputCompletion != null) {
Uninterruptibles.getUninterruptibly(defaultOutputCompletion);
} else {
LOG.error("The default output future was null, this is a bug!");
}

if (msg.hasRecordings()) {
Expand Down

0 comments on commit 7c5632a

Please sign in to comment.