Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force execution of finish shard bulk request (#51957) #52484

Merged
merged 3 commits into from
Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,16 +168,29 @@ protected void doRun() throws Exception {

@Override
public void onRejection(Exception e) {
// Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request
while (context.hasMoreOperationsToExecute()) {
context.setRequestToExecute(context.getCurrent());
final DocWriteRequest<?> docWriteRequest = context.getRequestToExecute();
onComplete(
exceptionToResult(
e, primary, docWriteRequest.opType() == DocWriteRequest.OpType.DELETE, docWriteRequest.version()),
context, null);
}
finishRequest();
// We must finish the outstanding request. Finishing the outstanding request can include
//refreshing and fsyncing. Therefore, we must force execution on the WRITE thread.
executor.execute(new ActionRunnable<PrimaryResult<BulkShardRequest, BulkShardResponse>>(listener) {

@Override
protected void doRun() {
// Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request
while (context.hasMoreOperationsToExecute()) {
context.setRequestToExecute(context.getCurrent());
final DocWriteRequest<?> docWriteRequest = context.getRequestToExecute();
onComplete(
exceptionToResult(
e, primary, docWriteRequest.opType() == DocWriteRequest.OpType.DELETE, docWriteRequest.version()),
context, null);
}
finishRequest();
}

@Override
public boolean isForceExecution() {
return true;
}
});
}

private void finishRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
Expand All @@ -53,13 +54,18 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.arrayWithSize;
Expand Down Expand Up @@ -818,6 +824,105 @@ public void testRetries() throws Exception {
latch.await();
}

public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {
TestThreadPool rejectingThreadPool = new TestThreadPool(
"TransportShardBulkActionTests#testForceExecutionOnRejectionAfterMappingUpdate",
Settings.builder()
.put("thread_pool." + ThreadPool.Names.WRITE + ".size", 1)
.put("thread_pool." + ThreadPool.Names.WRITE + ".queue_size", 1)
.build());
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
rejectingThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {
try {
cyclicBarrier.await();
logger.info("blocking the write executor");
cyclicBarrier.await();
logger.info("unblocked the write executor");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
try {
cyclicBarrier.await();
// Place a task in the queue to block next enqueue
rejectingThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {});

BulkItemRequest[] items = new BulkItemRequest[2];
DocWriteRequest<IndexRequest> writeRequest1 = new IndexRequest("index").id("id")
.source(Requests.INDEX_CONTENT_TYPE, "foo", 1);
DocWriteRequest<IndexRequest> writeRequest2 = new IndexRequest("index").id("id")
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
items[0] = new BulkItemRequest(0, writeRequest1);
items[1] = new BulkItemRequest(1, writeRequest2);
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);

Engine.IndexResult mappingUpdate =
new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()));
Translog.Location resultLocation1 = new Translog.Location(42, 36, 36);
Translog.Location resultLocation2 = new Translog.Location(42, 42, 42);
Engine.IndexResult success1 = new FakeIndexResult(1, 1, 10, true, resultLocation1);
Engine.IndexResult success2 = new FakeIndexResult(1, 1, 13, true, resultLocation2);

IndexShard shard = mock(IndexShard.class);
when(shard.shardId()).thenReturn(shardId);
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
.thenReturn(success1, mappingUpdate, success2);
when(shard.getFailedIndexResult(any(EsRejectedExecutionException.class), anyLong())).thenCallRealMethod();
when(shard.mapperService()).thenReturn(mock(MapperService.class));

randomlySetIgnoredPrimaryResponse(items[0]);

AtomicInteger updateCalled = new AtomicInteger();

final CountDownLatch latch = new CountDownLatch(1);
TransportShardBulkAction.performOnPrimary(
bulkShardRequest, shard, null, rejectingThreadPool::absoluteTimeInMillis, (update, shardId, type, listener) -> {
// There should indeed be a mapping update
assertNotNull(update);
updateCalled.incrementAndGet();
listener.onResponse(null);
try {
// Release blocking task now that the continue write execution has been rejected and
// the finishRequest execution has been force enqueued
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new IllegalStateException(e);
}
}, listener -> listener.onResponse(null), new LatchedActionListener<>(
ActionTestUtils.assertNoFailureListener(result ->
// Assert that we still need to fsync the location that was successfully written
assertThat(((WritePrimaryResult<BulkShardRequest, BulkShardResponse>) result).location,
equalTo(resultLocation1))), latch),
rejectingThreadPool);
latch.await();

assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1));

verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean());

BulkItemResponse primaryResponse1 = bulkShardRequest.items()[0].getPrimaryResponse();
assertThat(primaryResponse1.getItemId(), equalTo(0));
assertThat(primaryResponse1.getId(), equalTo("id"));
assertThat(primaryResponse1.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
assertFalse(primaryResponse1.isFailed());
assertThat(primaryResponse1.getResponse().status(), equalTo(RestStatus.CREATED));
assertThat(primaryResponse1.getResponse().getSeqNo(), equalTo(10L));

BulkItemResponse primaryResponse2 = bulkShardRequest.items()[1].getPrimaryResponse();
assertThat(primaryResponse2.getItemId(), equalTo(1));
assertThat(primaryResponse2.getId(), equalTo("id"));
assertThat(primaryResponse2.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
assertTrue(primaryResponse2.isFailed());
assertNull(primaryResponse2.getResponse());
assertEquals(primaryResponse2.status(), RestStatus.TOO_MANY_REQUESTS);
assertThat(primaryResponse2.getFailure().getCause(), instanceOf(EsRejectedExecutionException.class));

closeShards(shard);
} finally {
rejectingThreadPool.shutdownNow();
}
}

private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) {
if (randomBoolean()) {
// add a response to the request and thereby check that it is ignored for the primary.
Expand Down