-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
metaindex_row.go
165 lines (141 loc) · 4.6 KB
/
metaindex_row.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package storage
import (
"fmt"
"io"
"sort"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
)
// metaindexRow is a single metaindex row.
//
// The row points to a single index block containing block headers.
type metaindexRow struct {
// TSID is the first TSID in the corresponding index block.
TSID TSID
// MinTimestamp is the minimum timestamp in the given index block.
MinTimestamp int64
// MaxTimestamp is the maximum timestamp in the given index block.
MaxTimestamp int64
// IndexBlockOffset is the offset of index block.
IndexBlockOffset uint64
// BlockHeadersCount is the number of block headers
// in the given index block.
BlockHeadersCount uint32
// IndexBlockSize is the size of compressed index block.
IndexBlockSize uint32
}
// Reset resets the mr using the given tsid.
func (mr *metaindexRow) Reset() {
mr.TSID = TSID{}
mr.BlockHeadersCount = 0
mr.MinTimestamp = (1 << 63) - 1
mr.MaxTimestamp = -1 << 63
mr.IndexBlockOffset = 0
mr.IndexBlockSize = 0
}
// RegisterBlockHeader registers the given bh in the mr.
func (mr *metaindexRow) RegisterBlockHeader(bh *blockHeader) {
mr.BlockHeadersCount++
if mr.BlockHeadersCount == 1 {
mr.TSID = bh.TSID
mr.MinTimestamp = bh.MinTimestamp
mr.MaxTimestamp = bh.MaxTimestamp
return
}
if bh.MinTimestamp < mr.MinTimestamp {
mr.MinTimestamp = bh.MinTimestamp
}
if bh.MaxTimestamp > mr.MaxTimestamp {
mr.MaxTimestamp = bh.MaxTimestamp
}
}
// Marshal appends marshaled mr to dst and returns the result.
func (mr *metaindexRow) Marshal(dst []byte) []byte {
dst = mr.TSID.Marshal(dst)
dst = encoding.MarshalUint32(dst, mr.BlockHeadersCount)
dst = encoding.MarshalInt64(dst, mr.MinTimestamp)
dst = encoding.MarshalInt64(dst, mr.MaxTimestamp)
dst = encoding.MarshalUint64(dst, mr.IndexBlockOffset)
dst = encoding.MarshalUint32(dst, mr.IndexBlockSize)
return dst
}
// Unmarshal unmarshals mr from src and returns the tail of src.
func (mr *metaindexRow) Unmarshal(src []byte) ([]byte, error) {
// Unmarshal TSID
tail, err := mr.TSID.Unmarshal(src)
if err != nil {
return src, fmt.Errorf("cannot unmarshal TSID: %w", err)
}
src = tail
// Unmarshal BlockHeadersCount
if len(src) < 4 {
return src, fmt.Errorf("cannot unmarshal BlockHeadersCount from %d bytes; want at least %d bytes", len(src), 4)
}
mr.BlockHeadersCount = encoding.UnmarshalUint32(src)
src = src[4:]
// Unmarshal MinTimestamp
if len(src) < 8 {
return src, fmt.Errorf("cannot unmarshal MinTimestamp from %d bytes; want at least %d bytes", len(src), 8)
}
mr.MinTimestamp = encoding.UnmarshalInt64(src)
src = src[8:]
// Unmarshal MaxTimestamp
if len(src) < 8 {
return src, fmt.Errorf("cannot unmarshal MaxTimestamp from %d bytes; want at least %d bytes", len(src), 8)
}
mr.MaxTimestamp = encoding.UnmarshalInt64(src)
src = src[8:]
// Unmarshal IndexBlockOffset
if len(src) < 8 {
return src, fmt.Errorf("cannot unmarshal IndexBlockOffset from %d bytes; want at least %d bytes", len(src), 8)
}
mr.IndexBlockOffset = encoding.UnmarshalUint64(src)
src = src[8:]
// Unmarshal IndexBlockSize
if len(src) < 4 {
return src, fmt.Errorf("cannot unmarshal IndexBlockSize from %d bytes; want at least %d bytes", len(src), 4)
}
mr.IndexBlockSize = encoding.UnmarshalUint32(src)
src = src[4:]
// Validate unmarshaled data.
if mr.BlockHeadersCount <= 0 {
return src, fmt.Errorf("BlockHeadersCount must be greater than 0")
}
if mr.IndexBlockSize > 2*maxBlockSize {
return src, fmt.Errorf("too big IndexBlockSize; got %d; cannot exceed %d", mr.IndexBlockSize, 2*maxBlockSize)
}
return src, nil
}
func unmarshalMetaindexRows(dst []metaindexRow, r io.Reader) ([]metaindexRow, error) {
compressedData, err := io.ReadAll(r)
if err != nil {
return dst, fmt.Errorf("cannot read metaindex rows: %w", err)
}
data, err := encoding.DecompressZSTD(nil, compressedData)
if err != nil {
return dst, fmt.Errorf("cannot decompress metaindex rows: %w", err)
}
dstLen := len(dst)
for len(data) > 0 {
if len(dst) < cap(dst) {
dst = dst[:len(dst)+1]
} else {
dst = append(dst, metaindexRow{})
}
mr := &dst[len(dst)-1]
tail, err := mr.Unmarshal(data)
if err != nil {
return dst, fmt.Errorf("cannot unmarshal metaindexRow #%d from metaindex data: %w", len(dst)-dstLen, err)
}
data = tail
}
if dstLen == len(dst) {
return dst, fmt.Errorf("expecting non-zero metaindex rows; got zero")
}
// Make sure metaindex rows are sorted by tsid
tmp := dst[dstLen:]
ok := sort.SliceIsSorted(tmp, func(i, j int) bool { return tmp[i].TSID.Less(&tmp[j].TSID) })
if !ok {
return dst, fmt.Errorf("metaindexRow values must be sorted by TSID; got %+v", tmp)
}
return dst, nil
}