diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/diskusage/IndexDiskUsageAnalyzerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/diskusage/IndexDiskUsageAnalyzerIT.java index 424a06dddf84c..135b02305c67a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/diskusage/IndexDiskUsageAnalyzerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/diskusage/IndexDiskUsageAnalyzerIT.java @@ -66,12 +66,12 @@ public static class EngineTestPlugin extends Plugin implements EnginePlugin { public Optional getEngineFactory(IndexSettings indexSettings) { return Optional.of(config -> new InternalEngine(config) { @Override - public void flush(boolean force, boolean waitIfOngoing) throws EngineException { + public boolean flush(boolean force, boolean waitIfOngoing) throws EngineException { final ShardId shardId = config.getShardId(); if (failOnFlushShards.contains(shardId)) { throw new EngineException(shardId, "simulated IO"); } - super.flush(force, waitIfOngoing); + return super.flush(force, waitIfOngoing); } }); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 0b1e5902370d9..952ef783a2ea5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1015,8 +1015,10 @@ public boolean refreshNeeded() { * @param force if true a lucene commit is executed even if no changes need to be committed. * @param waitIfOngoing if true this call will block until all currently running flushes have finished. * Otherwise this call will return without blocking. + * @return false if waitIfOngoing==false and an ongoing request is detected, else true. + * If false is returned, no flush happened. */ - public abstract void flush(boolean force, boolean waitIfOngoing) throws EngineException; + public abstract boolean flush(boolean force, boolean waitIfOngoing) throws EngineException; /** * Flushes the state of the engine including the transaction log, clearing memory and persisting diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7a70b5ae6b8cc..bfe94bbb11bc5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1895,7 +1895,7 @@ public boolean shouldPeriodicallyFlush() { } @Override - public void flush(boolean force, boolean waitIfOngoing) throws EngineException { + public boolean flush(boolean force, boolean waitIfOngoing) throws EngineException { ensureOpen(); if (force && waitIfOngoing == false) { assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing; @@ -1908,7 +1908,8 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { if (flushLock.tryLock() == false) { // if we can't get the lock right away we block if needed otherwise barf if (waitIfOngoing == false) { - return; + logger.trace("detected an in-flight flush, not blocking to wait for it's completion"); + return false; } logger.trace("waiting for in-flight flush to finish"); flushLock.lock(); @@ -1960,6 +1961,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { throw ex; } finally { flushLock.unlock(); + logger.trace("released flush lock"); } } // We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving @@ -1967,6 +1969,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { if (engineConfig.isEnableGcDeletes()) { pruneDeletedTombstones(); } + return true; } private void refreshLastCommittedSegmentInfos() { diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index fbb7ef439d027..e47c3cf984c2b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -437,8 +437,8 @@ public boolean shouldPeriodicallyFlush() { } @Override - public void flush(boolean force, boolean waitIfOngoing) throws EngineException { - // noop + public boolean flush(boolean force, boolean waitIfOngoing) throws EngineException { + return true; // noop } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index cb24bd2b9c6e1..e3f2ea350a8c6 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1343,8 +1343,10 @@ public BulkStats bulkStats() { * Executes the given flush request against the engine. * * @param request the flush request + * @return false if waitIfOngoing==false and an ongoing request is detected, else true. + * If false is returned, no flush happened. */ - public void flush(FlushRequest request) { + public boolean flush(FlushRequest request) { final boolean waitIfOngoing = request.waitIfOngoing(); final boolean force = request.force(); logger.trace("flush with {}", request); @@ -1355,8 +1357,9 @@ public void flush(FlushRequest request) { */ verifyNotClosed(); final long time = System.nanoTime(); - getEngine().flush(force, waitIfOngoing); + boolean flushHappened = getEngine().flush(force, waitIfOngoing); flushMetric.inc(System.nanoTime() - time); + return flushHappened; } /** @@ -2188,7 +2191,11 @@ public void onFailure(Exception e) { @Override protected void doRun() { - flush(new FlushRequest().waitIfOngoing(false).force(false)); + if (flush(new FlushRequest().waitIfOngoing(false).force(false)) == false) { + // In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request + // will retry (#87888) + active.set(true); + } periodicFlushMetric.inc(); } }); diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 51db5b4f7c648..8dd0e89e1cbbe 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -62,7 +62,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -170,7 +170,7 @@ private void executeOnPrimaryOrReplica(boolean phase1) throws Throwable { public void testShardIsFlushed() throws Throwable { final ArgumentCaptor flushRequest = ArgumentCaptor.forClass(FlushRequest.class); - doNothing().when(indexShard).flush(flushRequest.capture()); + doReturn(true).when(indexShard).flush(flushRequest.capture()); executeOnPrimaryOrReplica(); verify(indexShard, times(1)).flush(any(FlushRequest.class)); assertThat(flushRequest.getValue().force(), is(true)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 91f7d262cec7a..1ce2685952d25 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -14,6 +14,7 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.Term; import org.apache.lucene.search.TermQuery; @@ -116,6 +117,7 @@ import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.FieldMaskingReader; import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.store.MockFSDirectoryFactory; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -3897,6 +3899,90 @@ public void testFlushOnIdle() throws Exception { closeShards(shard); } + @TestLogging(reason = "testing traces of concurrent flushes", value = "org.elasticsearch.index.engine.Engine:TRACE") + public void testFlushOnIdleConcurrentFlushDoesNotWait() throws Exception { + final MockLogAppender mockLogAppender = new MockLogAppender(); + try { + CountDownLatch readyToCompleteFlushLatch = new CountDownLatch(1); + IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) { + @Override + protected void commitIndexWriter(final IndexWriter writer, final Translog translog) throws IOException { + try { + readyToCompleteFlushLatch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + super.commitIndexWriter(writer, translog); + } + }); + + for (int i = 0; i < 3; i++) { + indexDoc(shard, "_doc", Integer.toString(i)); + } + + mockLogAppender.start(); + Loggers.addAppender(LogManager.getLogger(Engine.class), mockLogAppender); + + // Issue the first flushOnIdle request. The flush happens in the background using the flush threadpool. + shard.flushOnIdle(0); + assertFalse(shard.isActive()); + + // Wait for log message that flush acquired lock immediately + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "should see first flush getting lock immediately", + Engine.class.getCanonicalName(), + Level.TRACE, + "acquired flush lock immediately" + ) + ); + assertBusy(mockLogAppender::assertAllExpectationsMatched); + + // While the first flush is happening, index one more doc (to turn the shard's active flag to true), + // and issue a second flushOnIdle request which should not wait for the ongoing flush + indexDoc(shard, "_doc", Integer.toString(3)); + assertTrue(shard.isActive()); + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "should see second flush returning since it will not wait for the ongoing flush", + Engine.class.getCanonicalName(), + Level.TRACE, + "detected an in-flight flush, not blocking to wait for it's completion" + ) + ); + shard.flushOnIdle(0); + assertBusy(mockLogAppender::assertAllExpectationsMatched); + + // A direct call to flush (with waitIfOngoing=false) should not wait and return false immediately + assertFalse(shard.flush(new FlushRequest().waitIfOngoing(false).force(false))); + + // Allow first flushOnIdle to complete + readyToCompleteFlushLatch.countDown(); + + // Wait for first flushOnIdle to log a message that it released the flush lock + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "should see first flush releasing lock", + Engine.class.getCanonicalName(), + Level.TRACE, + "released flush lock" + ) + ); + assertBusy(mockLogAppender::assertAllExpectationsMatched); + + // The second flushOnIdle (that did not happen) should have turned the active flag to true + assertTrue(shard.isActive()); + + // After all the previous flushes are done, issue a final flush (for any remaining documents) that should return true + assertTrue(shard.flush(new FlushRequest())); + + closeShards(shard); + } finally { + Loggers.removeAppender(LogManager.getLogger(Engine.class), mockLogAppender); + mockLogAppender.stop(); + } + } + public void testOnCloseStats() throws IOException { final IndexShard indexShard = newStartedShard(true);