diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java b/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java index 39cd5d6eac38..98677b594d3b 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java @@ -16,7 +16,13 @@ */ package org.graylog2.indexer.messages; +import com.github.joschi.jadconfig.util.Duration; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.WaitStrategies; +import com.google.common.base.Predicate; import com.google.common.collect.Lists; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequestBuilder; import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse; @@ -28,7 +34,6 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.client.Client; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.indices.IndexMissingException; @@ -45,21 +50,30 @@ import javax.inject.Singleton; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; @Singleton public class Messages { - private static final Logger log = LoggerFactory.getLogger(Messages.class); + private static final Logger LOG = LoggerFactory.getLogger(Messages.class); + private static final Predicate ES_TIMEOUT_EXCEPTION_PREDICATE = new Predicate() { + @Override + public boolean apply(Throwable t) { + return t instanceof ElasticsearchTimeoutException; + } + }; + private static final Duration MAX_WAIT_TIME = Duration.seconds(30L); private final Client c; - private final ElasticsearchConfiguration configuration; + private final String deflectorName; private LinkedBlockingQueue> deadLetterQueue; @Inject - public Messages(Client client, ElasticsearchConfiguration configuration) { - this.configuration = configuration; + public Messages(Client client, ElasticsearchConfiguration configuration) { this.c = client; this.deadLetterQueue = new LinkedBlockingQueue<>(1000); + this.deflectorName = Deflector.buildName(configuration.getIndexPrefix()); } public LinkedBlockingQueue> getDeadLetterQueue() { @@ -67,48 +81,59 @@ public LinkedBlockingQueue> getDeadLetterQueue() { } public ResultMessage get(String messageId, String index) throws IndexMissingException, DocumentNotFoundException { - GetRequestBuilder grb = new GetRequestBuilder(c, index); - grb.setId(messageId); - - GetResponse r = c.get(grb.request()).actionGet(); - - if (!r.isExists()) { - throw new DocumentNotFoundException(); - } - - return ResultMessage.parseFromSource(r); - } - - public List analyze(String string, String index) throws IndexMissingException { - List tokens = Lists.newArrayList(); - AnalyzeRequestBuilder arb = new AnalyzeRequestBuilder(c.admin().indices(), index, string); - AnalyzeResponse r = c.admin().indices().analyze(arb.request()).actionGet(); - - for (AnalyzeToken token : r.getTokens()) { - tokens.add(token.getTerm()); - } - - return tokens; - } + GetRequestBuilder grb = new GetRequestBuilder(c, index); + grb.setId(messageId); + + GetResponse r = c.get(grb.request()).actionGet(); + + if (!r.isExists()) { + throw new DocumentNotFoundException(); + } + + return ResultMessage.parseFromSource(r); + } + + public List analyze(String string, String index) throws IndexMissingException { + List tokens = Lists.newArrayList(); + AnalyzeRequestBuilder arb = new AnalyzeRequestBuilder(c.admin().indices(), index, string); + AnalyzeResponse r = c.admin().indices().analyze(arb.request()).actionGet(); + + for (AnalyzeToken token : r.getTokens()) { + tokens.add(token.getTerm()); + } + + return tokens; + } public boolean bulkIndex(final List messages) { if (messages.isEmpty()) { return true; } - final BulkRequestBuilder request = c.prepareBulk(); + final BulkRequestBuilder requestBuilder = c.prepareBulk().setConsistencyLevel(WriteConsistencyLevel.ONE); for (Message msg : messages) { - request.add(buildIndexRequest(Deflector.buildName(configuration.getIndexPrefix()), - msg.toElasticSearchObject(), - msg.getId())); // Main index. + requestBuilder.add(buildIndexRequest(deflectorName, msg.toElasticSearchObject(), msg.getId())); } - request.setConsistencyLevel(WriteConsistencyLevel.ONE); - - final BulkResponse response = c.bulk(request.request()).actionGet(); + final BulkResponse response; + try { + response = RetryerBuilder.newBuilder() + .retryIfException(ES_TIMEOUT_EXCEPTION_PREDICATE) + .withWaitStrategy(WaitStrategies.exponentialWait(MAX_WAIT_TIME.getQuantity(), MAX_WAIT_TIME.getUnit())) + .build() + .call(new Callable() { + @Override + public BulkResponse call() throws Exception { + return c.bulk(requestBuilder.request()).actionGet(); + } + }); + } catch (ExecutionException | RetryException e) { + LOG.error("Couldn't bulk index " + messages.size() + " messages.", e); + return false; + } - log.debug("Deflector index: Bulk indexed {} messages, took {} ms, failures: {}", - response.getItems().length, response.getTookInMillis(), response.hasFailures()); + LOG.debug("Deflector index: Bulk indexed {} messages, took {} ms, failures: {}", + response.getItems().length, response.getTookInMillis(), response.hasFailures()); if (response.hasFailures()) { propagateFailure(response.getItems(), messages, response.buildFailureMessage()); @@ -118,7 +143,7 @@ public boolean bulkIndex(final List messages) { } private void propagateFailure(BulkItemResponse[] items, List messages, String errorMessage) { - log.error( + LOG.error( "Failed to index [{}] messages. Please check the index error log in your web interface for the reason. Error: {}", items.length, errorMessage); @@ -133,22 +158,19 @@ private void propagateFailure(BulkItemResponse[] items, List messages, boolean r = deadLetterQueue.offer(deadLetters); - if(!r) { - log.debug("Could not propagate failure to failure queue. Queue is full."); + if (!r) { + LOG.debug("Could not propagate failure to failure queue. Queue is full."); } } private IndexRequestBuilder buildIndexRequest(String index, Map source, String id) { - final IndexRequestBuilder b = new IndexRequestBuilder(c); - - b.setId(id); - b.setSource(source); - b.setIndex(index); - b.setContentType(XContentType.JSON); - b.setOpType(IndexRequest.OpType.INDEX); - b.setType(IndexMapping.TYPE_MESSAGE); - b.setConsistencyLevel(WriteConsistencyLevel.ONE); - - return b; + return new IndexRequestBuilder(c) + .setId(id) + .setSource(source) + .setIndex(index) + .setContentType(XContentType.JSON) + .setOpType(IndexRequest.OpType.INDEX) + .setType(IndexMapping.TYPE_MESSAGE) + .setConsistencyLevel(WriteConsistencyLevel.ONE); } }