Skip to content

Commit

Permalink
Use shared engine code for MockSharedFSEngine
Browse files Browse the repository at this point in the history
  • Loading branch information
dakrone committed Apr 22, 2015
1 parent 4308cbb commit 24bf3de
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 41 deletions.
Expand Up @@ -39,15 +39,13 @@
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.translog.*;

import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.file.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down
Expand Up @@ -328,7 +328,7 @@ public void testPrimaryRelocationWithConcurrentIndexing() throws Exception {
.build();

String node1 = internalCluster().startNode(nodeSettings);
Path dataPath = newTempDirPath();
Path dataPath = createTempDir();
final String IDX = "test";

Settings idxSettings = ImmutableSettings.builder()
Expand Down Expand Up @@ -400,7 +400,7 @@ public void testPrimaryRelocationWhereRecoveryFails() throws Exception {
.build();

String node1 = internalCluster().startNode(nodeSettings);
Path dataPath = newTempDirPath();
Path dataPath = createTempDir();
final String IDX = "test";

Settings idxSettings = ImmutableSettings.builder()
Expand Down
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.test.ElasticsearchIntegrationTest;

import java.io.IOException;
Expand Down
88 changes: 52 additions & 36 deletions src/test/java/org/elasticsearch/test/engine/MockSharedFSEngine.java
Expand Up @@ -19,52 +19,68 @@

package org.elasticsearch.test.engine;

import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.apache.lucene.search.AssertingIndexSearcher;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherManager;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.engine.SharedFSEngine;

import java.io.IOException;

/**
* TODO: document me!
* Mock engine for the SharedFSEngine
*/
public class MockSharedFSEngine extends MockInternalEngine {
public class MockSharedFSEngine extends SharedFSEngine {
private MockEngineSupport support;

public MockSharedFSEngine(EngineConfig config, boolean skipInitialTranslogRecovery) throws EngineException {
super(config, skipInitialTranslogRecovery);
}

private synchronized MockEngineSupport support() {
// lazy initialized since we need it already on super() ctor execution :(
if (support == null) {
support = new MockEngineSupport(config());
}
return support;
}

@Override
public void recover(RecoveryHandler recoveryHandler) throws EngineException {
store.incRef();
try {
logger.trace("[pre-recovery] acquiring write lock");
try (ReleasableLock lock = writeLock.acquire()) {
// phase1 under lock
ensureOpen();
try {
logger.trace("[phase1] performing phase 1 recovery (file recovery)");
recoveryHandler.phase1(null);
} catch (Throwable e) {
maybeFailEngine("recovery phase 1 (file transfer)", e);
throw new RecoveryEngineException(shardId, 1, "Execution failed", wrapIfClosed(e));
}
}
try {
logger.trace("[phase2] performing phase 2 recovery (translog replay)");
recoveryHandler.phase2(null);
} catch (Throwable e) {
maybeFailEngine("recovery phase 2 (snapshot transfer)", e);
throw new RecoveryEngineException(shardId, 2, "Execution failed", wrapIfClosed(e));
}
try {
logger.trace("[phase3] performing phase 3 recovery (finalization)");
recoveryHandler.phase3(null);
} catch (Throwable e) {
maybeFailEngine("recovery phase 3 (finalization)", e);
throw new RecoveryEngineException(shardId, 3, "Execution failed", wrapIfClosed(e));
}
} finally {
store.decRef();
public void close() throws IOException {
switch(support().flushOrClose(this, MockEngineSupport.CloseAction.CLOSE)) {
case FLUSH_AND_CLOSE:
super.flushAndClose();
break;
case CLOSE:
super.close();
break;
}
logger.trace("[post-recovery] recovery complete");
logger.debug("Ongoing recoveries after engine close: " + onGoingRecoveries.get());

}

@Override
public void flushAndClose() throws IOException {
switch(support().flushOrClose(this, MockEngineSupport.CloseAction.FLUSH_AND_CLOSE)) {
case FLUSH_AND_CLOSE:
super.flushAndClose();
break;
case CLOSE:
super.close();
break;
}
logger.debug("Ongoing recoveries after engine close: " + onGoingRecoveries.get());
}

@Override
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {
final AssertingIndexSearcher assertingIndexSearcher = support().newSearcher(this, source, searcher, manager);
assertingIndexSearcher.setSimilarity(searcher.getSimilarity());
// pass the original searcher to the super.newSearcher() method to make sure this is the searcher that will
// be released later on. If we wrap an index reader here must not pass the wrapped version to the manager
// on release otherwise the reader will be closed too early. - good news, stuff will fail all over the place if we don't get this right here
return new AssertingSearcher(assertingIndexSearcher,
super.newSearcher(source, searcher, manager), shardId, MockEngineSupport.INFLIGHT_ENGINE_SEARCHERS, logger);
}
}

0 comments on commit 24bf3de

Please sign in to comment.