3232import java .util .Queue ;
3333import java .util .Set ;
3434import java .util .concurrent .ConcurrentLinkedQueue ;
35- import java .util .concurrent .CountDownLatch ;
36- import java .util .concurrent .TimeUnit ;
3735import java .util .concurrent .atomic .AtomicBoolean ;
3836import java .util .concurrent .atomic .AtomicInteger ;
3937import java .util .concurrent .atomic .AtomicLong ;
@@ -3166,42 +3164,6 @@ public final boolean flushNextBuffer() throws IOException {
31663164 }
31673165 }
31683166
3169- private MergePolicy .OneMerge updateSegmentInfosOnMergeFinish (MergePolicy .OneMerge merge , final SegmentInfos toCommit ,
3170- AtomicReference <CountDownLatch > mergeLatchRef ) {
3171- return new MergePolicy .OneMerge (merge .segments ) {
3172- public void mergeFinished () throws IOException {
3173- super .mergeFinished ();
3174- CountDownLatch mergeAwaitLatch = mergeLatchRef .get ();
3175- if (mergeAwaitLatch == null ) {
3176- // Commit thread timed out waiting for this merge and moved on. No need to manipulate toCommit.
3177- return ;
3178- }
3179- if (isAborted () == false ) {
3180- deleter .incRef (this .info .files ());
3181- // Resolve "live" SegmentInfos segments to their toCommit cloned equivalents, based on segment name.
3182- Set <String > mergedSegmentNames = new HashSet <>();
3183- for (SegmentCommitInfo sci : this .segments ) {
3184- deleter .decRef (sci .files ());
3185- mergedSegmentNames .add (sci .info .name );
3186- }
3187- List <SegmentCommitInfo > toCommitMergedAwaySegments = new ArrayList <>();
3188- for (SegmentCommitInfo sci : toCommit ) {
3189- if (mergedSegmentNames .contains (sci .info .name )) {
3190- toCommitMergedAwaySegments .add (sci );
3191- }
3192- }
3193- // Construct a OneMerge that applies to toCommit
3194- MergePolicy .OneMerge applicableMerge = new MergePolicy .OneMerge (toCommitMergedAwaySegments );
3195- applicableMerge .info = this .info .clone ();
3196- long segmentCounter = Long .parseLong (this .info .info .name .substring (1 ), Character .MAX_RADIX );
3197- toCommit .counter = Math .max (toCommit .counter , segmentCounter + 1 );
3198- toCommit .applyMergeChanges (applicableMerge , false );
3199- }
3200- mergeAwaitLatch .countDown ();
3201- }
3202- };
3203- }
3204-
32053167 private long prepareCommitInternal () throws IOException {
32063168 startCommitTime = System .nanoTime ();
32073169 synchronized (commitLock ) {
@@ -3224,8 +3186,6 @@ private long prepareCommitInternal() throws IOException {
32243186 SegmentInfos toCommit = null ;
32253187 boolean anyChanges = false ;
32263188 long seqNo ;
3227- List <MergePolicy .OneMerge > commitMerges = null ;
3228- AtomicReference <CountDownLatch > mergeAwaitLatchRef = null ;
32293189
32303190 // This is copied from doFlush, except it's modified to
32313191 // clone & incRef the flushed SegmentInfos inside the
@@ -3280,38 +3240,15 @@ private long prepareCommitInternal() throws IOException {
32803240 // sneak into the commit point:
32813241 toCommit = segmentInfos .clone ();
32823242
3283- if (anyChanges ) {
3284- // Find any merges that can execute on commit (per MergePolicy).
3285- MergePolicy .MergeSpecification mergeSpec =
3286- config .getMergePolicy ().findFullFlushMerges (MergeTrigger .COMMIT , segmentInfos , this );
3287- if (mergeSpec != null && mergeSpec .merges .size () > 0 ) {
3288- int mergeCount = mergeSpec .merges .size ();
3289- commitMerges = new ArrayList <>(mergeCount );
3290- mergeAwaitLatchRef = new AtomicReference <>(new CountDownLatch (mergeCount ));
3291- for (MergePolicy .OneMerge oneMerge : mergeSpec .merges ) {
3292- MergePolicy .OneMerge trackedMerge =
3293- updateSegmentInfosOnMergeFinish (oneMerge , toCommit , mergeAwaitLatchRef );
3294- if (registerMerge (trackedMerge ) == false ) {
3295- throw new IllegalStateException ("MergePolicy " + config .getMergePolicy ().getClass () +
3296- " returned merging segments from findFullFlushMerges" );
3297- }
3298- commitMerges .add (trackedMerge );
3299- }
3300- if (infoStream .isEnabled ("IW" )) {
3301- infoStream .message ("IW" , "Registered " + mergeCount + " commit merges" );
3302- infoStream .message ("IW" , "Before executing commit merges, had " + toCommit .size () + " segments" );
3303- }
3304- }
3305- }
3306-
33073243 pendingCommitChangeCount = changeCount .get ();
33083244
33093245 // This protects the segmentInfos we are now going
33103246 // to commit. This is important in case, eg, while
33113247 // we are trying to sync all referenced files, a
33123248 // merge completes which would otherwise have
33133249 // removed the files we are now syncing.
3314- deleter .incRef (toCommit .files (false ));
3250+ filesToCommit = toCommit .files (false );
3251+ deleter .incRef (filesToCommit );
33153252 }
33163253 success = true ;
33173254 } finally {
@@ -3332,52 +3269,6 @@ private long prepareCommitInternal() throws IOException {
33323269 } finally {
33333270 maybeCloseOnTragicEvent ();
33343271 }
3335-
3336- if (mergeAwaitLatchRef != null ) {
3337- CountDownLatch mergeAwaitLatch = mergeAwaitLatchRef .get ();
3338- // If we found and registered any merges above, within the flushLock, then we want to ensure that they
3339- // complete execution. Note that since we released the lock, other merges may have been scheduled. We will
3340- // block until the merges that we registered complete. As they complete, they will update toCommit to
3341- // replace merged segments with the result of each merge.
3342- config .getIndexWriterEvents ().beginMergeOnCommit ();
3343- mergeScheduler .merge (this , MergeTrigger .COMMIT , true );
3344- long mergeWaitStart = System .nanoTime ();
3345- int abandonedCount = 0 ;
3346- long waitTimeMillis = (long ) (config .getMaxCommitMergeWaitSeconds () * 1000.0 );
3347- try {
3348- if (mergeAwaitLatch .await (waitTimeMillis , TimeUnit .MILLISECONDS ) == false ) {
3349- synchronized (this ) {
3350- // Need to do this in a synchronized block, to make sure none of our commit merges are currently
3351- // executing mergeFinished (since mergeFinished itself is called from within the IndexWriter lock).
3352- // After we clear the value from mergeAwaitLatchRef, the merges we schedule will still execute as
3353- // usual, but when they finish, they won't attempt to update toCommit or modify segment reference
3354- // counts.
3355- mergeAwaitLatchRef .set (null );
3356- for (MergePolicy .OneMerge commitMerge : commitMerges ) {
3357- if (runningMerges .contains (commitMerge ) || pendingMerges .contains (commitMerge )) {
3358- abandonedCount ++;
3359- }
3360- }
3361- }
3362- }
3363- } catch (InterruptedException ie ) {
3364- throw new ThreadInterruptedException (ie );
3365- } finally {
3366- if (infoStream .isEnabled ("IW" )) {
3367- infoStream .message ("IW" , String .format (Locale .ROOT , "Waited %.1f ms for commit merges" ,
3368- (System .nanoTime () - mergeWaitStart )/1_000_000.0 ));
3369- infoStream .message ("IW" , "After executing commit merges, had " + toCommit .size () + " segments" );
3370- if (abandonedCount > 0 ) {
3371- infoStream .message ("IW" , "Abandoned " + abandonedCount + " commit merges after " + waitTimeMillis + " ms" );
3372- }
3373- }
3374- if (abandonedCount > 0 ) {
3375- config .getIndexWriterEvents ().abandonedMergesOnCommit (abandonedCount );
3376- }
3377- config .getIndexWriterEvents ().finishMergeOnCommit ();
3378- }
3379- }
3380- filesToCommit = toCommit .files (false );
33813272
33823273 try {
33833274 if (anyChanges ) {
0 commit comments