Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove FlushType and make resources final in InternalEngine #9565

Merged
merged 1 commit into from Feb 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 13 additions & 13 deletions src/main/java/org/elasticsearch/index/engine/Engine.java
Expand Up @@ -92,9 +92,20 @@ public interface Engine extends Closeable {
void refresh(String source) throws EngineException;

/**
* Flushes the state of the engine, clearing memory.
* Flushes the state of the engine including the transaction log, clearing memory.
* @param force if <code>true</code> a lucene commit is executed even if no changes need to be committed.
* @param waitIfOngoing if <code>true</code> this call will block until all currently running flushes have finished.
* Otherwise this call will return without blocking.
*/
void flush(FlushType type, boolean force, boolean waitIfOngoing) throws EngineException;
void flush(boolean force, boolean waitIfOngoing) throws EngineException;

/**
* Flushes the state of the engine including the transaction log, clearing memory and persisting
* documents in the lucene index to disk including a potentially heavy and durable fsync operation.
* This operation is not going to block if another flush operation is currently running and won't write
* a lucene commit if nothing needs to be committed.
*/
void flush() throws EngineException;

/**
* Optimizes to 1 segment
Expand Down Expand Up @@ -187,17 +198,6 @@ public void close() throws ElasticsearchException {
}
}

public static enum FlushType {
/**
* A flush that just commits the writer, without cleaning the translog.
*/
COMMIT,
/**
* A flush that does a commit, as well as clears the translog.
*/
COMMIT_TRANSLOG
}

static interface Operation {
static enum Type {
CREATE,
Expand Down

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/main/java/org/elasticsearch/index/shard/IndexShard.java
Expand Up @@ -615,7 +615,7 @@ public void flush(FlushRequest request) throws ElasticsearchException {
logger.trace("flush with {}", request);
}
long time = System.nanoTime();
engine().flush(Engine.FlushType.COMMIT_TRANSLOG, request.force(), request.waitIfOngoing());
engine().flush(request.force(), request.waitIfOngoing());
flushMetric.inc(System.nanoTime() - time);
}

Expand Down Expand Up @@ -744,7 +744,7 @@ public void performRecoveryFinalization(boolean withFlush, RecoveryState recover

public void performRecoveryFinalization(boolean withFlush) throws ElasticsearchException {
if (withFlush) {
engine().flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine().flush();
}
// clear unreferenced files
translog.clearUnreferenced();
Expand Down
Expand Up @@ -734,7 +734,7 @@ private void applyInitializingShard(final RoutingTable routingTable, final Disco
RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.RELOCATION : RecoveryState.Type.REPLICA;
recoveryTarget.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData));
} catch (Throwable e) {
indexShard.engine().failEngine("corrupted preexisting index", e);
indexShard.failShard("corrupted preexisting index", e);
handleRecoveryFailure(indexService, indexMetaData, shardRouting, true, e);
}
} else {
Expand Down
Expand Up @@ -31,11 +31,11 @@ public void testLuceneSettings() {
final IndexService service = createIndex("foo");
// INDEX_COMPOUND_ON_FLUSH
InternalEngine engine = ((InternalEngine)engine(service));
assertThat(engine.currentIndexWriterConfig().getUseCompoundFile(), is(true));
assertThat(engine.getCurrentIndexWriterConfig().getUseCompoundFile(), is(true));
client().admin().indices().prepareUpdateSettings("foo").setSettings(ImmutableSettings.builder().put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, false).build()).get();
assertThat(engine.currentIndexWriterConfig().getUseCompoundFile(), is(false));
assertThat(engine.getCurrentIndexWriterConfig().getUseCompoundFile(), is(false));
client().admin().indices().prepareUpdateSettings("foo").setSettings(ImmutableSettings.builder().put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, true).build()).get();
assertThat(engine.currentIndexWriterConfig().getUseCompoundFile(), is(true));
assertThat(engine.getCurrentIndexWriterConfig().getUseCompoundFile(), is(true));
}


