Skip to content

Commit

Permalink
Async search should retry updates on version conflict (#63652)
Browse files Browse the repository at this point in the history
* Async search should retry updates on version conflict

The _async_search APIs can throw version conflict exception when the internal response
is updated concurrently. That can happen if the final response is written while the user
extends the expiration time. That scenario should be rare but it happened in Kibana for
several users so this change ensures that updates are retried at least 5 times. That
should resolve the transient errors for Kibana. This change also preserves the version
conflict exception in case the retry didn't work instead of returning a confusing 404.
This commit also ensures that we don't delete the response if the search was cancelled
internally and not deleted explicitly by the user.

Closes #63213
  • Loading branch information
jimczi committed Oct 16, 2020
1 parent eb6cd20 commit c4138e5
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -415,4 +417,37 @@ public void testSearchPhaseFailureNoCause() throws Exception {
assertNotNull(response.getFailure());
ensureTaskNotRunning(response.getId());
}

public void testRetryVersionConflict() throws Exception {
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName);
request.setWaitForCompletionTimeout(TimeValue.timeValueMinutes(10));
request.setKeepOnCompletion(true);
AsyncSearchResponse response = submitAsyncSearch(request);
assertNotNull(response.getSearchResponse());
assertFalse(response.isRunning());

List<Thread> threads = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
List<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < 2; i++) {
Runnable runnable = () -> {
for (int j = 0; j < 10; j++) {
try {
latch.await();
getAsyncSearch(response.getId(), TimeValue.timeValueMinutes(10));
} catch (Exception exc) {
exceptions.add(exc);
}
}
};
Thread thread = new Thread(runnable);
thread.start();
threads.add(thread);
}
latch.countDown();
for (Thread thread : threads) {
thread.join();
}
assertTrue(exceptions.toString(), exceptions.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,24 +173,12 @@ private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shoul
private void onFinalResponse(AsyncSearchTask searchTask,
AsyncSearchResponse response,
Runnable nextAction) {
if (searchTask.isCancelled()) {
// the task was cancelled so we ensure that there is nothing stored in the response index.
store.deleteResponse(searchTask.getExecutionId(), ActionListener.wrap(
resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]",
searchTask.getExecutionId().getEncoded()), exc);
unregisterTaskAndMoveOn(searchTask, nextAction);
}));
return;
}

store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response,
ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
Throwable cause = ExceptionsHelper.unwrapCause(exc);
if (cause instanceof DocumentMissingException == false &&
cause instanceof VersionConflictEngineException == false) {
cause instanceof VersionConflictEngineException == false) {
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
searchTask.getExecutionId().getEncoded()), exc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,16 @@ public void retrieveResult(GetAsyncResultRequest request, ActionListener<Respons
ActionListener.wrap(
p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener),
exc -> {
//don't log when: the async search document or its index is not found. That can happen if an invalid
//search id is provided or no async search initial response has been stored yet.
RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc));
if (status != RestStatus.NOT_FOUND) {
logger.error(() -> new ParameterizedMessage("failed to update expiration time for async-search [{}]",
searchId.getEncoded()), exc);
listener.onFailure(exc);
} else {
//the async search document or its index is not found.
//That can happen if an invalid/deleted search id is provided.
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
}
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
}
));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ public void updateResponse(String docId,
UpdateRequest request = new UpdateRequest()
.index(index)
.id(docId)
.doc(source, XContentType.JSON);
.doc(source, XContentType.JSON)
.retryOnConflict(5);
client.update(request, listener);
} catch(Exception e) {
listener.onFailure(e);
Expand All @@ -210,7 +211,8 @@ public void updateExpirationTime(String docId,
Map<String, Object> source = Collections.singletonMap(EXPIRATION_TIME_FIELD, expirationTimeMillis);
UpdateRequest request = new UpdateRequest().index(index)
.id(docId)
.doc(source, XContentType.JSON);
.doc(source, XContentType.JSON)
.retryOnConflict(5);
client.update(request, listener);
}

Expand Down

0 comments on commit c4138e5

Please sign in to comment.