diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java index 91e590c3171c..9a669e0d5b94 100644 --- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java +++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java @@ -19,6 +19,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Locale; @@ -222,14 +223,22 @@ private void waitApply(Set waitFor, IndexWriter writer) t infoStream.message("BD", "waitApply: " + waitFor.size() + " packets: " + waitFor); } + ArrayList pendingPackets = new ArrayList<>(); long totalDelCount = 0; for (FrozenBufferedUpdates packet : waitFor) { // Frozen packets are now resolved, concurrently, by the indexing threads that // create them, by adding a DocumentsWriter.ResolveUpdatesEvent to the events queue, // but if we get here and the packet is not yet resolved, we resolve it now ourselves: - packet.apply(writer); + if (packet.tryApply(writer) == false) { + // if somebody else is currently applying it - move on to the next one and force apply below + pendingPackets.add(packet); + } totalDelCount += packet.totalDelCount; } + for (FrozenBufferedUpdates packet : pendingPackets) { + // now block on all the packets that were concurrently applied to ensure they are due before we continue. + packet.forceApply(writer); + } if (infoStream.isEnabled("BD")) { infoStream.message("BD", diff --git a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java index 36834a39ff6e..bb84a79f15b2 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.IntConsumer; import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; @@ -83,6 +84,7 @@ final class FrozenBufferedUpdates { /** Counts down once all deletes/updates have been applied */ public final CountDownLatch applied = new CountDownLatch(1); + private final ReentrantLock applyLock = new ReentrantLock(); /** How many total documents were deleted/updated. */ public long totalDelCount; @@ -214,149 +216,173 @@ private List getInfosToApply(IndexWriter writer) { /** Translates a frozen packet of delete term/query, or doc values * updates, into their actual docIDs in the index, and applies the change. This is a heavy - * operation and is done concurrently by incoming indexing threads. */ + * operation and is done concurrently by incoming indexing threads. + * This method will return immediately without blocking if another thread is currently + * applying the package. In order to ensure the packet has been applied, {@link #forceApply(IndexWriter)} + * must be called. + * */ @SuppressWarnings("try") - public synchronized void apply(IndexWriter writer) throws IOException { - if (applied.getCount() == 0) { - // already done - return; + boolean tryApply(IndexWriter writer) throws IOException { + if (applyLock.tryLock()) { + try { + forceApply(writer); + return true; + } finally { + applyLock.unlock(); + } } + return false; + } - long startNS = System.nanoTime(); + /** Translates a frozen packet of delete term/query, or doc values + * updates, into their actual docIDs in the index, and applies the change. This is a heavy + * operation and is done concurrently by incoming indexing threads. + * */ + void forceApply(IndexWriter writer) throws IOException { + applyLock.lock(); + try { + if (applied.getCount() == 0) { + // already done + return; + } + long startNS = System.nanoTime(); - assert any(); + assert any(); - Set seenSegments = new HashSet<>(); + Set seenSegments = new HashSet<>(); - int iter = 0; - int totalSegmentCount = 0; - long totalDelCount = 0; + int iter = 0; + int totalSegmentCount = 0; + long totalDelCount = 0; - boolean finished = false; + boolean finished = false; - // Optimistic concurrency: assume we are free to resolve the deletes against all current segments in the index, despite that - // concurrent merges are running. Once we are done, we check to see if a merge completed while we were running. If so, we must retry - // resolving against the newly merged segment(s). Eventually no merge finishes while we were running and we are done. - while (true) { - String messagePrefix; - if (iter == 0) { - messagePrefix = ""; - } else { - messagePrefix = "iter " + iter; - } + // Optimistic concurrency: assume we are free to resolve the deletes against all current segments in the index, despite that + // concurrent merges are running. Once we are done, we check to see if a merge completed while we were running. If so, we must retry + // resolving against the newly merged segment(s). Eventually no merge finishes while we were running and we are done. + while (true) { + String messagePrefix; + if (iter == 0) { + messagePrefix = ""; + } else { + messagePrefix = "iter " + iter; + } - long iterStartNS = System.nanoTime(); + long iterStartNS = System.nanoTime(); - long mergeGenStart = writer.mergeFinishedGen.get(); + long mergeGenStart = writer.mergeFinishedGen.get(); - Set delFiles = new HashSet<>(); - BufferedUpdatesStream.SegmentState[] segStates; + Set delFiles = new HashSet<>(); + BufferedUpdatesStream.SegmentState[] segStates; - synchronized (writer) { - List infos = getInfosToApply(writer); - if (infos == null) { - break; - } + synchronized (writer) { + List infos = getInfosToApply(writer); + if (infos == null) { + break; + } - for (SegmentCommitInfo info : infos) { - delFiles.addAll(info.files()); - } + for (SegmentCommitInfo info : infos) { + delFiles.addAll(info.files()); + } - // Must open while holding IW lock so that e.g. segments are not merged - // away, dropped from 100% deletions, etc., before we can open the readers - segStates = openSegmentStates(writer, infos, seenSegments, delGen()); + // Must open while holding IW lock so that e.g. segments are not merged + // away, dropped from 100% deletions, etc., before we can open the readers + segStates = openSegmentStates(writer, infos, seenSegments, delGen()); - if (segStates.length == 0) { + if (segStates.length == 0) { + + if (infoStream.isEnabled("BD")) { + infoStream.message("BD", "packet matches no segments"); + } + break; + } if (infoStream.isEnabled("BD")) { - infoStream.message("BD", "packet matches no segments"); + infoStream.message("BD", String.format(Locale.ROOT, + messagePrefix + "now apply del packet (%s) to %d segments, mergeGen %d", + this, segStates.length, mergeGenStart)); } - break; + + totalSegmentCount += segStates.length; + + // Important, else IFD may try to delete our files while we are still using them, + // if e.g. a merge finishes on some of the segments we are resolving on: + writer.deleter.incRef(delFiles); + } + + AtomicBoolean success = new AtomicBoolean(); + long delCount; + try (Closeable finalizer = () -> finishApply(writer, segStates, success.get(), delFiles)) { + // don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates: + delCount = apply(segStates); + success.set(true); } + // Since we just resolved some more deletes/updates, now is a good time to write them: + writer.writeSomeDocValuesUpdates(); + + // It's OK to add this here, even if the while loop retries, because delCount only includes newly + // deleted documents, on the segments we didn't already do in previous iterations: + totalDelCount += delCount; + if (infoStream.isEnabled("BD")) { infoStream.message("BD", String.format(Locale.ROOT, - messagePrefix + "now apply del packet (%s) to %d segments, mergeGen %d", - this, segStates.length, mergeGenStart)); + messagePrefix + "done inner apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec", + this, segStates.length, delCount, (System.nanoTime() - iterStartNS) / 1000000000.)); } + if (privateSegment != null) { + // No need to retry for a segment-private packet: the merge that folds in our private segment already waits for all deletes to + // be applied before it kicks off, so this private segment must already not be in the set of merging segments - totalSegmentCount += segStates.length; + break; + } - // Important, else IFD may try to delete our files while we are still using them, - // if e.g. a merge finishes on some of the segments we are resolving on: - writer.deleter.incRef(delFiles); - } + // Must sync on writer here so that IW.mergeCommit is not running concurrently, so that if we exit, we know mergeCommit will succeed + // in pulling all our delGens into a merge: + synchronized (writer) { + long mergeGenCur = writer.mergeFinishedGen.get(); - AtomicBoolean success = new AtomicBoolean(); - long delCount; - try (Closeable finalizer = () -> finishApply(writer, segStates, success.get(), delFiles)) { - // don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates: - delCount = apply(segStates); - success.set(true); - } + if (mergeGenCur == mergeGenStart) { - // Since we just resolved some more deletes/updates, now is a good time to write them: - writer.writeSomeDocValuesUpdates(); + // Must do this while still holding IW lock else a merge could finish and skip carrying over our updates: - // It's OK to add this here, even if the while loop retries, because delCount only includes newly - // deleted documents, on the segments we didn't already do in previous iterations: - totalDelCount += delCount; + // Record that this packet is finished: + writer.finished(this); - if (infoStream.isEnabled("BD")) { - infoStream.message("BD", String.format(Locale.ROOT, - messagePrefix + "done inner apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec", - this, segStates.length, delCount, (System.nanoTime() - iterStartNS) / 1000000000.)); - } - if (privateSegment != null) { - // No need to retry for a segment-private packet: the merge that folds in our private segment already waits for all deletes to - // be applied before it kicks off, so this private segment must already not be in the set of merging segments - - break; - } + finished = true; - // Must sync on writer here so that IW.mergeCommit is not running concurrently, so that if we exit, we know mergeCommit will succeed - // in pulling all our delGens into a merge: - synchronized (writer) { - long mergeGenCur = writer.mergeFinishedGen.get(); + // No merge finished while we were applying, so we are done! + break; + } + } - if (mergeGenCur == mergeGenStart) { + if (infoStream.isEnabled("BD")) { + infoStream.message("BD", messagePrefix + "concurrent merges finished; move to next iter"); + } - // Must do this while still holding IW lock else a merge could finish and skip carrying over our updates: - - // Record that this packet is finished: - writer.finished(this); + // A merge completed while we were running. In this case, that merge may have picked up some of the updates we did, but not + // necessarily all of them, so we cycle again, re-applying all our updates to the newly merged segment. - finished = true; + iter++; + } - // No merge finished while we were applying, so we are done! - break; - } + if (finished == false) { + // Record that this packet is finished: + writer.finished(this); } if (infoStream.isEnabled("BD")) { - infoStream.message("BD", messagePrefix + "concurrent merges finished; move to next iter"); + String message = String.format(Locale.ROOT, + "done apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec", + this, totalSegmentCount, totalDelCount, (System.nanoTime() - startNS) / 1000000000.); + if (iter > 0) { + message += "; " + (iter + 1) + " iters due to concurrent merges"; + } + message += "; " + writer.getPendingUpdatesCount() + " packets remain"; + infoStream.message("BD", message); } - - // A merge completed while we were running. In this case, that merge may have picked up some of the updates we did, but not - // necessarily all of them, so we cycle again, re-applying all our updates to the newly merged segment. - - iter++; - } - - if (finished == false) { - // Record that this packet is finished: - writer.finished(this); - } - - if (infoStream.isEnabled("BD")) { - String message = String.format(Locale.ROOT, - "done apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec", - this, totalSegmentCount, totalDelCount, (System.nanoTime() - startNS) / 1000000000.); - if (iter > 0) { - message += "; " + (iter+1) + " iters due to concurrent merges"; - } - message += "; " + writer.getPendingUpdatesCount() + " packets remain"; - infoStream.message("BD", message); + } finally { + applyLock.unlock(); } } @@ -411,6 +437,7 @@ public static BufferedUpdatesStream.ApplyDeletesResult closeSegmentStates(IndexW private void finishApply(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates, boolean success, Set delFiles) throws IOException { + assert applyLock.isHeldByCurrentThread(); synchronized (writer) { BufferedUpdatesStream.ApplyDeletesResult result; @@ -441,8 +468,8 @@ private void finishApply(IndexWriter writer, BufferedUpdatesStream.SegmentState[ /** Applies pending delete-by-term, delete-by-query and doc values updates to all segments in the index, returning * the number of new deleted or updated documents. */ - private synchronized long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException { - + private long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException { + assert applyLock.isHeldByCurrentThread(); if (delGen == -1) { // we were not yet pushed throw new IllegalArgumentException("gen is not yet set; call BufferedUpdatesStream.push first"); diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 80d11c1b61db..028554b5eda7 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -2607,7 +2607,9 @@ synchronized long publishFrozenUpdates(FrozenBufferedUpdates packet) { // Do this as an event so it applies higher in the stack when we are not holding DocumentsWriterFlushQueue.purgeLock: eventQueue.add(w -> { try { - packet.apply(w); + // we call tryApply here since we don't want to block if a refresh or a flush is already applying the + // packet. The flush will retry this packet anyway to ensure all of them are applied + packet.tryApply(w); } catch (Throwable t) { try { w.onTragicEvent(t, "applyUpdatesPacket");