Skip to content

Commit

Permalink
Handle Request Entity Too Large errors in ElasticSearchOutput
Browse files Browse the repository at this point in the history
If we try to bulk index a batch of messages that exceeds the
elastic search `bulk_max_body_size` setting. (default 100MB)
Elastic will respond with an HTTP 413 Entity Too Large error.

In this case we retry the request by splitting the message batch
in half.

When responding with an HTTP 413 error, the server is allowed to close the connection
immediately. This means that our HTTP client (Jest) will simply report
an IOException (Broken pipe) instead of the actual error.
This can be avoided by sending the request with an Expect-Continue
header, which also avoids sending data that will be discarded later on.

Fixes #5091
  • Loading branch information
mpfz0r committed Jan 2, 2020
1 parent c7b61f9 commit 4a88d56
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.client.http.JestHttpClient;
import io.searchbox.core.Bulk;
import io.searchbox.core.BulkResult;
import io.searchbox.core.DocumentResult;
import io.searchbox.core.Get;
import io.searchbox.core.Index;
import io.searchbox.indices.Analyze;
import org.apache.http.client.config.RequestConfig;
import org.graylog2.indexer.IndexFailure;
import org.graylog2.indexer.IndexFailureImpl;
import org.graylog2.indexer.IndexMapping;
Expand All @@ -52,6 +55,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -128,49 +132,98 @@ public List<String> analyze(String toAnalyze, String index, String analyzer) thr
return terms;
}

