-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
block_stream_reader.go
361 lines (295 loc) · 10.7 KB
/
block_stream_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
package storage
import (
"fmt"
"io"
"path/filepath"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// blockStreamReader represents block stream reader.
type blockStreamReader struct {
// Currently active block.
Block Block
// Filesystem path to the stream reader.
//
// Is empty for inmemory stream readers.
path string
ph partHeader
// Use io.Reader type for timestampsReader and valuesReader
// in order to remove I2I conversion in readBlock
// when passing them to fs.ReadFullData
timestampsReader io.Reader
valuesReader io.Reader
indexReader filestream.ReadCloser
mrs []metaindexRow
// Points the current mr from mrs.
mr *metaindexRow
// The total number of rows read so far.
rowsCount uint64
// The total number of blocks read so far.
blocksCount uint64
// The number of block headers in the current index block.
indexBlockHeadersCount uint32
timestampsBlockOffset uint64
valuesBlockOffset uint64
indexBlockOffset uint64
indexData []byte
compressedIndexData []byte
// Cursor to indexData.
indexCursor []byte
err error
}
func (bsr *blockStreamReader) assertWriteClosers() {
_ = bsr.timestampsReader.(filestream.ReadCloser)
_ = bsr.valuesReader.(filestream.ReadCloser)
}
func (bsr *blockStreamReader) reset() {
bsr.Block.Reset()
bsr.path = ""
bsr.ph.Reset()
bsr.timestampsReader = nil
bsr.valuesReader = nil
bsr.indexReader = nil
bsr.mrs = bsr.mrs[:0]
bsr.mr = nil
bsr.rowsCount = 0
bsr.blocksCount = 0
bsr.indexBlockHeadersCount = 0
bsr.timestampsBlockOffset = 0
bsr.valuesBlockOffset = 0
bsr.indexBlockOffset = 0
bsr.indexData = bsr.indexData[:0]
bsr.compressedIndexData = bsr.compressedIndexData[:0]
bsr.indexCursor = nil
bsr.err = nil
}
// String returns human-readable representation of bsr.
func (bsr *blockStreamReader) String() string {
if len(bsr.path) > 0 {
return bsr.path
}
return bsr.ph.String()
}
// InitFromInmemoryPart initializes bsr from the given mp.
func (bsr *blockStreamReader) InitFromInmemoryPart(mp *inmemoryPart) {
bsr.reset()
bsr.ph = mp.ph
bsr.timestampsReader = mp.timestampsData.NewReader()
bsr.valuesReader = mp.valuesData.NewReader()
bsr.indexReader = mp.indexData.NewReader()
var err error
bsr.mrs, err = unmarshalMetaindexRows(bsr.mrs[:0], mp.metaindexData.NewReader())
if err != nil {
logger.Panicf("BUG: cannot unmarshal metaindex rows from inmemoryPart: %s", err)
}
bsr.assertWriteClosers()
}
// InitFromFilePart initializes bsr from a file-based part on the given path.
//
// Files in the part are always read without OS cache pollution,
// since they are usually deleted after the merge.
func (bsr *blockStreamReader) InitFromFilePart(path string) error {
bsr.reset()
path = filepath.Clean(path)
if err := bsr.ph.ParseFromPath(path); err != nil {
return fmt.Errorf("cannot parse path to part: %s", err)
}
timestampsPath := path + "/timestamps.bin"
timestampsFile, err := filestream.Open(timestampsPath, true)
if err != nil {
return fmt.Errorf("cannot open timestamps file in stream mode: %s", err)
}
valuesPath := path + "/values.bin"
valuesFile, err := filestream.Open(valuesPath, true)
if err != nil {
timestampsFile.MustClose()
return fmt.Errorf("cannot open values file in stream mode: %s", err)
}
indexPath := path + "/index.bin"
indexFile, err := filestream.Open(indexPath, true)
if err != nil {
timestampsFile.MustClose()
valuesFile.MustClose()
return fmt.Errorf("cannot open index file in stream mode: %s", err)
}
metaindexPath := path + "/metaindex.bin"
metaindexFile, err := filestream.Open(metaindexPath, true)
if err != nil {
timestampsFile.MustClose()
valuesFile.MustClose()
indexFile.MustClose()
return fmt.Errorf("cannot open metaindex file in stream mode: %s", err)
}
mrs, err := unmarshalMetaindexRows(bsr.mrs[:0], metaindexFile)
metaindexFile.MustClose()
if err != nil {
timestampsFile.MustClose()
valuesFile.MustClose()
indexFile.MustClose()
return fmt.Errorf("cannot unmarshal metaindex rows from inmemoryPart: %s", err)
}
bsr.path = path
bsr.timestampsReader = timestampsFile
bsr.valuesReader = valuesFile
bsr.indexReader = indexFile
bsr.mrs = mrs
bsr.assertWriteClosers()
return nil
}
// MustClose closes the bsr.
//
// It closes *Reader files passed to Init.
func (bsr *blockStreamReader) MustClose() {
bsr.timestampsReader.(filestream.ReadCloser).MustClose()
bsr.valuesReader.(filestream.ReadCloser).MustClose()
bsr.indexReader.MustClose()
bsr.reset()
}
// Error returns the last error.
func (bsr *blockStreamReader) Error() error {
if bsr.err == nil || bsr.err == io.EOF {
return nil
}
return fmt.Errorf("error when reading part %q: %s", bsr, bsr.err)
}
// NextBlock advances bsr to the next block.
func (bsr *blockStreamReader) NextBlock() bool {
if bsr.err != nil {
return false
}
bsr.Block.Reset()
err := bsr.readBlock()
if err == nil {
if bsr.Block.bh.RowsCount > 0 {
return true
}
bsr.err = fmt.Errorf("invalid block read with zero rows; block=%+v", &bsr.Block)
return false
}
if err == io.EOF {
bsr.err = io.EOF
return false
}
bsr.err = fmt.Errorf("cannot read next block: %s", err)
return false
}
func (bsr *blockStreamReader) readBlock() error {
if len(bsr.indexCursor) == 0 {
if bsr.mr != nil && bsr.indexBlockHeadersCount != bsr.mr.BlockHeadersCount {
return fmt.Errorf("invalid number of block headers in the previous index block at offset %d; got %d; want %d",
bsr.prevIndexBlockOffset(), bsr.indexBlockHeadersCount, bsr.mr.BlockHeadersCount)
}
bsr.indexBlockHeadersCount = 0
if err := bsr.readIndexBlock(); err != nil {
if err == io.EOF {
return io.EOF
}
return fmt.Errorf("cannot read index block from index data: %s", err)
}
}
// Read block header.
if len(bsr.indexCursor) < marshaledBlockHeaderSize {
return fmt.Errorf("too short index data for reading block header at offset %d; got %d bytes; want %d bytes",
bsr.prevIndexBlockOffset(), len(bsr.indexCursor), marshaledBlockHeaderSize)
}
bsr.Block.headerData = append(bsr.Block.headerData[:0], bsr.indexCursor[:marshaledBlockHeaderSize]...)
bsr.indexCursor = bsr.indexCursor[marshaledBlockHeaderSize:]
tail, err := bsr.Block.bh.Unmarshal(bsr.Block.headerData)
if err != nil {
return fmt.Errorf("cannot parse block header read from index data at offset %d: %s", bsr.prevIndexBlockOffset(), err)
}
if len(tail) > 0 {
return fmt.Errorf("non-empty tail left after parsing block header at offset %d: %x", bsr.prevIndexBlockOffset(), tail)
}
bsr.blocksCount++
if bsr.blocksCount > bsr.ph.BlocksCount {
return fmt.Errorf("too many blocks found in the block stream; got %d; cannot be bigger than %d", bsr.blocksCount, bsr.ph.BlocksCount)
}
// Validate block header.
bsr.rowsCount += uint64(bsr.Block.bh.RowsCount)
if bsr.rowsCount > bsr.ph.RowsCount {
return fmt.Errorf("too many rows found in the block stream; got %d; cannot be bigger than %d", bsr.rowsCount, bsr.ph.RowsCount)
}
if bsr.Block.bh.MinTimestamp < bsr.ph.MinTimestamp {
return fmt.Errorf("invalid MinTimestamp at block header at offset %d; got %d; cannot be smaller than %d",
bsr.prevIndexBlockOffset(), bsr.Block.bh.MinTimestamp, bsr.ph.MinTimestamp)
}
if bsr.Block.bh.MaxTimestamp > bsr.ph.MaxTimestamp {
return fmt.Errorf("invalid MaxTimestamp at block header at offset %d; got %d; cannot be bigger than %d",
bsr.prevIndexBlockOffset(), bsr.Block.bh.MaxTimestamp, bsr.ph.MaxTimestamp)
}
if bsr.Block.bh.TimestampsBlockOffset != bsr.timestampsBlockOffset {
return fmt.Errorf("invalid TimestampsBlockOffset at block header at offset %d; got %d; want %d",
bsr.prevIndexBlockOffset(), bsr.Block.bh.TimestampsBlockOffset, bsr.timestampsBlockOffset)
}
if bsr.Block.bh.ValuesBlockOffset != bsr.valuesBlockOffset {
return fmt.Errorf("invalid ValuesBlockOffset at block header at offset %d; got %d; want %d",
bsr.prevIndexBlockOffset(), bsr.Block.bh.ValuesBlockOffset, bsr.valuesBlockOffset)
}
// Read timestamps data.
bsr.Block.timestampsData = bytesutil.Resize(bsr.Block.timestampsData, int(bsr.Block.bh.TimestampsBlockSize))
if err := fs.ReadFullData(bsr.timestampsReader, bsr.Block.timestampsData); err != nil {
return fmt.Errorf("cannot read timestamps block at offset %d: %s", bsr.timestampsBlockOffset, err)
}
// Read values data.
bsr.Block.valuesData = bytesutil.Resize(bsr.Block.valuesData, int(bsr.Block.bh.ValuesBlockSize))
if err := fs.ReadFullData(bsr.valuesReader, bsr.Block.valuesData); err != nil {
return fmt.Errorf("cannot read values block at offset %d: %s", bsr.valuesBlockOffset, err)
}
// Update offsets.
bsr.timestampsBlockOffset += uint64(bsr.Block.bh.TimestampsBlockSize)
bsr.valuesBlockOffset += uint64(bsr.Block.bh.ValuesBlockSize)
bsr.indexBlockHeadersCount++
return nil
}
func (bsr *blockStreamReader) readIndexBlock() error {
// Go to the next metaindex row.
if len(bsr.mrs) == 0 {
return io.EOF
}
bsr.mr = &bsr.mrs[0]
bsr.mrs = bsr.mrs[1:]
// Validate metaindex row.
if bsr.indexBlockOffset != bsr.mr.IndexBlockOffset {
return fmt.Errorf("invalid IndexBlockOffset in metaindex row; got %d; want %d", bsr.mr.IndexBlockOffset, bsr.indexBlockOffset)
}
if bsr.mr.MinTimestamp < bsr.ph.MinTimestamp {
return fmt.Errorf("invalid MinTimesamp in metaindex row; got %d; cannot be smaller than %d", bsr.mr.MinTimestamp, bsr.ph.MinTimestamp)
}
if bsr.mr.MaxTimestamp > bsr.ph.MaxTimestamp {
return fmt.Errorf("invalid MaxTimestamp in metaindex row; got %d; cannot be bigger than %d", bsr.mr.MaxTimestamp, bsr.ph.MaxTimestamp)
}
// Read index block.
bsr.compressedIndexData = bytesutil.Resize(bsr.compressedIndexData, int(bsr.mr.IndexBlockSize))
if err := fs.ReadFullData(bsr.indexReader, bsr.compressedIndexData); err != nil {
return fmt.Errorf("cannot read index block from index data at offset %d: %s", bsr.indexBlockOffset, err)
}
tmpData, err := encoding.DecompressZSTD(bsr.indexData[:0], bsr.compressedIndexData)
if err != nil {
return fmt.Errorf("cannot decompress index block read at offset %d: %s", bsr.indexBlockOffset, err)
}
bsr.indexData = tmpData
bsr.indexCursor = bsr.indexData
// Update offsets.
bsr.indexBlockOffset += uint64(bsr.mr.IndexBlockSize)
return nil
}
func (bsr *blockStreamReader) prevIndexBlockOffset() uint64 {
return bsr.indexBlockOffset - uint64(bsr.mr.IndexBlockSize)
}
func getBlockStreamReader() *blockStreamReader {
v := bsrPool.Get()
if v == nil {
return &blockStreamReader{}
}
return v.(*blockStreamReader)
}
func putBlockStreamReader(bsr *blockStreamReader) {
bsr.MustClose()
bsrPool.Put(bsr)
}
var bsrPool sync.Pool