Skip to content

Commit 043c5df

Browse files
authored
LUCENE-8962: Add ability to selectively merge on commit (#1155)
* LUCENE-8962: Add ability to selectively merge on commit This adds a new "findCommitMerges" method to MergePolicy, which can specify merges to be executed before the IndexWriter.prepareCommitInternal method returns. If we have many index writer threads, they will flush their DWPT buffers on commit, resulting in many small segments, which can be merged before the commit returns. * Add missing Javadoc * Fix incorrect comment * Refactoring and fix intermittent test failure 1. Made some changes to the callback to update toCommit, leveraging SegmentInfos.applyMergeChanges. 2. I realized that we'll never end up with 0 registered merges, because we throw an exception if we fail to register a merge. 3. Moved the IndexWriterEvents.beginMergeOnCommit notification to before we call MergeScheduler.merge, since we may not be merging on another thread. 4. There was an intermittent test failure due to randomness in the time it takes for merges to complete. Before doing the final commit, we wait for pending merges to finish. We may still end up abandoning the final merge, but we can detect that and assert that either the merge was abandoned (and we have > 1 segment) or we did merge down to 1 segment. * Fix typo * Fix/improve comments based on PR feedback * More comment improvements from PR feedback * Rename method and add new MergeTrigger 1. Renamed findCommitMerges -> findFullFlushMerges. 2. Added MergeTrigger.COMMIT, passed to findFullFlushMerges and to MergeScheduler when merging on commit. * Update renamed method name in strings and comments
1 parent b2dbd18 commit 043c5df

File tree

12 files changed

+426
-5
lines changed

12 files changed

+426
-5
lines changed

lucene/core/src/java/org/apache/lucene/index/FilterMergePolicy.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, Mer
5757
return in.findForcedDeletesMerges(segmentInfos, mergeContext);
5858
}
5959

60+
@Override
61+
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
62+
return in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext);
63+
}
64+
6065
@Override
6166
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext)
6267
throws IOException {

lucene/core/src/java/org/apache/lucene/index/IndexWriter.java

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import java.util.Queue;
3333
import java.util.Set;
3434
import java.util.concurrent.ConcurrentLinkedQueue;
35+
import java.util.concurrent.CountDownLatch;
36+
import java.util.concurrent.TimeUnit;
3537
import java.util.concurrent.atomic.AtomicBoolean;
3638
import java.util.concurrent.atomic.AtomicInteger;
3739
import java.util.concurrent.atomic.AtomicLong;
@@ -3147,6 +3149,42 @@ public final boolean flushNextBuffer() throws IOException {
31473149
}
31483150
}
31493151

