Skip to content

Commit

Permalink
Fix optimize behavior with 'force' and 'flush' flags.
Browse files Browse the repository at this point in the history
This does the following:
* Make 'force' flag only build a merge if the delegate MP returned no merges
* Add async handling for 'flush' when 'waitForMerges' is false
* Remove flush at the beginning of optimize.  This is something the user can
  do if they wish, before calling optimize.

closes elastic#7886
closes elastic#7904
  • Loading branch information
rjernst committed Sep 29, 2014
1 parent 20a0c68 commit 4ce798d
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 29 deletions.
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.analysis.AnalysisService;
Expand Down Expand Up @@ -1009,12 +1010,20 @@ public void maybeMerge() throws EngineException {
throw new OptimizeFailedEngineException(shardId, t);
}
}

private void waitForMerges(boolean flushAfter) {
try {
currentIndexWriter().waitForMerges();
} catch (IOException e) {
throw new OptimizeFailedEngineException(shardId, e);
}
if (flushAfter) {
flush(new Flush().force(true).waitIfOngoing(true));
}
}

@Override
public void optimize(Optimize optimize) throws EngineException {
if (optimize.flush()) {
flush(new Flush().force(true).waitIfOngoing(true));
}
if (optimizeMutex.compareAndSet(false, true)) {
ElasticsearchMergePolicy elasticsearchMergePolicy = null;
try (InternalLock _ = readLock.acquire()) {
Expand Down Expand Up @@ -1054,18 +1063,23 @@ public void optimize(Optimize optimize) throws EngineException {
}
optimizeMutex.set(false);
}

}

// wait for the merges outside of the read lock
if (optimize.waitForMerge()) {
try {
currentIndexWriter().waitForMerges();
} catch (IOException e) {
throw new OptimizeFailedEngineException(shardId, e);
}
}
if (optimize.flush()) {
flush(new Flush().force(true).waitIfOngoing(true));
waitForMerges(optimize.flush());
} else if (optimize.flush()) {
// we only need to monitor merges for async calls if we are going to flush
threadPool.executor(ThreadPool.Names.OPTIMIZE).execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.error("Exception while waiting for merges asynchronously after optimize", t);
}
@Override
protected void doRun() throws Exception {
waitForMerges(true);
}
});
}
}

Expand Down
Expand Up @@ -196,20 +196,22 @@ public MergeSpecification findMerges(MergeTrigger mergeTrigger,
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer)
throws IOException {
if (force) {
List<SegmentCommitInfo> segments = Lists.newArrayList();
for (SegmentCommitInfo info : segmentInfos) {
if (segmentsToMerge.containsKey(info)) {
segments.add(info);
}
}
if (!segments.isEmpty()) {
MergeSpecification spec = new IndexUpgraderMergeSpecification();
spec.add(new OneMerge(segments));
return spec;
}
}
return upgradedMergeSpecification(delegate.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer));
MergeSpecification spec = delegate.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer);

if (spec == null && force) {
List<SegmentCommitInfo> segments = Lists.newArrayList();
for (SegmentCommitInfo info : segmentInfos) {
if (segmentsToMerge.containsKey(info)) {
segments.add(info);
}
}
if (!segments.isEmpty()) {
spec = new IndexUpgraderMergeSpecification();
spec.add(new OneMerge(segments));
return spec;
}
}
return upgradedMergeSpecification(spec);
}

@Override
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.engine.internal;

import com.google.common.base.Predicate;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
Expand All @@ -29,6 +30,7 @@
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.AlreadyClosedException;
Expand Down Expand Up @@ -374,7 +376,7 @@ public void afterMerge(OnGoingMerge merge) {
}
});

Engine engine = createEngine(engineSettingsService, store, createTranslog(), mergeSchedulerProvider);
final Engine engine = createEngine(engineSettingsService, store, createTranslog(), mergeSchedulerProvider);
engine.start();
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
Expand Down Expand Up @@ -410,26 +412,44 @@ public void afterMerge(OnGoingMerge merge) {
index = new Engine.Index(null, newUid("4"), doc);
engine.index(index);
engine.flush(new Engine.Flush());

final long gen1 = store.readLastCommittedSegmentsInfo().getGeneration();
// now, optimize and wait for merges, see that we have no merge flag
engine.optimize(new Engine.Optimize().flush(true).maxNumSegments(1).waitForMerge(true));

for (Segment segment : engine.segments()) {
assertThat(segment.getMergeId(), nullValue());
}
// we could have multiple underlying merges, so the generation may increase more than once
assertTrue(store.readLastCommittedSegmentsInfo().getGeneration() > gen1);

// forcing an optimize will merge this single segment shard
final boolean force = randomBoolean();
if (force) {
waitTillMerge.set(new CountDownLatch(1));
waitForMerge.set(new CountDownLatch(1));
}
engine.optimize(new Engine.Optimize().flush(true).maxNumSegments(1).force(force).waitForMerge(false));
final boolean flush = randomBoolean();
final long gen2 = store.readLastCommittedSegmentsInfo().getGeneration();
engine.optimize(new Engine.Optimize().flush(flush).maxNumSegments(1).force(force).waitForMerge(false));
waitTillMerge.get().await();
for (Segment segment : engine.segments()) {
assertThat(segment.getMergeId(), force ? notNullValue() : nullValue());
}
waitForMerge.get().countDown();

if (flush) {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
try {
// we should have had just 1 merge, so last generation should be exact
return store.readLastCommittedSegmentsInfo().getLastGeneration() == gen2;
} catch (IOException e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}
});
}

engine.close();
}
Expand Down

0 comments on commit 4ce798d

Please sign in to comment.