From ea305631895ddd2ccfa4c7d4782ae362fbde958f Mon Sep 17 00:00:00 2001 From: Jochen Schalanda Date: Wed, 21 Jun 2017 14:29:24 +0200 Subject: [PATCH 1/2] Don't check ES cluster health when flushing messages Checking the ES cluster health before flushing messages was important while using the embedded Elasticsearch client node because it would block processing until the cluster is available and healthy (YELLOW or GREEN). After migrating to an HTTP-based Elasticsearch client, this isn't necessary anymore. The client will simply fail to index the messages. Additionally, this changeset only marks those messages as committed, which could be successfully indexed. Before this change, all messages of a batch were marked as committed and removed from the journal, whether they've been indexed or not. --- .../graylog2/indexer/messages/Messages.java | 29 ++++++---- .../outputs/BlockingBatchedESOutput.java | 45 ++++++--------- .../graylog2/outputs/ElasticSearchOutput.java | 26 ++++++--- .../outputs/BlockingBatchedESOutputTest.java | 57 +------------------ 4 files changed, 55 insertions(+), 102 deletions(-) diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java b/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java index a9db4c517991..456c716d8c06 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java @@ -47,7 +47,7 @@ import java.io.IOException; import java.net.SocketTimeoutException; import java.util.ArrayList; -import java.util.LinkedList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -109,9 +109,9 @@ public List analyze(String toAnalyze, String index, String analyzer) thr return terms; } - public boolean bulkIndex(final List> messageList) { + public List bulkIndex(final List> messageList) { if (messageList.isEmpty()) { - return true; + return Collections.emptyList(); } final Bulk.Builder bulk = new Bulk.Builder(); @@ -125,14 +125,18 @@ public boolean bulkIndex(final List> messageList) { } final BulkResult result = runBulkRequest(bulk.build(), messageList.size()); + final List failedItems = result.getFailedItems(); - LOG.debug("Index: Bulk indexed {} messages, took {} ms, failures: {}", - result.getItems().size(), result, result.getFailedItems().size()); - if (!result.getFailedItems().isEmpty()) { - propagateFailure(result.getFailedItems(), messageList, result.getErrorMessage()); + if (LOG.isDebugEnabled()) { + LOG.debug("Index: Bulk indexed {} messages, took {} ms, failures: {}", + result.getItems().size(), result, failedItems.size()); } - return result.getFailedItems().isEmpty(); + if (!failedItems.isEmpty()) { + return propagateFailure(failedItems, messageList, result.getErrorMessage()); + } else { + return Collections.emptyList(); + } } private BulkResult runBulkRequest(final Bulk request, int count) { @@ -149,12 +153,13 @@ private BulkResult runBulkRequest(final Bulk request, int count) { } } - private void propagateFailure(List items, List> messageList, String errorMessage) { - final List indexFailures = new LinkedList<>(); + private List propagateFailure(List items, List> messageList, String errorMessage) { final Map messageMap = messageList.stream() .map(Map.Entry::getValue) .distinct() .collect(Collectors.toMap(Message::getId, Function.identity())); + final List failedMessageIds = new ArrayList<>(items.size()); + final List indexFailures = new ArrayList<>(items.size()); for (BulkResult.BulkResultItem item : items) { LOG.trace("Failed to index message: {}", item.error); @@ -169,6 +174,8 @@ private void propagateFailure(List items, List items, List source, String id) { diff --git a/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java b/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java index 8ac89a3d24c4..aa4dac2b3360 100644 --- a/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java +++ b/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java @@ -20,12 +20,10 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.AssistedInject; import org.graylog2.indexer.IndexSet; -import org.graylog2.indexer.cluster.Cluster; import org.graylog2.indexer.messages.Messages; import org.graylog2.plugin.Message; import org.graylog2.plugin.configuration.Configuration; @@ -35,9 +33,9 @@ import org.slf4j.LoggerFactory; import javax.inject.Inject; +import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -47,11 +45,11 @@ // Singleton class public class BlockingBatchedESOutput extends ElasticSearchOutput { private static final Logger log = LoggerFactory.getLogger(BlockingBatchedESOutput.class); - private final Cluster cluster; private final int maxBufferSize; private final Timer processTime; private final Histogram batchSize; private final Meter bufferFlushes; + private final Meter bufferFlushFailures; private final Meter bufferFlushesRequested; private volatile List> buffer; @@ -63,30 +61,28 @@ public class BlockingBatchedESOutput extends ElasticSearchOutput { @AssistedInject public BlockingBatchedESOutput(MetricRegistry metricRegistry, Messages messages, - Cluster cluster, org.graylog2.Configuration serverConfiguration, Journal journal, @Assisted Stream stream, @Assisted Configuration configuration) { - this(metricRegistry, messages, cluster, serverConfiguration, journal); + this(metricRegistry, messages, serverConfiguration, journal); } @Inject public BlockingBatchedESOutput(MetricRegistry metricRegistry, Messages messages, - Cluster cluster, org.graylog2.Configuration serverConfiguration, Journal journal) { super(metricRegistry, messages, journal); - this.cluster = cluster; this.maxBufferSize = serverConfiguration.getOutputBatchSize(); outputFlushInterval = serverConfiguration.getOutputFlushInterval(); this.processTime = metricRegistry.timer(name(this.getClass(), "processTime")); this.batchSize = metricRegistry.histogram(name(this.getClass(), "batchSize")); this.bufferFlushes = metricRegistry.meter(name(this.getClass(), "bufferFlushes")); + this.bufferFlushFailures = metricRegistry.meter(name(this.getClass(), "bufferFlushFailures")); this.bufferFlushesRequested = metricRegistry.meter(name(this.getClass(), "bufferFlushesRequested")); - buffer = Lists.newArrayListWithCapacity(maxBufferSize); + buffer = new ArrayList<>(maxBufferSize); } @@ -104,7 +100,7 @@ public void writeMessageEntry(Map.Entry entry) throws Excepti if (buffer.size() >= maxBufferSize) { flushBatch = buffer; - buffer = Lists.newArrayListWithCapacity(maxBufferSize); + buffer = new ArrayList<>(maxBufferSize); } } // if the current thread found it had to flush any messages, it does so but blocks. @@ -116,21 +112,17 @@ public void writeMessageEntry(Map.Entry entry) throws Excepti } private void flush(List> messages) { - if (!cluster.isConnected() || !cluster.isDeflectorHealthy()) { - try { - cluster.waitForConnectedAndDeflectorHealthy(); - } catch (TimeoutException | InterruptedException e) { - log.warn("Error while waiting for healthy Elasticsearch cluster. Not flushing.", e); - return; - } - } // never try to flush an empty buffer - if (messages.size() == 0) { + if (messages.isEmpty()) { return; } - log.debug("Starting flushing {} messages, flush threads active {}", - messages.size(), - activeFlushThreads.incrementAndGet()); + + activeFlushThreads.incrementAndGet(); + if (log.isDebugEnabled()) { + log.debug("Starting flushing {} messages, flush threads active {}", + messages.size(), + activeFlushThreads.get()); + } try (Timer.Context ignored = processTime.time()) { lastFlushTime.set(System.nanoTime()); @@ -139,18 +131,13 @@ private void flush(List> messages) { bufferFlushes.mark(); } catch (Exception e) { log.error("Unable to flush message buffer", e); + bufferFlushFailures.mark(); } activeFlushThreads.decrementAndGet(); log.debug("Flushing {} messages completed", messages.size()); } public void forceFlushIfTimedout() { - if (!cluster.isConnected() || !cluster.isDeflectorHealthy()) { - // do not actually try to flush, because that will block until the cluster comes back. - // simply check and return. - log.debug("Cluster unavailable, but not blocking for periodic flush attempt. This will try again."); - return; - } // if we shouldn't flush at all based on the last flush time, no need to synchronize on this. if (lastFlushTime.get() != 0 && outputFlushInterval > NANOSECONDS.toSeconds(System.nanoTime() - lastFlushTime.get())) { @@ -160,7 +147,7 @@ public void forceFlushIfTimedout() { final List> flushBatch; synchronized (this) { flushBatch = buffer; - buffer = Lists.newArrayListWithCapacity(maxBufferSize); + buffer = new ArrayList<>(maxBufferSize); } if (flushBatch != null) { bufferFlushesRequested.mark(); diff --git a/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java b/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java index 07cdb850a314..d4304f7df45c 100644 --- a/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java +++ b/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java @@ -19,8 +19,6 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; -import com.google.common.base.Joiner; -import com.google.common.collect.Ordering; import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.AssistedInject; import org.graylog2.indexer.IndexSet; @@ -36,6 +34,7 @@ import javax.inject.Inject; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,12 +44,14 @@ public class ElasticSearchOutput implements MessageOutput { private static final String WRITES_METRICNAME = name(ElasticSearchOutput.class, "writes"); + private static final String FAILURES_METRICNAME = name(ElasticSearchOutput.class, "failures"); private static final String PROCESS_TIME_METRICNAME = name(ElasticSearchOutput.class, "processTime"); private static final String NAME = "ElasticSearch Output"; private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchOutput.class); private final Meter writes; + private final Meter failures; private final Timer processTime; private final Messages messages; private final Journal journal; @@ -73,6 +74,7 @@ public ElasticSearchOutput(MetricRegistry metricRegistry, this.journal = journal; // Only constructing metrics here. write() get's another Core reference. (because this technically is a plugin) this.writes = metricRegistry.meter(WRITES_METRICNAME); + this.failures = metricRegistry.meter(FAILURES_METRICNAME); this.processTime = metricRegistry.timer(PROCESS_TIME_METRICNAME); // Should be set in initialize once this becomes a real plugin. @@ -94,18 +96,26 @@ public void write(List messageList) throws Exception { public void writeMessageEntries(List> messageList) throws Exception { if (LOG.isTraceEnabled()) { - final List sortedIds = Ordering.natural().sortedCopy(messageList.stream() - .map(entry -> entry.getValue().getId()) - .collect(Collectors.toList())); - LOG.trace("Writing message ids to [{}]: <{}>", NAME, Joiner.on(", ").join(sortedIds)); + final String sortedIds = messageList.stream() + .map(Map.Entry::getValue) + .map(Message::getId) + .sorted(Comparator.naturalOrder()) + .collect(Collectors.joining(", ")); + LOG.trace("Writing message ids to [{}]: <{}>", NAME, sortedIds); } writes.mark(messageList.size()); + final List failedMessageIds; try (final Timer.Context ignored = processTime.time()) { - messages.bulkIndex(messageList); + failedMessageIds = messages.bulkIndex(messageList); } + failures.mark(failedMessageIds.size()); + for (final Map.Entry entry : messageList) { - journal.markJournalOffsetCommitted(entry.getValue().getJournalOffset()); + final Message message = entry.getValue(); + if (!failedMessageIds.contains(message.getId())) { + journal.markJournalOffsetCommitted(message.getJournalOffset()); + } } } diff --git a/graylog2-server/src/test/java/org/graylog2/outputs/BlockingBatchedESOutputTest.java b/graylog2-server/src/test/java/org/graylog2/outputs/BlockingBatchedESOutputTest.java index e61aa7baef77..5c686ce6ac20 100644 --- a/graylog2-server/src/test/java/org/graylog2/outputs/BlockingBatchedESOutputTest.java +++ b/graylog2-server/src/test/java/org/graylog2/outputs/BlockingBatchedESOutputTest.java @@ -21,7 +21,6 @@ import com.google.common.collect.Maps; import org.graylog2.Configuration; import org.graylog2.indexer.IndexSet; -import org.graylog2.indexer.cluster.Cluster; import org.graylog2.indexer.messages.Messages; import org.graylog2.plugin.Message; import org.graylog2.plugin.Tools; @@ -37,12 +36,9 @@ import java.util.Map; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; public class BlockingBatchedESOutputTest { @Rule @@ -54,8 +50,6 @@ public class BlockingBatchedESOutputTest { @Mock private Messages messages; - @Mock - private Cluster cluster; @Before public void setUp() throws Exception { @@ -71,10 +65,7 @@ public int getOutputBatchSize() { @Test public void write() throws Exception { - when(cluster.isConnected()).thenReturn(true); - when(cluster.isDeflectorHealthy()).thenReturn(true); - - final BlockingBatchedESOutput output = new BlockingBatchedESOutput(metricRegistry, messages, cluster, config, journal); + final BlockingBatchedESOutput output = new BlockingBatchedESOutput(metricRegistry, messages, config, journal); final List> messageList = buildMessages(config.getOutputBatchSize()); @@ -85,53 +76,9 @@ public void write() throws Exception { verify(messages, times(1)).bulkIndex(eq(messageList)); } - @Test - public void writeDoesNotFlushIfClusterIsNotConnected() throws Exception { - when(cluster.isConnected()).thenReturn(false); - - doThrow(RuntimeException.class).when(cluster).waitForConnectedAndDeflectorHealthy(); - - final BlockingBatchedESOutput output = new BlockingBatchedESOutput(metricRegistry, messages, cluster, config, journal); - - final List> messageList = buildMessages(config.getOutputBatchSize()); - - for (Map.Entry entry : messageList) { - try { - output.writeMessageEntry(entry); - } catch(RuntimeException ignore) { - } - } - - verify(messages, never()).bulkIndex(eq(messageList)); - } - - @Test - public void writeDoesNotFlushIfClusterIsUnhealthy() throws Exception { - when(cluster.isConnected()).thenReturn(true); - when(cluster.isDeflectorHealthy()).thenReturn(false); - - doThrow(RuntimeException.class).when(cluster).waitForConnectedAndDeflectorHealthy(); - - final BlockingBatchedESOutput output = new BlockingBatchedESOutput(metricRegistry, messages, cluster, config, journal); - - final List> messageList = buildMessages(config.getOutputBatchSize()); - - for (Map.Entry entry : messageList) { - try { - output.writeMessageEntry(entry); - } catch(RuntimeException ignore) { - } - } - - verify(messages, never()).bulkIndex(eq(messageList)); - } - @Test public void forceFlushIfTimedOut() throws Exception { - when(cluster.isConnected()).thenReturn(true); - when(cluster.isDeflectorHealthy()).thenReturn(true); - - final BlockingBatchedESOutput output = new BlockingBatchedESOutput(metricRegistry, messages, cluster, config, journal); + final BlockingBatchedESOutput output = new BlockingBatchedESOutput(metricRegistry, messages, config, journal); final List> messageList = buildMessages(config.getOutputBatchSize() - 1); From cfb787f04b06fe08b6b539a5abef96c96cebd919 Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Fri, 23 Jun 2017 10:40:32 +0200 Subject: [PATCH 2/2] Only committing highest journal offset. --- .../org/graylog2/outputs/ElasticSearchOutput.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java b/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java index d4304f7df45c..c6aef12d1df6 100644 --- a/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java +++ b/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java @@ -37,6 +37,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -111,12 +112,12 @@ public void writeMessageEntries(List> messageList) } failures.mark(failedMessageIds.size()); - for (final Map.Entry entry : messageList) { - final Message message = entry.getValue(); - if (!failedMessageIds.contains(message.getId())) { - journal.markJournalOffsetCommitted(message.getJournalOffset()); - } - } + final Optional offset = messageList.stream() + .map(Map.Entry::getValue) + .map(Message::getJournalOffset) + .max(Long::compare); + + offset.ifPresent(journal::markJournalOffsetCommitted); } @Override