Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't check ES cluster health when flushing messages #3927

Merged
merged 2 commits into from Jun 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -47,7 +47,7 @@
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -109,9 +109,9 @@ public List<String> analyze(String toAnalyze, String index, String analyzer) thr
return terms;
}

public boolean bulkIndex(final List<Map.Entry<IndexSet, Message>> messageList) {
public List<String> bulkIndex(final List<Map.Entry<IndexSet, Message>> messageList) {
if (messageList.isEmpty()) {
return true;
return Collections.emptyList();
}

final Bulk.Builder bulk = new Bulk.Builder();
Expand All @@ -125,14 +125,18 @@ public boolean bulkIndex(final List<Map.Entry<IndexSet, Message>> messageList) {
}

final BulkResult result = runBulkRequest(bulk.build(), messageList.size());
final List<BulkResult.BulkResultItem> failedItems = result.getFailedItems();

LOG.debug("Index: Bulk indexed {} messages, took {} ms, failures: {}",
result.getItems().size(), result, result.getFailedItems().size());
if (!result.getFailedItems().isEmpty()) {
propagateFailure(result.getFailedItems(), messageList, result.getErrorMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Index: Bulk indexed {} messages, took {} ms, failures: {}",
result.getItems().size(), result, failedItems.size());
}

return result.getFailedItems().isEmpty();
if (!failedItems.isEmpty()) {
return propagateFailure(failedItems, messageList, result.getErrorMessage());
} else {
return Collections.emptyList();
}
}

private BulkResult runBulkRequest(final Bulk request, int count) {
Expand All @@ -149,12 +153,13 @@ private BulkResult runBulkRequest(final Bulk request, int count) {
}
}

private void propagateFailure(List<BulkResult.BulkResultItem> items, List<Map.Entry<IndexSet, Message>> messageList, String errorMessage) {
final List<IndexFailure> indexFailures = new LinkedList<>();
private List<String> propagateFailure(List<BulkResult.BulkResultItem> items, List<Map.Entry<IndexSet, Message>> messageList, String errorMessage) {
final Map<String, Message> messageMap = messageList.stream()
.map(Map.Entry::getValue)
.distinct()
.collect(Collectors.toMap(Message::getId, Function.identity()));
final List<String> failedMessageIds = new ArrayList<>(items.size());
final List<IndexFailure> indexFailures = new ArrayList<>(items.size());
for (BulkResult.BulkResultItem item : items) {
LOG.trace("Failed to index message: {}", item.error);

Expand All @@ -169,6 +174,8 @@ private void propagateFailure(List<BulkResult.BulkResultItem> items, List<Map.En
.build();

indexFailures.add(new IndexFailureImpl(doc));

failedMessageIds.add(item.id);
}

LOG.error("Failed to index [{}] messages. Please check the index error log in your web interface for the reason. Error: {}",
Expand All @@ -180,6 +187,8 @@ private void propagateFailure(List<BulkResult.BulkResultItem> items, List<Map.En
} catch (InterruptedException e) {
LOG.warn("Couldn't save index failures.", e);
}

return failedMessageIds;
}

public Index prepareIndexRequest(String index, Map<String, Object> source, String id) {
Expand Down
Expand Up @@ -20,12 +20,10 @@
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.cluster.Cluster;
import org.graylog2.indexer.messages.Messages;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.configuration.Configuration;
Expand All @@ -35,9 +33,9 @@
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -47,11 +45,11 @@
// Singleton class
public class BlockingBatchedESOutput extends ElasticSearchOutput {
private static final Logger log = LoggerFactory.getLogger(BlockingBatchedESOutput.class);
private final Cluster cluster;
private final int maxBufferSize;
private final Timer processTime;
private final Histogram batchSize;
private final Meter bufferFlushes;
private final Meter bufferFlushFailures;
private final Meter bufferFlushesRequested;

private volatile List<Map.Entry<IndexSet, Message>> buffer;
Expand All @@ -63,30 +61,28 @@ public class BlockingBatchedESOutput extends ElasticSearchOutput {
@AssistedInject
public BlockingBatchedESOutput(MetricRegistry metricRegistry,
Messages messages,
Cluster cluster,
org.graylog2.Configuration serverConfiguration,
Journal journal,
@Assisted Stream stream,
@Assisted Configuration configuration) {
this(metricRegistry, messages, cluster, serverConfiguration, journal);
this(metricRegistry, messages, serverConfiguration, journal);
}

@Inject
public BlockingBatchedESOutput(MetricRegistry metricRegistry,
Messages messages,
Cluster cluster,
org.graylog2.Configuration serverConfiguration,
Journal journal) {
super(metricRegistry, messages, journal);
this.cluster = cluster;
this.maxBufferSize = serverConfiguration.getOutputBatchSize();
outputFlushInterval = serverConfiguration.getOutputFlushInterval();
this.processTime = metricRegistry.timer(name(this.getClass(), "processTime"));
this.batchSize = metricRegistry.histogram(name(this.getClass(), "batchSize"));
this.bufferFlushes = metricRegistry.meter(name(this.getClass(), "bufferFlushes"));
this.bufferFlushFailures = metricRegistry.meter(name(this.getClass(), "bufferFlushFailures"));
this.bufferFlushesRequested = metricRegistry.meter(name(this.getClass(), "bufferFlushesRequested"));

buffer = Lists.newArrayListWithCapacity(maxBufferSize);
buffer = new ArrayList<>(maxBufferSize);

}

Expand All @@ -104,7 +100,7 @@ public void writeMessageEntry(Map.Entry<IndexSet, Message> entry) throws Excepti

if (buffer.size() >= maxBufferSize) {
flushBatch = buffer;
buffer = Lists.newArrayListWithCapacity(maxBufferSize);
buffer = new ArrayList<>(maxBufferSize);
}
}
// if the current thread found it had to flush any messages, it does so but blocks.
Expand All @@ -116,21 +112,17 @@ public void writeMessageEntry(Map.Entry<IndexSet, Message> entry) throws Excepti
}

private void flush(List<Map.Entry<IndexSet, Message>> messages) {
if (!cluster.isConnected() || !cluster.isDeflectorHealthy()) {
try {
cluster.waitForConnectedAndDeflectorHealthy();
} catch (TimeoutException | InterruptedException e) {
log.warn("Error while waiting for healthy Elasticsearch cluster. Not flushing.", e);
return;
}
}
// never try to flush an empty buffer
if (messages.size() == 0) {
if (messages.isEmpty()) {
return;
}
log.debug("Starting flushing {} messages, flush threads active {}",
messages.size(),
activeFlushThreads.incrementAndGet());

activeFlushThreads.incrementAndGet();
if (log.isDebugEnabled()) {
log.debug("Starting flushing {} messages, flush threads active {}",
messages.size(),
activeFlushThreads.get());
}

try (Timer.Context ignored = processTime.time()) {
lastFlushTime.set(System.nanoTime());
Expand All @@ -139,18 +131,13 @@ private void flush(List<Map.Entry<IndexSet, Message>> messages) {
bufferFlushes.mark();
} catch (Exception e) {
log.error("Unable to flush message buffer", e);
bufferFlushFailures.mark();
}
activeFlushThreads.decrementAndGet();
log.debug("Flushing {} messages completed", messages.size());
}

public void forceFlushIfTimedout() {
if (!cluster.isConnected() || !cluster.isDeflectorHealthy()) {
// do not actually try to flush, because that will block until the cluster comes back.
// simply check and return.
log.debug("Cluster unavailable, but not blocking for periodic flush attempt. This will try again.");
return;
}
// if we shouldn't flush at all based on the last flush time, no need to synchronize on this.
if (lastFlushTime.get() != 0 &&
outputFlushInterval > NANOSECONDS.toSeconds(System.nanoTime() - lastFlushTime.get())) {
Expand All @@ -160,7 +147,7 @@ public void forceFlushIfTimedout() {
final List<Map.Entry<IndexSet, Message>> flushBatch;
synchronized (this) {
flushBatch = buffer;
buffer = Lists.newArrayListWithCapacity(maxBufferSize);
buffer = new ArrayList<>(maxBufferSize);
}
if (flushBatch != null) {
bufferFlushesRequested.mark();
Expand Down
Expand Up @@ -19,8 +19,6 @@
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Joiner;
import com.google.common.collect.Ordering;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import org.graylog2.indexer.IndexSet;
Expand All @@ -36,21 +34,25 @@

import javax.inject.Inject;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.codahale.metrics.MetricRegistry.name;

public class ElasticSearchOutput implements MessageOutput {
private static final String WRITES_METRICNAME = name(ElasticSearchOutput.class, "writes");
private static final String FAILURES_METRICNAME = name(ElasticSearchOutput.class, "failures");
private static final String PROCESS_TIME_METRICNAME = name(ElasticSearchOutput.class, "processTime");

private static final String NAME = "ElasticSearch Output";
private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchOutput.class);

private final Meter writes;
private final Meter failures;
private final Timer processTime;
private final Messages messages;
private final Journal journal;
Expand All @@ -73,6 +75,7 @@ public ElasticSearchOutput(MetricRegistry metricRegistry,
this.journal = journal;
// Only constructing metrics here. write() get's another Core reference. (because this technically is a plugin)
this.writes = metricRegistry.meter(WRITES_METRICNAME);
this.failures = metricRegistry.meter(FAILURES_METRICNAME);
this.processTime = metricRegistry.timer(PROCESS_TIME_METRICNAME);

// Should be set in initialize once this becomes a real plugin.
Expand All @@ -94,19 +97,27 @@ public void write(List<Message> messageList) throws Exception {

public void writeMessageEntries(List<Map.Entry<IndexSet, Message>> messageList) throws Exception {
if (LOG.isTraceEnabled()) {
final List<String> sortedIds = Ordering.natural().sortedCopy(messageList.stream()
.map(entry -> entry.getValue().getId())
.collect(Collectors.toList()));
LOG.trace("Writing message ids to [{}]: <{}>", NAME, Joiner.on(", ").join(sortedIds));
final String sortedIds = messageList.stream()
.map(Map.Entry::getValue)
.map(Message::getId)
.sorted(Comparator.naturalOrder())
.collect(Collectors.joining(", "));
LOG.trace("Writing message ids to [{}]: <{}>", NAME, sortedIds);
}

writes.mark(messageList.size());
final List<String> failedMessageIds;
try (final Timer.Context ignored = processTime.time()) {
messages.bulkIndex(messageList);
}
for (final Map.Entry<IndexSet, Message> entry : messageList) {
journal.markJournalOffsetCommitted(entry.getValue().getJournalOffset());
failedMessageIds = messages.bulkIndex(messageList);
}
failures.mark(failedMessageIds.size());

final Optional<Long> offset = messageList.stream()
.map(Map.Entry::getValue)
.map(Message::getJournalOffset)
.max(Long::compare);

offset.ifPresent(journal::markJournalOffsetCommitted);
}

@Override
Expand Down
Expand Up @@ -21,7 +21,6 @@
import com.google.common.collect.Maps;
import org.graylog2.Configuration;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.cluster.Cluster;
import org.graylog2.indexer.messages.Messages;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.Tools;
Expand All @@ -37,12 +36,9 @@
import java.util.Map;

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class BlockingBatchedESOutputTest {
@Rule
Expand All @@ -54,8 +50,6 @@ public class BlockingBatchedESOutputTest {

@Mock
private Messages messages;
@Mock
private Cluster cluster;

@Before
public void setUp() throws Exception {
Expand All @@ -71,10 +65,7 @@ public int getOutputBatchSize() {

@Test
public void write() throws Exception {
when(cluster.isConnected()).thenReturn(true);
when(cluster.isDeflectorHealthy()).thenReturn(true);

final BlockingBatchedESOutput output = new BlockingBatchedESOutput(metricRegistry, messages, cluster, config, journal);
final BlockingBatchedESOutput output = new BlockingBatchedESOutput(metricRegistry, messages, config, journal);

final List<Map.Entry<IndexSet, Message>> messageList = buildMessages(config.getOutputBatchSize());

Expand All @@ -85,53 +76,9 @@ public void write() throws Exception {
verify(messages, times(1)).bulkIndex(eq(messageList));
}

@Test
public void writeDoesNotFlushIfClusterIsNotConnected() throws Exception {
when(cluster.isConnected()).thenReturn(false);

doThrow(RuntimeException.class).when(cluster).waitForConnectedAndDeflectorHealthy();

final BlockingBatchedESOutput output = new BlockingBatchedESOutput(metricRegistry, messages, cluster, config, journal);

final List<Map.Entry<IndexSet, Message>> messageList = buildMessages(config.getOutputBatchSize());

for (Map.Entry<IndexSet, Message> entry : messageList) {
try {
output.writeMessageEntry(entry);
} catch(RuntimeException ignore) {
}
}

verify(messages, never()).bulkIndex(eq(messageList));
}

@Test
public void writeDoesNotFlushIfClusterIsUnhealthy() throws Exception {
when(cluster.isConnected()).thenReturn(true);
when(cluster.isDeflectorHealthy()).thenReturn(false);

doThrow(RuntimeException.class).when(cluster).waitForConnectedAndDeflectorHealthy();

final BlockingBatchedESOutput output = new BlockingBatchedESOutput(metricRegistry, messages, cluster, config, journal);

final List<Map.Entry<IndexSet, Message>> messageList = buildMessages(config.getOutputBatchSize());

for (Map.Entry<IndexSet, Message> entry : messageList) {
try {
output.writeMessageEntry(entry);
} catch(RuntimeException ignore) {
}
}

verify(messages, never()).bulkIndex(eq(messageList));
}

@Test
public void forceFlushIfTimedOut() throws Exception {
when(cluster.isConnected()).thenReturn(true);
when(cluster.isDeflectorHealthy()).thenReturn(true);

final BlockingBatchedESOutput output = new BlockingBatchedESOutput(metricRegistry, messages, cluster, config, journal);
final BlockingBatchedESOutput output = new BlockingBatchedESOutput(metricRegistry, messages, config, journal);

final List<Map.Entry<IndexSet, Message>> messageList = buildMessages(config.getOutputBatchSize() - 1);

Expand Down