Skip to content

Commit

Permalink
Reindex should retry on search failures
Browse files Browse the repository at this point in the history
This uses the same backoff policy we use for bulk and just retries until
the request isn't rejected.

Instead of `{"retries": 12}` in the response to count retries this now
looks like `{"retries": {"bulk": 12", "search": 1}`.

Closes #18059
  • Loading branch information
nik9000 committed May 17, 2016
1 parent 584be0b commit fe4823e
Show file tree
Hide file tree
Showing 22 changed files with 518 additions and 109 deletions.
18 changes: 14 additions & 4 deletions docs/reference/docs/reindex.asciidoc
Expand Up @@ -33,7 +33,10 @@ That will return something like this:
"batches": 1,
"version_conflicts": 0,
"noops": 0,
"retries": 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0,
"requests_per_second": "unlimited",
"throttled_until_millis": 0,
Expand Down Expand Up @@ -386,7 +389,10 @@ The JSON response looks like this:
"created": 123,
"batches": 1,
"version_conflicts": 2,
"retries": 0,
"retries": {
"bulk": 0,
"search": 0
}
"throttled_millis": 0,
"failures" : [ ]
}
Expand Down Expand Up @@ -414,7 +420,8 @@ The number of version conflicts that reindex hit.

`retries`::

The number of retries that the reindex did in response to a full queue.
The number of retries attempted by reindex. `bulk` is the number of bulk
actions retried and `search` is the number of search actions retried.

`throttled_millis`::

Expand Down Expand Up @@ -468,7 +475,10 @@ The responses looks like:
"batches" : 4,
"version_conflicts" : 0,
"noops" : 0,
"retries": 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0
},
"description" : ""
Expand Down
18 changes: 14 additions & 4 deletions docs/reference/docs/update-by-query.asciidoc
Expand Up @@ -26,7 +26,10 @@ That will return something like this:
"batches": 1,
"version_conflicts": 0,
"noops": 0,
"retries": 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0,
"requests_per_second": "unlimited",
"throttled_until_millis": 0,
Expand Down Expand Up @@ -220,7 +223,10 @@ The JSON response looks like this:
"updated": 0,
"batches": 1,
"version_conflicts": 2,
"retries": 0,
"retries": {
"bulk": 0,
"search": 0
}
"throttled_millis": 0,
"failures" : [ ]
}
Expand All @@ -244,7 +250,8 @@ The number of version conflicts that the update by query hit.

`retries`::

The number of retries that the update by query did in response to a full queue.
The number of retries attempted by update-by-query. `bulk` is the number of bulk
actions retried and `search` is the number of search actions retried.

`throttled_millis`::

Expand Down Expand Up @@ -299,7 +306,10 @@ The responses looks like:
"batches" : 4,
"version_conflicts" : 0,
"noops" : 0,
"retries": 0,
"retries": {
"bulk": 0,
"search": 0
}
"throttled_millis": 0
},
"description" : ""
Expand Down
1 change: 1 addition & 0 deletions docs/reference/migration/migrate_5_0.asciidoc
Expand Up @@ -34,6 +34,7 @@ way to do this is to upgrade to Elasticsearch 2.3 or later and to use the
* <<breaking_50_percolator>>
* <<breaking_50_suggester>>
* <<breaking_50_index_apis>>
* <<breaking_50_document_api_changes>>
* <<breaking_50_settings_changes>>
* <<breaking_50_allocation>>
* <<breaking_50_http_changes>>
Expand Down
33 changes: 33 additions & 0 deletions docs/reference/migration/migrate_5_0/docs.asciidoc
@@ -0,0 +1,33 @@
[[breaking_50_document_api_changes]]
=== Document API changes

==== Reindex and Update By Query
Before 5.0.0 `_reindex` and `_update_by_query` only retried bulk failures so
they used the following response format:

[source,js]
----------------------
{
...
"retries": 10
...
}
----------------------

