Skip to content

Commit

Permalink
Closed shard should never open new engine (elastic#47186)
Browse files Browse the repository at this point in the history
We should not open new engines if a shard is closed. We break this
assumption in elastic#45263 where we stop verifying the shard state before
creating an engine but only before swapping the engine reference.
We can fail to snapshot the store metadata or checkIndex a closed shard
if there's some IndexWriter holding the index lock.

Closes elastic#47060
  • Loading branch information
dnhatn authored and debadair committed Nov 13, 2019
1 parent 8a63108 commit 71ebec6
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 57 deletions.
74 changes: 25 additions & 49 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1158,11 +1158,9 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
synchronized (engineMutex) {
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
synchronized (mutex) {
final Engine engine = getEngineOrNull();
if (engine != null) {
indexCommit = engine.acquireLastIndexCommit(false);
}
final Engine engine = getEngineOrNull();
if (engine != null) {
indexCommit = engine.acquireLastIndexCommit(false);
}
if (indexCommit == null) {
return store.getMetadata(null, true);
Expand Down Expand Up @@ -1286,9 +1284,11 @@ public CacheHelper getReaderCacheHelper() {
}

public void close(String reason, boolean flushEngine) throws IOException {
synchronized (mutex) {
synchronized (engineMutex) {
try {
changeState(IndexShardState.CLOSED, reason);
synchronized (mutex) {
changeState(IndexShardState.CLOSED, reason);
}
} finally {
final Engine engine = this.currentEngineReference.getAndSet(null);
try {
Expand Down Expand Up @@ -1343,6 +1343,7 @@ public void prepareForIndexRecovery() {
* This is the first operation after the local checkpoint of the safe commit if exists.
*/
public long recoverLocallyUpToGlobalCheckpoint() {
assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
Expand Down Expand Up @@ -1394,7 +1395,7 @@ public long recoverLocallyUpToGlobalCheckpoint() {
getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint);
logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
} finally {
synchronized (mutex) {
synchronized (engineMutex) {
IOUtils.close(currentEngineReference.getAndSet(null));
}
}
Expand Down Expand Up @@ -1569,23 +1570,15 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
+ "] but got " + getRetentionLeases();
synchronized (engineMutex) {
assert currentEngineReference.get() == null : "engine is running";
verifyNotClosed();
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
final Engine newEngine = engineFactory.newReadWriteEngine(config);
synchronized (mutex) {
try {
verifyNotClosed();
assert currentEngineReference.get() == null : "engine is running";
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
active.set(true);
} finally {
if (currentEngineReference.get() != newEngine) {
newEngine.close();
}
}
}
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
active.set(true);
}
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
Expand Down Expand Up @@ -1616,7 +1609,8 @@ private void onNewEngine(Engine newEngine) {
* called if recovery has to be restarted after network error / delay **
*/
public void performRecoveryRestart() throws IOException {
synchronized (mutex) {
assert Thread.holdsLock(mutex) == false : "restart recovery under mutex";
synchronized (engineMutex) {
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
IOUtils.close(currentEngineReference.getAndSet(null));
resetRecoveryStage();
Expand Down Expand Up @@ -3288,7 +3282,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
*/
void resetEngineToGlobalCheckpoint() throws IOException {
assert Thread.holdsLock(engineMutex) == false : "resetting engine under mutex";
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
assert getActiveOperationsCount() == OPERATIONS_BLOCKED
: "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']';
sync(); // persist the global checkpoint to disk
Expand All @@ -3301,6 +3295,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
final long globalCheckpoint = getLastKnownGlobalCheckpoint();
assert globalCheckpoint == getLastSyncedGlobalCheckpoint();
synchronized (engineMutex) {
verifyNotClosed();
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
// acquireXXXCommit and close works.
final Engine readOnlyEngine =
Expand Down Expand Up @@ -3328,7 +3323,7 @@ public IndexCommitRef acquireSafeIndexCommit() {

@Override
public void close() throws IOException {
assert Thread.holdsLock(mutex);
assert Thread.holdsLock(engineMutex);

Engine newEngine = newEngineReference.get();
if (newEngine == currentEngineReference.get()) {
Expand All @@ -3338,36 +3333,17 @@ public void close() throws IOException {
IOUtils.close(super::close, newEngine);
}
};
synchronized (mutex) {
try {
verifyNotClosed();
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
} finally {
if (currentEngineReference.get() != readOnlyEngine) {
readOnlyEngine.close();
}
}
}
final Engine newReadWriteEngine = engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker));
synchronized (mutex) {
try {
verifyNotClosed();
newEngineReference.set(newReadWriteEngine);
onNewEngine(newReadWriteEngine);
} finally {
if (newEngineReference.get() != newReadWriteEngine) {
newReadWriteEngine.close(); // shard was closed
}
}
}
IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine));
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
}
final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery(
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
// TODO: add a dedicate recovery stats for the reset translog
});
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
newEngineReference.get().refresh("reset_engine");
synchronized (mutex) {
synchronized (engineMutex) {
verifyNotClosed();
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
// We set active because we are now writing operations to the engine; this way,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import java.util.Random;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -511,15 +512,21 @@ public final void ensureAllSearchContextsReleased() throws Exception {
// TODO: can we do this cleaner???

/** MockFSDirectoryService sets this: */
public static boolean checkIndexFailed;
public static final List<Exception> checkIndexFailures = new CopyOnWriteArrayList<>();

@Before
public final void resetCheckIndexStatus() throws Exception {
checkIndexFailed = false;
checkIndexFailures.clear();
}

public final void ensureCheckIndexPassed() {
assertFalse("at least one shard failed CheckIndex", checkIndexFailed);
if (checkIndexFailures.isEmpty() == false) {
final AssertionError e = new AssertionError("at least one shard failed CheckIndex");
for (Exception failure : checkIndexFailures) {
e.addSuppressed(failure);
}
throw e;
}
}

// -----------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,19 @@ public static void checkIndex(Logger logger, Store store, ShardId shardId) {
CheckIndex.Status status = store.checkIndex(out);
out.flush();
if (!status.clean) {
ESTestCase.checkIndexFailed = true;
logger.warn("check index [failure] index files={}\n{}", Arrays.toString(dir.listAll()), os.bytes().utf8ToString());
throw new IOException("index check failure");
IOException failure = new IOException("failed to check index for shard " + shardId +
";index files [" + Arrays.toString(dir.listAll()) + "] os [" + os.bytes().utf8ToString() + "]");
ESTestCase.checkIndexFailures.add(failure);
throw failure;
} else {
if (logger.isDebugEnabled()) {
logger.debug("check index [success]\n{}", os.bytes().utf8ToString());
}
}
} catch (LockObtainFailedException e) {
ESTestCase.checkIndexFailed = true;
throw new IllegalStateException("IndexWriter is still open on shard " + shardId, e);
IllegalStateException failure = new IllegalStateException("IndexWriter is still open on shard " + shardId, e);
ESTestCase.checkIndexFailures.add(failure);
throw failure;
}
} catch (Exception e) {
logger.warn("failed to check index", e);
Expand Down

0 comments on commit 71ebec6

Please sign in to comment.