-
Notifications
You must be signed in to change notification settings - Fork 968
/
FrozenBufferedUpdates.java
862 lines (748 loc) · 33.3 KB
/
FrozenBufferedUpdates.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntConsumer;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.RamUsageEstimator;
/**
* Holds buffered deletes and updates by term or query, once pushed. Pushed
* deletes/updates are write-once, so we shift to more memory efficient data
* structure to hold them. We don't hold docIDs because these are applied on
* flush.
*/
final class FrozenBufferedUpdates {
/* NOTE: we now apply this frozen packet immediately on creation, yet this process is heavy, and runs
* in multiple threads, and this compression is sizable (~8.3% of the original size), so it's important
* we run this before applying the deletes/updates. */
/* Query we often undercount (say 24 bytes), plus int. */
final static int BYTES_PER_DEL_QUERY = RamUsageEstimator.NUM_BYTES_OBJECT_REF + Integer.BYTES + 24;
// Terms, in sorted order:
final PrefixCodedTerms deleteTerms;
// Parallel array of deleted query, and the docIDUpto for each
final Query[] deleteQueries;
final int[] deleteQueryLimits;
/** Counts down once all deletes/updates have been applied */
public final CountDownLatch applied = new CountDownLatch(1);
private final ReentrantLock applyLock = new ReentrantLock();
private final Map<String, FieldUpdatesBuffer> fieldUpdates;
/** How many total documents were deleted/updated. */
public long totalDelCount;
private final int fieldUpdatesCount;
final int bytesUsed;
final int numTermDeletes;
private long delGen = -1; // assigned by BufferedUpdatesStream once pushed
final SegmentCommitInfo privateSegment; // non-null iff this frozen packet represents
// a segment private deletes. in that case is should
// only have Queries and doc values updates
private final InfoStream infoStream;
public FrozenBufferedUpdates(InfoStream infoStream, BufferedUpdates updates, SegmentCommitInfo privateSegment) {
this.infoStream = infoStream;
this.privateSegment = privateSegment;
assert updates.deleteDocIDs.isEmpty();
assert privateSegment == null || updates.deleteTerms.isEmpty() : "segment private packet should only have del queries";
Term termsArray[] = updates.deleteTerms.keySet().toArray(new Term[updates.deleteTerms.size()]);
ArrayUtil.timSort(termsArray);
PrefixCodedTerms.Builder builder = new PrefixCodedTerms.Builder();
for (Term term : termsArray) {
builder.add(term);
}
deleteTerms = builder.finish();
deleteQueries = new Query[updates.deleteQueries.size()];
deleteQueryLimits = new int[updates.deleteQueries.size()];
int upto = 0;
for(Map.Entry<Query,Integer> ent : updates.deleteQueries.entrySet()) {
deleteQueries[upto] = ent.getKey();
deleteQueryLimits[upto] = ent.getValue();
upto++;
}
// TODO if a Term affects multiple fields, we could keep the updates key'd by Term
// so that it maps to all fields it affects, sorted by their docUpto, and traverse
// that Term only once, applying the update to all fields that still need to be
// updated.
this.fieldUpdates = Collections.unmodifiableMap(new HashMap<>(updates.fieldUpdates));
this.fieldUpdatesCount = updates.numFieldUpdates.get();
bytesUsed = (int) ((deleteTerms.ramBytesUsed() + deleteQueries.length * BYTES_PER_DEL_QUERY)
+ updates.fieldUpdatesBytesUsed.get());
numTermDeletes = updates.numTermDeletes.get();
if (infoStream != null && infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT,
"compressed %d to %d bytes (%.2f%%) for deletes/updates; private segment %s",
updates.ramBytesUsed(), bytesUsed, 100.*bytesUsed/updates.ramBytesUsed(),
privateSegment));
}
}
/** Returns the {@link SegmentCommitInfo} that this packet is supposed to apply its deletes to, or null
* if the private segment was already merged away. */
private List<SegmentCommitInfo> getInfosToApply(IndexWriter writer) {
assert Thread.holdsLock(writer);
final List<SegmentCommitInfo> infos;
if (privateSegment != null) {
if (writer.segmentCommitInfoExist(privateSegment)) {
infos = Collections.singletonList(privateSegment);
}else {
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "private segment already gone; skip processing updates");
}
infos = null;
}
} else {
infos = writer.listOfSegmentCommitInfos();
}
return infos;
}
/** Translates a frozen packet of delete term/query, or doc values
* updates, into their actual docIDs in the index, and applies the change. This is a heavy
* operation and is done concurrently by incoming indexing threads.
* This method will return immediately without blocking if another thread is currently
* applying the package. In order to ensure the packet has been applied, {@link #forceApply(IndexWriter)}
* must be called.
* */
@SuppressWarnings("try")
boolean tryApply(IndexWriter writer) throws IOException {
if (applyLock.tryLock()) {
try {
forceApply(writer);
return true;
} finally {
applyLock.unlock();
}
}
return false;
}
/** Translates a frozen packet of delete term/query, or doc values
* updates, into their actual docIDs in the index, and applies the change. This is a heavy
* operation and is done concurrently by incoming indexing threads.
* */
void forceApply(IndexWriter writer) throws IOException {
applyLock.lock();
try {
if (applied.getCount() == 0) {
// already done
return;
}
long startNS = System.nanoTime();
assert any();
Set<SegmentCommitInfo> seenSegments = new HashSet<>();
int iter = 0;
int totalSegmentCount = 0;
long totalDelCount = 0;
boolean finished = false;
// Optimistic concurrency: assume we are free to resolve the deletes against all current segments in the index, despite that
// concurrent merges are running. Once we are done, we check to see if a merge completed while we were running. If so, we must retry
// resolving against the newly merged segment(s). Eventually no merge finishes while we were running and we are done.
while (true) {
String messagePrefix;
if (iter == 0) {
messagePrefix = "";
} else {
messagePrefix = "iter " + iter;
}
long iterStartNS = System.nanoTime();
long mergeGenStart = writer.mergeFinishedGen.get();
Set<String> delFiles = new HashSet<>();
BufferedUpdatesStream.SegmentState[] segStates;
synchronized (writer) {
List<SegmentCommitInfo> infos = getInfosToApply(writer);
if (infos == null) {
break;
}
for (SegmentCommitInfo info : infos) {
delFiles.addAll(info.files());
}
// Must open while holding IW lock so that e.g. segments are not merged
// away, dropped from 100% deletions, etc., before we can open the readers
segStates = openSegmentStates(writer, infos, seenSegments, delGen());
if (segStates.length == 0) {
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", "packet matches no segments");
}
break;
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT,
messagePrefix + "now apply del packet (%s) to %d segments, mergeGen %d",
this, segStates.length, mergeGenStart));
}
totalSegmentCount += segStates.length;
// Important, else IFD may try to delete our files while we are still using them,
// if e.g. a merge finishes on some of the segments we are resolving on:
writer.deleter.incRef(delFiles);
}
AtomicBoolean success = new AtomicBoolean();
long delCount;
try (Closeable finalizer = () -> finishApply(writer, segStates, success.get(), delFiles)) {
// don't hold IW monitor lock here so threads are free concurrently resolve deletes/updates:
delCount = apply(segStates);
success.set(true);
}
// Since we just resolved some more deletes/updates, now is a good time to write them:
writer.writeSomeDocValuesUpdates();
// It's OK to add this here, even if the while loop retries, because delCount only includes newly
// deleted documents, on the segments we didn't already do in previous iterations:
totalDelCount += delCount;
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT,
messagePrefix + "done inner apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
this, segStates.length, delCount, (System.nanoTime() - iterStartNS) / 1000000000.));
}
if (privateSegment != null) {
// No need to retry for a segment-private packet: the merge that folds in our private segment already waits for all deletes to
// be applied before it kicks off, so this private segment must already not be in the set of merging segments
break;
}
// Must sync on writer here so that IW.mergeCommit is not running concurrently, so that if we exit, we know mergeCommit will succeed
// in pulling all our delGens into a merge:
synchronized (writer) {
long mergeGenCur = writer.mergeFinishedGen.get();
if (mergeGenCur == mergeGenStart) {
// Must do this while still holding IW lock else a merge could finish and skip carrying over our updates:
// Record that this packet is finished:
writer.finished(this);
finished = true;
// No merge finished while we were applying, so we are done!
break;
}
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", messagePrefix + "concurrent merges finished; move to next iter");
}
// A merge completed while we were running. In this case, that merge may have picked up some of the updates we did, but not
// necessarily all of them, so we cycle again, re-applying all our updates to the newly merged segment.
iter++;
}
if (finished == false) {
// Record that this packet is finished:
writer.finished(this);
}
if (infoStream.isEnabled("BD")) {
String message = String.format(Locale.ROOT,
"done apply del packet (%s) to %d segments; %d new deletes/updates; took %.3f sec",
this, totalSegmentCount, totalDelCount, (System.nanoTime() - startNS) / 1000000000.);
if (iter > 0) {
message += "; " + (iter + 1) + " iters due to concurrent merges";
}
message += "; " + writer.getPendingUpdatesCount() + " packets remain";
infoStream.message("BD", message);
}
} finally {
applyLock.unlock();
}
}
/** Opens SegmentReader and inits SegmentState for each segment. */
private static BufferedUpdatesStream.SegmentState[] openSegmentStates(IndexWriter writer, List<SegmentCommitInfo> infos,
Set<SegmentCommitInfo> alreadySeenSegments, long delGen) throws IOException {
List<BufferedUpdatesStream.SegmentState> segStates = new ArrayList<>();
try {
for (SegmentCommitInfo info : infos) {
if (info.getBufferedDeletesGen() <= delGen && alreadySeenSegments.contains(info) == false) {
segStates.add(new BufferedUpdatesStream.SegmentState(writer.getPooledInstance(info, true), writer::release, info));
alreadySeenSegments.add(info);
}
}
} catch (Throwable t) {
try {
IOUtils.close(segStates);
} catch (Throwable t1) {
t.addSuppressed(t1);
}
throw t;
}
return segStates.toArray(new BufferedUpdatesStream.SegmentState[0]);
}
/** Close segment states previously opened with openSegmentStates. */
public static BufferedUpdatesStream.ApplyDeletesResult closeSegmentStates(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates, boolean success) throws IOException {
List<SegmentCommitInfo> allDeleted = null;
long totDelCount = 0;
final List<BufferedUpdatesStream.SegmentState> segmentStates = Arrays.asList(segStates);
for (BufferedUpdatesStream.SegmentState segState : segmentStates) {
if (success) {
totDelCount += segState.rld.getDelCount() - segState.startDelCount;
int fullDelCount = segState.rld.getDelCount();
assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc();
if (segState.rld.isFullyDeleted() && writer.getConfig().getMergePolicy().keepFullyDeletedSegment(() -> segState.reader) == false) {
if (allDeleted == null) {
allDeleted = new ArrayList<>();
}
allDeleted.add(segState.reader.getOriginalSegmentInfo());
}
}
}
IOUtils.close(segmentStates);
if (writer.infoStream.isEnabled("BD")) {
writer.infoStream.message("BD", "closeSegmentStates: " + totDelCount + " new deleted documents; pool " + writer.getPendingUpdatesCount()+ " packets; bytesUsed=" + writer.getReaderPoolRamBytesUsed());
}
return new BufferedUpdatesStream.ApplyDeletesResult(totDelCount > 0, allDeleted);
}
private void finishApply(IndexWriter writer, BufferedUpdatesStream.SegmentState[] segStates,
boolean success, Set<String> delFiles) throws IOException {
assert applyLock.isHeldByCurrentThread();
synchronized (writer) {
BufferedUpdatesStream.ApplyDeletesResult result;
try {
result = closeSegmentStates(writer, segStates, success);
} finally {
// Matches the incRef we did above, but we must do the decRef after closing segment states else
// IFD can't delete still-open files
writer.deleter.decRef(delFiles);
}
if (result.anyDeletes) {
writer.maybeMerge.set(true);
writer.checkpoint();
}
if (result.allDeleted != null) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "drop 100% deleted segments: " + writer.segString(result.allDeleted));
}
for (SegmentCommitInfo info : result.allDeleted) {
writer.dropDeletedSegment(info);
}
writer.checkpoint();
}
}
}
/** Applies pending delete-by-term, delete-by-query and doc values updates to all segments in the index, returning
* the number of new deleted or updated documents. */
private long apply(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
assert applyLock.isHeldByCurrentThread();
if (delGen == -1) {
// we were not yet pushed
throw new IllegalArgumentException("gen is not yet set; call BufferedUpdatesStream.push first");
}
assert applied.getCount() != 0;
if (privateSegment != null) {
assert segStates.length == 1;
assert privateSegment == segStates[0].reader.getOriginalSegmentInfo();
}
totalDelCount += applyTermDeletes(segStates);
totalDelCount += applyQueryDeletes(segStates);
totalDelCount += applyDocValuesUpdates(segStates);
return totalDelCount;
}
private long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
if (fieldUpdates.isEmpty()) {
return 0;
}
long startNS = System.nanoTime();
long updateCount = 0;
for (BufferedUpdatesStream.SegmentState segState : segStates) {
if (delGen < segState.delGen) {
// segment is newer than this deletes packet
continue;
}
if (segState.rld.refCount() == 1) {
// This means we are the only remaining reference to this segment, meaning
// it was merged away while we were running, so we can safely skip running
// because we will run on the newly merged segment next:
continue;
}
final boolean isSegmentPrivateDeletes = privateSegment != null;
if (fieldUpdates.isEmpty() == false) {
updateCount += applyDocValuesUpdates(segState, fieldUpdates, delGen, isSegmentPrivateDeletes);
}
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD",
String.format(Locale.ROOT, "applyDocValuesUpdates %.1f msec for %d segments, %d field updates; %d new updates",
(System.nanoTime()-startNS)/1000000.,
segStates.length,
fieldUpdatesCount,
updateCount));
}
return updateCount;
}
private static long applyDocValuesUpdates(BufferedUpdatesStream.SegmentState segState,
Map<String, FieldUpdatesBuffer> updates,
long delGen,
boolean segmentPrivateDeletes) throws IOException {
// TODO: we can process the updates per DV field, from last to first so that
// if multiple terms affect same document for the same field, we add an update
// only once (that of the last term). To do that, we can keep a bitset which
// marks which documents have already been updated. So e.g. if term T1
// updates doc 7, and then we process term T2 and it updates doc 7 as well,
// we don't apply the update since we know T1 came last and therefore wins
// the update.
// We can also use that bitset as 'liveDocs' to pass to TermEnum.docs(), so
// that these documents aren't even returned.
long updateCount = 0;
// We first write all our updates private, and only in the end publish to the ReadersAndUpdates */
final List<DocValuesFieldUpdates> resolvedUpdates = new ArrayList<>();
for (Map.Entry<String, FieldUpdatesBuffer> fieldUpdate : updates.entrySet()) {
String updateField = fieldUpdate.getKey();
DocValuesFieldUpdates dvUpdates = null;
FieldUpdatesBuffer value = fieldUpdate.getValue();
boolean isNumeric = value.isNumeric();
FieldUpdatesBuffer.BufferedUpdateIterator iterator = value.iterator();
FieldUpdatesBuffer.BufferedUpdate bufferedUpdate;
TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, false);
while ((bufferedUpdate = iterator.next()) != null) {
// TODO: we traverse the terms in update order (not term order) so that we
// apply the updates in the correct order, i.e. if two terms update the
// same document, the last one that came in wins, irrespective of the
// terms lexical order.
// we can apply the updates in terms order if we keep an updatesGen (and
// increment it with every update) and attach it to each NumericUpdate. Note
// that we cannot rely only on docIDUpto because an app may send two updates
// which will get same docIDUpto, yet will still need to respect the order
// those updates arrived.
// TODO: we could at least *collate* by field?
final DocIdSetIterator docIdSetIterator = termDocsIterator.nextTerm(bufferedUpdate.termField, bufferedUpdate.termValue);
if (docIdSetIterator != null) {
final int limit;
if (delGen == segState.delGen) {
assert segmentPrivateDeletes;
limit = bufferedUpdate.docUpTo;
} else {
limit = Integer.MAX_VALUE;
}
final BytesRef binaryValue;
final long longValue;
if (bufferedUpdate.hasValue == false) {
longValue = -1;
binaryValue = null;
} else {
longValue = bufferedUpdate.numericValue;
binaryValue = bufferedUpdate.binaryValue;
}
termDocsIterator.getDocs();
if (dvUpdates == null) {
if (isNumeric) {
if (value.hasSingleValue()) {
dvUpdates = new NumericDocValuesFieldUpdates
.SingleValueNumericDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc(),
value.getNumericValue(0));
} else {
dvUpdates = new NumericDocValuesFieldUpdates(delGen, updateField, value.getMinNumeric(),
value.getMaxNumeric(), segState.reader.maxDoc());
}
} else {
dvUpdates = new BinaryDocValuesFieldUpdates(delGen, updateField, segState.reader.maxDoc());
}
resolvedUpdates.add(dvUpdates);
}
final IntConsumer docIdConsumer;
final DocValuesFieldUpdates update = dvUpdates;
if (bufferedUpdate.hasValue == false) {
docIdConsumer = doc -> update.reset(doc);
} else if (isNumeric) {
docIdConsumer = doc -> update.add(doc, longValue);
} else {
docIdConsumer = doc -> update.add(doc, binaryValue);
}
final Bits acceptDocs = segState.rld.getLiveDocs();
if (segState.rld.sortMap != null && segmentPrivateDeletes) {
// This segment was sorted on flush; we must apply seg-private deletes carefully in this case:
int doc;
while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (acceptDocs == null || acceptDocs.get(doc)) {
// The limit is in the pre-sorted doc space:
if (segState.rld.sortMap.newToOld(doc) < limit) {
docIdConsumer.accept(doc);
updateCount++;
}
}
}
} else {
int doc;
while ((doc = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (doc >= limit) {
break; // no more docs that can be updated for this term
}
if (acceptDocs == null || acceptDocs.get(doc)) {
docIdConsumer.accept(doc);
updateCount++;
}
}
}
}
}
}
// now freeze & publish:
for (DocValuesFieldUpdates update : resolvedUpdates) {
if (update.any()) {
update.finish();
segState.rld.addDVUpdate(update);
}
}
return updateCount;
}
// Delete by query
private long applyQueryDeletes(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
if (deleteQueries.length == 0) {
return 0;
}
long startNS = System.nanoTime();
long delCount = 0;
for (BufferedUpdatesStream.SegmentState segState : segStates) {
if (delGen < segState.delGen) {
// segment is newer than this deletes packet
continue;
}
if (segState.rld.refCount() == 1) {
// This means we are the only remaining reference to this segment, meaning
// it was merged away while we were running, so we can safely skip running
// because we will run on the newly merged segment next:
continue;
}
final LeafReaderContext readerContext = segState.reader.getContext();
for (int i = 0; i < deleteQueries.length; i++) {
Query query = deleteQueries[i];
int limit;
if (delGen == segState.delGen) {
assert privateSegment != null;
limit = deleteQueryLimits[i];
} else {
limit = Integer.MAX_VALUE;
}
final IndexSearcher searcher = new IndexSearcher(readerContext.reader());
searcher.setQueryCache(null);
query = searcher.rewrite(query);
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1);
final Scorer scorer = weight.scorer(readerContext);
if (scorer != null) {
final DocIdSetIterator it = scorer.iterator();
if (segState.rld.sortMap != null && limit != Integer.MAX_VALUE) {
assert privateSegment != null;
// This segment was sorted on flush; we must apply seg-private deletes carefully in this case:
int docID;
while ((docID = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
// The limit is in the pre-sorted doc space:
if (segState.rld.sortMap.newToOld(docID) < limit) {
if (segState.rld.delete(docID)) {
delCount++;
}
}
}
} else {
int docID;
while ((docID = it.nextDoc()) < limit) {
if (segState.rld.delete(docID)) {
delCount++;
}
}
}
}
}
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD",
String.format(Locale.ROOT, "applyQueryDeletes took %.2f msec for %d segments and %d queries; %d new deletions",
(System.nanoTime()-startNS)/1000000.,
segStates.length,
deleteQueries.length,
delCount));
}
return delCount;
}
private long applyTermDeletes(BufferedUpdatesStream.SegmentState[] segStates) throws IOException {
if (deleteTerms.size() == 0) {
return 0;
}
// We apply segment-private deletes on flush:
assert privateSegment == null;
long startNS = System.nanoTime();
long delCount = 0;
for (BufferedUpdatesStream.SegmentState segState : segStates) {
assert segState.delGen != delGen: "segState.delGen=" + segState.delGen + " vs this.gen=" + delGen;
if (segState.delGen > delGen) {
// our deletes don't apply to this segment
continue;
}
if (segState.rld.refCount() == 1) {
// This means we are the only remaining reference to this segment, meaning
// it was merged away while we were running, so we can safely skip running
// because we will run on the newly merged segment next:
continue;
}
FieldTermIterator iter = deleteTerms.iterator();
BytesRef delTerm;
TermDocsIterator termDocsIterator = new TermDocsIterator(segState.reader, true);
while ((delTerm = iter.next()) != null) {
final DocIdSetIterator iterator = termDocsIterator.nextTerm(iter.field(), delTerm);
if (iterator != null) {
int docID;
while ((docID = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
// NOTE: there is no limit check on the docID
// when deleting by Term (unlike by Query)
// because on flush we apply all Term deletes to
// each segment. So all Term deleting here is
// against prior segments:
if (segState.rld.delete(docID)) {
delCount++;
}
}
}
}
}
if (infoStream.isEnabled("BD")) {
infoStream.message("BD",
String.format(Locale.ROOT, "applyTermDeletes took %.2f msec for %d segments and %d del terms; %d new deletions",
(System.nanoTime()-startNS)/1000000.,
segStates.length,
deleteTerms.size(),
delCount));
}
return delCount;
}
public void setDelGen(long delGen) {
assert this.delGen == -1: "delGen was already previously set to " + this.delGen;
this.delGen = delGen;
deleteTerms.setDelGen(delGen);
}
public long delGen() {
assert delGen != -1;
return delGen;
}
@Override
public String toString() {
String s = "delGen=" + delGen;
if (numTermDeletes != 0) {
s += " numDeleteTerms=" + numTermDeletes;
if (numTermDeletes != deleteTerms.size()) {
s += " (" + deleteTerms.size() + " unique)";
}
}
if (deleteQueries.length != 0) {
s += " numDeleteQueries=" + deleteQueries.length;
}
if (fieldUpdates.size() > 0) {
s += " fieldUpdates=" + fieldUpdatesCount;
}
if (bytesUsed != 0) {
s += " bytesUsed=" + bytesUsed;
}
if (privateSegment != null) {
s += " privateSegment=" + privateSegment;
}
return s;
}
boolean any() {
return deleteTerms.size() > 0 || deleteQueries.length > 0 || fieldUpdatesCount > 0 ;
}
/**
* This class helps iterating a term dictionary and consuming all the docs for each terms.
* It accepts a field, value tuple and returns a {@link DocIdSetIterator} if the field has an entry
* for the given value. It has an optimized way of iterating the term dictionary if the terms are
* passed in sorted order and makes sure terms and postings are reused as much as possible.
*/
static final class TermDocsIterator {
private final TermsProvider provider;
private String field;
private TermsEnum termsEnum;
private PostingsEnum postingsEnum;
private final boolean sortedTerms;
private BytesRef readerTerm;
private BytesRef lastTerm; // only set with asserts
@FunctionalInterface
interface TermsProvider {
Terms terms(String field) throws IOException;
}
TermDocsIterator(Fields fields, boolean sortedTerms) {
this(fields::terms, sortedTerms);
}
TermDocsIterator(LeafReader reader, boolean sortedTerms) {
this(reader::terms, sortedTerms);
}
private TermDocsIterator(TermsProvider provider, boolean sortedTerms) {
this.sortedTerms = sortedTerms;
this.provider = provider;
}
private void setField(String field) throws IOException {
if (this.field == null || this.field.equals(field) == false) {
this.field = field;
Terms terms = provider.terms(field);
if (terms != null) {
termsEnum = terms.iterator();
if (sortedTerms) {
assert (lastTerm = null) == null; // need to reset otherwise we fail the assertSorted below since we sort per field
readerTerm = termsEnum.next();
}
} else {
termsEnum = null;
}
}
}
DocIdSetIterator nextTerm(String field, BytesRef term) throws IOException {
setField(field);
if (termsEnum != null) {
if (sortedTerms) {
assert assertSorted(term);
// in the sorted case we can take advantage of the "seeking forward" property
// this allows us depending on the term dict impl to reuse data-structures internally
// which speed up iteration over terms and docs significantly.
int cmp = term.compareTo(readerTerm);
if (cmp < 0) {
return null; // requested term does not exist in this segment
} else if (cmp == 0) {
return getDocs();
} else if (cmp > 0) {
TermsEnum.SeekStatus status = termsEnum.seekCeil(term);
switch (status) {
case FOUND:
return getDocs();
case NOT_FOUND:
readerTerm = termsEnum.term();
return null;
case END:
// no more terms in this segment
termsEnum = null;
return null;
default:
throw new AssertionError("unknown status");
}
}
} else if (termsEnum.seekExact(term)) {
return getDocs();
}
}
return null;
}
private boolean assertSorted(BytesRef term) {
assert sortedTerms;
assert lastTerm == null || term.compareTo(lastTerm) >= 0 : "boom: " + term.utf8ToString() + " last: " + lastTerm.utf8ToString();
lastTerm = BytesRef.deepCopyOf(term);
return true;
}
private DocIdSetIterator getDocs() throws IOException {
assert termsEnum != null;
return postingsEnum = termsEnum.postings(postingsEnum, PostingsEnum.NONE);
}
}
}