Where `retries` counts the number of bulk retries. Now they retry on search
failures as well and use this response format:

[source,js]
----------------------
{
...
"retries": {
"bulk": 10,
"search": 1
}
...
}
----------------------

Where `bulk` counts the number of bulk retries and `search` counts the number
of search retries.
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.reindex;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
Expand Down Expand Up @@ -57,6 +58,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static java.lang.Math.max;
import static java.lang.Math.min;
Expand Down Expand Up @@ -91,7 +93,8 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
private final ThreadPool threadPool;
private final SearchRequest firstSearchRequest;
private final ActionListener<Response> listener;
private final Retry retry;
private final BackoffPolicy backoffPolicy;
private final Retry bulkRetry;

public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, ESLogger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, Request mainRequest, SearchRequest firstSearchRequest, ActionListener<Response> listener) {
Expand All @@ -102,7 +105,8 @@ public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, ESLogger logger, P
this.mainRequest = mainRequest;
this.firstSearchRequest = firstSearchRequest;
this.listener = listener;
retry = Retry.on(EsRejectedExecutionException.class).policy(wrapBackoffPolicy(backoffPolicy()));
backoffPolicy = buildBackoffPolicy();
bulkRetry = Retry.on(EsRejectedExecutionException.class).policy(wrapBackoffPolicy(backoffPolicy));
}

protected abstract BulkRequest buildBulk(Iterable<SearchHit> docs);
Expand Down Expand Up @@ -131,21 +135,14 @@ public void start() {
firstSearchRequest.types() == null || firstSearchRequest.types().length == 0 ? ""
: firstSearchRequest.types());
}
client.search(firstSearchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse response) {
logger.debug("[{}] documents match query", response.getHits().getTotalHits());
onScrollResponse(timeValueSeconds(0), response);
}

@Override
public void onFailure(Throwable e) {
finishHim(e);
}
});
} catch (Throwable t) {
finishHim(t);
return;
}
searchWithRetry(listener -> client.search(firstSearchRequest, listener), (SearchResponse response) -> {
logger.debug("[{}] documents match query", response.getHits().getTotalHits());
onScrollResponse(timeValueSeconds(0), response);
});
}

/**
Expand Down Expand Up @@ -239,7 +236,7 @@ void sendBulkRequest(BulkRequest request) {
finishHim(null);
return;
}
retry.withAsyncBackoff(client, request, new ActionListener<BulkResponse>() {
bulkRetry.withAsyncBackoff(client, request, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
onBulkResponse(response);
Expand Down Expand Up @@ -322,16 +319,8 @@ void startNextScroll(int lastBatchSize) {
SearchScrollRequest request = new SearchScrollRequest();
// Add the wait time into the scroll timeout so it won't timeout while we wait for throttling
request.scrollId(scroll.get()).scroll(timeValueNanos(firstSearchRequest.scroll().keepAlive().nanos() + waitTime));
client.searchScroll(request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse response) {
onScrollResponse(timeValueNanos(max(0, earliestNextBatchStartTime - System.nanoTime())), response);
}

@Override
public void onFailure(Throwable e) {
finishHim(e);
}
searchWithRetry(listener -> client.searchScroll(request, listener), (SearchResponse response) -> {
onScrollResponse(timeValueNanos(max(0, earliestNextBatchStartTime - System.nanoTime())), response);
});
}

Expand Down Expand Up @@ -434,9 +423,9 @@ public void onFailure(Throwable e) {
}

/**
* Build the backoff policy for use with retries.
* Get the backoff policy for use with retries.
*/
BackoffPolicy backoffPolicy() {
BackoffPolicy buildBackoffPolicy() {
return exponentialBackoff(mainRequest.getRetryBackoffInitialTime(), mainRequest.getMaxRetries());
}

Expand Down Expand Up @@ -470,7 +459,7 @@ long getLastBatchStartTime() {
}

/**
* Wraps a backoffPolicy in another policy that counts the number of backoffs acquired.
* Wraps a backoffPolicy in another policy that counts the number of backoffs acquired. Used to count bulk backoffs.
*/
private BackoffPolicy wrapBackoffPolicy(BackoffPolicy backoffPolicy) {
return new BackoffPolicy() {
Expand All @@ -488,11 +477,52 @@ public TimeValue next() {
if (false == delegate.hasNext()) {
return null;
}
task.countRetry();
task.countBulkRetry();
return delegate.next();
}
};
}
};
}

