Skip to content

Commit

Permalink
Reindex negative TimeValue fix (#54057) (#54133)
Browse files Browse the repository at this point in the history
Reindex would use timeValueNanos(System.nanoTime()). The intended use
for TimeValue is as a duration, not as absolute time. In particular,
this could result in negative TimeValue's, being unsupported in #53913.
Modified to use the bare long nano-second value.
  • Loading branch information
henningandersen committed Mar 25, 2020
1 parent 958c7cd commit 4b0e231
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu

public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions,
boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, Action mainAction, Request mainRequest,
ThreadPool threadPool, Action mainAction, Request mainRequest,
ActionListener<BulkByScrollResponse> listener) {

this.task = task;
Expand Down Expand Up @@ -158,7 +158,7 @@ public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>>
// The default script applier executes a no-op
return (request, searchHit) -> request;
}

/**
* Build the {@link RequestWrapper} for a single search hit. This shouldn't handle
* metadata or scripting. That will be handled by copyMetadata and
Expand Down Expand Up @@ -237,19 +237,19 @@ public void start() {
}
try {
startTime.set(System.nanoTime());
scrollSource.start(response -> onScrollResponse(timeValueNanos(System.nanoTime()), 0, response));
scrollSource.start(response -> onScrollResponse(System.nanoTime(), 0, response));
} catch (Exception e) {
finishHim(e);
}
}

/**
* Process a scroll response.
* @param lastBatchStartTime the time when the last batch started. Used to calculate the throttling delay.
* @param lastBatchStartTimeNS the time when the last batch started. Used to calculate the throttling delay.
* @param lastBatchSize the size of the last batch. Used to calculate the throttling delay.
* @param response the scroll response to process
*/
void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, ScrollableHitSource.Response response) {
void onScrollResponse(long lastBatchStartTimeNS, int lastBatchSize, ScrollableHitSource.Response response) {
logger.debug("[{}]: got scroll response with [{}] hits", task.getId(), response.getHits().size());
if (task.isCancelled()) {
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
Expand All @@ -276,7 +276,7 @@ protected void doRun() throws Exception {
* It is important that the batch start time be calculated from here, scroll response to scroll response. That way the time
* waiting on the scroll doesn't count against this batch in the throttle.
*/
prepareBulkRequest(timeValueNanos(System.nanoTime()), response);
prepareBulkRequest(System.nanoTime(), response);
}

@Override
Expand All @@ -285,15 +285,15 @@ public void onFailure(Exception e) {
}
};
prepareBulkRequestRunnable = (AbstractRunnable) threadPool.getThreadContext().preserveContext(prepareBulkRequestRunnable);
worker.delayPrepareBulkRequest(threadPool, lastBatchStartTime, lastBatchSize, prepareBulkRequestRunnable);
worker.delayPrepareBulkRequest(threadPool, lastBatchStartTimeNS, lastBatchSize, prepareBulkRequestRunnable);
}

/**
* Prepare the bulk request. Called on the generic thread pool after some preflight checks have been done one the SearchResponse and any
* delay has been slept. Uses the generic thread pool because reindex is rare enough not to need its own thread pool and because the
* thread may be blocked by the user script.
*/
void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.Response response) {
void prepareBulkRequest(long thisBatchStartTimeNS, ScrollableHitSource.Response response) {
logger.debug("[{}]: preparing bulk request", task.getId());
if (task.isCancelled()) {
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
Expand All @@ -318,18 +318,18 @@ void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.Respon
/*
* If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
*/
startNextScroll(thisBatchStartTime, timeValueNanos(System.nanoTime()), 0);
startNextScroll(thisBatchStartTimeNS, System.nanoTime(), 0);
return;
}
request.timeout(mainRequest.getTimeout());
request.waitForActiveShards(mainRequest.getWaitForActiveShards());
sendBulkRequest(thisBatchStartTime, request);
sendBulkRequest(thisBatchStartTimeNS, request);
}

/**
* Send a bulk request, handling retries.
*/
void sendBulkRequest(TimeValue thisBatchStartTime, BulkRequest request) {
void sendBulkRequest(long thisBatchStartTimeNS, BulkRequest request) {
if (logger.isDebugEnabled()) {
logger.debug("[{}]: sending [{}] entry, [{}] bulk request", task.getId(), request.requests().size(),
new ByteSizeValue(request.estimatedSizeInBytes()));
Expand All @@ -342,7 +342,7 @@ void sendBulkRequest(TimeValue thisBatchStartTime, BulkRequest request) {
bulkRetry.withBackoff(client::bulk, request, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
onBulkResponse(thisBatchStartTime, response);
onBulkResponse(thisBatchStartTimeNS, response);
}

@Override
Expand All @@ -355,7 +355,7 @@ public void onFailure(Exception e) {
/**
* Processes bulk responses, accounting for failures.
*/
void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response) {
void onBulkResponse(long thisBatchStartTimeNS, BulkResponse response) {
try {
List<Failure> failures = new ArrayList<Failure>();
Set<String> destinationIndicesThisBatch = new HashSet<>();
Expand Down Expand Up @@ -403,7 +403,7 @@ void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response) {
return;
}

startNextScroll(thisBatchStartTime, timeValueNanos(System.nanoTime()), response.getItems().length);
startNextScroll(thisBatchStartTimeNS, System.nanoTime(), response.getItems().length);
} catch (Exception t) {
finishHim(t);
}
Expand All @@ -415,15 +415,15 @@ void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response) {
* @param lastBatchSize the number of requests sent in the last batch. This is used to calculate the throttling values which are applied
* when the scroll returns
*/
void startNextScroll(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) {
void startNextScroll(long lastBatchStartTimeNS, long nowNS, int lastBatchSize) {
if (task.isCancelled()) {
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
finishHim(null);
return;
}
TimeValue extraKeepAlive = worker.throttleWaitTime(lastBatchStartTime, now, lastBatchSize);
TimeValue extraKeepAlive = worker.throttleWaitTime(lastBatchStartTimeNS, nowNS, lastBatchSize);
scrollSource.startNextScroll(extraKeepAlive, response -> {
onScrollResponse(lastBatchStartTime, lastBatchSize, response);
onScrollResponse(lastBatchStartTimeNS, lastBatchSize, response);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@
import static org.apache.lucene.util.TestUtil.randomSimpleString;
import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -208,7 +207,7 @@ public void testStartNextScrollRetriesOnRejectionAndSucceeds() throws Exception
client.scrollsToReject = randomIntBetween(0, testRequest.getMaxRetries() - 1);
DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
action.setScroll(scrollId());
TimeValue now = timeValueNanos(System.nanoTime());
long now = System.nanoTime();
action.startNextScroll(now, now, 0);
assertBusy(() -> assertEquals(client.scrollsToReject + 1, client.scrollAttempts.get()));
if (listener.isDone()) {
Expand All @@ -223,7 +222,7 @@ public void testStartNextScrollRetriesOnRejectionButFailsOnTooManyRejections() t
client.scrollsToReject = testRequest.getMaxRetries() + randomIntBetween(1, 100);
DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff();
action.setScroll(scrollId());
TimeValue now = timeValueNanos(System.nanoTime());
long now = System.nanoTime();
action.startNextScroll(now, now, 0);
assertBusy(() -> assertEquals(testRequest.getMaxRetries() + 1, client.scrollAttempts.get()));
assertBusy(() -> assertTrue(listener.isDone()));
Expand All @@ -239,7 +238,7 @@ public void testScrollResponseSetsTotal() {

long total = randomIntBetween(0, Integer.MAX_VALUE);
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueSeconds(0), 0, response);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), 0, 0, response);
assertEquals(total, testTask.getStatus().getTotal());
}

Expand All @@ -252,7 +251,7 @@ public void testScrollResponseBatchingBehavior() throws Exception {
Hit hit = new ScrollableHitSource.BasicHit("index", "type", "id", 0);
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null);
DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response);
simulateScrollResponse(action, System.nanoTime(), 0, response);

// Use assert busy because the update happens on another thread
final int expectedBatches = batches;
Expand Down Expand Up @@ -305,7 +304,7 @@ public void testBulkResponseSetsLotsOfStatus() {
new IndexResponse(shardId, "type", "id" + i, seqNo, primaryTerm, randomInt(), createdResponse);
responses[i] = new BulkItemResponse(i, opType, response);
}
new DummyAsyncBulkByScrollAction().onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(responses, 0));
new DummyAsyncBulkByScrollAction().onBulkResponse(System.nanoTime(), new BulkResponse(responses, 0));
assertEquals(versionConflicts, testTask.getStatus().getVersionConflicts());
assertEquals(updated, testTask.getStatus().getUpdated());
assertEquals(created, testTask.getStatus().getCreated());
Expand Down Expand Up @@ -335,7 +334,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n
}
});
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 10, response);
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
assertThat(e.getCause(), instanceOf(EsRejectedExecutionException.class));
assertThat(e.getCause(), hasToString(containsString("test")));
Expand All @@ -353,7 +352,7 @@ public void testShardFailuresAbortRequest() throws Exception {
SearchFailure shardFailure = new SearchFailure(new RuntimeException("test"));
ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(false, singletonList(shardFailure), 0,
emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse);
BulkByScrollResponse response = listener.get();
assertThat(response.getBulkFailures(), empty());
assertThat(response.getSearchFailures(), contains(shardFailure));
Expand All @@ -367,7 +366,7 @@ public void testShardFailuresAbortRequest() throws Exception {
*/
public void testSearchTimeoutsAbortRequest() throws Exception {
ScrollableHitSource.Response scrollResponse = new ScrollableHitSource.Response(true, emptyList(), 0, emptyList(), null);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 0, scrollResponse);
simulateScrollResponse(new DummyAsyncBulkByScrollAction(), System.nanoTime(), 0, scrollResponse);
BulkByScrollResponse response = listener.get();
assertThat(response.getBulkFailures(), empty());
assertThat(response.getSearchFailures(), empty());
Expand All @@ -384,7 +383,7 @@ public void testBulkFailuresAbortRequest() throws Exception {
DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]
{new BulkItemResponse(0, DocWriteRequest.OpType.CREATE, failure)}, randomLong());
action.onBulkResponse(timeValueNanos(System.nanoTime()), bulkResponse);
action.onBulkResponse(System.nanoTime(), bulkResponse);
BulkByScrollResponse response = listener.get();
assertThat(response.getBulkFailures(), contains(failure));
assertThat(response.getSearchFailures(), empty());
Expand All @@ -404,7 +403,7 @@ protected AbstractAsyncBulkByScrollAction.RequestWrapper<?> buildRequest(Hit doc
ScrollableHitSource.BasicHit hit = new ScrollableHitSource.BasicHit("index", "type", "id", 0);
hit.setSource(new BytesArray("{}"), XContentType.JSON);
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 1, singletonList(hit), null);
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 0, response);
simulateScrollResponse(action, System.nanoTime(), 0, response);
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
assertThat(e.getCause(), instanceOf(RuntimeException.class));
assertThat(e.getCause().getMessage(), equalTo("surprise"));
Expand Down Expand Up @@ -456,8 +455,8 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n
// Set throttle to 1 request per second to make the math simpler
worker.rethrottle(1f);
// Make the last batch look nearly instant but have 100 documents
TimeValue lastBatchStartTime = timeValueNanos(System.nanoTime());
TimeValue now = timeValueNanos(lastBatchStartTime.nanos() + 1);
long lastBatchStartTime = System.nanoTime();
long now = lastBatchStartTime + 1;
action.startNextScroll(lastBatchStartTime, now, 100);

// So the next request is going to have to wait an extra 100 seconds or so (base was 10 seconds, so 110ish)
Expand Down Expand Up @@ -503,15 +502,15 @@ private void bulkRetryTestCase(boolean failWithRejection) throws Exception {
CountDownLatch successLatch = new CountDownLatch(1);
DummyAsyncBulkByScrollAction action = new DummyActionWithoutBackoff() {
@Override
void startNextScroll(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) {
void startNextScroll(long lastBatchStartTime, long now, int lastBatchSize) {
successLatch.countDown();
}
};
BulkRequest request = new BulkRequest();
for (int i = 0; i < size + 1; i++) {
request.add(new IndexRequest("index", "type", "id" + i));
}
action.sendBulkRequest(timeValueNanos(System.nanoTime()), request);
action.sendBulkRequest(System.nanoTime(), request);
if (failWithRejection) {
BulkByScrollResponse response = listener.get();
assertThat(response.getBulkFailures(), hasSize(1));
Expand Down Expand Up @@ -577,22 +576,22 @@ public void testCancelBeforeInitialSearch() throws Exception {
}

public void testCancelBeforeScrollResponse() throws Exception {
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1,
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> simulateScrollResponse(action, System.nanoTime(), 1,
new ScrollableHitSource.Response(false, emptyList(), between(1, 100000), emptyList(), null)));
}

public void testCancelBeforeSendBulkRequest() throws Exception {
cancelTaskCase((DummyAsyncBulkByScrollAction action) ->
action.sendBulkRequest(timeValueNanos(System.nanoTime()), new BulkRequest()));
action.sendBulkRequest(System.nanoTime(), new BulkRequest()));
}

public void testCancelBeforeOnBulkResponse() throws Exception {
cancelTaskCase((DummyAsyncBulkByScrollAction action) ->
action.onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(new BulkItemResponse[0], 0)));
action.onBulkResponse(System.nanoTime(), new BulkResponse(new BulkItemResponse[0], 0)));
}

public void testCancelBeforeStartNextScroll() throws Exception {
TimeValue now = timeValueNanos(System.nanoTime());
long now = System.nanoTime();
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.startNextScroll(now, now, 0));
}

Expand Down Expand Up @@ -641,7 +640,7 @@ public ScheduledCancellable schedule(Runnable command, TimeValue delay, String n
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), total, emptyList(), null);
// Use a long delay here so the test will time out if the cancellation doesn't reschedule the throttled task
worker.rethrottle(1);
simulateScrollResponse(action, timeValueNanos(System.nanoTime()), 1000, response);
simulateScrollResponse(action, System.nanoTime(), 1000, response);

// Now that we've got our cancel we'll just verify that it all came through all right
assertEquals(reason, listener.get(10, TimeUnit.SECONDS).getReasonCancelled());
Expand Down Expand Up @@ -670,7 +669,7 @@ private void cancelTaskCase(Consumer<DummyAsyncBulkByScrollAction> testMe) throw
/**
* Simulate a scroll response by setting the scroll id and firing the onScrollResponse method.
*/
private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, TimeValue lastBatchTime, int lastBatchSize,
private void simulateScrollResponse(DummyAsyncBulkByScrollAction action, long lastBatchTime, int lastBatchSize,
ScrollableHitSource.Response response) {
action.setScroll(scrollId());
action.onScrollResponse(lastBatchTime, lastBatchSize, response);
Expand Down

0 comments on commit 4b0e231

Please sign in to comment.