Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not create engine under IndexShard#mutex #45263

Merged
merged 15 commits into from
Aug 26, 2019
Merged
83 changes: 58 additions & 25 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
protected volatile ShardRouting shardRouting;
protected volatile IndexShardState state;
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
private final Object engineMutex = new Object();
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
final EngineFactory engineFactory;

private final IndexingOperationListener indexingOperationListeners;
Expand Down Expand Up @@ -1192,20 +1193,23 @@ public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException {
* @throws java.nio.file.NoSuchFileException if one or more files referenced by a commit are not present.
*/
public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
assert Thread.holdsLock(mutex) == false : "snapshotting store metadata under mutex";
Engine.IndexCommitRef indexCommit = null;
store.incRef();
try {
Engine engine;
synchronized (mutex) {
synchronized (engineMutex) {
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy which is initialized.
// That can be done out of mutex, since the engine can be closed half way.
engine = getEngineOrNull();
if (engine == null) {
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
synchronized (mutex) {
final Engine engine = getEngineOrNull();
if (engine != null) {
indexCommit = engine.acquireLastIndexCommit(false);
}
}
if (indexCommit == null) {
return store.getMetadata(null, true);
}
}
indexCommit = engine.acquireLastIndexCommit(false);
return store.getMetadata(indexCommit.getIndexCommit());
} finally {
store.decRef();
Expand Down Expand Up @@ -1343,13 +1347,13 @@ public IndexShard postRecovery(String reason)
if (state == IndexShardState.STARTED) {
throw new IndexShardStartedException(shardId);
}
// we need to refresh again to expose all operations that were index until now. Otherwise
// we may not expose operations that were indexed with a refresh listener that was immediately
// responded to in addRefreshListener.
getEngine().refresh("post_recovery");
recoveryState.setStage(RecoveryState.Stage.DONE);
changeState(IndexShardState.POST_RECOVERY, reason);
}
// we need to refresh again to expose all operations that were index until now. Otherwise
// we may not expose operations that were indexed with a refresh listener that was immediately
// responded to in addRefreshListener.
getEngine().refresh("post_recovery");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if moving this outside the mutex require extra care in addRefreshListener too, since otherwise we risk someone querying in between marking state POST_RECOVERY and the refresh completing, expecting to see data because of a previous call to addRefreshListener? I am not really absolutely sure this is necessary and the timing obviously have to be horrible for anything bad to happen, but wanted your thoughts on this anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well spotted. Yes, we need to maintain this happens-before relation. We can reuse the engineMutex but I prefer a separate mutex for this. See f802515.

return this;
}

Expand Down Expand Up @@ -1583,6 +1587,7 @@ public void openEngineAndSkipTranslogRecovery() throws IOException {
}

private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException {
assert Thread.holdsLock(mutex) == false : "opening engine under mutex";
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
Expand All @@ -1595,16 +1600,26 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty()
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
+ "] but got " + getRetentionLeases();
synchronized (mutex) {
verifyNotClosed();
assert currentEngineReference.get() == null : "engine is running";
synchronized (engineMutex) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
final Engine newEngine = engineFactory.newReadWriteEngine(config);
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
active.set(true);
boolean success = false;
try {
synchronized (mutex) {
verifyNotClosed();
assert currentEngineReference.get() == null : "engine is running";
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
active.set(true);
success = true;
}
} finally {
if (success == false) {
newEngine.close();
}
}
}
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
Expand All @@ -1627,6 +1642,7 @@ private boolean assertSequenceNumbersInCommit() throws IOException {
}

private void onNewEngine(Engine newEngine) {
assert Thread.holdsLock(engineMutex);
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
}

Expand Down Expand Up @@ -2673,7 +2689,13 @@ private DocumentMapperForType docMapper(String type) {
}

private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
Sort indexSort = indexSortSupplier.get();
final Sort indexSort = indexSortSupplier.get();
final Engine.Warmer warmer = reader -> {
assert Thread.holdsLock(mutex) == false : "warming engine under mutex";
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
if (this.warmer != null) {
this.warmer.warm(reader);
}
};
return new EngineConfig(shardId, shardRouting.allocationId().getId(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService != null ? mapperService.indexAnalyzer() : null,
Expand Down Expand Up @@ -3303,6 +3325,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
*/
void resetEngineToGlobalCheckpoint() throws IOException {
assert Thread.holdsLock(engineMutex) == false : "resetting engine under mutex";
assert getActiveOperationsCount() == OPERATIONS_BLOCKED
: "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']';
sync(); // persist the global checkpoint to disk
Expand All @@ -3314,9 +3337,8 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
SetOnce<Engine> newEngineReference = new SetOnce<>();
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
synchronized (mutex) {
verifyNotClosed();
// we must create both new read-only engine and new read-write engine under mutex to ensure snapshotStoreMetadata,
synchronized (engineMutex) {
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
// acquireXXXCommit and close works.
final Engine readOnlyEngine =
henningandersen marked this conversation as resolved.
Show resolved Hide resolved
new ReadOnlyEngine(newEngineConfig(replicationTracker), seqNoStats, translogStats, false, Function.identity()) {
Expand Down Expand Up @@ -3347,7 +3369,18 @@ public void close() throws IOException {
IOUtils.close(super::close, newEngine);
}
};
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
boolean success = false;
try {
synchronized (mutex) {
verifyNotClosed();
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
success = true;
}
} finally {
if (success == false) {
readOnlyEngine.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This slightly changes behaviour to close the current engine if closing the old engine fails. While this might not really make a difference, I think I would prefer to not do that., ie. set success=true after verifyNotClosed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should close it if the old engine does't close it will just cause dangling reference to an engine and a locked shared if we don't?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 67badc6 to close the new engine if we failed to install.

}
}
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,12 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.Engine.DeleteResult;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
Expand Down Expand Up @@ -4123,4 +4125,39 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) {
assertThat(readonlyShard.docStats().getCount(), equalTo(numDocs));
closeShards(readonlyShard);
}

public void testCloseShardWhileEngineIsWarming() throws Exception {
CountDownLatch warmerStarted = new CountDownLatch(1);
CountDownLatch warmerBlocking = new CountDownLatch(1);
IndexShard shard = newShard(true, Settings.EMPTY, config -> {
Engine.Warmer warmer = reader -> {
try {
warmerStarted.countDown();
warmerBlocking.await();
config.getWarmer().warm(reader);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
};
EngineConfig configWithWarmer = new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(),
config.getIndexSettings(), warmer, config.getStore(), config.getMergePolicy(), config.getAnalyzer(),
config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(),
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier());
return new InternalEngine(configWithWarmer);
});
Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard)));
recoveryThread.start();
try {
warmerStarted.await();
shard.close("testing", false);
assertThat(shard.state, equalTo(IndexShardState.CLOSED));
} finally {
warmerBlocking.countDown();
}
recoveryThread.join();
shard.store().close();
}
}