diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index a0151264e8293..553c17ca87056 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -51,6 +51,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.analysis.AnalysisService; @@ -1009,12 +1010,20 @@ public void maybeMerge() throws EngineException { throw new OptimizeFailedEngineException(shardId, t); } } + + private void waitForMerges(boolean flushAfter) { + try { + currentIndexWriter().waitForMerges(); + } catch (IOException e) { + throw new OptimizeFailedEngineException(shardId, e); + } + if (flushAfter) { + flush(new Flush().force(true).waitIfOngoing(true)); + } + } @Override public void optimize(Optimize optimize) throws EngineException { - if (optimize.flush()) { - flush(new Flush().force(true).waitIfOngoing(true)); - } if (optimizeMutex.compareAndSet(false, true)) { ElasticsearchMergePolicy elasticsearchMergePolicy = null; try (InternalLock _ = readLock.acquire()) { @@ -1054,18 +1063,23 @@ public void optimize(Optimize optimize) throws EngineException { } optimizeMutex.set(false); } - } + // wait for the merges outside of the read lock if (optimize.waitForMerge()) { - try { - currentIndexWriter().waitForMerges(); - } catch (IOException e) { - throw new OptimizeFailedEngineException(shardId, e); - } - } - if (optimize.flush()) { - flush(new Flush().force(true).waitIfOngoing(true)); + waitForMerges(optimize.flush()); + } else if (optimize.flush()) { + // we only need to monitor merges for async calls if we are going to flush + threadPool.executor(ThreadPool.Names.OPTIMIZE).execute(new AbstractRunnable() { + @Override + public void onFailure(Throwable t) { + logger.error("Exception while waiting for merges asynchronously after optimize", t); + } + @Override + protected void doRun() throws Exception { + waitForMerges(true); + } + }); } } diff --git a/src/main/java/org/elasticsearch/index/merge/policy/ElasticsearchMergePolicy.java b/src/main/java/org/elasticsearch/index/merge/policy/ElasticsearchMergePolicy.java index 1b5f38be8a5dd..9e6a53706dad9 100644 --- a/src/main/java/org/elasticsearch/index/merge/policy/ElasticsearchMergePolicy.java +++ b/src/main/java/org/elasticsearch/index/merge/policy/ElasticsearchMergePolicy.java @@ -196,20 +196,22 @@ public MergeSpecification findMerges(MergeTrigger mergeTrigger, public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, int maxSegmentCount, Map segmentsToMerge, IndexWriter writer) throws IOException { - if (force) { - List segments = Lists.newArrayList(); - for (SegmentCommitInfo info : segmentInfos) { - if (segmentsToMerge.containsKey(info)) { - segments.add(info); - } - } - if (!segments.isEmpty()) { - MergeSpecification spec = new IndexUpgraderMergeSpecification(); - spec.add(new OneMerge(segments)); - return spec; - } - } - return upgradedMergeSpecification(delegate.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer)); + MergeSpecification spec = delegate.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer); + + if (spec == null && force) { + List segments = Lists.newArrayList(); + for (SegmentCommitInfo info : segmentInfos) { + if (segmentsToMerge.containsKey(info)) { + segments.add(info); + } + } + if (!segments.isEmpty()) { + spec = new IndexUpgraderMergeSpecification(); + spec.add(new OneMerge(segments)); + return spec; + } + } + return upgradedMergeSpecification(spec); } @Override diff --git a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java index 089de57f239cb..bbb86a3995b82 100644 --- a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.engine.internal; +import com.google.common.base.Predicate; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -29,6 +30,7 @@ import org.apache.lucene.document.TextField; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexDeletionPolicy; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.Term; import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.AlreadyClosedException; @@ -374,7 +376,7 @@ public void afterMerge(OnGoingMerge merge) { } }); - Engine engine = createEngine(engineSettingsService, store, createTranslog(), mergeSchedulerProvider); + final Engine engine = createEngine(engineSettingsService, store, createTranslog(), mergeSchedulerProvider); engine.start(); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false); Engine.Index index = new Engine.Index(null, newUid("1"), doc); @@ -410,13 +412,15 @@ public void afterMerge(OnGoingMerge merge) { index = new Engine.Index(null, newUid("4"), doc); engine.index(index); engine.flush(new Engine.Flush()); - + final long gen1 = store.readLastCommittedSegmentsInfo().getGeneration(); // now, optimize and wait for merges, see that we have no merge flag engine.optimize(new Engine.Optimize().flush(true).maxNumSegments(1).waitForMerge(true)); for (Segment segment : engine.segments()) { assertThat(segment.getMergeId(), nullValue()); } + // we could have multiple underlying merges, so the generation may increase more than once + assertTrue(store.readLastCommittedSegmentsInfo().getGeneration() > gen1); // forcing an optimize will merge this single segment shard final boolean force = randomBoolean(); @@ -424,12 +428,28 @@ public void afterMerge(OnGoingMerge merge) { waitTillMerge.set(new CountDownLatch(1)); waitForMerge.set(new CountDownLatch(1)); } - engine.optimize(new Engine.Optimize().flush(true).maxNumSegments(1).force(force).waitForMerge(false)); + final boolean flush = randomBoolean(); + final long gen2 = store.readLastCommittedSegmentsInfo().getGeneration(); + engine.optimize(new Engine.Optimize().flush(flush).maxNumSegments(1).force(force).waitForMerge(false)); waitTillMerge.get().await(); for (Segment segment : engine.segments()) { assertThat(segment.getMergeId(), force ? notNullValue() : nullValue()); } waitForMerge.get().countDown(); + + if (flush) { + awaitBusy(new Predicate() { + @Override + public boolean apply(Object o) { + try { + // we should have had just 1 merge, so last generation should be exact + return store.readLastCommittedSegmentsInfo().getLastGeneration() == gen2; + } catch (IOException e) { + throw ExceptionsHelper.convertToRuntime(e); + } + } + }); + } engine.close(); }