Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix optimize behavior with 'force' and 'flush' flags. #7920

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.analysis.AnalysisService;
Expand Down Expand Up @@ -1009,12 +1010,20 @@ public void maybeMerge() throws EngineException {
throw new OptimizeFailedEngineException(shardId, t);
}
}

private void waitForMerges(boolean flushAfter) {
try {
currentIndexWriter().waitForMerges();
} catch (IOException e) {
throw new OptimizeFailedEngineException(shardId, e);
}
if (flushAfter) {
flush(new Flush().force(true).waitIfOngoing(true));
}
}

@Override
public void optimize(Optimize optimize) throws EngineException {
if (optimize.flush()) {
flush(new Flush().force(true).waitIfOngoing(true));
}
if (optimizeMutex.compareAndSet(false, true)) {
ElasticsearchMergePolicy elasticsearchMergePolicy = null;
try (InternalLock _ = readLock.acquire()) {
Expand Down Expand Up @@ -1054,18 +1063,23 @@ public void optimize(Optimize optimize) throws EngineException {
}
optimizeMutex.set(false);
}

}

// wait for the merges outside of the read lock
if (optimize.waitForMerge()) {
try {
currentIndexWriter().waitForMerges();
} catch (IOException e) {
throw new OptimizeFailedEngineException(shardId, e);
}
}
if (optimize.flush()) {
flush(new Flush().force(true).waitIfOngoing(true));
waitForMerges(optimize.flush());
} else if (optimize.flush()) {
// we only need to monitor merges for async calls if we are going to flush
threadPool.executor(ThreadPool.Names.OPTIMIZE).execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.error("Exception while waiting for merges asynchronously after optimize", t);
}
@Override
protected void doRun() throws Exception {
waitForMerges(true);
}
});
}
}

Expand Down
Expand Up @@ -196,20 +196,22 @@ public MergeSpecification findMerges(MergeTrigger mergeTrigger,
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer)
throws IOException {
if (force) {
List<SegmentCommitInfo> segments = Lists.newArrayList();
for (SegmentCommitInfo info : segmentInfos) {
if (segmentsToMerge.containsKey(info)) {
segments.add(info);
}
}
if (!segments.isEmpty()) {
MergeSpecification spec = new IndexUpgraderMergeSpecification();
spec.add(new OneMerge(segments));
return spec;
}
}
return upgradedMergeSpecification(delegate.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer));
MergeSpecification spec = delegate.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer);

if (spec == null && force) {
List<SegmentCommitInfo> segments = Lists.newArrayList();
for (SegmentCommitInfo info : segmentInfos) {
if (segmentsToMerge.containsKey(info)) {
segments.add(info);
}
}
if (!segments.isEmpty()) {
spec = new IndexUpgraderMergeSpecification();
spec.add(new OneMerge(segments));
return spec;
}
}
return upgradedMergeSpecification(spec);
}

@Override
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.engine.internal;

import com.google.common.base.Predicate;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
Expand All @@ -29,6 +30,7 @@
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.AlreadyClosedException;
Expand Down Expand Up @@ -374,7 +376,7 @@ public void afterMerge(OnGoingMerge merge) {
}
});

Engine engine = createEngine(engineSettingsService, store, createTranslog(), mergeSchedulerProvider);
final Engine engine = createEngine(engineSettingsService, store, createTranslog(), mergeSchedulerProvider);
engine.start();
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
Expand Down Expand Up @@ -410,26 +412,44 @@ public void afterMerge(OnGoingMerge merge) {
index = new Engine.Index(null, newUid("4"), doc);
engine.index(index);
engine.flush(new Engine.Flush());

final long gen1 = store.readLastCommittedSegmentsInfo().getGeneration();
// now, optimize and wait for merges, see that we have no merge flag
engine.optimize(new Engine.Optimize().flush(true).maxNumSegments(1).waitForMerge(true));

for (Segment segment : engine.segments()) {
assertThat(segment.getMergeId(), nullValue());
}
// we could have multiple underlying merges, so the generation may increase more than once
assertTrue(store.readLastCommittedSegmentsInfo().getGeneration() > gen1);

// forcing an optimize will merge this single segment shard
final boolean force = randomBoolean();
if (force) {
waitTillMerge.set(new CountDownLatch(1));
waitForMerge.set(new CountDownLatch(1));
}
engine.optimize(new Engine.Optimize().flush(true).maxNumSegments(1).force(force).waitForMerge(false));
final boolean flush = randomBoolean();
final long gen2 = store.readLastCommittedSegmentsInfo().getGeneration();
engine.optimize(new Engine.Optimize().flush(flush).maxNumSegments(1).force(force).waitForMerge(false));
waitTillMerge.get().await();
for (Segment segment : engine.segments()) {
assertThat(segment.getMergeId(), force ? notNullValue() : nullValue());
}
waitForMerge.get().countDown();

if (flush) {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
try {
// we should have had just 1 merge, so last generation should be exact
return store.readLastCommittedSegmentsInfo().getLastGeneration() == gen2;
} catch (IOException e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}
});
}

engine.close();
}
Expand Down