forked from ironsweet/golucene
-
Notifications
You must be signed in to change notification settings - Fork 0
/
flush.go
194 lines (170 loc) · 6.97 KB
/
flush.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package index
import (
"github.com/gzg1984/golucene/core/util"
"sync"
)
/*
FlushPlicy controls when segments are flushed from a RAM resident
internal data-structure to the IndexWriter's Directory.
Segments are traditionally flushed by:
1. RAM consumption - configured via IndexWriterConfig.SetRAMBufferSizeMB()
2. Number of RAM resident documents - configured via IndexWriterConfig.SetMaxBufferedDocs()
The policy also applies pending delete operations (by term and/or
query), given the threshold set in IndexcWriterConfig.SetMaxBufferedDeleteTerms().
IndexWriter consults the provided FlushPolicy to control the flushing
process. The policy is informed for each added or updated document as
well as for each delete term. Based on the FlushPolicy, the
information provided via ThreadState and DocumentsWriterFlushControl,
the FlushPolicy decides if a DocumentsWriterPerThread needs flushing
and mark it as flush-pending via DocumentsWriterFlushControl.SetFLushingPending(),
or if deletes need to be applied.
*/
type FlushPolicy interface {
// Called for each delete term. If this is a delte triggered due to
// an update the given ThreadState is non-nil.
//
// Note: this method is called synchronized on the given
// DocumentsWriterFlushControl and it is guaranteed that the
// calling goroutine holds the lock on the given ThreadState
onDelete(*DocumentsWriterFlushControl, *ThreadState)
// Called for each document update on the given ThreadState's DWPT
//
// Note: this method is called synchronized on the given DWFC and
// it is guaranteed that the calling thread holds the lock on the
// given ThreadState
onUpdate(*DocumentsWriterFlushControl, *ThreadState)
// Called for each document addition on the given ThreadState's DWPT.
//
// Note: this method is synchronized by the given DWFC and it is
// guaranteed that the calling thread holds the lock on the given
// ThreadState
onInsert(*DocumentsWriterFlushControl, *ThreadState)
// Called by DocumentsWriter to initialize the FlushPolicy
init(indexWriterConfig LiveIndexWriterConfig)
}
type FlushPolicyImplSPI interface {
onInsert(*DocumentsWriterFlushControl, *ThreadState)
onDelete(*DocumentsWriterFlushControl, *ThreadState)
}
type FlushPolicyImpl struct {
sync.Locker
spi FlushPolicyImplSPI
indexWriterConfig LiveIndexWriterConfig
infoStream util.InfoStream
}
func newFlushPolicyImpl(spi FlushPolicyImplSPI) *FlushPolicyImpl {
return &FlushPolicyImpl{
Locker: &sync.Mutex{},
spi: spi,
}
}
func (fp *FlushPolicyImpl) onUpdate(control *DocumentsWriterFlushControl, state *ThreadState) {
fp.spi.onInsert(control, state)
fp.spi.onDelete(control, state)
}
func (fp *FlushPolicyImpl) init(indexWriterConfig LiveIndexWriterConfig) {
fp.Lock() // synchronized
defer fp.Unlock()
fp.indexWriterConfig = indexWriterConfig
fp.infoStream = indexWriterConfig.InfoStream()
}
/*
Returns the current most RAM consuming non-pending ThreadState with
at least one indexed document.
This method will never return nil
*/
func (p *FlushPolicyImpl) findLargestNonPendingWriter(control *DocumentsWriterFlushControl,
perThreadState *ThreadState) *ThreadState {
assert(perThreadState.dwpt.numDocsInRAM > 0)
maxRamSoFar := perThreadState.bytesUsed
// the dwpt which needs to be flushed eventually
maxRamUsingThreadState := perThreadState
assert2(!perThreadState.flushPending, "DWPT should have flushed")
count := 0
control.perThreadPool.foreach(func(next *ThreadState) {
if !next.flushPending {
if nextRam := next.bytesUsed; nextRam > 0 && next.dwpt.numDocsInRAM > 0 {
if p.infoStream.IsEnabled("FP") {
p.infoStream.Message("FP", "thread state has %v bytes; docInRAM=%v",
nextRam, next.dwpt.numDocsInRAM)
}
count++
if nextRam > maxRamSoFar {
maxRamSoFar = nextRam
maxRamUsingThreadState = next
}
}
}
})
if p.infoStream.IsEnabled("FP") {
p.infoStream.Message("FP", "%v in-use non-flusing threads states", count)
}
p.message("set largest ram consuming thread pending on lower watermark")
return maxRamUsingThreadState
}
func (p *FlushPolicyImpl) message(s string) {
if p.infoStream.IsEnabled("FP") {
p.infoStream.Message("FP", s)
}
}
// index/FlushByRamOrCountsPolicy.java
/*
Default FlushPolicy implementation that flushes new segments based on
RAM used and document count depending on the IndexWriter's
IndexWriterConfig. It also applies pending deletes based on the
number of buffered delete terms.
1. onDelete() - applies pending delete operations based on the global
number of buffered delete terms iff MaxBufferedDeleteTerms() is
enabled
2. onInsert() - flushes either on the number of documents per
DocumentsWriterPerThread (NumDocsInRAM()) or on the global active
memory consumption in the current indexing session iff
MaxBufferedDocs() or RAMBufferSizeMB() is enabled respectively
3. onUpdate() - calls onInsert() and onDelete() in order
All IndexWriterConfig settings are used to mark DocumentsWriterPerThread
as flush pending during indexing with respect to their live updates.
If SetRAMBufferSizeMB() is enabled, the largest ram consuming
DocumentsWriterPerThread will be marked as pending iff the global
active RAM consumption is >= the configured max RAM buffer.
*/
type FlushByRamOrCountsPolicy struct {
*FlushPolicyImpl
}
func newFlushByRamOrCountsPolicy() *FlushByRamOrCountsPolicy {
ans := new(FlushByRamOrCountsPolicy)
ans.FlushPolicyImpl = newFlushPolicyImpl(ans)
return ans
}
func (p *FlushByRamOrCountsPolicy) onDelete(control *DocumentsWriterFlushControl, state *ThreadState) {
panic("not implemented yet")
}
func (p *FlushByRamOrCountsPolicy) onInsert(control *DocumentsWriterFlushControl, state *ThreadState) {
if p.flushOnDocCount() && state.dwpt.numDocsInRAM >= p.indexWriterConfig.MaxBufferedDocs() {
// flush this state by num docs
control.setFlushPending(state)
} else if p.flushOnRAM() { // flush by RAM
limit := int64(p.indexWriterConfig.RAMBufferSizeMB() * 1024 * 1024)
totalRam := control._activeBytes + control.deleteBytesUsed() // safe w/o sync
if totalRam >= limit {
if p.infoStream.IsEnabled("FP") {
p.infoStream.Message("FP",
"trigger flush: activeBytes=%v deleteBytes=%v vs limit=%v",
control._activeBytes, control.deleteBytesUsed(), limit)
}
p.markLargestWriterPending(control, state, totalRam)
}
}
}
/* Marks the mos tram consuming active DWPT flush pending */
func (p *FlushByRamOrCountsPolicy) markLargestWriterPending(control *DocumentsWriterFlushControl,
perThreadState *ThreadState, currentBytesPerThread int64) {
control.setFlushPending(p.findLargestNonPendingWriter(control, perThreadState))
}
/* Returns true if this FLushPolicy flushes on IndexWriterConfig.MaxBufferedDocs(), otherwise false */
func (p *FlushByRamOrCountsPolicy) flushOnDocCount() bool {
return p.indexWriterConfig.MaxBufferedDocs() != DISABLE_AUTO_FLUSH
}
/* Returns true if this FlushPolicy flushes on IndexWriterConfig.RAMBufferSizeMB(), otherwise false */
func (p *FlushByRamOrCountsPolicy) flushOnRAM() bool {
return p.indexWriterConfig.RAMBufferSizeMB() != DISABLE_AUTO_FLUSH
}