/**
* Run a search action and call onResponse when a the response comes in, retrying if the action fails with an exception caused by
* rejected execution.
*
* @param action consumes a listener and starts the action. The listener it consumes is rigged to retry on failure.
* @param onResponse consumes the response from the action
*/
private <T> void searchWithRetry(Consumer<ActionListener<T>> action, Consumer<T> onResponse) {
class RetryHelper extends AbstractRunnable implements ActionListener<T> {
private final Iterator<TimeValue> retries = backoffPolicy.iterator();

@Override
public void onResponse(T response) {
onResponse.accept(response);
}

@Override
protected void doRun() throws Exception {
action.accept(this);
}

@Override
public void onFailure(Throwable e) {
if (ExceptionsHelper.unwrap(e, EsRejectedExecutionException.class) != null) {
if (retries.hasNext()) {
logger.trace("retrying rejected search", e);
threadPool.schedule(retries.next(), ThreadPool.Names.SAME, this);
task.countSearchRetry();
} else {
logger.warn("giving up on search because we retried {} times without success", e, retries);
finishHim(e);
}
} else {
logger.warn("giving up on search because it failed with a non-retryable exception", e);
finishHim(e);
}
}
}
new RetryHelper().run();
}
}
Expand Up @@ -19,9 +19,6 @@

package org.elasticsearch.index.reindex;

import java.io.IOException;
import java.util.Arrays;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.WriteConsistencyLevel;
Expand All @@ -34,6 +31,9 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Arrays;

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
Expand Down
Expand Up @@ -107,6 +107,23 @@ public Self consistency(WriteConsistencyLevel consistency) {
return self();
}

/**
* Initial delay after a rejection before retrying a bulk request. With the default maxRetries the total backoff for retrying rejections
* is about one minute per bulk request. Once the entire bulk request is successful the retry counter resets.
*/
public Self setRetryBackoffInitialTime(TimeValue retryBackoffInitialTime) {
request.setRetryBackoffInitialTime(retryBackoffInitialTime);
return self();
}

/**
* Total number of retries attempted for rejections. There is no way to ask for unlimited retries.
*/
public Self setMaxRetries(int maxRetries) {
request.setMaxRetries(maxRetries);
return self();
}

/**
* Set the throttle for this request in sub-requests per second. {@link Float#POSITIVE_INFINITY} means set no throttle and that is the
* default. Throttling is done between batches, as we start the next scroll requests. That way we can increase the scroll's timeout to
Expand Down
Expand Up @@ -26,12 +26,11 @@

public abstract class AbstractBulkIndexByScrollRequestBuilder<
Request extends AbstractBulkIndexByScrollRequest<Request>,
Response extends BulkIndexByScrollResponse,
Self extends AbstractBulkIndexByScrollRequestBuilder<Request, Response, Self>>
extends AbstractBulkByScrollRequestBuilder<Request, Response, Self> {
Self extends AbstractBulkIndexByScrollRequestBuilder<Request, Self>>
extends AbstractBulkByScrollRequestBuilder<Request, BulkIndexByScrollResponse, Self> {

protected AbstractBulkIndexByScrollRequestBuilder(ElasticsearchClient client,
Action<Request, Response, Self> action, SearchRequestBuilder search, Request request) {
Action<Request, BulkIndexByScrollResponse, Self> action, SearchRequestBuilder search, Request request) {
super(client, action, search, request);
}

Expand Down

0 comments on commit fe4823e

Please sign in to comment.