Expand Down
Expand Up @@ -308,7 +308,7 @@ public void testSegments() throws Exception {
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
assertThat(segments.get(0).ramTree, nullValue());

engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();

segments = engine.segments(false);
assertThat(segments.size(), equalTo(1));
Expand Down Expand Up @@ -453,19 +453,19 @@ public void afterMerge(OnGoingMerge merge) {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();
assertThat(engine.segments(false).size(), equalTo(1));
index = new Engine.Index(null, newUid("2"), doc);
engine.index(index);
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();
List<Segment> segments = engine.segments(false);
assertThat(segments.size(), equalTo(2));
for (Segment segment : segments) {
assertThat(segment.getMergeId(), nullValue());
}
index = new Engine.Index(null, newUid("3"), doc);
engine.index(index);
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();
segments = engine.segments(false);
assertThat(segments.size(), equalTo(3));
for (Segment segment : segments) {
Expand All @@ -485,7 +485,7 @@ public void afterMerge(OnGoingMerge merge) {

index = new Engine.Index(null, newUid("4"), doc);
engine.index(index);
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();
final long gen1 = store.readLastCommittedSegmentsInfo().getGeneration();
// now, optimize and wait for merges, see that we have no merge flag
engine.forceMerge(true, true);
Expand Down Expand Up @@ -645,7 +645,7 @@ public void testSimpleOperations() throws Exception {
searchResult.close();

// now flush
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();

// and, verify get (in real time)
getResult = engine.get(new Engine.Get(true, newUid("1")));
Expand Down Expand Up @@ -722,7 +722,7 @@ public void testSearchResultRelease() throws Exception {
public void testFailEngineOnCorruption() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();
final boolean failEngine = defaultSettings.getAsBoolean(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, false);
final int failInPhase = randomIntBetween(1, 3);
try {
Expand Down Expand Up @@ -777,13 +777,13 @@ public void phase3(Translog.Snapshot snapshot) throws EngineException {
public void testSimpleRecover() throws Exception {
final ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();

engine.recover(new Engine.RecoveryHandler() {
@Override
public void phase1(SnapshotIndexCommit snapshot) throws EngineException {
try {
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();
assertThat("flush is not allowed in phase 1", false, equalTo(true));
} catch (FlushNotAllowedEngineException e) {
// all is well
Expand All @@ -794,7 +794,7 @@ public void phase1(SnapshotIndexCommit snapshot) throws EngineException {
public void phase2(Translog.Snapshot snapshot) throws EngineException {
MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(0));
try {
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();
assertThat("flush is not allowed in phase 2", false, equalTo(true));
} catch (FlushNotAllowedEngineException e) {
// all is well
Expand All @@ -809,7 +809,7 @@ public void phase3(Translog.Snapshot snapshot) throws EngineException {
MatcherAssert.assertThat(snapshot, TranslogSizeMatcher.translogSize(1));
try {
// we can do this here since we are on the same thread
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();
assertThat("flush is not allowed in phase 3", false, equalTo(true));
} catch (FlushNotAllowedEngineException e) {
// all is well
Expand All @@ -819,15 +819,15 @@ public void phase3(Translog.Snapshot snapshot) throws EngineException {
// post recovery should flush the translog
MatcherAssert.assertThat(translog.snapshot(), TranslogSizeMatcher.translogSize(0));

engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();
engine.close();
}

@Test
public void testRecoverWithOperationsBetweenPhase1AndPhase2() throws Exception {
ParsedDocument doc1 = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc1));
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, false);
engine.create(new Engine.Create(null, newUid("2"), doc2));

Expand All @@ -850,15 +850,15 @@ public void phase3(Translog.Snapshot snapshot) throws EngineException {
}
});

engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();
engine.close();
}

@Test
public void testRecoverWithOperationsBetweenPhase1AndPhase2AndPhase3() throws Exception {
ParsedDocument doc1 = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc1));
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, false);
engine.create(new Engine.Create(null, newUid("2"), doc2));

Expand Down Expand Up @@ -888,7 +888,7 @@ public void phase3(Translog.Snapshot snapshot) throws EngineException {
}
});

engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();
engine.close();
}

Expand Down Expand Up @@ -1000,7 +1000,7 @@ public void testVersioningIndexConflictWithFlush() {
engine.index(index);
assertThat(index.version(), equalTo(2l));

engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();

index = new Engine.Index(null, newUid("1"), doc, 1l, VersionType.INTERNAL, PRIMARY, 0);
try {
Expand Down Expand Up @@ -1031,7 +1031,7 @@ public void testExternalVersioningIndexConflictWithFlush() {
engine.index(index);
assertThat(index.version(), equalTo(14l));

engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();

index = new Engine.Index(null, newUid("1"), doc, 13, VersionType.EXTERNAL, PRIMARY, 0);
try {
Expand Down Expand Up @@ -1104,7 +1104,7 @@ public void testVersioningDeleteConflictWithFlush() {
engine.index(index);
assertThat(index.version(), equalTo(2l));

engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();

Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"), 1l, VersionType.INTERNAL, PRIMARY, 0, false);
try {
Expand All @@ -1123,14 +1123,14 @@ public void testVersioningDeleteConflictWithFlush() {
// all is well
}

engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();

// now actually delete
delete = new Engine.Delete("test", "1", newUid("1"), 2l, VersionType.INTERNAL, PRIMARY, 0, false);
engine.delete(delete);
assertThat(delete.version(), equalTo(3l));

engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();

// now check if we can index to a delete doc with version
index = new Engine.Index(null, newUid("1"), doc, 2l, VersionType.INTERNAL, PRIMARY, 0);
Expand Down Expand Up @@ -1173,7 +1173,7 @@ public void testVersioningCreateExistsExceptionWithFlush() {
engine.create(create);
assertThat(create.version(), equalTo(1l));

engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();

create = new Engine.Create(null, newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0);
try {
Expand Down Expand Up @@ -1297,7 +1297,7 @@ public void testCreatedFlagAfterFlush() {

engine.delete(new Engine.Delete(null, "1", newUid("1")));

engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();

index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
Expand Down Expand Up @@ -1348,13 +1348,13 @@ public void testIndexWriterInfoStream() {
// First, with DEBUG, which should NOT log IndexWriter output:
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();
assertFalse(mockAppender.sawIndexWriterMessage);

// Again, with TRACE, which should log IndexWriter output:
rootLogger.setLevel(Level.TRACE);
engine.create(new Engine.Create(null, newUid("2"), doc));
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();
assertTrue(mockAppender.sawIndexWriterMessage);

} finally {
Expand Down Expand Up @@ -1383,14 +1383,14 @@ public void testIndexWriterIFDInfoStream() {
// First, with DEBUG, which should NOT log IndexWriter output:
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();
assertFalse(mockAppender.sawIndexWriterMessage);
assertFalse(mockAppender.sawIndexWriterIFDMessage);

// Again, with TRACE, which should only log IndexWriter IFD output:
iwIFDLogger.setLevel(Level.TRACE);
engine.create(new Engine.Create(null, newUid("2"), doc));
engine.flush(Engine.FlushType.COMMIT_TRANSLOG, false, false);
engine.flush();
assertFalse(mockAppender.sawIndexWriterMessage);
assertTrue(mockAppender.sawIndexWriterIFDMessage);

Expand Down
Expand Up @@ -86,7 +86,7 @@ public void beforeIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard,
// When the the internal engine closes we do a rollback, which removes uncommitted segments
// By doing a commit flush we perform a Lucene commit, but don't clear the translog,
// so that even in tests where don't flush we can check the integrity of the Lucene index
indexShard.engine().flush(Engine.FlushType.COMMIT, false, true); // Keep translog for tests that rely on replaying it
indexShard.engine().snapshotIndex(); // Keep translog for tests that rely on replaying it
logger.info("flush finished in beforeIndexShardClosed");
}
}
Expand Down