Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
LUCENE-8962: Merge segments on getReader (#1623)
Add IndexWriter merge-on-refresh feature to selectively merge 
small segments on getReader, subject to a configurable timeout, 
to improve search performance by reducing the number of small 
segments for searching.

Co-authored-by: Mike McCandless <mikemccand@apache.org>
  • Loading branch information
s1monw and mikemccand committed Aug 24, 2020
1 parent 96a853b commit 8294e1a
Show file tree
Hide file tree
Showing 15 changed files with 478 additions and 138 deletions.
4 changes: 4 additions & 0 deletions lucene/CHANGES.txt
Expand Up @@ -172,6 +172,10 @@ New Features

* LUCENE-9386: RegExpQuery added case insensitive matching option. (Mark Harwood)

* LUCENE-8962: Add IndexWriter merge-on-refresh feature to selectively merge
small segments on getReader, subject to a configurable timeout, to improve
search performance by reducing the number of small segments for searching. (Simon Willnauer)

Improvements
---------------------

Expand Down
261 changes: 212 additions & 49 deletions lucene/core/src/java/org/apache/lucene/index/IndexWriter.java

Large diffs are not rendered by default.

Expand Up @@ -110,8 +110,8 @@ public static enum OpenMode {
/** Default value for whether calls to {@link IndexWriter#close()} include a commit. */
public final static boolean DEFAULT_COMMIT_ON_CLOSE = true;

/** Default value for time to wait for merges on commit (when using a {@link MergePolicy} that implements {@link MergePolicy#findFullFlushMerges}). */
public static final long DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS = 0;
/** Default value for time to wait for merges on commit or getReader (when using a {@link MergePolicy} that implements {@link MergePolicy#findFullFlushMerges}). */
public static final long DEFAULT_MAX_FULL_FLUSH_MERGE_WAIT_MILLIS = 0;

// indicates whether this config instance is already attached to a writer.
// not final so that it can be cloned properly.
Expand Down Expand Up @@ -463,17 +463,18 @@ public IndexWriterConfig setCommitOnClose(boolean commitOnClose) {
}

/**
* Expert: sets the amount of time to wait for merges (during {@link IndexWriter#commit}) returned by
* Expert: sets the amount of time to wait for merges (during {@link IndexWriter#commit}
* or {@link IndexWriter#getReader(boolean, boolean)}) returned by
* MergePolicy.findFullFlushMerges(...).
* If this time is reached, we proceed with the commit based on segments merged up to that point.
* The merges are not cancelled, and will still run to completion independent of the commit,
* like natural segment merges. The default is <code>{@value IndexWriterConfig#DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS}</code>.
* The merges are not aborted, and will still run to completion independent of the commit or getReader call,
* like natural segment merges. The default is <code>{@value IndexWriterConfig#DEFAULT_MAX_FULL_FLUSH_MERGE_WAIT_MILLIS}</code>.
*
* Note: This settings has no effect unless {@link MergePolicy#findFullFlushMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)}
* has an implementation that actually returns merges which by default doesn't return any merges.
*/
public IndexWriterConfig setMaxCommitMergeWaitMillis(long maxCommitMergeWaitMillis) {
this.maxCommitMergeWaitMillis = maxCommitMergeWaitMillis;
public IndexWriterConfig setMaxFullFlushMergeWaitMillis(long maxFullFlushMergeWaitMillis) {
this.maxFullFlushMergeWaitMillis = maxFullFlushMergeWaitMillis;
return this;
}

Expand Down
Expand Up @@ -110,7 +110,7 @@ public class LiveIndexWriterConfig {
protected String softDeletesField = null;

/** Amount of time to wait for merges returned by MergePolicy.findFullFlushMerges(...) */
protected volatile long maxCommitMergeWaitMillis;
protected volatile long maxFullFlushMergeWaitMillis;

// used by IndexWriterConfig
LiveIndexWriterConfig(Analyzer analyzer) {
Expand All @@ -134,7 +134,7 @@ public class LiveIndexWriterConfig {
flushPolicy = new FlushByRamOrCountsPolicy();
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
maxCommitMergeWaitMillis = IndexWriterConfig.DEFAULT_MAX_COMMIT_MERGE_WAIT_MILLIS;
maxFullFlushMergeWaitMillis = IndexWriterConfig.DEFAULT_MAX_FULL_FLUSH_MERGE_WAIT_MILLIS;
}

/** Returns the default analyzer to use for indexing documents. */
Expand Down Expand Up @@ -469,8 +469,8 @@ public String getSoftDeletesField() {
* If this time is reached, we proceed with the commit based on segments merged up to that point.
* The merges are not cancelled, and may still run to completion independent of the commit.
*/
public long getMaxCommitMergeWaitMillis() {
return maxCommitMergeWaitMillis;
public long getMaxFullFlushMergeWaitMillis() {
return maxFullFlushMergeWaitMillis;
}

@Override
Expand All @@ -496,7 +496,7 @@ public String toString() {
sb.append("indexSort=").append(getIndexSort()).append("\n");
sb.append("checkPendingFlushOnUpdate=").append(isCheckPendingFlushOnUpdate()).append("\n");
sb.append("softDeletesField=").append(getSoftDeletesField()).append("\n");
sb.append("maxCommitMergeWaitMillis=").append(getMaxCommitMergeWaitMillis()).append("\n");
sb.append("maxFullFlushMergeWaitMillis=").append(getMaxFullFlushMergeWaitMillis()).append("\n");
return sb.toString();
}
}
17 changes: 9 additions & 8 deletions lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
Expand Up @@ -421,7 +421,7 @@ Optional<Boolean> hasCompletedSuccessfully() {
/**
* Called just before the merge is applied to IndexWriter's SegmentInfos
*/
void onMergeComplete() {
void onMergeComplete() throws IOException {
}

/**
Expand Down Expand Up @@ -623,19 +623,20 @@ public abstract MergeSpecification findForcedDeletesMerges(
/**
* Identifies merges that we want to execute (synchronously) on commit. By default, this will do no merging on commit.
* If you implement this method in your {@code MergePolicy} you must also set a non-zero timeout using
* {@link IndexWriterConfig#setMaxCommitMergeWaitMillis}.
* {@link IndexWriterConfig#setMaxFullFlushMergeWaitMillis}.
*
* Any merges returned here will make {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} block until
* the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitMillis()} has elapsed. This may be
* used to merge small segments that have just been flushed as part of the commit, reducing the number of segments in
* the commit. If a merge does not complete in the allotted time, it will continue to execute, and eventually finish and
* apply to future commits, but will not be reflected in the current commit.
* Any merges returned here will make {@link IndexWriter#commit()}, {@link IndexWriter#prepareCommit()}
* or {@link IndexWriter#getReader(boolean, boolean)} block until
* the merges complete or until {@link IndexWriterConfig#getMaxFullFlushMergeWaitMillis()} has elapsed. This may be
* used to merge small segments that have just been flushed, reducing the number of segments in
* the point in time snapshot. If a merge does not complete in the allotted time, it will continue to execute, and eventually finish and
* apply to future point in time snapshot, but will not be reflected in the current one.
*
* If a {@link OneMerge} in the returned {@link MergeSpecification} includes a segment already included in a registered
* merge, then {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} will throw a {@link IllegalStateException}.
* Use {@link MergeContext#getMergingSegments()} to determine which segments are currently registered to merge.
*
* @param mergeTrigger the event that triggered the merge (COMMIT or FULL_FLUSH).
* @param mergeTrigger the event that triggered the merge (COMMIT or GET_READER).
* @param segmentInfos the total set of segments in the index (while preparing the commit)
* @param mergeContext the MergeContext to find the merges on, which should be used to determine which segments are
* already in a registered merge (see {@link MergeContext#getMergingSegments()}).
Expand Down
Expand Up @@ -53,4 +53,8 @@ public enum MergeTrigger {
* Merge was triggered on commit.
*/
COMMIT,
/**
* Merge was triggered on opening NRT readers.
*/
GET_READER,
}
Expand Up @@ -404,7 +404,7 @@ private PendingDeletes newPendingDeletes(SegmentReader reader, SegmentCommitInfo
private boolean noDups() {
Set<String> seen = new HashSet<>();
for(SegmentCommitInfo info : readerMap.keySet()) {
assert !seen.contains(info.info.name);
assert !seen.contains(info.info.name) : "seen twice: " + info.info.name ;
seen.add(info.info.name);
}
return true;
Expand Down
Expand Up @@ -82,7 +82,8 @@ protected DirectoryReader doBody(String segmentFileName) throws IOException {
}

/** Used by near real-time search */
static DirectoryReader open(IndexWriter writer, SegmentInfos infos, boolean applyAllDeletes, boolean writeAllDeletes) throws IOException {
static StandardDirectoryReader open(IndexWriter writer, IOUtils.IOFunction<SegmentCommitInfo, SegmentReader> readerFunction,
SegmentInfos infos, boolean applyAllDeletes, boolean writeAllDeletes) throws IOException {
// IndexWriter synchronizes externally before calling
// us, which ensures infos will not change; so there's
// no need to process segments in reverse order
Expand All @@ -101,19 +102,14 @@ static DirectoryReader open(IndexWriter writer, SegmentInfos infos, boolean appl
// IndexWriter's segmentInfos:
final SegmentCommitInfo info = infos.info(i);
assert info.info.dir == dir;
final ReadersAndUpdates rld = writer.getPooledInstance(info, true);
try {
final SegmentReader reader = rld.getReadOnlyClone(IOContext.READ);
if (reader.numDocs() > 0 || writer.getConfig().mergePolicy.keepFullyDeletedSegment(() -> reader)) {
// Steal the ref:
readers.add(reader);
infosUpto++;
} else {
reader.decRef();
segmentInfos.remove(infosUpto);
}
} finally {
writer.release(rld);
final SegmentReader reader = readerFunction.apply(info);
if (reader.numDocs() > 0 || writer.getConfig().mergePolicy.keepFullyDeletedSegment(() -> reader)) {
// Steal the ref:
readers.add(reader);
infosUpto++;
} else {
reader.decRef();
segmentInfos.remove(infosUpto);
}
}

Expand Down
Expand Up @@ -4206,7 +4206,7 @@ public void mergeFinished(boolean success, boolean segmentDropped) throws IOExce
public void testMergeOnCommitKeepFullyDeletedSegments() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setMaxCommitMergeWaitMillis(30 * 1000);
iwc.setMaxFullFlushMergeWaitMillis(30 * 1000);
iwc.mergePolicy = new FilterMergePolicy(newMergePolicy()) {
@Override
public boolean keepFullyDeletedSegment(IOSupplier<CodecReader> readerIOSupplier) {
Expand Down

0 comments on commit 8294e1a

Please sign in to comment.