3152+
private MergePolicy.OneMerge updateSegmentInfosOnMergeFinish(MergePolicy.OneMerge merge, final SegmentInfos toCommit,
3153+
AtomicReference<CountDownLatch> mergeLatchRef) {
3154+
return new MergePolicy.OneMerge(merge.segments) {
3155+
public void mergeFinished() throws IOException {
3156+
super.mergeFinished();
3157+
CountDownLatch mergeAwaitLatch = mergeLatchRef.get();
3158+
if (mergeAwaitLatch == null) {
3159+
// Commit thread timed out waiting for this merge and moved on. No need to manipulate toCommit.
3160+
return;
3161+
}
3162+
if (isAborted() == false) {
3163+
deleter.incRef(this.info.files());
3164+
// Resolve "live" SegmentInfos segments to their toCommit cloned equivalents, based on segment name.
3165+
Set<String> mergedSegmentNames = new HashSet<>();
3166+
for (SegmentCommitInfo sci : this.segments) {
3167+
deleter.decRef(sci.files());
3168+
mergedSegmentNames.add(sci.info.name);
3169+
}
3170+
List<SegmentCommitInfo> toCommitMergedAwaySegments = new ArrayList<>();
3171+
for (SegmentCommitInfo sci : toCommit) {
3172+
if (mergedSegmentNames.contains(sci.info.name)) {
3173+
toCommitMergedAwaySegments.add(sci);
3174+
}
3175+
}
3176+
// Construct a OneMerge that applies to toCommit
3177+
MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments);
3178+
applicableMerge.info = this.info.clone();
3179+
long segmentCounter = Long.parseLong(this.info.info.name.substring(1), Character.MAX_RADIX);
3180+
toCommit.counter = Math.max(toCommit.counter, segmentCounter + 1);
3181+
toCommit.applyMergeChanges(applicableMerge, false);
3182+
}
3183+
mergeAwaitLatch.countDown();
3184+
}
3185+
};
3186+
}
3187+
31503188
private long prepareCommitInternal() throws IOException {
31513189
startCommitTime = System.nanoTime();
31523190
synchronized(commitLock) {
@@ -3169,6 +3207,8 @@ private long prepareCommitInternal() throws IOException {
31693207
SegmentInfos toCommit = null;
31703208
boolean anyChanges = false;
31713209
long seqNo;
3210+
List<MergePolicy.OneMerge> commitMerges = null;
3211+
AtomicReference<CountDownLatch> mergeAwaitLatchRef = null;
31723212

31733213
// This is copied from doFlush, except it's modified to
31743214
// clone & incRef the flushed SegmentInfos inside the
@@ -3223,15 +3263,38 @@ private long prepareCommitInternal() throws IOException {
32233263
// sneak into the commit point:
32243264
toCommit = segmentInfos.clone();
32253265

3266+
if (anyChanges) {
3267+
// Find any merges that can execute on commit (per MergePolicy).
3268+
MergePolicy.MergeSpecification mergeSpec =
3269+
config.getMergePolicy().findFullFlushMerges(MergeTrigger.COMMIT, segmentInfos, this);
3270+
if (mergeSpec != null && mergeSpec.merges.size() > 0) {
3271+
int mergeCount = mergeSpec.merges.size();
3272+
commitMerges = new ArrayList<>(mergeCount);
3273+
mergeAwaitLatchRef = new AtomicReference<>(new CountDownLatch(mergeCount));
3274+
for (MergePolicy.OneMerge oneMerge : mergeSpec.merges) {
3275+
MergePolicy.OneMerge trackedMerge =
3276+
updateSegmentInfosOnMergeFinish(oneMerge, toCommit, mergeAwaitLatchRef);
3277+
if (registerMerge(trackedMerge) == false) {
3278+
throw new IllegalStateException("MergePolicy " + config.getMergePolicy().getClass() +
3279+
" returned merging segments from findFullFlushMerges");
3280+
}
3281+
commitMerges.add(trackedMerge);
3282+
}
3283+
if (infoStream.isEnabled("IW")) {
3284+
infoStream.message("IW", "Registered " + mergeCount + " commit merges");
3285+
infoStream.message("IW", "Before executing commit merges, had " + toCommit.size() + " segments");
3286+
}
3287+
}
3288+
}
3289+
32263290
pendingCommitChangeCount = changeCount.get();
32273291

32283292
// This protects the segmentInfos we are now going
32293293
// to commit. This is important in case, eg, while
32303294
// we are trying to sync all referenced files, a
32313295
// merge completes which would otherwise have
32323296
// removed the files we are now syncing.
3233-
filesToCommit = toCommit.files(false);
3234-
deleter.incRef(filesToCommit);
3297+
deleter.incRef(toCommit.files(false));
32353298
}
32363299
success = true;
32373300
} finally {
@@ -3252,6 +3315,53 @@ private long prepareCommitInternal() throws IOException {
32523315
} finally {
32533316
maybeCloseOnTragicEvent();
32543317
}
3318+
3319+
if (mergeAwaitLatchRef != null) {
3320+
CountDownLatch mergeAwaitLatch = mergeAwaitLatchRef.get();
3321+
// If we found and registered any merges above, within the flushLock, then we want to ensure that they
3322+
// complete execution. Note that since we released the lock, other merges may have been scheduled. We will
3323+
// block until the merges that we registered complete. As they complete, they will update toCommit to
3324+
// replace merged segments with the result of each merge.
3325+
config.getIndexWriterEvents().beginMergeOnCommit();
3326+
mergeScheduler.merge(this, MergeTrigger.COMMIT, true);
3327+
long mergeWaitStart = System.nanoTime();
3328+
int abandonedCount = 0;
3329+
long waitTimeMillis = (long) (config.getMaxCommitMergeWaitSeconds() * 1000.0);
3330+
try {
3331+
if (mergeAwaitLatch.await(waitTimeMillis, TimeUnit.MILLISECONDS) == false) {
3332+
synchronized (this) {
3333+
// Need to do this in a synchronized block, to make sure none of our commit merges are currently
3334+
// executing mergeFinished (since mergeFinished itself is called from within the IndexWriter lock).
3335+
// After we clear the value from mergeAwaitLatchRef, the merges we schedule will still execute as
3336+
// usual, but when they finish, they won't attempt to update toCommit or modify segment reference
3337+
// counts.
3338+
mergeAwaitLatchRef.set(null);
3339+
for (MergePolicy.OneMerge commitMerge : commitMerges) {
3340+
if (runningMerges.contains(commitMerge) || pendingMerges.contains(commitMerge)) {
3341+
abandonedCount++;
3342+
}
3343+
}
3344+
}
3345+
}
3346+
} catch (InterruptedException e) {
3347+
Thread.interrupted();
3348+
throw new IOException("Interrupted waiting for merges");
3349+
} finally {
3350+
if (infoStream.isEnabled("IW")) {
3351+
infoStream.message("IW", String.format(Locale.ROOT, "Waited %.1f ms for commit merges",
3352+
(System.nanoTime() - mergeWaitStart)/1_000_000.0));
3353+
infoStream.message("IW", "After executing commit merges, had " + toCommit.size() + " segments");
3354+
if (abandonedCount > 0) {
3355+
infoStream.message("IW", "Abandoned " + abandonedCount + " commit merges after " + waitTimeMillis + " ms");
3356+
}
3357+
}
3358+
if (abandonedCount > 0) {
3359+
config.getIndexWriterEvents().abandonedMergesOnCommit(abandonedCount);
3360+
}
3361+
config.getIndexWriterEvents().finishMergeOnCommit();
3362+
}
3363+
}
3364+
filesToCommit = toCommit.files(false);
32553365

32563366
try {
32573367
if (anyChanges) {

lucene/core/src/java/org/apache/lucene/index/IndexWriterConfig.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ public static enum OpenMode {
112112

113113
/** Default value for whether calls to {@link IndexWriter#close()} include a commit. */
114114
public final static boolean DEFAULT_COMMIT_ON_CLOSE = true;
115+
116+
/** Default value for time to wait for merges on commit (when using a {@link MergePolicy} that implements findFullFlushMerges). */
117+
public static final double DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS = 30.0;
115118

116119
// indicates whether this config instance is already attached to a writer.
117120
// not final so that it can be cloned properly.
@@ -484,6 +487,24 @@ public IndexWriterConfig setCommitOnClose(boolean commitOnClose) {
484487
return this;
485488
}
486489

490+
/**
491+
* Expert: sets the amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...).
492+
* If this time is reached, we proceed with the commit based on segments merged up to that point.
493+
* The merges are not cancelled, and may still run to completion independent of the commit.
494+
*/
495+
public IndexWriterConfig setMaxCommitMergeWaitSeconds(double maxCommitMergeWaitSeconds) {
496+
this.maxCommitMergeWaitSeconds = maxCommitMergeWaitSeconds;
497+
return this;
498+
}
499+
500+
/**
501+
* Set the callback that gets invoked when IndexWriter performs various actions.
502+
*/
503+
public IndexWriterConfig setIndexWriterEvents(IndexWriterEvents indexWriterEvents) {
504+
this.indexWriterEvents = indexWriterEvents;
505+
return this;
506+
}
507+
487508
/** We only allow sorting on these types */
488509
private static final EnumSet<SortField.Type> ALLOWED_INDEX_SORT_TYPES = EnumSet.of(SortField.Type.STRING,
489510
SortField.Type.LONG,
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
19+
20+
package org.apache.lucene.index;
21+
22+
/**
23+
* Callback interface to signal various actions taken by IndexWriter.
24+
*
25+
* @lucene.experimental
26+
*/
27+
public interface IndexWriterEvents {
28+
/**
29+
* A default implementation that ignores all events.
30+
*/
31+
IndexWriterEvents NULL_EVENTS = new IndexWriterEvents() {
32+
@Override
33+
public void beginMergeOnCommit() { }
34+
35+
@Override
36+
public void finishMergeOnCommit() { }
37+
38+
@Override
39+
public void abandonedMergesOnCommit(int abandonedCount) { }
40+
};
41+
42+
/**
43+
* Signals the start of waiting for a merge on commit, returned from
44+
* {@link MergePolicy#findFullFlushMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)}.
45+
*/
46+
void beginMergeOnCommit();
47+
48+
/**
49+
* Signals the end of waiting for merges on commit. This may be either because the merges completed, or because we timed out according
50+
* to the limit set in {@link IndexWriterConfig#setMaxCommitMergeWaitSeconds(double)}.
51+
*/
52+
void finishMergeOnCommit();
53+
54+
/**
55+
* Called to signal that we abandoned some merges on commit upon reaching the timeout specified in
56+
* {@link IndexWriterConfig#setMaxCommitMergeWaitSeconds(double)}.
57+
*/
58+
void abandonedMergesOnCommit(int abandonedCount);
59+
}

lucene/core/src/java/org/apache/lucene/index/LiveIndexWriterConfig.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,12 @@ public class LiveIndexWriterConfig {
117117
/** the attributes for the NRT readers */
118118
protected Map<String, String> readerAttributes = Collections.emptyMap();
119119

120+
/** Amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...) */
121+
protected volatile double maxCommitMergeWaitSeconds;
122+
123+
/** Callback interface called on index writer actions. */
124+
protected IndexWriterEvents indexWriterEvents;
125+
120126

121127
// used by IndexWriterConfig
122128
LiveIndexWriterConfig(Analyzer analyzer) {
@@ -141,6 +147,8 @@ public class LiveIndexWriterConfig {
141147
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
142148
indexerThreadPool = new DocumentsWriterPerThreadPool();
143149
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
150+
maxCommitMergeWaitSeconds = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_SECONDS;
151+
indexWriterEvents = IndexWriterEvents.NULL_EVENTS;
144152
}
145153

146154
/** Returns the default analyzer to use for indexing documents. */
@@ -480,6 +488,22 @@ public String getSoftDeletesField() {
480488
return softDeletesField;
481489
}
482490

491+
/**
492+
* Expert: return the amount of time to wait for merges returned by by MergePolicy.findFullFlushMerges(...).
493+
* If this time is reached, we proceed with the commit based on segments merged up to that point.
494+
* The merges are not cancelled, and may still run to completion independent of the commit.
495+
*/
496+
public double getMaxCommitMergeWaitSeconds() {
497+
return maxCommitMergeWaitSeconds;
498+
}
499+
500+
/**
501+
* Returns a callback used to signal actions taken by the {@link IndexWriter}.
502+
*/
503+
public IndexWriterEvents getIndexWriterEvents() {
504+
return indexWriterEvents;
505+
}
506+
483507
@Override
484508
public String toString() {
485509
StringBuilder sb = new StringBuilder();
@@ -505,6 +529,8 @@ public String toString() {
505529
sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
506530
sb.append("softDeletesField=").append(getSoftDeletesField()).append("\n");
507531
sb.append("readerAttributes=").append(getReaderAttributes()).append("\n");
532+
sb.append("maxCommitMergeWaitSeconds=").append(getMaxCommitMergeWaitSeconds()).append("\n");
533+
sb.append("indexWriterEvents=").append(getIndexWriterEvents().getClass().getName()).append("\n");
508534
return sb.toString();
509535
}
510536

lucene/core/src/java/org/apache/lucene/index/MergePolicy.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ public abstract MergeSpecification findMerges(MergeTrigger mergeTrigger, Segment
510510
* an original segment present in the
511511
* to-be-merged index; else, it was a segment
512512
* produced by a cascaded merge.
513-
* @param mergeContext the IndexWriter to find the merges on
513+
* @param mergeContext the MergeContext to find the merges on
514514
*/
515515
public abstract MergeSpecification findForcedMerges(
516516
SegmentInfos segmentInfos, int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext)
@@ -521,11 +521,33 @@ public abstract MergeSpecification findForcedMerges(
521521
* deletes from the index.
522522
* @param segmentInfos
523523
* the total set of segments in the index
524-
* @param mergeContext the IndexWriter to find the merges on
524+
* @param mergeContext the MergeContext to find the merges on
525525
*/
526526
public abstract MergeSpecification findForcedDeletesMerges(
527527
SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException;
528528

529+
/**
530+
* Identifies merges that we want to execute (synchronously) on commit. By default, do not synchronously merge on commit.
531+
*
532+
* Any merges returned here will make {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} block until
533+
* the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitSeconds()} have elapsed. This may be
534+
* used to merge small segments that have just been flushed as part of the commit, reducing the number of segments in
535+
* the commit. If a merge does not complete in the allotted time, it will continue to execute, but will not be reflected
536+
* in the commit.
537+
*
538+
* If a {@link OneMerge} in the returned {@link MergeSpecification} includes a segment already included in a registered
539+
* merge, then {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} will throw a {@link IllegalStateException}.
540+
* Use {@link MergeContext#getMergingSegments()} to determine which segments are currently registered to merge.
541+
*
542+
* @param mergeTrigger the event that triggered the merge (COMMIT or FULL_FLUSH).
543+
* @param segmentInfos the total set of segments in the index (while preparing the commit)
544+
* @param mergeContext the MergeContext to find the merges on, which should be used to determine which segments are
545+
* already in a registered merge (see {@link MergeContext#getMergingSegments()}).
546+
*/
547+
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
548+
return null;
549+
}
550+
529551
/**
530552
* Returns true if a new segment (regardless of its origin) should use the
531553
* compound file format. The default implementation returns <code>true</code>

lucene/core/src/java/org/apache/lucene/index/MergeTrigger.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,10 @@ public enum MergeTrigger {
4747
/**
4848
* Merge was triggered by a closing IndexWriter.
4949
*/
50-
CLOSING
50+
CLOSING,
51+
52+
/**
53+
* Merge was triggered on commit.
54+
*/
55+
COMMIT,
5156
}

lucene/core/src/java/org/apache/lucene/index/NoMergePolicy.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
4545
@Override
4646
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
4747

48+
@Override
49+
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) { return null; }
50+
4851
@Override
4952
public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegment, MergeContext mergeContext) {
5053
return newSegment.info.getUseCompoundFile();

lucene/core/src/java/org/apache/lucene/index/OneMergeWrappingMergePolicy.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, Mer
5959
return wrapSpec(in.findForcedDeletesMerges(segmentInfos, mergeContext));
6060
}
6161

62+
@Override
63+
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
64+
return wrapSpec(in.findFullFlushMerges(mergeTrigger, segmentInfos, mergeContext));
65+
}
66+
6267
private MergeSpecification wrapSpec(MergeSpecification spec) {
6368
MergeSpecification wrapped = spec == null ? null : new MergeSpecification();
6469
if (wrapped != null) {

0 commit comments

Comments
 (0)