Skip to content

Commit

Permalink
Retry bulk indexing messages on timeout
Browse files Browse the repository at this point in the history
Closes #1270
  • Loading branch information
Jochen Schalanda committed Aug 24, 2015
1 parent 48c097c commit 2bae0ff
Showing 1 changed file with 73 additions and 51 deletions.
Expand Up @@ -16,7 +16,13 @@
*/ */
package org.graylog2.indexer.messages; package org.graylog2.indexer.messages;


import com.github.joschi.jadconfig.util.Duration;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Predicate;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequestBuilder; import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequestBuilder;
import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse; import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
Expand All @@ -28,7 +34,6 @@
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndexMissingException;
Expand All @@ -45,70 +50,90 @@
import javax.inject.Singleton; import javax.inject.Singleton;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;


@Singleton @Singleton
public class Messages { public class Messages {
private static final Logger log = LoggerFactory.getLogger(Messages.class); private static final Logger LOG = LoggerFactory.getLogger(Messages.class);
private static final Predicate<Throwable> ES_TIMEOUT_EXCEPTION_PREDICATE = new Predicate<Throwable>() {
@Override
public boolean apply(Throwable t) {
return t instanceof ElasticsearchTimeoutException;
}
};
private static final Duration MAX_WAIT_TIME = Duration.seconds(30L);


private final Client c; private final Client c;
private final ElasticsearchConfiguration configuration; private final String deflectorName;
private LinkedBlockingQueue<List<DeadLetter>> deadLetterQueue; private LinkedBlockingQueue<List<DeadLetter>> deadLetterQueue;


@Inject @Inject
public Messages(Client client, ElasticsearchConfiguration configuration) { public Messages(Client client, ElasticsearchConfiguration configuration) {
this.configuration = configuration;
this.c = client; this.c = client;
this.deadLetterQueue = new LinkedBlockingQueue<>(1000); this.deadLetterQueue = new LinkedBlockingQueue<>(1000);
this.deflectorName = Deflector.buildName(configuration.getIndexPrefix());
} }


public LinkedBlockingQueue<List<DeadLetter>> getDeadLetterQueue() { public LinkedBlockingQueue<List<DeadLetter>> getDeadLetterQueue() {
return deadLetterQueue; return deadLetterQueue;
} }


public ResultMessage get(String messageId, String index) throws IndexMissingException, DocumentNotFoundException { public ResultMessage get(String messageId, String index) throws IndexMissingException, DocumentNotFoundException {
GetRequestBuilder grb = new GetRequestBuilder(c, index); GetRequestBuilder grb = new GetRequestBuilder(c, index);
grb.setId(messageId); grb.setId(messageId);


GetResponse r = c.get(grb.request()).actionGet(); GetResponse r = c.get(grb.request()).actionGet();

if (!r.isExists()) { if (!r.isExists()) {
throw new DocumentNotFoundException(); throw new DocumentNotFoundException();
} }

return ResultMessage.parseFromSource(r); return ResultMessage.parseFromSource(r);
} }

public List<String> analyze(String string, String index) throws IndexMissingException { public List<String> analyze(String string, String index) throws IndexMissingException {
List<String> tokens = Lists.newArrayList(); List<String> tokens = Lists.newArrayList();
AnalyzeRequestBuilder arb = new AnalyzeRequestBuilder(c.admin().indices(), index, string); AnalyzeRequestBuilder arb = new AnalyzeRequestBuilder(c.admin().indices(), index, string);
AnalyzeResponse r = c.admin().indices().analyze(arb.request()).actionGet(); AnalyzeResponse r = c.admin().indices().analyze(arb.request()).actionGet();

for (AnalyzeToken token : r.getTokens()) { for (AnalyzeToken token : r.getTokens()) {
tokens.add(token.getTerm()); tokens.add(token.getTerm());
} }

return tokens; return tokens;
} }


public boolean bulkIndex(final List<Message> messages) { public boolean bulkIndex(final List<Message> messages) {
if (messages.isEmpty()) { if (messages.isEmpty()) {
return true; return true;
} }


final BulkRequestBuilder request = c.prepareBulk(); final BulkRequestBuilder requestBuilder = c.prepareBulk().setConsistencyLevel(WriteConsistencyLevel.ONE);
for (Message msg : messages) { for (Message msg : messages) {
request.add(buildIndexRequest(Deflector.buildName(configuration.getIndexPrefix()), requestBuilder.add(buildIndexRequest(deflectorName, msg.toElasticSearchObject(), msg.getId()));
msg.toElasticSearchObject(),
msg.getId())); // Main index.
} }


request.setConsistencyLevel(WriteConsistencyLevel.ONE); final BulkResponse response;

try {
final BulkResponse response = c.bulk(request.request()).actionGet(); response = RetryerBuilder.<BulkResponse>newBuilder()
.retryIfException(ES_TIMEOUT_EXCEPTION_PREDICATE)
.withWaitStrategy(WaitStrategies.exponentialWait(MAX_WAIT_TIME.getQuantity(), MAX_WAIT_TIME.getUnit()))
.build()
.call(new Callable<BulkResponse>() {
@Override
public BulkResponse call() throws Exception {
return c.bulk(requestBuilder.request()).actionGet();
}
});
} catch (ExecutionException | RetryException e) {
LOG.error("Couldn't bulk index " + messages.size() + " messages.", e);
return false;
}


log.debug("Deflector index: Bulk indexed {} messages, took {} ms, failures: {}", LOG.debug("Deflector index: Bulk indexed {} messages, took {} ms, failures: {}",
response.getItems().length, response.getTookInMillis(), response.hasFailures()); response.getItems().length, response.getTookInMillis(), response.hasFailures());


if (response.hasFailures()) { if (response.hasFailures()) {
propagateFailure(response.getItems(), messages, response.buildFailureMessage()); propagateFailure(response.getItems(), messages, response.buildFailureMessage());
Expand All @@ -118,7 +143,7 @@ public boolean bulkIndex(final List<Message> messages) {
} }


private void propagateFailure(BulkItemResponse[] items, List<Message> messages, String errorMessage) { private void propagateFailure(BulkItemResponse[] items, List<Message> messages, String errorMessage) {
log.error( LOG.error(
"Failed to index [{}] messages. Please check the index error log in your web interface for the reason. Error: {}", "Failed to index [{}] messages. Please check the index error log in your web interface for the reason. Error: {}",
items.length, items.length,
errorMessage); errorMessage);
Expand All @@ -133,22 +158,19 @@ private void propagateFailure(BulkItemResponse[] items, List<Message> messages,


boolean r = deadLetterQueue.offer(deadLetters); boolean r = deadLetterQueue.offer(deadLetters);


if(!r) { if (!r) {
log.debug("Could not propagate failure to failure queue. Queue is full."); LOG.debug("Could not propagate failure to failure queue. Queue is full.");
} }
} }


private IndexRequestBuilder buildIndexRequest(String index, Map<String, Object> source, String id) { private IndexRequestBuilder buildIndexRequest(String index, Map<String, Object> source, String id) {
final IndexRequestBuilder b = new IndexRequestBuilder(c); return new IndexRequestBuilder(c)

.setId(id)
b.setId(id); .setSource(source)
b.setSource(source); .setIndex(index)
b.setIndex(index); .setContentType(XContentType.JSON)
b.setContentType(XContentType.JSON); .setOpType(IndexRequest.OpType.INDEX)
b.setOpType(IndexRequest.OpType.INDEX); .setType(IndexMapping.TYPE_MESSAGE)
b.setType(IndexMapping.TYPE_MESSAGE); .setConsistencyLevel(WriteConsistencyLevel.ONE);
b.setConsistencyLevel(WriteConsistencyLevel.ONE);

return b;
} }
} }

0 comments on commit 2bae0ff

Please sign in to comment.