forked from ironsweet/golucene
-
Notifications
You must be signed in to change notification settings - Fork 0
/
deleteStream.go
457 lines (395 loc) · 12.7 KB
/
deleteStream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
package index
import (
"errors"
"fmt"
. "github.com/gzg1984/golucene/core/codec/spi"
"github.com/gzg1984/golucene/core/store"
"github.com/gzg1984/golucene/core/util"
"log"
"math"
"sort"
"sync"
"sync/atomic"
"time"
)
// index/BufferedUpdatesStream.java
type ApplyDeletesResult struct {
// True if any actual deletes took place:
anyDeletes bool
// Curreng gen, for the merged segment:
gen int64
// If non-nil, contains segments that are 100% deleted
allDeleted []*SegmentCommitInfo
}
type SegInfoByDelGen []*SegmentCommitInfo
func (a SegInfoByDelGen) Len() int { return len(a) }
func (a SegInfoByDelGen) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a SegInfoByDelGen) Less(i, j int) bool { return a[i].BufferedUpdatesGen < a[j].BufferedUpdatesGen }
type Query interface{}
type QueryAndLimit struct {
}
// index/CoalescedUpdates.java
type CoalescedUpdates struct {
_queries map[Query]int
numericDVUpdates []*DocValuesUpdate
binaryDVUpdates []*DocValuesUpdate
}
func newCoalescedUpdates() *CoalescedUpdates {
return &CoalescedUpdates{
_queries: make(map[Query]int),
}
}
func (cd *CoalescedUpdates) String() string {
panic("not implemented yet")
}
func (cd *CoalescedUpdates) update(in *FrozenBufferedUpdates) {
panic("not implemented yet")
}
func (cd *CoalescedUpdates) terms() []*Term {
panic("not implemented yet")
}
func (cd *CoalescedUpdates) queries() []*QueryAndLimit {
panic("not implemented yet")
}
/*
Tracks the stream of BufferedUpdates. When DocumentsWriterPerThread
flushes, its buffered deletes and updates are appended to this stream.
We later apply them (resolve them to the actual docIDs, per segment)
when a merge is started (only to the to-be-merged segments). We also
apply to all segments when NRT reader is pulled, commit/close is
called, or when too many deletes or updates are buffered and must be
flushed (by RAM usage or by count).
Each packet is assigned a generation, and each flushed or merged
segment is also assigned a generation, so we can track when
BufferedUpdates packets to apply to any given segment.
*/
type BufferedUpdatesStream struct {
sync.Locker
// TODO: maybe linked list?
updates []*FrozenBufferedUpdates
// Starts at 1 so that SegmentInfos that have never had deletes
// applied (whose bufferedDelGen defaults to 0) will be correct:
nextGen int64
// used only by assert
lastDeleteTerm *Term
infoStream util.InfoStream
bytesUsed int64 // atomic
numTerms int32 // atomic
}
func newBufferedUpdatesStream(infoStream util.InfoStream) *BufferedUpdatesStream {
return &BufferedUpdatesStream{
Locker: &sync.Mutex{},
updates: make([]*FrozenBufferedUpdates, 0),
nextGen: 1,
infoStream: infoStream,
}
}
/* Appends a new packet of buffered deletes to the stream, setting its generation: */
func (s *BufferedUpdatesStream) push(packet *FrozenBufferedUpdates) int64 {
panic("not implemented yet")
}
func (ds *BufferedUpdatesStream) clear() {
ds.Lock()
defer ds.Unlock()
ds.updates = nil
ds.nextGen = 1
atomic.StoreInt32(&ds.numTerms, 0)
atomic.StoreInt64(&ds.bytesUsed, 0)
}
func (ds *BufferedUpdatesStream) any() bool {
return atomic.LoadInt64(&ds.bytesUsed) != 0
}
func (ds *BufferedUpdatesStream) RamBytesUsed() int64 {
return atomic.LoadInt64(&ds.bytesUsed)
}
/*
Resolves the buffered deleted Term/Query/docIDs, into actual deleted
docIDs in the liveDocs MutableBits for each SegmentReader.
*/
func (ds *BufferedUpdatesStream) applyDeletesAndUpdates(readerPool *ReaderPool, infos []*SegmentCommitInfo) (*ApplyDeletesResult, error) {
ds.Lock()
defer ds.Unlock()
if len(infos) == 0 {
ds.nextGen++
return &ApplyDeletesResult{false, ds.nextGen - 1, nil}, nil
}
t0 := time.Now()
ds.assertDeleteStats()
if !ds.any() {
if ds.infoStream.IsEnabled("BD") {
ds.infoStream.Message("BD", "applyDeletes: no deletes; skipping")
}
ds.nextGen++
return &ApplyDeletesResult{false, ds.nextGen - 1, nil}, nil
}
if ds.infoStream.IsEnabled("BD") {
ds.infoStream.Message("BD", "applyDeletes: infos=%v packetCount=%v", infos, len(ds.updates))
}
gen := ds.nextGen
ds.nextGen++
infos2 := make([]*SegmentCommitInfo, len(infos))
copy(infos2, infos)
sort.Sort(SegInfoByDelGen(infos2))
var coalescedUpdates *CoalescedUpdates
var anyNewDeletes bool
infosIDX := len(infos2) - 1
delIDX := len(ds.updates) - 1
var allDeleted []*SegmentCommitInfo
for infosIDX >= 0 {
log.Printf("BD: cycle delIDX=%v infoIDX=%v", delIDX, infosIDX)
var packet *FrozenBufferedUpdates
if delIDX >= 0 {
packet = ds.updates[delIDX]
}
info := infos2[infosIDX]
segGen := info.BufferedUpdatesGen
if packet != nil && segGen < packet.gen {
log.Println(" coalesce")
if coalescedUpdates == nil {
coalescedUpdates = newCoalescedUpdates()
}
if !packet.isSegmentPrivate {
// Only coalesce if we are NOT on a segment private del
// packet: the segment private del packet must only be
// applied to segments with the same delGen. yet, if a
// segment is already deleted from the SI since it had no
// more documents remaining after some del packets younger
// than its segPrivate packet (higher delGen) have been
// applied, the segPrivate packet has not been removed.
coalescedUpdates.update(packet)
}
delIDX--
} else if packet != nil && segGen == packet.gen {
assertn(packet.isSegmentPrivate,
"Packet and Segments deletegen can only match on a segment private del packet gen=%v",
segGen)
log.Println(" eq")
// Lockorder: IW -> BD -> RP
assert(readerPool.infoIsLive(info))
rld := readerPool.get(info, true)
reader, err := rld.reader(store.IO_CONTEXT_READ)
if err != nil {
return nil, err
}
delCount, segAllDeletes, err := func() (delCount int64, segAllDeletes bool, err error) {
defer func() {
err = mergeError(err, rld.release(reader))
err = mergeError(err, readerPool.release(rld))
}()
dvUpdates := newDocValuesFieldUpdatesContainer()
if coalescedUpdates != nil {
fmt.Println(" del coalesced")
var delta int64
delta, err = ds._applyTermDeletes(coalescedUpdates.terms(), rld, reader)
if err == nil {
delCount += delta
delta, err = applyQueryDeletes(coalescedUpdates.queries(), rld, reader)
if err == nil {
delCount += delta
err = ds.applyDocValuesUpdates(coalescedUpdates.numericDVUpdates, rld, reader, dvUpdates)
if err == nil {
err = ds.applyDocValuesUpdates(coalescedUpdates.binaryDVUpdates, rld, reader, dvUpdates)
}
}
}
if err != nil {
return
}
}
fmt.Println(" del exact")
// Don't delete by Term here; DWPT already did that on flush:
var delta int64
delta, err = applyQueryDeletes(packet.queries(), rld, reader)
if err == nil {
delCount += delta
err = ds.applyDocValuesUpdates(packet.numericDVUpdates, rld, reader, dvUpdates)
if err == nil {
err = ds.applyDocValuesUpdates(packet.binaryDVUpdates, rld, reader, dvUpdates)
if err == nil && dvUpdates.any() {
err = rld.writeFieldUpdates(info.Info.Dir, dvUpdates)
}
}
}
if err != nil {
return
}
fullDelCount := rld.info.DelCount() + rld.pendingDeleteCount()
infoDocCount := rld.info.Info.DocCount()
assert(fullDelCount <= infoDocCount)
return delCount, fullDelCount == infoDocCount, nil
}()
if err != nil {
return nil, err
}
anyNewDeletes = anyNewDeletes || (delCount > 0)
if segAllDeletes {
allDeleted = append(allDeleted, info)
}
if ds.infoStream.IsEnabled("BD") {
var suffix string
if segAllDeletes {
suffix = " 100%% deleted"
}
ds.infoStream.Message("BD", "Seg=%v segGen=%v segDeletes=[%v]; coalesced deletes=[%v] newDelCount=%v%v",
info, segGen, packet, coalescedUpdates, delCount, suffix)
}
if coalescedUpdates == nil {
coalescedUpdates = newCoalescedUpdates()
}
// Since we are on a segment private del packet we must not
// update the CoalescedUpdates here! We can simply advance to
// the next packet and seginfo.
delIDX--
infosIDX--
info.SetBufferedUpdatesGen(gen)
} else {
log.Println(" gt")
if coalescedUpdates != nil {
// Lock order: IW -> BD -> RP
assert(readerPool.infoIsLive(info))
rld := readerPool.get(info, true)
reader, err := rld.reader(store.IO_CONTEXT_READ)
if err != nil {
return nil, err
}
delCount, segAllDeletes, err := func() (delCount int64, segAllDeletes bool, err error) {
defer func() {
err = mergeError(err, rld.release(reader))
err = mergeError(err, readerPool.release(rld))
}()
var delta int64
delta, err = ds._applyTermDeletes(coalescedUpdates.terms(), rld, reader)
if err == nil {
delCount += delta
delta, err = applyQueryDeletes(coalescedUpdates.queries(), rld, reader)
if err == nil {
delCount += delta
dvUpdates := newDocValuesFieldUpdatesContainer()
err = ds.applyDocValuesUpdates(coalescedUpdates.numericDVUpdates, rld, reader, dvUpdates)
if err == nil {
err = ds.applyDocValuesUpdates(coalescedUpdates.binaryDVUpdates, rld, reader, dvUpdates)
if err == nil && dvUpdates.any() {
err = rld.writeFieldUpdates(info.Info.Dir, dvUpdates)
}
}
}
}
if err != nil {
return
}
fullDelCount := rld.info.DelCount() + rld.pendingDeleteCount()
infoDocCount := rld.info.Info.DocCount()
assert(fullDelCount <= infoDocCount)
return delCount, fullDelCount == infoDocCount, nil
}()
if err != nil {
return nil, err
}
anyNewDeletes = anyNewDeletes || (delCount > 0)
if segAllDeletes {
allDeleted = append(allDeleted, info)
}
if ds.infoStream.IsEnabled("BD") {
var suffix string
if segAllDeletes {
suffix = " 100%% deleted"
}
ds.infoStream.Message("BD", "Seg=%v segGen=%v coalesced deletes=[%v] newDelCount=%v%v",
info, segGen, coalescedUpdates, delCount, suffix)
}
}
info.SetBufferedUpdatesGen(gen)
infosIDX--
}
}
ds.assertDeleteStats()
if ds.infoStream.IsEnabled("BD") {
ds.infoStream.Message("BD", "applyDeletes took %v", time.Now().Sub(t0))
}
return &ApplyDeletesResult{anyNewDeletes, gen, allDeleted}, nil
}
func mergeError(err, err2 error) error {
if err == nil {
return err2
} else {
return errors.New(fmt.Sprintf("%v\n %v", err, err2))
}
}
// Lock order IW -> BD
/*
Removes any BufferedUpdates that we no longer need to store because
all segments in the index have had the deletes applied.
*/
func (ds *BufferedUpdatesStream) prune(infos *SegmentInfos) {
ds.assertDeleteStats()
var minGen int64 = math.MaxInt64
for _, info := range infos.Segments {
if info.BufferedUpdatesGen < minGen {
minGen = info.BufferedUpdatesGen
}
}
if ds.infoStream.IsEnabled("BD") {
var dir store.Directory
if len(infos.Segments) > 0 {
dir = infos.Segments[0].Info.Dir
}
ds.infoStream.Message("BD", "prune sis=%v minGen=%v packetCount=%v",
infos.toString(dir), minGen, len(ds.updates))
}
for delIDX, update := range ds.updates {
if update.gen >= minGen {
ds.pruneUpdates(delIDX)
ds.assertDeleteStats()
return
}
}
// All deletes pruned
ds.pruneUpdates(len(ds.updates))
assert(!ds.any())
ds.assertDeleteStats()
}
func (ds *BufferedUpdatesStream) pruneUpdates(count int) {
if count > 0 {
if ds.infoStream.IsEnabled("BD") {
ds.infoStream.Message("BD", "pruneDeletes: prune %v packets; %v packets remain",
count, len(ds.updates)-count)
}
for delIDX := 0; delIDX < count; delIDX++ {
packet := ds.updates[delIDX]
n := atomic.AddInt32(&ds.numTerms, -int32(packet.numTermDeletes))
assert(n >= 0)
n2 := atomic.AddInt64(&ds.bytesUsed, -int64(packet.bytesUsed))
assert(n2 >= 0)
ds.updates[delIDX] = nil
}
ds.updates = ds.updates[count:]
}
}
/* Delete by term */
func (ds *BufferedUpdatesStream) _applyTermDeletes(terms []*Term,
rld *ReadersAndUpdates, reader *SegmentReader) (int64, error) {
panic("not implemented yet")
}
/* DocValues updates */
func (ds *BufferedUpdatesStream) applyDocValuesUpdates(updates []*DocValuesUpdate,
rld *ReadersAndUpdates, reader *SegmentReader,
dvUpdatesCntainer *DocValuesFieldUpdatesContainer) error {
panic("not implemented yet")
}
/* Delete by query */
func applyQueryDeletes(queries []*QueryAndLimit,
rld *ReadersAndUpdates, reader *SegmentReader) (int64, error) {
panic("not implemented yet")
}
func (ds *BufferedUpdatesStream) assertDeleteStats() {
var numTerms2 int
var bytesUsed2 int64
for _, packet := range ds.updates {
numTerms2 += packet.numTermDeletes
bytesUsed2 += int64(packet.bytesUsed)
}
n1 := int(atomic.LoadInt32(&ds.numTerms))
assertn(numTerms2 == n1, "numTerms2=%v vs %v", numTerms2, n1)
n2 := int64(atomic.LoadInt64(&ds.bytesUsed))
assertn(bytesUsed2 == n2, "bytesUsed2=%v vs %v", bytesUsed2, n2)
}