Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move more methods into abstract Engine #9717

Merged
merged 1 commit into from Feb 17, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 57 additions & 1 deletion src/main/java/org/elasticsearch/index/engine/Engine.java
Expand Up @@ -59,6 +59,7 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
*
Expand All @@ -72,6 +73,10 @@ public abstract class Engine implements Closeable {
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
protected final FailedEngineListener failedEngineListener;
protected final SnapshotDeletionPolicy deletionPolicy;
protected final ReentrantLock failEngineLock = new ReentrantLock();
protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());

protected volatile Throwable failedEngine = null;

Expand Down Expand Up @@ -416,7 +421,45 @@ public final boolean refreshNeeded() {
public abstract void recover(RecoveryHandler recoveryHandler) throws EngineException;

/** fail engine due to some error. the engine will also be closed. */
public abstract void failEngine(String reason, Throwable failure);
public void failEngine(String reason, Throwable failure) {
assert failure != null;
if (failEngineLock.tryLock()) {
store.incRef();
try {
try {
// we just go and close this engine - no way to recover
closeNoLock("engine failed on: [" + reason + "]");
// we first mark the store as corrupted before we notify any listeners
// this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when
// the shard is initializing
if (Lucene.isCorruptionException(failure)) {
try {
store.markStoreCorrupted(ExceptionsHelper.unwrapCorruption(failure));
} catch (IOException e) {
logger.warn("Couldn't marks store corrupted", e);
}
}
} finally {
if (failedEngine != null) {
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
return;
}
logger.warn("failed engine [{}]", failure, reason);
// we must set a failure exception, generate one if not supplied
failedEngine = failure;
failedEngineListener.onFailedEngine(shardId, reason, failure);
}
} catch (Throwable t) {
// don't bubble up these exceptions up
logger.warn("failEngine threw exception", t);
} finally {
store.decRef();
}
} else {
logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure);
}
}

/** Check whether the engine should be failed */
protected boolean maybeFailEngine(String source, Throwable t) {
Expand Down Expand Up @@ -963,4 +1006,17 @@ public void release() {
}

protected abstract SearcherManager getSearcherManager();

protected abstract void closeNoLock(String reason) throws ElasticsearchException;

@Override
public void close() throws IOException {
if (isClosed.get() == false) { // don't acquire the write lock if we are already closed
logger.debug("close now acquiring writeLock");
try (ReleasableLock _ = writeLock.acquire()) {
logger.debug("close acquired writeLock");
closeNoLock("api");
}
}
}
}
59 changes: 1 addition & 58 deletions src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Expand Up @@ -80,10 +80,6 @@ public class InternalEngine extends Engine {
private final MergePolicyProvider mergePolicyProvider;
private final MergeSchedulerProvider mergeScheduler;

private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
private final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());

private final IndexWriter indexWriter;

private final SearcherFactory searcherFactory;
Expand All @@ -101,7 +97,6 @@ public class InternalEngine extends Engine {
private final LiveVersionMap versionMap;

private final Object[] dirtyLocks;
private final ReentrantLock failEngineLock = new ReentrantLock();

private final AtomicLong translogIdGenerator = new AtomicLong();
private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean();
Expand Down Expand Up @@ -892,23 +887,12 @@ public List<Segment> segments(boolean verbose) {
}
}

@Override
public void close() throws ElasticsearchException {
if (isClosed.get() == false) { // don't acquire the write lock if we are already closed
logger.trace("close now acquire writeLock");
try (ReleasableLock _ = writeLock.acquire()) {
logger.trace("close now acquired writeLock");
closeNoLock("api");
}
}
}

/**
* Closes the engine without acquiring the write lock. This should only be
* called while the write lock is hold or in a disaster condition ie. if the engine
* is failed.
*/
private void closeNoLock(String reason) throws ElasticsearchException {
protected final void closeNoLock(String reason) throws ElasticsearchException {
if (isClosed.compareAndSet(false, true)) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself";
try {
Expand Down Expand Up @@ -938,47 +922,6 @@ private void closeNoLock(String reason) throws ElasticsearchException {
}
}

@Override
public void failEngine(String reason, Throwable failure) {
assert failure != null;
if (failEngineLock.tryLock()) {
store.incRef();
try {
try {
// we just go and close this engine - no way to recover
closeNoLock("engine failed on: [" + reason + "]");
// we first mark the store as corrupted before we notify any listeners
// this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when
// the shard is initializing
if (Lucene.isCorruptionException(failure)) {
try {
store.markStoreCorrupted(ExceptionsHelper.unwrapCorruption(failure));
} catch (IOException e) {
logger.warn("Couldn't marks store corrupted", e);
}
}
} finally {
if (failedEngine != null) {
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
return;
}
logger.warn("failed engine [{}]", failure, reason);
// we must set a failure exception, generate one if not supplied
failedEngine = failure;
failedEngineListener.onFailedEngine(shardId, reason, failure);
}
} catch (Throwable t) {
// don't bubble up these exceptions up
logger.warn("failEngine threw exception", t);
} finally {
store.decRef();
}
} else {
logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure);
}
}

@Override
protected SearcherManager getSearcherManager() {
return searcherManager;
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchIntegrationTest;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.Map;
import java.util.Random;
Expand Down Expand Up @@ -76,7 +77,7 @@ public MockInternalEngine(EngineConfig config) throws EngineException {
}

@Override
public void close() {
public void close() throws IOException {
try {
super.close();
} finally {
Expand Down