Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
LUCENE-8962: Allow waiting for all merges in a merge spec (#1585)
This change adds infrastructure to allow straight forward waiting
on one or more merges or an entire merge specification. This is
a basis for LUCENE-8962.
  • Loading branch information
s1monw committed Jun 17, 2020
1 parent 207efbc commit 59efe22
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 24 deletions.
20 changes: 9 additions & 11 deletions lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
Expand Up @@ -2129,12 +2129,12 @@ public final void maybeMerge() throws IOException {

private final void maybeMerge(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments) throws IOException {
ensureOpen(false);
if (updatePendingMerges(mergePolicy, trigger, maxNumSegments)) {
if (updatePendingMerges(mergePolicy, trigger, maxNumSegments) != null) {
mergeScheduler.merge(mergeSource, trigger);
}
}

private synchronized boolean updatePendingMerges(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments)
private synchronized MergePolicy.MergeSpecification updatePendingMerges(MergePolicy mergePolicy, MergeTrigger trigger, int maxNumSegments)
throws IOException {

// In case infoStream was disabled on init, but then enabled at some
Expand All @@ -2144,22 +2144,21 @@ private synchronized boolean updatePendingMerges(MergePolicy mergePolicy, MergeT
assert maxNumSegments == UNBOUNDED_MAX_MERGE_SEGMENTS || maxNumSegments > 0;
assert trigger != null;
if (stopMerges) {
return false;
return null;
}

// Do not start new merges if disaster struck
if (tragedy.get() != null) {
return false;
return null;
}
boolean newMergesFound = false;

final MergePolicy.MergeSpecification spec;
if (maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS) {
assert trigger == MergeTrigger.EXPLICIT || trigger == MergeTrigger.MERGE_FINISHED :
"Expected EXPLICT or MERGE_FINISHED as trigger even with maxNumSegments set but was: " + trigger.name();

spec = mergePolicy.findForcedMerges(segmentInfos, maxNumSegments, Collections.unmodifiableMap(segmentsToMerge), this);
newMergesFound = spec != null;
if (newMergesFound) {
if (spec != null) {
final int numMerges = spec.merges.size();
for(int i=0;i<numMerges;i++) {
final MergePolicy.OneMerge merge = spec.merges.get(i);
Expand All @@ -2169,14 +2168,13 @@ private synchronized boolean updatePendingMerges(MergePolicy mergePolicy, MergeT
} else {
spec = mergePolicy.findMerges(trigger, segmentInfos, this);
}
newMergesFound = spec != null;
if (newMergesFound) {
if (spec != null) {
final int numMerges = spec.merges.size();
for(int i=0;i<numMerges;i++) {
registerMerge(spec.merges.get(i));
}
}
return newMergesFound;
return spec;
}

/** Expert: to be used by a {@link MergePolicy} to avoid
Expand Down Expand Up @@ -4289,7 +4287,7 @@ private synchronized void mergeFinish(MergePolicy.OneMerge merge) {
@SuppressWarnings("try")
private synchronized void closeMergeReaders(MergePolicy.OneMerge merge, boolean suppressExceptions) throws IOException {
final boolean drop = suppressExceptions == false;
try (Closeable finalizer = merge::mergeFinished) {
try (Closeable finalizer = () -> merge.mergeFinished(suppressExceptions == false)) {
IOUtils.applyToAll(merge.readers, sr -> {
final ReadersAndUpdates rld = getPooledInstance(sr.getOriginalSegmentInfo(), false);
// We still hold a ref so it should not have been removed:
Expand Down
75 changes: 64 additions & 11 deletions lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
Expand Up @@ -23,7 +23,12 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -37,6 +42,7 @@
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOSupplier;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.ThreadInterruptedException;

/**
* <p>Expert: a MergePolicy determines the sequence of
Expand Down Expand Up @@ -76,7 +82,7 @@ public abstract class MergePolicy {
* @lucene.experimental */
public static class OneMergeProgress {
/** Reason for pausing the merge thread. */
public static enum PauseReason {
public enum PauseReason {
/** Stopped (because of throughput rate set to 0, typically). */
STOPPED,
/** Temporarily paused because of exceeded throughput rate. */
Expand Down Expand Up @@ -196,6 +202,7 @@ final void setMergeThread(Thread owner) {
*
* @lucene.experimental */
public static class OneMerge {
private final CompletableFuture<Boolean> mergeCompleted = new CompletableFuture<>();
SegmentCommitInfo info; // used by IndexWriter
boolean registerDone; // used by IndexWriter
long mergeGen; // used by IndexWriter
Expand All @@ -222,7 +229,7 @@ public static class OneMerge {
volatile long mergeStartNS = -1;

/** Total number of documents in segments to be merged, not accounting for deletions. */
public final int totalMaxDoc;
final int totalMaxDoc;
Throwable error;

/** Sole constructor.
Expand All @@ -233,13 +240,8 @@ public OneMerge(List<SegmentCommitInfo> segments) {
throw new RuntimeException("segments must include at least one segment");
}
// clone the list, as the in list may be based off original SegmentInfos and may be modified
this.segments = new ArrayList<>(segments);
int count = 0;
for(SegmentCommitInfo info : segments) {
count += info.info.maxDoc();
}
totalMaxDoc = count;

this.segments = List.copyOf(segments);
totalMaxDoc = segments.stream().mapToInt(i -> i.info.maxDoc()).sum();
mergeProgress = new OneMergeProgress();
}

Expand All @@ -251,8 +253,12 @@ public void mergeInit() throws IOException {
mergeProgress.setMergeThread(Thread.currentThread());
}

/** Called by {@link IndexWriter} after the merge is done and all readers have been closed. */
public void mergeFinished() throws IOException {
/** Called by {@link IndexWriter} after the merge is done and all readers have been closed.
* @param success true iff the merge finished successfully ie. was committed */
public void mergeFinished(boolean success) throws IOException {
if (mergeCompleted.complete(success) == false) {
throw new IllegalStateException("merge has already finished");
}
}

/** Wrap the reader in order to add/remove information to the merged segment. */
Expand Down Expand Up @@ -362,6 +368,37 @@ public void checkAborted() throws MergeAbortedException {
public OneMergeProgress getMergeProgress() {
return mergeProgress;
}

/**
* Waits for this merge to be completed
* @return true if the merge finished within the specified timeout
*/
boolean await(long timeout, TimeUnit timeUnit) {
try {
mergeCompleted.get(timeout, timeUnit);
return true;
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
} catch (ExecutionException | TimeoutException e) {
return false;
}
}

/**
* Returns true if the merge has finished or false if it's still running or
* has not been started. This method will not block.
*/
boolean isDone() {
return mergeCompleted.isDone();
}

/**
* Returns true iff the merge completed successfully or false if the merge succeeded with a failure.
* This method will not block and return an empty Optional if the merge has not finished yet
*/
Optional<Boolean> hasCompletedSuccessfully() {
return Optional.ofNullable(mergeCompleted.getNow(null));
}
}

/**
Expand Down Expand Up @@ -399,6 +436,22 @@ public String segString(Directory dir) {
}
return b.toString();
}

/**
* Waits if necessary for at most the given time for all merges.
*/
boolean await(long timeout, TimeUnit unit) {
try {
CompletableFuture<Void> future = CompletableFuture.allOf(merges.stream()
.map(m -> m.mergeCompleted).collect(Collectors.toList()).toArray(CompletableFuture<?>[]::new));
future.get(timeout, unit);
return true;
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
} catch (ExecutionException | TimeoutException e) {
return false;
}
}
}

/** Exception thrown if there are any problems while executing a merge. */
Expand Down
Expand Up @@ -538,7 +538,7 @@ public CodecReader wrapForMerge(CodecReader reader) throws IOException {
}

@Override
public void mergeFinished() throws IOException {
public void mergeFinished(boolean success) throws IOException {
Throwable th = null;
for (ParallelLeafReader r : parallelReaders) {
try {
Expand Down
Expand Up @@ -4181,7 +4181,7 @@ public boolean keepFullyDeletedSegment(IOSupplier<CodecReader> readerIOSupplier)
SetOnce<Boolean> onlyFinishOnce = new SetOnce<>();
return new MergePolicy.OneMerge(merge.segments) {
@Override
public void mergeFinished() {
public void mergeFinished(boolean success) {
onlyFinishOnce.set(true);
}
};
Expand Down
158 changes: 158 additions & 0 deletions lucene/core/src/test/org/apache/lucene/index/TestMergePolicy.java
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.lucene.index;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.lucene.store.Directory;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.Version;

public class TestMergePolicy extends LuceneTestCase {

public void testWaitForOneMerge() throws IOException, InterruptedException {
try (Directory dir = newDirectory()) {
MergePolicy.MergeSpecification ms = createRandomMergeSpecification(dir, 1 + random().nextInt(10));
for (MergePolicy.OneMerge m : ms.merges) {
assertFalse(m.hasCompletedSuccessfully().isPresent());
}
Thread t = new Thread(() -> {
try {
for (MergePolicy.OneMerge m : ms.merges) {
m.mergeFinished(true);
}
} catch (IOException e) {
throw new AssertionError(e);
}
});
t.start();
assertTrue(ms.await(100, TimeUnit.HOURS));
for (MergePolicy.OneMerge m : ms.merges) {
assertTrue(m.hasCompletedSuccessfully().get());
}
t.join();
}
}

public void testTimeout() throws IOException, InterruptedException {
try (Directory dir = newDirectory()) {
MergePolicy.MergeSpecification ms = createRandomMergeSpecification(dir, 3);
for (MergePolicy.OneMerge m : ms.merges) {
assertFalse(m.hasCompletedSuccessfully().isPresent());
}
Thread t = new Thread(() -> {
try {
ms.merges.get(0).mergeFinished(true);
} catch (IOException e) {
throw new AssertionError(e);
}
});
t.start();
assertFalse(ms.await(10, TimeUnit.MILLISECONDS));
assertFalse(ms.merges.get(1).hasCompletedSuccessfully().isPresent());
t.join();
}
}

public void testTimeoutLargeNumberOfMerges() throws IOException, InterruptedException {
try (Directory dir = newDirectory()) {
MergePolicy.MergeSpecification ms = createRandomMergeSpecification(dir, 10000);
for (MergePolicy.OneMerge m : ms.merges) {
assertFalse(m.hasCompletedSuccessfully().isPresent());
}
AtomicInteger i = new AtomicInteger(0);
AtomicBoolean stop = new AtomicBoolean(false);
Thread t = new Thread(() -> {
while (stop.get() == false) {
try {
ms.merges.get(i.getAndIncrement()).mergeFinished(true);
Thread.sleep(1);
} catch (IOException | InterruptedException e) {
throw new AssertionError(e);
}
}
});
t.start();
assertFalse(ms.await(10, TimeUnit.MILLISECONDS));
stop.set(true);
t.join();
for (int j = 0; j < ms.merges.size(); j++) {
if (j < i.get()) {
assertTrue(ms.merges.get(j).hasCompletedSuccessfully().get());
} else {
assertFalse(ms.merges.get(j).hasCompletedSuccessfully().isPresent());
}
}
}
}

public void testFinishTwice() throws IOException {
try (Directory dir = newDirectory()) {
MergePolicy.MergeSpecification spec = createRandomMergeSpecification(dir, 1);
MergePolicy.OneMerge oneMerge = spec.merges.get(0);
oneMerge.mergeFinished(true);
expectThrows(IllegalStateException.class, () -> oneMerge.mergeFinished(false));
}
}

public void testTotalMaxDoc() throws IOException {
try (Directory dir = newDirectory()) {
MergePolicy.MergeSpecification spec = createRandomMergeSpecification(dir, 1);
int docs = 0;
MergePolicy.OneMerge oneMerge = spec.merges.get(0);
for (SegmentCommitInfo info : oneMerge.segments) {
docs += info.info.maxDoc();
}
assertEquals(docs, oneMerge.totalMaxDoc);
}
}

private static MergePolicy.MergeSpecification createRandomMergeSpecification(Directory dir, int numMerges) {
MergePolicy.MergeSpecification ms = new MergePolicy.MergeSpecification();
for (int ii = 0; ii < numMerges; ++ii) {
final SegmentInfo si = new SegmentInfo(
dir, // dir
Version.LATEST, // version
Version.LATEST, // min version
TestUtil.randomSimpleString(random()), // name
random().nextInt(1000), // maxDoc
random().nextBoolean(), // isCompoundFile
null, // codec
Collections.emptyMap(), // diagnostics
TestUtil.randomSimpleString(// id
random(),
StringHelper.ID_LENGTH,
StringHelper.ID_LENGTH).getBytes(StandardCharsets.US_ASCII),
Collections.emptyMap(), // attributes
null /* indexSort */);
final List<SegmentCommitInfo> segments = new LinkedList<SegmentCommitInfo>();
segments.add(new SegmentCommitInfo(si, 0, 0, 0, 0, 0, StringHelper.randomId()));
ms.add(new MergePolicy.OneMerge(segments));
}
return ms;
}
}

0 comments on commit 59efe22

Please sign in to comment.