public List<String> bulkIndex(final List<Map.Entry<IndexSet, Message>> messageList) {
public List<String> bulkIndex(final List<Map.Entry<IndexSet, Message>> messageList) throws Exception {
return bulkIndex(messageList, false);
}

public List<String> bulkIndex(final List<Map.Entry<IndexSet, Message>> messageList, boolean isSystemTraffic) {
public List<String> bulkIndex(final List<Map.Entry<IndexSet, Message>> messageList, boolean isSystemTraffic) throws Exception {
if (messageList.isEmpty()) {
return Collections.emptyList();
}

final Bulk.Builder bulk = new Bulk.Builder();
for (Map.Entry<IndexSet, Message> entry : messageList) {
final Message message = entry.getValue();
if (isSystemTraffic) {
systemTrafficCounter.inc(message.getSize());
} else {
outputByteCounter.inc(message.getSize());
int chunkSize = messageList.size();
List<BulkResult.BulkResultItem> failedItems = new ArrayList<>();
int tries;
for (tries = 0; tries < 3; tries++) {
try {
failedItems = bulkIndexChunked(messageList, isSystemTraffic, chunkSize);
break; // on success
} catch (EntityTooLargeException e) {
LOG.warn("Bulk index failed with 'Request Entity Too Large' error. Retrying by splitting up batch size <{}>.", chunkSize);
if (tries == 0) {
LOG.warn("Consider lowering the \"output_batch_size\" setting.");
}
chunkSize /= 2;
}

bulk.addAction(new Index.Builder(message.toElasticSearchObject(invalidTimestampMeter))
.index(entry.getKey().getWriteIndexAlias())
.type(IndexMapping.TYPE_MESSAGE)
.id(message.getId())
.build());
}

final BulkResult result = runBulkRequest(bulk.build(), messageList.size());
final List<BulkResult.BulkResultItem> failedItems = result.getFailedItems();

if (LOG.isDebugEnabled()) {
LOG.debug("Index: Bulk indexed {} messages, took {} ms, failures: {}",
result.getItems().size(), result, failedItems.size());
if (tries == 3) {
throw new Exception("Still no success after splitting up output batch three times.");
}


if (!failedItems.isEmpty()) {
final Set<String> failedIds = failedItems.stream().map(item -> item.id).collect(Collectors.toSet());
recordTimestamp(messageList, failedIds);
return propagateFailure(failedItems, messageList, result.getErrorMessage());
return propagateFailure(failedItems, messageList);
} else {
recordTimestamp(messageList, Collections.emptySet());
return Collections.emptyList();
}
}

private List<BulkResult.BulkResultItem> bulkIndexChunked(final List<Map.Entry<IndexSet, Message>> messageList, boolean isSystemTraffic, int chunkSize) throws EntityTooLargeException {
chunkSize = Math.min(messageList.size(), chunkSize);

final List<BulkResult.BulkResultItem> failedItems = new ArrayList<>();
final Iterable<List<Map.Entry<IndexSet, Message>>> partition = Iterables.partition(messageList, chunkSize);
int partitionCount = 1;
for (List<Map.Entry<IndexSet, Message>> subMessageList: partition) {
Bulk.Builder bulk = new Bulk.Builder();

long messageSizes = 0;
for (Map.Entry<IndexSet, Message> entry : subMessageList) {
final Message message = entry.getValue();
messageSizes += message.getSize();

bulk.addAction(new Index.Builder(message.toElasticSearchObject(invalidTimestampMeter))
.index(entry.getKey().getWriteIndexAlias())
.type(IndexMapping.TYPE_MESSAGE)
.id(message.getId())
.build());
}

final BulkResult result = runBulkRequest(bulk.build(), subMessageList.size());

if (result.getResponseCode() == 413) {
throw new EntityTooLargeException();
}

// TODO should we check result.isSucceeded()?

if (isSystemTraffic) {
systemTrafficCounter.inc(messageSizes);
} else {
outputByteCounter.inc(messageSizes);
}
failedItems.addAll(result.getFailedItems());
if (LOG.isDebugEnabled()) {
String chunkInfo = "";
if (chunkSize != messageList.size()) {
chunkInfo = String.format(Locale.ROOT, " (chunk %d/%d)", partitionCount,
(int) Math.ceil((double)messageList.size() / chunkSize));
}
LOG.debug("Index: Bulk indexed {} messages{}, failures: {}",
result.getItems().size(), chunkInfo, failedItems.size());
}
if (!result.getFailedItems().isEmpty()) {
LOG.error("Failed to index [{}] messages. Please check the index error log in your web interface for the reason. Error: {}",
result.getFailedItems().size(), result.getErrorMessage());
}
partitionCount++;
}
return failedItems;
}

private void recordTimestamp(List<Map.Entry<IndexSet, Message>> messageList, Set<String> failedIds) {
for (final Map.Entry<IndexSet, Message> entry : messageList) {
final Message message = entry.getValue();
Expand All @@ -185,7 +238,9 @@ private void recordTimestamp(List<Map.Entry<IndexSet, Message>> messageList, Set

private BulkResult runBulkRequest(final Bulk request, int count) {
try {
return BULK_REQUEST_RETRYER.call(() -> client.execute(request));
// Enable Expect-Continue to catch 413 errors before we send the actual data
final RequestConfig requestConfig = RequestConfig.custom().setExpectContinueEnabled(true).build();
return BULK_REQUEST_RETRYER.call(() -> ((JestHttpClient)client).execute(request, requestConfig));
} catch (ExecutionException | RetryException e) {
if (e instanceof RetryException) {
LOG.error("Could not bulk index {} messages. Giving up after {} attempts.", count, ((RetryException) e).getNumberOfFailedAttempts());
Expand All @@ -196,7 +251,7 @@ private BulkResult runBulkRequest(final Bulk request, int count) {
}
}

private List<String> propagateFailure(List<BulkResult.BulkResultItem> items, List<Map.Entry<IndexSet, Message>> messageList, String errorMessage) {
private List<String> propagateFailure(List<BulkResult.BulkResultItem> items, List<Map.Entry<IndexSet, Message>> messageList) {
final Map<String, Message> messageMap = messageList.stream()
.map(Map.Entry::getValue)
.distinct()
Expand All @@ -221,9 +276,6 @@ private List<String> propagateFailure(List<BulkResult.BulkResultItem> items, Lis
failedMessageIds.add(item.id);
}

LOG.error("Failed to index [{}] messages. Please check the index error log in your web interface for the reason. Error: {}",
indexFailures.size(), errorMessage);

try {
// TODO: Magic number
indexFailureQueue.offer(indexFailures, 25, TimeUnit.MILLISECONDS);
Expand All @@ -247,4 +299,7 @@ public Index prepareIndexRequest(String index, Map<String, Object> source, Strin
public LinkedBlockingQueue<List<IndexFailure>> getIndexFailureQueue() {
return indexFailureQueue;
}

private class EntityTooLargeException extends Exception {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public void writeMessageEntries(List<Map.Entry<IndexSet, Message>> messageList)
}
failures.mark(failedMessageIds.size());

// TODO why doesn't this exclude the failedMessageIds?
final Optional<Long> offset = messageList.stream()
.map(Map.Entry::getValue)
.map(Message::getJournalOffset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,32 @@
package org.graylog2.indexer.messages;

import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Maps;
import io.searchbox.client.JestResult;
import io.searchbox.core.Count;
import io.searchbox.core.CountResult;
import io.searchbox.core.DocumentResult;
import io.searchbox.core.Index;
import joptsimple.internal.Strings;
import org.graylog.testing.elasticsearch.ElasticsearchBaseTest;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.TestIndexSet;
import org.graylog2.indexer.indexset.IndexSetConfig;
import org.graylog2.indexer.results.ResultMessage;
import org.graylog2.indexer.retention.strategies.DeletionRetentionStrategy;
import org.graylog2.indexer.retention.strategies.DeletionRetentionStrategyConfig;
import org.graylog2.indexer.rotation.strategies.MessageCountRotationStrategy;
import org.graylog2.indexer.rotation.strategies.MessageCountRotationStrategyConfig;
import org.graylog2.plugin.Message;
import org.graylog2.system.processing.InMemoryProcessingStatusRecorder;
import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test;

import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

Expand All @@ -41,6 +56,25 @@ public void setUp() throws Exception {
messages = new Messages(new MetricRegistry(), jestClient(), new InMemoryProcessingStatusRecorder());
}

private static final IndexSetConfig indexSetConfig = IndexSetConfig.builder()
.id("index-set-1")
.title("Index set 1")
.description("For testing")
.indexPrefix("graylog")
.creationDate(ZonedDateTime.now())
.shards(1)
.replicas(0)
.rotationStrategyClass(MessageCountRotationStrategy.class.getCanonicalName())
.rotationStrategy(MessageCountRotationStrategyConfig.createDefault())
.retentionStrategyClass(DeletionRetentionStrategy.class.getCanonicalName())
.retentionStrategy(DeletionRetentionStrategyConfig.createDefault())
.indexAnalyzer("standard")
.indexTemplateName("template-1")
.indexOptimizationMaxNumSegments(1)
.indexOptimizationDisabled(false)
.build();
private static final IndexSet indexSet = new TestIndexSet(indexSetConfig);

@Test
public void getResultDoesNotContainJestMetadataFields() throws Exception {
final String index = UUID.randomUUID().toString();
Expand All @@ -59,4 +93,32 @@ public void getResultDoesNotContainJestMetadataFields() throws Exception {
assertThat(message.hasField(JestResult.ES_METADATA_ID)).isFalse();
assertThat(message.hasField(JestResult.ES_METADATA_VERSION)).isFalse();
}

@Test
public void testIfTooLargeBatchesGetSplitUp() throws Exception {
// This test assumes that ES is configured with bulk_max_body_size to 100MB
// Check if we can index about 300MB of messages (once the large batch get's split up)
final int MESSAGECOUNT = 303;
final ArrayList<Map.Entry<IndexSet, Message>> largeMessageBatch = createMessageBatch(MESSAGECOUNT);
final List<String> failedItems = this.messages.bulkIndex(largeMessageBatch);

assertThat(failedItems).isEmpty();

Thread.sleep(2000); // wait for ES to finish indexing
final Count count = new Count.Builder().build();
final CountResult result = jestClient().execute(count);

assertThat(result.getCount()).isEqualTo(MESSAGECOUNT);
}

private ArrayList<Map.Entry<IndexSet, Message>> createMessageBatch(int size) {
final ArrayList<Map.Entry<IndexSet, Message>> messageList = new ArrayList<>();

// Each Message is about 1 MB
final String message = Strings.repeat('A', 1024 * 1024);
for (int i = 0; i < size; i++) {
messageList.add(Maps.immutableEntry(indexSet, new Message(i + message, "source", DateTime.now())));
}
return messageList;
}
}

0 comments on commit 4a88d56

Please sign in to comment.