Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inactive shard flush should wait for ongoing one #89430

Merged
merged 8 commits into from
Aug 22, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ public static class EngineTestPlugin extends Plugin implements EnginePlugin {
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
return Optional.of(config -> new InternalEngine(config) {
@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
public boolean flush(boolean force, boolean waitIfOngoing) throws EngineException {
final ShardId shardId = config.getShardId();
if (failOnFlushShards.contains(shardId)) {
throw new EngineException(shardId, "simulated IO");
}
super.flush(force, waitIfOngoing);
return super.flush(force, waitIfOngoing);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1015,8 +1015,9 @@ public boolean refreshNeeded() {
* @param force if <code>true</code> a lucene commit is executed even if no changes need to be committed.
* @param waitIfOngoing if <code>true</code> this call will block until all currently running flushes have finished.
* Otherwise this call will return without blocking.
* @return <code>true</code> if the flush happened, else <code>false</code> (e.g., if it did not wait for an ongoing request)
kingherc marked this conversation as resolved.
Show resolved Hide resolved
*/
public abstract void flush(boolean force, boolean waitIfOngoing) throws EngineException;
public abstract boolean flush(boolean force, boolean waitIfOngoing) throws EngineException;

/**
* Flushes the state of the engine including the transaction log, clearing memory and persisting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1895,7 +1895,7 @@ public boolean shouldPeriodicallyFlush() {
}

@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
public boolean flush(boolean force, boolean waitIfOngoing) throws EngineException {
ensureOpen();
if (force && waitIfOngoing == false) {
assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing;
Expand All @@ -1908,7 +1908,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
if (flushLock.tryLock() == false) {
// if we can't get the lock right away we block if needed otherwise barf
if (waitIfOngoing == false) {
return;
return false;
}
logger.trace("waiting for in-flight flush to finish");
flushLock.lock();
Expand Down Expand Up @@ -1967,6 +1967,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
if (engineConfig.isEnableGcDeletes()) {
pruneDeletedTombstones();
}
return true;
}

private void refreshLastCommittedSegmentInfos() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ public boolean shouldPeriodicallyFlush() {
}

@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
// noop
public boolean flush(boolean force, boolean waitIfOngoing) throws EngineException {
return true; // noop
kingherc marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
18 changes: 13 additions & 5 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1343,8 +1343,9 @@ public BulkStats bulkStats() {
* Executes the given flush request against the engine.
*
* @param request the flush request
* @return <code>true</code> if the flush happened, else <code>false</code> (e.g., if it did not wait for an ongoing request)
*/
public void flush(FlushRequest request) {
public boolean flush(FlushRequest request) {
final boolean waitIfOngoing = request.waitIfOngoing();
final boolean force = request.force();
logger.trace("flush with {}", request);
Expand All @@ -1355,8 +1356,11 @@ public void flush(FlushRequest request) {
*/
verifyNotClosed();
final long time = System.nanoTime();
getEngine().flush(force, waitIfOngoing);
flushMetric.inc(System.nanoTime() - time);
boolean flushHappened = getEngine().flush(force, waitIfOngoing);
if (flushHappened) {
kingherc marked this conversation as resolved.
Show resolved Hide resolved
flushMetric.inc(System.nanoTime() - time);
}
return flushHappened;
}

/**
Expand Down Expand Up @@ -2188,8 +2192,12 @@ public void onFailure(Exception e) {

@Override
protected void doRun() {
flush(new FlushRequest().waitIfOngoing(false).force(false));
periodicFlushMetric.inc();
if (flush(new FlushRequest().waitIfOngoing(false).force(false))) {
periodicFlushMetric.inc();
kingherc marked this conversation as resolved.
Show resolved Hide resolved
} else {
// In case the flush did not happen, revert active flag so that a next flushOnIdle request can happen (#87888)
kingherc marked this conversation as resolved.
Show resolved Hide resolved
active.set(true);
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -170,7 +170,7 @@ private void executeOnPrimaryOrReplica(boolean phase1) throws Throwable {

public void testShardIsFlushed() throws Throwable {
final ArgumentCaptor<FlushRequest> flushRequest = ArgumentCaptor.forClass(FlushRequest.class);
doNothing().when(indexShard).flush(flushRequest.capture());
doReturn(true).when(indexShard).flush(flushRequest.capture());
executeOnPrimaryOrReplica();
verify(indexShard, times(1)).flush(any(FlushRequest.class));
assertThat(flushRequest.getValue().force(), is(true));
Expand Down