Skip to content

Commit

Permalink
Inactive shard flush should wait for ongoing one (#89430)
Browse files Browse the repository at this point in the history
Inactive shard flush should wait for ongoing one

org.elasticsearch.indices.flush.FlushIT#testFlushOnInactive would
sometimes fail in the following case:
* SHARD_MEMORY_INTERVAL_TIME_SETTING is set very low, e.g., 10ms
* The regularly scheduled multiple flushes proceed to
  org.elasticsearch.index.shard.IndexShard#flushOnIdle
* There, the first flush will handle e.g., the first document
  that was indexed. The second flush will arrive shortly after,
  before the first flush finishes.
* The second flush will find that wasActive = true (due to the
  indexing of the remaining documents), and will set it to false.
* However, the second flush will not be executed because
  waitIfOngoing = false, and there is the ongoing first flush.
* No other flush is scheduled (since any next regularly scheduled
  flush will find wasActive = false), which creates the problem.

Solution: if a flush request does not happen, revert active flag,
so that a next flush request can happen.

Fixes #87888
  • Loading branch information
kingherc committed Aug 22, 2022
1 parent 58fafe2 commit 824bfd0
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ public static class EngineTestPlugin extends Plugin implements EnginePlugin {
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
return Optional.of(config -> new InternalEngine(config) {
@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
public boolean flush(boolean force, boolean waitIfOngoing) throws EngineException {
final ShardId shardId = config.getShardId();
if (failOnFlushShards.contains(shardId)) {
throw new EngineException(shardId, "simulated IO");
}
super.flush(force, waitIfOngoing);
return super.flush(force, waitIfOngoing);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1015,8 +1015,10 @@ public boolean refreshNeeded() {
* @param force if <code>true</code> a lucene commit is executed even if no changes need to be committed.
* @param waitIfOngoing if <code>true</code> this call will block until all currently running flushes have finished.
* Otherwise this call will return without blocking.
* @return <code>false</code> if <code>waitIfOngoing==false</code> and an ongoing request is detected, else <code>true</code>.
* If <code>false</code> is returned, no flush happened.
*/
public abstract void flush(boolean force, boolean waitIfOngoing) throws EngineException;
public abstract boolean flush(boolean force, boolean waitIfOngoing) throws EngineException;

/**
* Flushes the state of the engine including the transaction log, clearing memory and persisting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1895,7 +1895,7 @@ public boolean shouldPeriodicallyFlush() {
}

@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
public boolean flush(boolean force, boolean waitIfOngoing) throws EngineException {
ensureOpen();
if (force && waitIfOngoing == false) {
assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing;
Expand All @@ -1908,7 +1908,8 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
if (flushLock.tryLock() == false) {
// if we can't get the lock right away we block if needed otherwise barf
if (waitIfOngoing == false) {
return;
logger.trace("detected an in-flight flush, not blocking to wait for it's completion");
return false;
}
logger.trace("waiting for in-flight flush to finish");
flushLock.lock();
Expand Down Expand Up @@ -1960,13 +1961,15 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
throw ex;
} finally {
flushLock.unlock();
logger.trace("released flush lock");
}
}
// We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
// (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
if (engineConfig.isEnableGcDeletes()) {
pruneDeletedTombstones();
}
return true;
}

private void refreshLastCommittedSegmentInfos() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ public boolean shouldPeriodicallyFlush() {
}

@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
// noop
public boolean flush(boolean force, boolean waitIfOngoing) throws EngineException {
return true; // noop
}

@Override
Expand Down
13 changes: 10 additions & 3 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1343,8 +1343,10 @@ public BulkStats bulkStats() {
* Executes the given flush request against the engine.
*
* @param request the flush request
* @return <code>false</code> if <code>waitIfOngoing==false</code> and an ongoing request is detected, else <code>true</code>.
* If <code>false</code> is returned, no flush happened.
*/
public void flush(FlushRequest request) {
public boolean flush(FlushRequest request) {
final boolean waitIfOngoing = request.waitIfOngoing();
final boolean force = request.force();
logger.trace("flush with {}", request);
Expand All @@ -1355,8 +1357,9 @@ public void flush(FlushRequest request) {
*/
verifyNotClosed();
final long time = System.nanoTime();
getEngine().flush(force, waitIfOngoing);
boolean flushHappened = getEngine().flush(force, waitIfOngoing);
flushMetric.inc(System.nanoTime() - time);
return flushHappened;
}

/**
Expand Down Expand Up @@ -2188,7 +2191,11 @@ public void onFailure(Exception e) {

@Override
protected void doRun() {
flush(new FlushRequest().waitIfOngoing(false).force(false));
if (flush(new FlushRequest().waitIfOngoing(false).force(false)) == false) {
// In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request
// will retry (#87888)
active.set(true);
}
periodicFlushMetric.inc();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -170,7 +170,7 @@ private void executeOnPrimaryOrReplica(boolean phase1) throws Throwable {

public void testShardIsFlushed() throws Throwable {
final ArgumentCaptor<FlushRequest> flushRequest = ArgumentCaptor.forClass(FlushRequest.class);
doNothing().when(indexShard).flush(flushRequest.capture());
doReturn(true).when(indexShard).flush(flushRequest.capture());
executeOnPrimaryOrReplica();
verify(indexShard, times(1)).flush(any(FlushRequest.class));
assertThat(flushRequest.getValue().force(), is(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery;
Expand Down Expand Up @@ -116,6 +117,7 @@
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.FieldMaskingReader;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.store.MockFSDirectoryFactory;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -3897,6 +3899,90 @@ public void testFlushOnIdle() throws Exception {
closeShards(shard);
}

@TestLogging(reason = "testing traces of concurrent flushes", value = "org.elasticsearch.index.engine.Engine:TRACE")
public void testFlushOnIdleConcurrentFlushDoesNotWait() throws Exception {
final MockLogAppender mockLogAppender = new MockLogAppender();
try {
CountDownLatch readyToCompleteFlushLatch = new CountDownLatch(1);
IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) {
@Override
protected void commitIndexWriter(final IndexWriter writer, final Translog translog) throws IOException {
try {
readyToCompleteFlushLatch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
super.commitIndexWriter(writer, translog);
}
});

for (int i = 0; i < 3; i++) {
indexDoc(shard, "_doc", Integer.toString(i));
}

mockLogAppender.start();
Loggers.addAppender(LogManager.getLogger(Engine.class), mockLogAppender);

// Issue the first flushOnIdle request. The flush happens in the background using the flush threadpool.
shard.flushOnIdle(0);
assertFalse(shard.isActive());

// Wait for log message that flush acquired lock immediately
mockLogAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"should see first flush getting lock immediately",
Engine.class.getCanonicalName(),
Level.TRACE,
"acquired flush lock immediately"
)
);
assertBusy(mockLogAppender::assertAllExpectationsMatched);

// While the first flush is happening, index one more doc (to turn the shard's active flag to true),
// and issue a second flushOnIdle request which should not wait for the ongoing flush
indexDoc(shard, "_doc", Integer.toString(3));
assertTrue(shard.isActive());
mockLogAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"should see second flush returning since it will not wait for the ongoing flush",
Engine.class.getCanonicalName(),
Level.TRACE,
"detected an in-flight flush, not blocking to wait for it's completion"
)
);
shard.flushOnIdle(0);
assertBusy(mockLogAppender::assertAllExpectationsMatched);

// A direct call to flush (with waitIfOngoing=false) should not wait and return false immediately
assertFalse(shard.flush(new FlushRequest().waitIfOngoing(false).force(false)));

// Allow first flushOnIdle to complete
readyToCompleteFlushLatch.countDown();

// Wait for first flushOnIdle to log a message that it released the flush lock
mockLogAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"should see first flush releasing lock",
Engine.class.getCanonicalName(),
Level.TRACE,
"released flush lock"
)
);
assertBusy(mockLogAppender::assertAllExpectationsMatched);

// The second flushOnIdle (that did not happen) should have turned the active flag to true
assertTrue(shard.isActive());

// After all the previous flushes are done, issue a final flush (for any remaining documents) that should return true
assertTrue(shard.flush(new FlushRequest()));

closeShards(shard);
} finally {
Loggers.removeAppender(LogManager.getLogger(Engine.class), mockLogAppender);
mockLogAppender.stop();
}
}

public void testOnCloseStats() throws IOException {
final IndexShard indexShard = newStartedShard(true);

Expand Down

0 comments on commit 824bfd0

Please sign in to comment.