Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inactive shard flush should wait for ongoing one #89430

Merged
merged 8 commits into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ public static class EngineTestPlugin extends Plugin implements EnginePlugin {
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
return Optional.of(config -> new InternalEngine(config) {
@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
public boolean flush(boolean force, boolean waitIfOngoing) throws EngineException {
final ShardId shardId = config.getShardId();
if (failOnFlushShards.contains(shardId)) {
throw new EngineException(shardId, "simulated IO");
}
super.flush(force, waitIfOngoing);
return super.flush(force, waitIfOngoing);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1015,8 +1015,10 @@ public boolean refreshNeeded() {
* @param force if <code>true</code> a lucene commit is executed even if no changes need to be committed.
* @param waitIfOngoing if <code>true</code> this call will block until all currently running flushes have finished.
* Otherwise this call will return without blocking.
* @return <code>false</code> if <code>waitIfOngoing==false</code> and an ongoing request is detected, else <code>true</code>.
* If <code>false</code> is returned, no flush happened.
*/
public abstract void flush(boolean force, boolean waitIfOngoing) throws EngineException;
public abstract boolean flush(boolean force, boolean waitIfOngoing) throws EngineException;

/**
* Flushes the state of the engine including the transaction log, clearing memory and persisting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1895,7 +1895,7 @@ public boolean shouldPeriodicallyFlush() {
}

@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
public boolean flush(boolean force, boolean waitIfOngoing) throws EngineException {
ensureOpen();
if (force && waitIfOngoing == false) {
assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing;
Expand All @@ -1908,7 +1908,8 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
if (flushLock.tryLock() == false) {
// if we can't get the lock right away we block if needed otherwise barf
if (waitIfOngoing == false) {
return;
logger.trace("detected an in-flight flush, not blocking to wait for it's completion");
return false;
}
logger.trace("waiting for in-flight flush to finish");
flushLock.lock();
Expand Down Expand Up @@ -1960,13 +1961,15 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
throw ex;
} finally {
flushLock.unlock();
logger.trace("released flush lock");
}
}
// We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
// (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
if (engineConfig.isEnableGcDeletes()) {
pruneDeletedTombstones();
}
return true;
}

private void refreshLastCommittedSegmentInfos() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ public boolean shouldPeriodicallyFlush() {
}

@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
// noop
public boolean flush(boolean force, boolean waitIfOngoing) throws EngineException {
return true; // noop
kingherc marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
13 changes: 10 additions & 3 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1343,8 +1343,10 @@ public BulkStats bulkStats() {
* Executes the given flush request against the engine.
*
* @param request the flush request
* @return <code>false</code> if <code>waitIfOngoing==false</code> and an ongoing request is detected, else <code>true</code>.
* If <code>false</code> is returned, no flush happened.
*/
public void flush(FlushRequest request) {
public boolean flush(FlushRequest request) {
final boolean waitIfOngoing = request.waitIfOngoing();
final boolean force = request.force();
logger.trace("flush with {}", request);
Expand All @@ -1355,8 +1357,9 @@ public void flush(FlushRequest request) {
*/
verifyNotClosed();
final long time = System.nanoTime();
getEngine().flush(force, waitIfOngoing);
boolean flushHappened = getEngine().flush(force, waitIfOngoing);
flushMetric.inc(System.nanoTime() - time);
return flushHappened;
}

/**
Expand Down Expand Up @@ -2188,7 +2191,11 @@ public void onFailure(Exception e) {

@Override
protected void doRun() {
flush(new FlushRequest().waitIfOngoing(false).force(false));
if (flush(new FlushRequest().waitIfOngoing(false).force(false)) == false) {
// In case an ongoing flush was detected, revert active flag so that a next flushOnIdle request
// will retry (#87888)
active.set(true);
}
periodicFlushMetric.inc();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -170,7 +170,7 @@ private void executeOnPrimaryOrReplica(boolean phase1) throws Throwable {

public void testShardIsFlushed() throws Throwable {
final ArgumentCaptor<FlushRequest> flushRequest = ArgumentCaptor.forClass(FlushRequest.class);
doNothing().when(indexShard).flush(flushRequest.capture());
doReturn(true).when(indexShard).flush(flushRequest.capture());
executeOnPrimaryOrReplica();
verify(indexShard, times(1)).flush(any(FlushRequest.class));
assertThat(flushRequest.getValue().force(), is(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery;
Expand Down Expand Up @@ -116,6 +117,7 @@
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.FieldMaskingReader;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.store.MockFSDirectoryFactory;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -3897,6 +3899,90 @@ public void testFlushOnIdle() throws Exception {
closeShards(shard);
}

@TestLogging(reason = "testing traces of concurrent flushes", value = "org.elasticsearch.index.engine.Engine:TRACE")
public void testFlushOnIdleConcurrentFlushDoesNotWait() throws Exception {
final MockLogAppender mockLogAppender = new MockLogAppender();
try {
CountDownLatch readyToCompleteFlushLatch = new CountDownLatch(1);
IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) {
@Override
protected void commitIndexWriter(final IndexWriter writer, final Translog translog) throws IOException {
try {
readyToCompleteFlushLatch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
super.commitIndexWriter(writer, translog);
}
});

for (int i = 0; i < 3; i++) {
indexDoc(shard, "_doc", Integer.toString(i));
}

mockLogAppender.start();
Loggers.addAppender(LogManager.getLogger(Engine.class), mockLogAppender);

// Issue the first flushOnIdle request. The flush happens in the background using the flush threadpool.
shard.flushOnIdle(0);
assertFalse(shard.isActive());

// Wait for log message that flush acquired lock immediately
mockLogAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"should see first flush getting lock immediately",
Engine.class.getCanonicalName(),
Level.TRACE,
"acquired flush lock immediately"
)
);
assertBusy(mockLogAppender::assertAllExpectationsMatched);

// While the first flush is happening, index one more doc (to turn the shard's active flag to true),
// and issue a second flushOnIdle request which should not wait for the ongoing flush
indexDoc(shard, "_doc", Integer.toString(3));
assertTrue(shard.isActive());
mockLogAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"should see second flush returning since it will not wait for the ongoing flush",
Engine.class.getCanonicalName(),
Level.TRACE,
"detected an in-flight flush, not blocking to wait for it's completion"
)
);
shard.flushOnIdle(0);
assertBusy(mockLogAppender::assertAllExpectationsMatched);

// A direct call to flush (with waitIfOngoing=false) should not wait and return false immediately
assertFalse(shard.flush(new FlushRequest().waitIfOngoing(false).force(false)));

// Allow first flushOnIdle to complete
readyToCompleteFlushLatch.countDown();

// Wait for first flushOnIdle to log a message that it released the flush lock
mockLogAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"should see first flush releasing lock",
Engine.class.getCanonicalName(),
Level.TRACE,
"released flush lock"
)
);
assertBusy(mockLogAppender::assertAllExpectationsMatched);

kingherc marked this conversation as resolved.
Show resolved Hide resolved
// The second flushOnIdle (that did not happen) should have turned the active flag to true
assertTrue(shard.isActive());

// After all the previous flushes are done, issue a final flush (for any remaining documents) that should return true
assertTrue(shard.flush(new FlushRequest()));

closeShards(shard);
} finally {
Loggers.removeAppender(LogManager.getLogger(Engine.class), mockLogAppender);
mockLogAppender.stop();
}
}

public void testOnCloseStats() throws IOException {
final IndexShard indexShard = newStartedShard(true);

Expand Down