Skip to content

Commit

Permalink
Force execution of finish shard bulk request
Browse files Browse the repository at this point in the history
Currently the shard bulk request can be rejected by the write threadpool
after a mapping update. This introduces a scenario where the mapping
listener thread will attempt to finish the request and fsync. This
thread can potentially be a transport thread. This commit fixes this
issue by forcing the finish action to happen on the write threadpool.

Fixes elastic#51904.
  • Loading branch information
Tim-Brooks committed Feb 5, 2020
1 parent e856734 commit 5a02d1a
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,20 @@ public void onRejection(Exception e) {
e, primary, docWriteRequest.opType() == DocWriteRequest.OpType.DELETE, docWriteRequest.version()),
context, null);
}
finishRequest();

// Force the execution to finish the request
executor.execute(new ActionRunnable<>(listener) {

@Override
protected void doRun() {
finishRequest();
}

@Override
public boolean isForceExecution() {
return true;
}
});
}

private void finishRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
Expand All @@ -53,13 +54,18 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.arrayWithSize;
Expand Down Expand Up @@ -809,6 +815,105 @@ public void testRetries() throws Exception {
latch.await();
}

public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {
TestThreadPool rejectingThreadPool = new TestThreadPool(
"TransportShardBulkActionTests#testForceExecutionOnRejectionAfterMappingUpdate",
Settings.builder()
.put("thread_pool." + ThreadPool.Names.WRITE + ".size", 1)
.put("thread_pool." + ThreadPool.Names.WRITE + ".queue_size", 1)
.build());
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
rejectingThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {
try {
cyclicBarrier.await();
logger.info("blocking the write executor");
cyclicBarrier.await();
logger.info("unblocked the write executor");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
try {
cyclicBarrier.await();
// Place a task in the queue to block next enqueue
rejectingThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {});

BulkItemRequest[] items = new BulkItemRequest[2];
DocWriteRequest<IndexRequest> writeRequest1 = new IndexRequest("index").id("id")
.source(Requests.INDEX_CONTENT_TYPE, "foo", 1);
DocWriteRequest<IndexRequest> writeRequest2 = new IndexRequest("index").id("id")
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
items[0] = new BulkItemRequest(0, writeRequest1);
items[1] = new BulkItemRequest(1, writeRequest2);
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);

Engine.IndexResult mappingUpdate =
new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()));
Translog.Location resultLocation1 = new Translog.Location(42, 36, 36);
Translog.Location resultLocation2 = new Translog.Location(42, 42, 42);
Engine.IndexResult success1 = new FakeIndexResult(1, 1, 10, true, resultLocation1);
Engine.IndexResult success2 = new FakeIndexResult(1, 1, 13, true, resultLocation2);

IndexShard shard = mock(IndexShard.class);
when(shard.shardId()).thenReturn(shardId);
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
.thenReturn(success1, mappingUpdate, success2);
when(shard.getFailedIndexResult(any(EsRejectedExecutionException.class), anyLong())).thenCallRealMethod();
when(shard.mapperService()).thenReturn(mock(MapperService.class));

randomlySetIgnoredPrimaryResponse(items[0]);

AtomicInteger updateCalled = new AtomicInteger();

final CountDownLatch latch = new CountDownLatch(1);
TransportShardBulkAction.performOnPrimary(
bulkShardRequest, shard, null, rejectingThreadPool::absoluteTimeInMillis, (update, shardId, listener) -> {
// There should indeed be a mapping update
assertNotNull(update);
updateCalled.incrementAndGet();
listener.onResponse(null);
try {
// Release blocking task now that the continue write execution has been rejected and
// the finishRequest execution has been force enqueued
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new IllegalStateException(e);
}
}, listener -> listener.onResponse(null), new LatchedActionListener<>(
ActionTestUtils.assertNoFailureListener(result ->
// Assert that we still need to fsync the location that was successfully written
assertThat(((WritePrimaryResult<BulkShardRequest, BulkShardResponse>) result).location,
equalTo(resultLocation1))), latch),
rejectingThreadPool);
latch.await();

assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1));

verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean());

BulkItemResponse primaryResponse1 = bulkShardRequest.items()[0].getPrimaryResponse();
assertThat(primaryResponse1.getItemId(), equalTo(0));
assertThat(primaryResponse1.getId(), equalTo("id"));
assertThat(primaryResponse1.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
assertFalse(primaryResponse1.isFailed());
assertThat(primaryResponse1.getResponse().status(), equalTo(RestStatus.CREATED));
assertThat(primaryResponse1.getResponse().getSeqNo(), equalTo(10L));

BulkItemResponse primaryResponse2 = bulkShardRequest.items()[1].getPrimaryResponse();
assertThat(primaryResponse2.getItemId(), equalTo(1));
assertThat(primaryResponse2.getId(), equalTo("id"));
assertThat(primaryResponse2.getOpType(), equalTo(DocWriteRequest.OpType.INDEX));
assertTrue(primaryResponse2.isFailed());
assertNull(primaryResponse2.getResponse());
assertEquals(primaryResponse2.status(), RestStatus.TOO_MANY_REQUESTS);
assertThat(primaryResponse2.getFailure().getCause(), instanceOf(EsRejectedExecutionException.class));

closeShards(shard);
} finally {
rejectingThreadPool.shutdownNow();
}
}

private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) {
if (randomBoolean()) {
// add a response to the request and thereby check that it is ignored for the primary.
Expand Down

0 comments on commit 5a02d1a

Please sign in to comment.