-
Notifications
You must be signed in to change notification settings - Fork 457
/
stream_binary_reader.go
390 lines (318 loc) · 14 KB
/
stream_binary_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/block/indexheader/binary_reader.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Thanos Authors.
package indexheader
import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/runutil"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/thanos-io/objstore"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
streamencoding "github.com/grafana/mimir/pkg/storegateway/indexheader/encoding"
streamindex "github.com/grafana/mimir/pkg/storegateway/indexheader/index"
"github.com/grafana/mimir/pkg/storegateway/indexheader/indexheaderpb"
"github.com/grafana/mimir/pkg/util/spanlogger"
)
type StreamBinaryReaderMetrics struct {
decbufFactory *streamencoding.DecbufFactoryMetrics
}
func NewStreamBinaryReaderMetrics(reg prometheus.Registerer) *StreamBinaryReaderMetrics {
return &StreamBinaryReaderMetrics{
decbufFactory: streamencoding.NewDecbufFactoryMetrics(reg),
}
}
type StreamBinaryReader struct {
factory *streamencoding.DecbufFactory
toc *BinaryTOC
// Symbols struct that keeps only 1/postingOffsetsInMemSampling in the memory, then looks up the
// rest via seeking to offsets in the index-header.
symbols *streamindex.Symbols
// Cache of the label name symbol lookups,
// as there are not many and they are half of all lookups.
// For index v1 the symbol reference is the index header symbol reference, not the prometheus TSDB index symbol reference.
nameSymbols map[uint32]string
// Direct cache of values. This is much faster than an LRU cache and still provides
// a reasonable cache hit ratio.
valueSymbolsMx sync.Mutex
valueSymbols [valueSymbolsCacheSize]struct {
// index in TSDB v1 is the offset of the symbol in the index-header file.
// In TSDB v2 it is the sequence number of the symbol in the TSDB index (starting at 0).
index uint32
symbol string
}
postingsOffsetTable streamindex.PostingOffsetTable
version int
indexVersion int
}
// NewStreamBinaryReader loads or builds new index-header if not present on disk.
func NewStreamBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, metrics *StreamBinaryReaderMetrics, cfg Config) (*StreamBinaryReader, error) {
spanLog, ctx := spanlogger.NewWithLogger(ctx, logger, "indexheader.NewStreamBinaryReader")
defer spanLog.Finish()
binPath := filepath.Join(dir, id.String(), block.IndexHeaderFilename)
sparseHeadersPath := filepath.Join(dir, id.String(), block.SparseIndexHeaderFilename)
br, err := newFileStreamBinaryReader(binPath, id, sparseHeadersPath, postingOffsetsInMemSampling, spanLog, metrics, cfg)
if err == nil {
return br, nil
}
level.Debug(spanLog).Log("msg", "failed to read index-header from disk; recreating", "path", binPath, "err", err)
start := time.Now()
if err := WriteBinary(ctx, bkt, id, binPath); err != nil {
return nil, fmt.Errorf("cannot write index header: %w", err)
}
level.Debug(spanLog).Log("msg", "built index-header file", "path", binPath, "elapsed", time.Since(start))
return newFileStreamBinaryReader(binPath, id, sparseHeadersPath, postingOffsetsInMemSampling, spanLog, metrics, cfg)
}
// newFileStreamBinaryReader loads sparse index-headers from disk or constructs it from the index-header if not available.
func newFileStreamBinaryReader(binPath string, id ulid.ULID, sparseHeadersPath string, postingOffsetsInMemSampling int, logger *spanlogger.SpanLogger, metrics *StreamBinaryReaderMetrics, cfg Config) (bw *StreamBinaryReader, err error) {
r := &StreamBinaryReader{
factory: streamencoding.NewDecbufFactory(binPath, cfg.MaxIdleFileHandles, metrics.decbufFactory),
}
// Create a new raw decoding buffer with access to the entire index-header file to
// read initial version information and the table of contents.
d := r.factory.NewRawDecbuf()
defer runutil.CloseWithErrCapture(&err, &d, "new file stream binary reader")
if err = d.Err(); err != nil {
return nil, fmt.Errorf("cannot create decoding buffer: %w", err)
}
// Grab the full length of the index header before we read any of it. This is needed
// so that we can skip directly to the table of contents at the end of file.
indexHeaderSize := d.Len()
if magic := d.Be32(); magic != MagicIndex {
return nil, fmt.Errorf("invalid magic number %x", magic)
}
level.Debug(logger).Log("msg", "index header file size", "bytes", indexHeaderSize)
r.version = int(d.Byte())
r.indexVersion = int(d.Byte())
// As of now this value is also the actual end of the last posting list. In the future
// it may be some bytes after the actual end (e.g. in case Prometheus starts adding padding
// after the last posting list).
// This value used to be the offset of the postings offset table up to and including Mimir 2.7.
// After that this is the offset of the label indices table.
// So what we read here will depend on what version of Mimir created the index header file.
indexLastPostingListEndBound := d.Be64()
if err = d.Err(); err != nil {
return nil, fmt.Errorf("cannot read version and index version: %w", err)
}
if r.version != BinaryFormatV1 {
return nil, fmt.Errorf("unknown index-header file version %d", r.version)
}
r.toc, err = newBinaryTOCFromFile(d, indexHeaderSize)
if err != nil {
return nil, fmt.Errorf("cannot read table-of-contents: %w", err)
}
// Load in sparse symbols and postings offset table; from disk if this is a v2 index.
if r.indexVersion == index.FormatV2 {
sparseData, err := os.ReadFile(sparseHeadersPath)
if err != nil && !os.IsNotExist(err) {
level.Warn(logger).Log("msg", "failed to read sparse index-headers from disk; recreating", "id", id, "err", err)
}
// If sparseHeaders are not on disk, construct sparseHeaders and write to disk.
if err != nil {
if err = r.loadFromIndexHeader(logger, id, cfg, indexLastPostingListEndBound, postingOffsetsInMemSampling); err != nil {
return nil, fmt.Errorf("cannot load sparse index-header: %w", err)
}
if err := writeSparseHeadersToFile(logger, id, sparseHeadersPath, r); err != nil {
return nil, fmt.Errorf("cannot write sparse index-header to disk: %w", err)
}
level.Debug(logger).Log("msg", "built sparse index-header file", "id", id, "path", sparseHeadersPath)
} else {
// Otherwise, read persisted sparseHeaders from disk to memory.
if err = r.loadFromSparseIndexHeader(logger, id, sparseHeadersPath, sparseData, postingOffsetsInMemSampling); err != nil {
return nil, fmt.Errorf("cannot load sparse index-header from disk: %w", err)
}
}
} else {
if err = r.loadFromIndexHeader(logger, id, cfg, indexLastPostingListEndBound, postingOffsetsInMemSampling); err != nil {
return nil, fmt.Errorf("cannot load sparse index-header: %w", err)
}
}
labelNames, err := r.postingsOffsetTable.LabelNames()
if err != nil {
return nil, fmt.Errorf("cannot load label names from postings offset table: %w", err)
}
r.nameSymbols = make(map[uint32]string, len(labelNames))
if err = r.symbols.ForEachSymbol(labelNames, func(sym string, offset uint32) error {
r.nameSymbols[offset] = sym
return nil
}); err != nil {
return nil, err
}
return r, err
}
// loadFromSparseIndexHeader load from sparse index-header on disk.
func (r *StreamBinaryReader) loadFromSparseIndexHeader(logger *spanlogger.SpanLogger, id ulid.ULID, sparseHeadersPath string, sparseData []byte, postingOffsetsInMemSampling int) (err error) {
start := time.Now()
defer func() {
level.Info(logger).Log("msg", "loaded sparse index-header from disk", "id", id, "path", sparseHeadersPath, "elapsed", time.Since(start))
}()
level.Info(logger).Log("msg", "loading sparse index-header from disk", "id", id, "path", sparseHeadersPath)
sparseHeaders := &indexheaderpb.Sparse{}
gzipped := bytes.NewReader(sparseData)
gzipReader, err := gzip.NewReader(gzipped)
if err != nil {
return fmt.Errorf("failed to create sparse index-header reader: %w", err)
}
sparseData, err = io.ReadAll(gzipReader)
if err != nil {
return fmt.Errorf("failed to read sparse index-header: %w", err)
}
if err := sparseHeaders.Unmarshal(sparseData); err != nil {
return fmt.Errorf("failed to decode sparse index-header file: %w", err)
}
r.symbols, err = streamindex.NewSymbolsFromSparseHeader(r.factory, sparseHeaders.Symbols, r.indexVersion, int(r.toc.Symbols))
if err != nil {
return fmt.Errorf("cannot load symbols from sparse index-header: %w", err)
}
r.postingsOffsetTable, err = streamindex.NewPostingOffsetTableFromSparseHeader(r.factory, sparseHeaders.PostingsOffsetTable, int(r.toc.PostingsOffsetTable), postingOffsetsInMemSampling)
if err != nil {
return fmt.Errorf("cannot load postings offset table from sparse index-header: %w", err)
}
return nil
}
// loadFromIndexHeader loads in symbols and postings offset table from the index-header.
func (r *StreamBinaryReader) loadFromIndexHeader(logger *spanlogger.SpanLogger, id ulid.ULID, cfg Config, indexLastPostingListEndBound uint64, postingOffsetsInMemSampling int) (err error) {
start := time.Now()
defer func() {
level.Info(logger).Log("msg", "loaded sparse index-header from full index-header", "id", id, "elapsed", time.Since(start))
}()
level.Info(logger).Log("msg", "loading sparse index-header from full index-header", "id", id)
r.symbols, err = streamindex.NewSymbols(r.factory, r.indexVersion, int(r.toc.Symbols), cfg.VerifyOnLoad)
if err != nil {
return fmt.Errorf("cannot load symbols from full index-header: %w", err)
}
r.postingsOffsetTable, err = streamindex.NewPostingOffsetTable(r.factory, int(r.toc.PostingsOffsetTable), r.indexVersion, indexLastPostingListEndBound, postingOffsetsInMemSampling, cfg.VerifyOnLoad)
if err != nil {
return fmt.Errorf("cannot load postings offset table from full index-header: %w", err)
}
return nil
}
// writeSparseHeadersToFile uses protocol buffer to write StreamBinaryReader to disk at sparseHeadersPath.
func writeSparseHeadersToFile(logger *spanlogger.SpanLogger, id ulid.ULID, sparseHeadersPath string, reader *StreamBinaryReader) error {
start := time.Now()
defer func() {
level.Info(logger).Log("msg", "wrote sparse index-header to disk", "id", id, "path", sparseHeadersPath, "elapsed", time.Since(start))
}()
level.Info(logger).Log("msg", "writing sparse index-header to disk", "id", id, "path", sparseHeadersPath)
sparseHeaders := &indexheaderpb.Sparse{}
sparseHeaders.Symbols = reader.symbols.NewSparseSymbol()
sparseHeaders.PostingsOffsetTable = reader.postingsOffsetTable.NewSparsePostingOffsetTable()
out, err := sparseHeaders.Marshal()
if err != nil {
return fmt.Errorf("failed to encode sparse index-header: %w", err)
}
var gzipped bytes.Buffer
gzipWriter := gzip.NewWriter(&gzipped)
if _, err := gzipWriter.Write(out); err != nil {
return fmt.Errorf("failed to gzip sparse index-header: %w", err)
}
if err := gzipWriter.Close(); err != nil {
return fmt.Errorf("failed to close gzip sparse index-header: %w", err)
}
if err := os.WriteFile(sparseHeadersPath, gzipped.Bytes(), 0600); err != nil {
return fmt.Errorf("failed to write sparse index-header file: %w", err)
}
return nil
}
// newBinaryTOCFromFile return parsed TOC from given Decbuf. The Decbuf is expected to be
// configured to access the entirety of the index-header file.
func newBinaryTOCFromFile(d streamencoding.Decbuf, indexHeaderSize int) (*BinaryTOC, error) {
tocOffset := indexHeaderSize - binaryTOCLen
if d.ResetAt(tocOffset); d.Err() != nil {
return nil, d.Err()
}
if d.CheckCrc32(castagnoliTable); d.Err() != nil {
return nil, d.Err()
}
d.ResetAt(tocOffset)
symbols := d.Be64()
postingsOffsetTable := d.Be64()
if err := d.Err(); err != nil {
return nil, err
}
return &BinaryTOC{
Symbols: symbols,
PostingsOffsetTable: postingsOffsetTable,
}, nil
}
func (r *StreamBinaryReader) IndexVersion() (int, error) {
return r.indexVersion, nil
}
func (r *StreamBinaryReader) PostingsOffset(name, value string) (index.Range, error) {
rng, found, err := r.postingsOffsetTable.PostingsOffset(name, value)
if err != nil {
return index.Range{}, err
}
if !found {
return index.Range{}, NotFoundRangeErr
}
return rng, nil
}
func (r *StreamBinaryReader) LookupSymbol(o uint32) (string, error) {
if r.indexVersion == index.FormatV1 {
// For v1 little trick is needed. Refs are actual offset inside index, not index-header. This is different
// of the header length difference between two files.
o += headerLen - index.HeaderLen
}
if s, ok := r.nameSymbols[o]; ok {
return s, nil
}
cacheIndex := o % valueSymbolsCacheSize
r.valueSymbolsMx.Lock()
if cached := r.valueSymbols[cacheIndex]; cached.index == o && cached.symbol != "" {
v := cached.symbol
r.valueSymbolsMx.Unlock()
return v, nil
}
r.valueSymbolsMx.Unlock()
s, err := r.symbols.Lookup(o)
if err != nil {
return s, err
}
r.valueSymbolsMx.Lock()
r.valueSymbols[cacheIndex].index = o
r.valueSymbols[cacheIndex].symbol = s
r.valueSymbolsMx.Unlock()
return s, nil
}
type cachedLabelNamesSymbolsReader struct {
labelNames map[uint32]string
r streamindex.SymbolsReader
}
func (c cachedLabelNamesSymbolsReader) Close() error {
return c.r.Close()
}
func (c cachedLabelNamesSymbolsReader) Read(u uint32) (string, error) {
if s, ok := c.labelNames[u]; ok {
return s, nil
}
return c.r.Read(u)
}
func (r *StreamBinaryReader) SymbolsReader() (streamindex.SymbolsReader, error) {
return cachedLabelNamesSymbolsReader{
labelNames: r.nameSymbols,
r: r.symbols.Reader(),
}, nil
}
func (r *StreamBinaryReader) LabelValuesOffsets(ctx context.Context, name string, prefix string, filter func(string) bool) ([]streamindex.PostingListOffset, error) {
return r.postingsOffsetTable.LabelValuesOffsets(ctx, name, prefix, filter)
}
func (r *StreamBinaryReader) LabelNames() ([]string, error) {
return r.postingsOffsetTable.LabelNames()
}
func (r *StreamBinaryReader) Close() error {
r.factory.Stop()
return nil
}