diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index e709c24bc64c7..46284c8ec419e 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -179,7 +179,20 @@ public void onRejection(Exception e) { e, primary, docWriteRequest.opType() == DocWriteRequest.OpType.DELETE, docWriteRequest.version()), context, null); } - finishRequest(); + + // Force the execution to finish the request + executor.execute(new ActionRunnable<>(listener) { + + @Override + protected void doRun() { + finishRequest(); + } + + @Override + public boolean isForceExecution() { + return true; + } + }); } private void finishRequest() { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 5985d6731fdbd..a6f6f3c5ca81e 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; @@ -53,13 +54,18 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collections; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.Matchers.arrayWithSize; @@ -809,6 +815,105 @@ public void testRetries() throws Exception { latch.await(); } + public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { + TestThreadPool rejectingThreadPool = new TestThreadPool( + "TransportShardBulkActionTests#testForceExecutionOnRejectionAfterMappingUpdate", + Settings.builder() + .put("thread_pool." + ThreadPool.Names.WRITE + ".size", 1) + .put("thread_pool." + ThreadPool.Names.WRITE + ".queue_size", 1) + .build()); + CyclicBarrier cyclicBarrier = new CyclicBarrier(2); + rejectingThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> { + try { + cyclicBarrier.await(); + logger.info("blocking the write executor"); + cyclicBarrier.await(); + logger.info("unblocked the write executor"); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + try { + cyclicBarrier.await(); + // Place a task in the queue to block next enqueue + rejectingThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> {}); + + BulkItemRequest[] items = new BulkItemRequest[2]; + DocWriteRequest writeRequest1 = new IndexRequest("index").id("id") + .source(Requests.INDEX_CONTENT_TYPE, "foo", 1); + DocWriteRequest writeRequest2 = new IndexRequest("index").id("id") + .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); + items[0] = new BulkItemRequest(0, writeRequest1); + items[1] = new BulkItemRequest(1, writeRequest2); + BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + + Engine.IndexResult mappingUpdate = + new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap())); + Translog.Location resultLocation1 = new Translog.Location(42, 36, 36); + Translog.Location resultLocation2 = new Translog.Location(42, 42, 42); + Engine.IndexResult success1 = new FakeIndexResult(1, 1, 10, true, resultLocation1); + Engine.IndexResult success2 = new FakeIndexResult(1, 1, 13, true, resultLocation2); + + IndexShard shard = mock(IndexShard.class); + when(shard.shardId()).thenReturn(shardId); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())) + .thenReturn(success1, mappingUpdate, success2); + when(shard.getFailedIndexResult(any(EsRejectedExecutionException.class), anyLong())).thenCallRealMethod(); + when(shard.mapperService()).thenReturn(mock(MapperService.class)); + + randomlySetIgnoredPrimaryResponse(items[0]); + + AtomicInteger updateCalled = new AtomicInteger(); + + final CountDownLatch latch = new CountDownLatch(1); + TransportShardBulkAction.performOnPrimary( + bulkShardRequest, shard, null, rejectingThreadPool::absoluteTimeInMillis, (update, shardId, listener) -> { + // There should indeed be a mapping update + assertNotNull(update); + updateCalled.incrementAndGet(); + listener.onResponse(null); + try { + // Release blocking task now that the continue write execution has been rejected and + // the finishRequest execution has been force enqueued + cyclicBarrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new IllegalStateException(e); + } + }, listener -> listener.onResponse(null), new LatchedActionListener<>( + ActionTestUtils.assertNoFailureListener(result -> + // Assert that we still need to fsync the location that was successfully written + assertThat(((WritePrimaryResult) result).location, + equalTo(resultLocation1))), latch), + rejectingThreadPool); + latch.await(); + + assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1)); + + verify(shard, times(2)).applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()); + + BulkItemResponse primaryResponse1 = bulkShardRequest.items()[0].getPrimaryResponse(); + assertThat(primaryResponse1.getItemId(), equalTo(0)); + assertThat(primaryResponse1.getId(), equalTo("id")); + assertThat(primaryResponse1.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); + assertFalse(primaryResponse1.isFailed()); + assertThat(primaryResponse1.getResponse().status(), equalTo(RestStatus.CREATED)); + assertThat(primaryResponse1.getResponse().getSeqNo(), equalTo(10L)); + + BulkItemResponse primaryResponse2 = bulkShardRequest.items()[1].getPrimaryResponse(); + assertThat(primaryResponse2.getItemId(), equalTo(1)); + assertThat(primaryResponse2.getId(), equalTo("id")); + assertThat(primaryResponse2.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); + assertTrue(primaryResponse2.isFailed()); + assertNull(primaryResponse2.getResponse()); + assertEquals(primaryResponse2.status(), RestStatus.TOO_MANY_REQUESTS); + assertThat(primaryResponse2.getFailure().getCause(), instanceOf(EsRejectedExecutionException.class)); + + closeShards(shard); + } finally { + rejectingThreadPool.shutdownNow(); + } + } + private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) { if (randomBoolean()) { // add a response to the request and thereby check that it is ignored for the primary.