Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative approach to LUCENE-8962 #1576

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
75 changes: 61 additions & 14 deletions lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
Expand Up @@ -33,6 +33,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -2127,14 +2128,14 @@ public final void maybeMerge() throws IOException {
maybeMerge(config.getMergePolicy(), MergeTrigger.EXPLICIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
}

private final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
private void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
ensureOpen(false);
if (updatePendingMerges(mergePolicy, trigger, maxNumSegments)) {
if (updatePendingMerges(mergePolicy, trigger, maxNumSegments) != null) {
mergeScheduler.merge(mergeSource, trigger);
}
}

private synchronized boolean updatePendingMerges(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments)
private synchronized MergePolicy.MergeSpecification updatePendingMerges(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments)
throws IOException {

// In case infoStream was disabled on init, but then enabled at some
Expand All @@ -2144,39 +2145,43 @@ private synchronized boolean updatePendingMerges(MergePolicy mergePolicy, MergeT
assert maxNumSegments == UNBOUNDED_MAX_MERGE_SEGMENTS || maxNumSegments > 0;
assert trigger != null;
if (stopMerges) {
return false;
return null;
}

// Do not start new merges if disaster struck
if (tragedy.get() != null) {
return false;
return null;
}
boolean newMergesFound = false;
final MergePolicy.MergeSpecification spec;
if (maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS) {
assert trigger == MergeTrigger.EXPLICIT || trigger == MergeTrigger.MERGE_FINISHED :
"Expected EXPLICT or MERGE_FINISHED as trigger even with maxNumSegments set but was: " + trigger.name();

spec = mergePolicy.findForcedMerges(segmentInfos, maxNumSegments, Collections.unmodifiableMap(segmentsToMerge), this);
newMergesFound = spec != null;
if (newMergesFound) {
if (spec != null) {
final int numMerges = spec.merges.size();
for(int i=0;i<numMerges;i++) {
final MergePolicy.OneMerge merge = spec.merges.get(i);
merge.maxNumSegments = maxNumSegments;
}
}
} else {
spec = mergePolicy.findMerges(trigger, segmentInfos, this);
switch (trigger) {
case COMMIT:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an inconsistency here that suggests something is wrong, or at least confusing enough to deserve a comment. For case COMMIT, we call a findFullFlushMerges. Shouldn't it be on case FULL_FLUSH to be consistent with the method we are calling? Or should findFullFlushMerges be called findCommitMerges?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Full flush happens for refresh and commit.

But we have only implemented "merge on commit" so far.

I would love to also add "merge on refresh", but until we do so, I think it's correct for IndexWriter to separate out the COMMIT case here like this.

spec = mergePolicy.findFullFlushMerges(trigger, segmentInfos, this);
break;
default:
spec = mergePolicy.findMerges(trigger, segmentInfos, this);
}

}
newMergesFound = spec != null;
if (newMergesFound) {
if (spec != null) {
final int numMerges = spec.merges.size();
for(int i=0;i<numMerges;i++) {
registerMerge(spec.merges.get(i));
}
}
return newMergesFound;
return spec;
}

/** Expert: to be used by a {@link MergePolicy} to avoid
Expand Down Expand Up @@ -3174,7 +3179,7 @@ private long prepareCommitInternal() throws IOException {
SegmentInfos toCommit = null;
boolean anyChanges = false;
long seqNo;

MergePolicy.MergeSpecification onCommitMerges = null;
// This is copied from doFlush, except it's modified to
// clone & incRef the flushed SegmentInfos inside the
// sync block:
Expand Down Expand Up @@ -3227,6 +3232,45 @@ private long prepareCommitInternal() throws IOException {
// corresponding add from an updateDocument) can
// sneak into the commit point:
toCommit = segmentInfos.clone();
if (anyChanges) {
SegmentInfos committingSegmentInfos = toCommit;
onCommitMerges = updatePendingMerges(new OneMergeWrappingMergePolicy(config.getMergePolicy(), toWrap ->
new MergePolicy.OneMerge(toWrap.segments) {
@Override
public void mergeFinished(boolean committed) throws IOException {
assert Thread.holdsLock(IndexWriter.this);
if (committed) {
deleter.incRef(info.files());
Set<String> mergedSegmentNames = new HashSet<>();
for (SegmentCommitInfo sci : segments) {
mergedSegmentNames.add(sci.info.name);
}
List<SegmentCommitInfo> toCommitMergedAwaySegments = new ArrayList<>();
for (SegmentCommitInfo sci : committingSegmentInfos) {
if (mergedSegmentNames.contains(sci.info.name)) {
toCommitMergedAwaySegments.add(sci);
deleter.decRef(sci.files());
}
}
// Construct a OneMerge that applies to toCommit
MergePolicy.OneMerge applicableMerge = new MergePolicy.OneMerge(toCommitMergedAwaySegments);
applicableMerge.info = info.clone();
long segmentCounter = Long.parseLong(info.info.name.substring(1), Character.MAX_RADIX);
committingSegmentInfos.counter = Math.max(committingSegmentInfos.counter, segmentCounter + 1);
committingSegmentInfos.applyMergeChanges(applicableMerge, false);
}
toWrap.mergeFinished(committed);
super.mergeFinished(committed);

}

@Override
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
return toWrap.wrapForMerge(reader);
}
}
), MergeTrigger.COMMIT, UNBOUNDED_MAX_MERGE_SEGMENTS);
}

pendingCommitChangeCount = changeCount.get();

Expand Down Expand Up @@ -3259,6 +3303,9 @@ private long prepareCommitInternal() throws IOException {
}

try {
if (onCommitMerges != null) {
onCommitMerges.await(10, TimeUnit.SECONDS); // pass some config value here
}
if (anyChanges) {
maybeMerge.set(true);
}
Expand Down Expand Up @@ -4289,7 +4336,7 @@ private synchronized void mergeFinish(MergePolicy.OneMerge merge) {
@SuppressWarnings("try")
private synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions) throws IOException {
final boolean drop = suppressExceptions == false;
try (Closeable finalizer = merge::mergeFinished) {
try (Closeable finalizer = () -> merge.mergeFinished(suppressExceptions==false);) {
IOUtils.applyToAll(merge.readers, sr -> {
final ReadersAndUpdates rld = getPooledInstance(sr.getOriginalSegmentInfo(), false);
// We still hold a ref so it should not have been removed:
Expand Down
70 changes: 64 additions & 6 deletions lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
Expand Up @@ -24,10 +24,16 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -196,6 +202,7 @@ final void setMergeThread(Thread owner) {
*
* @lucene.experimental */
public static class OneMerge {
private final CompletableFuture<Void> completable = new CompletableFuture<>();
SegmentCommitInfo info; // used by IndexWriter
boolean registerDone; // used by IndexWriter
long mergeGen; // used by IndexWriter
Expand All @@ -222,13 +229,13 @@ public static class OneMerge {
volatile long mergeStartNS = -1;

/** Total number of documents in segments to be merged, not accounting for deletions. */
public final int totalMaxDoc;
final int totalMaxDoc;
Throwable error;

/** Sole constructor.
* @param segments List of {@link SegmentCommitInfo}s
* to be merged. */
public OneMerge(List<SegmentCommitInfo> segments) {
/** Sole constructor.
* @param segments List of {@link SegmentCommitInfo}s
* to be merged. */
public OneMerge(List<SegmentCommitInfo> segments) {
if (0 == segments.size()) {
throw new RuntimeException("segments must include at least one segment");
}
Expand All @@ -252,7 +259,24 @@ public void mergeInit() throws IOException {
}

/** Called by {@link IndexWriter} after the merge is done and all readers have been closed. */
public void mergeFinished() throws IOException {
public void mergeFinished(boolean committed) throws IOException {
completable.complete(null);
}

boolean await(long timeout, TimeUnit timeUnit) {
try {
completable.get(timeout, timeUnit);
return true;
} catch (InterruptedException e) {
Thread.interrupted();
return false;
} catch (ExecutionException | TimeoutException e) {
return false;
}
}

boolean isDone() {
return completable.isDone();
}

/** Wrap the reader in order to add/remove information to the merged segment. */
Expand Down Expand Up @@ -399,8 +423,19 @@ public String segString(Directory dir) {
}
return b.toString();
}

boolean await(long timeout, TimeUnit unit) {
for (OneMerge merge : merges) {
if (merge.await(timeout, unit) == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks suspicious when there is more than one merge. Shouldn't the timeout decrease as time is used up by earlier merges?

In practice, when is there more than one? I've been confused on this matter when I developed a custom MP/MS.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in a real change that's correct. in a prototype as this is it's really just there to visualize the idea. I didn't do this on purpose to not discuss impl details. that's not the point of this it's really just a PR to make commenting simpler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed this in the followup PR #1585

return false;
}
}
return true;
}
}



/** Exception thrown if there are any problems while executing a merge. */
public static class MergeException extends RuntimeException {
/** Create a {@code MergeException}. */
Expand Down Expand Up @@ -692,4 +727,27 @@ public interface MergeContext {
*/
Set<SegmentCommitInfo> getMergingSegments();
}

/**
* Identifies merges that we want to execute (synchronously) on commit. By default, do not synchronously merge on commit.
*
* Any merges returned here will make {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} block until
* the merges complete or until {@link IndexWriterConfig#getMaxCommitMergeWaitSeconds()} have elapsed. This may be
* used to merge small segments that have just been flushed as part of the commit, reducing the number of segments in
* the commit. If a merge does not complete in the allotted time, it will continue to execute, but will not be reflected
* in the commit.
*
* If a {@link OneMerge} in the returned {@link MergeSpecification} includes a segment already included in a registered
* merge, then {@link IndexWriter#commit()} or {@link IndexWriter#prepareCommit()} will throw a {@link IllegalStateException}.
* Use {@link MergeContext#getMergingSegments()} to determine which segments are currently registered to merge.
*
* @param mergeTrigger the event that triggered the merge (COMMIT or FULL_FLUSH).
* @param segmentInfos the total set of segments in the index (while preparing the commit)
* @param mergeContext the MergeContext to find the merges on, which should be used to determine which segments are
* already in a registered merge (see {@link MergeContext#getMergingSegments()}).
*/
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
return null;
}

}
Expand Up @@ -47,5 +47,10 @@ public enum MergeTrigger {
/**
* Merge was triggered by a closing IndexWriter.
*/
CLOSING
CLOSING,

/**
* Merge was triggered on commit.
*/
COMMIT
}
Expand Up @@ -538,7 +538,7 @@ public CodecReader wrapForMerge(CodecReader reader) throws IOException {
}

@Override
public void mergeFinished() throws IOException {
public void mergeFinished(boolean committed) throws IOException {
Throwable th = null;
for (ParallelLeafReader r : parallelReaders) {
try {
Expand Down