Browse files

add support for retrying bulk operations when ES is busy

elasticsearch bulk operations now default to a finite bounded queue
we now support waiting and retrying within a _bulk_docs request
the retry count and wait time are configurable using:
couchbase.bulkIndexRetries (defaults to 1024)
couchbase.bulkIndexRetryWaitMs (defaults to 1000)
  • Loading branch information...
1 parent 2a7c186 commit 5d853d3e7b432b3422cbd458f72a977544145d3e @mschoch mschoch committed Apr 7, 2014
View
7 src/main/java/org/elasticsearch/transport/couchbase/capi/CouchbaseCAPITransportImpl.java
@@ -69,6 +69,9 @@
private final long maxConcurrentRequests;
+ private long bulkIndexRetries;
+ private long bulkIndexRetryWaitMs;
+
@Inject
public CouchbaseCAPITransportImpl(Settings settings, RestController restController, NetworkService networkService, IndicesService indicesService, MetaDataMappingService metaDataMappingService, Client client) {
super(settings);
@@ -86,6 +89,8 @@ public CouchbaseCAPITransportImpl(Settings settings, RestController restControll
this.dynamicTypePath = settings.get("couchbase.dynamicTypePath");
this.resolveConflicts = settings.getAsBoolean("couchbase.resolveConflicts", true);
this.maxConcurrentRequests = settings.getAsLong("couchbase.maxConcurrentRequests", 1024L);
+ this.bulkIndexRetries = settings.getAsLong("couchbase.bulkIndexRetries", 1024L);
+ this.bulkIndexRetryWaitMs = settings.getAsLong("couchbase.bulkIndexRetryWaitMs", 1000L);
int defaultNumVbuckets = 1024;
if(System.getProperty("os.name").toLowerCase().contains("mac")) {
@@ -118,7 +123,7 @@ protected void doStart() throws ElasticsearchException {
final InetAddress publishAddressHost = publishAddressHostX;
- capiBehavior = new ElasticSearchCAPIBehavior(client, logger, defaultDocumentType, checkpointDocumentType, dynamicTypePath, resolveConflicts.booleanValue(), maxConcurrentRequests);
+ capiBehavior = new ElasticSearchCAPIBehavior(client, logger, defaultDocumentType, checkpointDocumentType, dynamicTypePath, resolveConflicts.booleanValue(), maxConcurrentRequests, bulkIndexRetries, bulkIndexRetryWaitMs);
couchbaseBehavior = new ElasticSearchCouchbaseBehavior(client, logger, checkpointDocumentType);
PortsRange portsRange = new PortsRange(port);
View
72 src/main/java/org/elasticsearch/transport/couchbase/capi/ElasticSearchCAPIBehavior.java
@@ -30,6 +30,7 @@
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
@@ -68,8 +69,10 @@
protected CounterMetric totalTooManyConcurrentRequestsErrors;
protected long maxConcurrentRequests;
+ protected long bulkIndexRetries;
+ protected long bulkIndexRetryWaitMs;
- public ElasticSearchCAPIBehavior(Client client, ESLogger logger, String defaultDocumentType, String checkpointDocumentType, String dynamicTypePath, boolean resolveConflicts, long maxConcurrentRequests) {
+ public ElasticSearchCAPIBehavior(Client client, ESLogger logger, String defaultDocumentType, String checkpointDocumentType, String dynamicTypePath, boolean resolveConflicts, long maxConcurrentRequests, long bulkIndexRetries, long bulkIndexRetryWaitMs) {
this.client = client;
this.logger = logger;
this.defaultDocumentType = defaultDocumentType;
@@ -84,6 +87,8 @@ public ElasticSearchCAPIBehavior(Client client, ESLogger logger, String defaultD
this.totalTooManyConcurrentRequestsErrors = new CounterMetric();
this.maxConcurrentRequests = maxConcurrentRequests;
+ this.bulkIndexRetries = bulkIndexRetries;
+ this.bulkIndexRetryWaitMs = bulkIndexRetryWaitMs;
}
@Override
@@ -314,33 +319,68 @@ public boolean ensureFullCommit(String database) {
}
}
- List<Object> result = new ArrayList<Object>();
+ List<Object> result = null;
+ long retriesLeft = this.bulkIndexRetries;
+ int attempt = 0;
- BulkResponse response = bulkBuilder.execute().actionGet();
- if(response != null) {
- for (BulkItemResponse bulkItemResponse : response.getItems()) {
- Map<String, Object> itemResponse = new HashMap<String, Object>();
- String itemId = bulkItemResponse.getId();
- itemResponse.put("id", itemId);
- if(bulkItemResponse.isFailed()) {
- itemResponse.put("error", "failed");
- itemResponse.put("reason", bulkItemResponse.getFailureMessage());
- logger.error("indexing error for id: {} reason: {}", itemId, bulkItemResponse.getFailureMessage());
- throw new RuntimeException("indexing error " + bulkItemResponse.getFailureMessage());
- } else {
- itemResponse.put("rev", revisions.get(itemId));
+ BulkResponse response = null;
+ do {
+ attempt++;
+ result = new ArrayList<Object>();
+ if(response != null) {
+ // at least second time through
+ try {
+ Thread.sleep(this.bulkIndexRetryWaitMs);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ response = bulkBuilder.execute().actionGet();
+ if(response != null) {
+ for (BulkItemResponse bulkItemResponse : response.getItems()) {
+ if(!bulkItemResponse.isFailed()) {
+ String itemId = bulkItemResponse.getId();
+ String itemRev = revisions.get(itemId);
+ Map<String, Object> itemResponse = new HashMap<String, Object>();
+ itemResponse.put("id", itemId);
+ itemResponse.put("rev", itemRev);
+ result.add(itemResponse);
+ } else {
+ Failure failure = bulkItemResponse.getFailure();
+ // if the error is fatal don't retry
+ if(failureMessageAppearsFatal(failure.getMessage())) {
+ throw new RuntimeException("indexing error " + failure.getMessage());
+ }
+ }
}
- result.add(itemResponse);
}
+ retriesLeft--;
+ } while((response != null) && (response.hasFailures()) && (retriesLeft > 0));
+
+ if(response == null) {
+ throw new RuntimeException("indexing error, bulk response was null");
}
+ if(retriesLeft == 0) {
+ throw new RuntimeException("indexing error, bulk failed after all retries");
+ }
+
+ logger.debug("bulk index succeeded after {} tries", attempt);
+
long end = System.currentTimeMillis();
meanBulkDocsRequests.inc(end - start);
activeBulkDocsRequests.dec();
return result;
}
+ public boolean failureMessageAppearsFatal(String failureMessage) {
+ if(failureMessage.contains("EsRejectedExecutionException")) {
+ return false;
+ }
+ return true;
+ }
+
@Override
public Map<String, Object> getDocument(String database, String docId) {
return getDocumentElasticSearch(getElasticSearchIndexNameFromDatabase(database), docId, defaultDocumentType);

0 comments on commit 5d853d3

Please sign in to comment.