-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
block_stream_merger.go
288 lines (247 loc) · 7.78 KB
/
block_stream_merger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
package logstorage
import (
"container/heap"
"fmt"
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// mustMergeBlockStreams merges bsrs to bsw and updates ph accordingly.
//
// Finalize() is guaranteed to be called on bsrs and bsw before returning from the func.
func mustMergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}) {
bsm := getBlockStreamMerger()
bsm.mustInit(bsw, bsrs)
for len(bsm.readersHeap) > 0 {
if needStop(stopCh) {
break
}
bsr := bsm.readersHeap[0]
bsm.mustWriteBlock(&bsr.blockData, bsw)
if bsr.NextBlock() {
heap.Fix(&bsm.readersHeap, 0)
} else {
heap.Pop(&bsm.readersHeap)
}
}
bsm.mustFlushRows()
putBlockStreamMerger(bsm)
bsw.Finalize(ph)
mustCloseBlockStreamReaders(bsrs)
}
// blockStreamMerger merges block streams
type blockStreamMerger struct {
// bsw is the block stream writer to write the merged blocks.
bsw *blockStreamWriter
// bsrs contains the original readers passed to mustInit().
// They are used by ReadersPaths()
bsrs []*blockStreamReader
// readersHeap contains a heap of readers to read blocks to merge.
readersHeap blockStreamReadersHeap
// streamID is the stream ID for the pending data.
streamID streamID
// sbu is the unmarshaler for strings in rows and rowsTmp.
sbu *stringsBlockUnmarshaler
// vd is the decoder for unmarshaled strings.
vd *valuesDecoder
// bd is the pending blockData.
// bd is unpacked into rows when needed.
bd blockData
// rows is pending log entries.
rows rows
// rowsTmp is temporary storage for log entries during merge.
rowsTmp rows
// uncompressedRowsSizeBytes is the current size of uncompressed rows.
//
// It is used for flushing rows to blocks when their size reaches maxUncompressedBlockSize
uncompressedRowsSizeBytes uint64
}
func (bsm *blockStreamMerger) reset() {
bsm.bsw = nil
rhs := bsm.readersHeap
for i := range rhs {
rhs[i] = nil
}
bsm.readersHeap = rhs[:0]
bsm.streamID.reset()
bsm.resetRows()
}
func (bsm *blockStreamMerger) resetRows() {
if bsm.sbu != nil {
putStringsBlockUnmarshaler(bsm.sbu)
bsm.sbu = nil
}
if bsm.vd != nil {
putValuesDecoder(bsm.vd)
bsm.vd = nil
}
bsm.bd.reset()
bsm.rows.reset()
bsm.rowsTmp.reset()
bsm.uncompressedRowsSizeBytes = 0
}
func (bsm *blockStreamMerger) mustInit(bsw *blockStreamWriter, bsrs []*blockStreamReader) {
bsm.reset()
bsm.bsw = bsw
bsm.bsrs = bsrs
rsh := bsm.readersHeap[:0]
for _, bsr := range bsrs {
if bsr.NextBlock() {
rsh = append(rsh, bsr)
}
}
bsm.readersHeap = rsh
heap.Init(&bsm.readersHeap)
}
// mustWriteBlock writes bd to bsm
func (bsm *blockStreamMerger) mustWriteBlock(bd *blockData, bsw *blockStreamWriter) {
bsm.checkNextBlock(bd)
switch {
case !bd.streamID.equal(&bsm.streamID):
// The bd contains another streamID.
// Write the current log entries under the current streamID, then process the bd.
bsm.mustFlushRows()
bsm.streamID = bd.streamID
if bd.uncompressedSizeBytes >= maxUncompressedBlockSize {
// Fast path - write full bd to the output without extracting log entries from it.
bsw.MustWriteBlockData(bd)
} else {
// Slow path - copy the bd to the curr bd.
bsm.bd.copyFrom(bd)
}
case bd.uncompressedSizeBytes >= maxUncompressedBlockSize:
// The bd contains the same streamID and it is full,
// so it can be written next after the current log entries
// without the need to merge the bd with the current log entries.
// Write the current log entries and then the bd.
bsm.mustFlushRows()
bsw.MustWriteBlockData(bd)
default:
// The bd contains the same streamID and it isn't full,
// so it must be merged with the current log entries.
bsm.mustMergeRows(bd)
}
}
// checkNextBlock checks whether the bd can be written next after the current data.
func (bsm *blockStreamMerger) checkNextBlock(bd *blockData) {
if len(bsm.rows.timestamps) > 0 && bsm.bd.rowsCount > 0 {
logger.Panicf("BUG: bsm.bd must be empty when bsm.rows isn't empty! got %d log entries in bsm.bd", bsm.bd.rowsCount)
}
if bd.streamID.less(&bsm.streamID) {
logger.Panicf("FATAL: cannot merge %s: the streamID=%s for the next block is smaller than the streamID=%s for the current block",
bsm.ReadersPaths(), &bd.streamID, &bsm.streamID)
}
if !bd.streamID.equal(&bsm.streamID) {
return
}
// streamID at bd equals streamID at bsm. Check that minTimestamp in bd is bigger or equal to the minTimestmap at bsm.
if bd.rowsCount == 0 {
return
}
nextMinTimestamp := bd.timestampsData.minTimestamp
if len(bsm.rows.timestamps) == 0 {
if bsm.bd.rowsCount == 0 {
return
}
minTimestamp := bsm.bd.timestampsData.minTimestamp
if nextMinTimestamp < minTimestamp {
logger.Panicf("FATAL: cannot merge %s: the next block's minTimestamp=%d is smaller than the minTimestamp=%d for the current block",
bsm.ReadersPaths(), nextMinTimestamp, minTimestamp)
}
return
}
minTimestamp := bsm.rows.timestamps[0]
if nextMinTimestamp < minTimestamp {
logger.Panicf("FATAL: cannot merge %s: the next block's minTimestamp=%d is smaller than the minTimestamp=%d for log entries for the current block",
bsm.ReadersPaths(), nextMinTimestamp, minTimestamp)
}
}
// ReadersPaths returns paths for input blockStreamReaders
func (bsm *blockStreamMerger) ReadersPaths() string {
paths := make([]string, len(bsm.bsrs))
for i, bsr := range bsm.bsrs {
paths[i] = bsr.Path()
}
return fmt.Sprintf("[%s]", strings.Join(paths, ","))
}
// mustMergeRows merges the current log entries inside bsm with bd log entries.
func (bsm *blockStreamMerger) mustMergeRows(bd *blockData) {
if bsm.bd.rowsCount > 0 {
// Unmarshal log entries from bsm.bd
bsm.mustUnmarshalRows(&bsm.bd)
bsm.bd.reset()
}
// Unmarshal log entries from bd
rowsLen := len(bsm.rows.timestamps)
bsm.mustUnmarshalRows(bd)
// Merge unmarshaled log entries
timestamps := bsm.rows.timestamps
rows := bsm.rows.rows
bsm.rowsTmp.mergeRows(timestamps[:rowsLen], timestamps[rowsLen:], rows[:rowsLen], rows[rowsLen:])
bsm.rows, bsm.rowsTmp = bsm.rowsTmp, bsm.rows
bsm.rowsTmp.reset()
if bsm.uncompressedRowsSizeBytes >= maxUncompressedBlockSize {
bsm.mustFlushRows()
}
}
func (bsm *blockStreamMerger) mustUnmarshalRows(bd *blockData) {
rowsLen := len(bsm.rows.timestamps)
if bsm.sbu == nil {
bsm.sbu = getStringsBlockUnmarshaler()
}
if bsm.vd == nil {
bsm.vd = getValuesDecoder()
}
if err := bd.unmarshalRows(&bsm.rows, bsm.sbu, bsm.vd); err != nil {
logger.Panicf("FATAL: cannot merge %s: cannot unmarshal log entries from blockData: %s", bsm.ReadersPaths(), err)
}
bsm.uncompressedRowsSizeBytes += uncompressedRowsSizeBytes(bsm.rows.rows[rowsLen:])
}
func (bsm *blockStreamMerger) mustFlushRows() {
if len(bsm.rows.timestamps) == 0 {
bsm.bsw.MustWriteBlockData(&bsm.bd)
} else {
bsm.bsw.MustWriteRows(&bsm.streamID, bsm.rows.timestamps, bsm.rows.rows)
}
bsm.resetRows()
}
func getBlockStreamMerger() *blockStreamMerger {
v := blockStreamMergerPool.Get()
if v == nil {
return &blockStreamMerger{}
}
return v.(*blockStreamMerger)
}
func putBlockStreamMerger(bsm *blockStreamMerger) {
bsm.reset()
blockStreamMergerPool.Put(bsm)
}
var blockStreamMergerPool sync.Pool
type blockStreamReadersHeap []*blockStreamReader
func (h *blockStreamReadersHeap) Len() int {
return len(*h)
}
func (h *blockStreamReadersHeap) Less(i, j int) bool {
x := *h
a := &x[i].blockData
b := &x[j].blockData
if !a.streamID.equal(&b.streamID) {
return a.streamID.less(&b.streamID)
}
return a.timestampsData.minTimestamp < b.timestampsData.minTimestamp
}
func (h *blockStreamReadersHeap) Swap(i, j int) {
x := *h
x[i], x[j] = x[j], x[i]
}
func (h *blockStreamReadersHeap) Push(v interface{}) {
bsr := v.(*blockStreamReader)
*h = append(*h, bsr)
}
func (h *blockStreamReadersHeap) Pop() interface{} {
x := *h
bsr := x[len(x)-1]
x[len(x)-1] = nil
*h = x[:len(x)-1]
return bsr
}