-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
partition.go
223 lines (188 loc) · 5.89 KB
/
partition.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package logstorage
import (
"bytes"
"path/filepath"
"sort"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// PartitionStats contains stats for the partition.
type PartitionStats struct {
DatadbStats
IndexdbStats
}
type partition struct {
// s is the parent storage for the partition
s *Storage
// path is the path to the partition directory
path string
// name is the partition name. It is basically the directory name obtained from path.
// It is used for creating keys for partition caches.
name string
// idb is indexdb used for the given partition
idb *indexdb
// ddb is the datadb used for the given partition
ddb *datadb
}
// mustCreatePartition creates a partition at the given path.
//
// The created partition can be opened with mustOpenPartition() after is has been created.
//
// The created partition can be deleted with mustDeletePartition() when it is no longer needed.
func mustCreatePartition(path string) {
fs.MustMkdirFailIfExist(path)
indexdbPath := filepath.Join(path, indexdbDirname)
mustCreateIndexdb(indexdbPath)
datadbPath := filepath.Join(path, datadbDirname)
mustCreateDatadb(datadbPath)
}
// mustDeletePartition deletes partition at the given path.
//
// The partition must be closed with MustClose before deleting it.
func mustDeletePartition(path string) {
fs.MustRemoveAll(path)
}
// mustOpenPartition opens partition at the given path for the given Storage.
//
// The returned partition must be closed when no longer needed with mustClosePartition() call.
func mustOpenPartition(s *Storage, path string) *partition {
name := filepath.Base(path)
// Open indexdb
indexdbPath := filepath.Join(path, indexdbDirname)
idb := mustOpenIndexdb(indexdbPath, name, s)
// Start initializing the partition
pt := &partition{
s: s,
path: path,
name: name,
idb: idb,
}
// Open datadb
datadbPath := filepath.Join(path, datadbDirname)
pt.ddb = mustOpenDatadb(pt, datadbPath, s.flushInterval)
return pt
}
// mustClosePartition closes pt.
//
// The caller must ensure that pt is no longer used before the call to mustClosePartition().
//
// The partition can be deleted if needed after it is closed via mustDeletePartition() call.
func mustClosePartition(pt *partition) {
// Close indexdb
mustCloseIndexdb(pt.idb)
pt.idb = nil
// Close datadb
mustCloseDatadb(pt.ddb)
pt.ddb = nil
pt.name = ""
pt.path = ""
pt.s = nil
}
func (pt *partition) mustAddRows(lr *LogRows) {
// Register rows in indexdb
var pendingRows []int
streamIDs := lr.streamIDs
for i := range lr.timestamps {
streamID := &streamIDs[i]
if pt.hasStreamIDInCache(streamID) {
continue
}
if len(pendingRows) == 0 || !streamIDs[pendingRows[len(pendingRows)-1]].equal(streamID) {
pendingRows = append(pendingRows, i)
}
}
if len(pendingRows) > 0 {
logNewStreams := pt.s.logNewStreams
streamTagsCanonicals := lr.streamTagsCanonicals
sort.Slice(pendingRows, func(i, j int) bool {
return streamIDs[pendingRows[i]].less(&streamIDs[pendingRows[j]])
})
for i, rowIdx := range pendingRows {
streamID := &streamIDs[rowIdx]
if i > 0 && streamIDs[pendingRows[i-1]].equal(streamID) {
continue
}
if pt.hasStreamIDInCache(streamID) {
continue
}
if !pt.idb.hasStreamID(streamID) {
streamTagsCanonical := streamTagsCanonicals[rowIdx]
pt.idb.mustRegisterStream(streamID, streamTagsCanonical)
if logNewStreams {
pt.logNewStream(streamTagsCanonical, lr.rows[rowIdx])
}
}
pt.putStreamIDToCache(streamID)
}
}
// Add rows to datadb
pt.ddb.mustAddRows(lr)
if pt.s.logIngestedRows {
pt.logIngestedRows(lr)
}
}
func (pt *partition) logNewStream(streamTagsCanonical []byte, fields []Field) {
streamTags := getStreamTagsString(streamTagsCanonical)
rf := RowFormatter(fields)
logger.Infof("partition %s: new stream %s for log entry %s", pt.path, streamTags, &rf)
}
func (pt *partition) logIngestedRows(lr *LogRows) {
for i := range lr.rows {
s := lr.GetRowString(i)
logger.Infof("partition %s: new log entry %s", pt.path, s)
}
}
// appendStreamTagsByStreamID appends canonical representation of stream tags for the given sid to dst
// and returns the result.
func (pt *partition) appendStreamTagsByStreamID(dst []byte, sid *streamID) []byte {
// Search for the StreamTags in the cache.
key := bbPool.Get()
defer bbPool.Put(key)
// There is no need in putting partition name into key here,
// since StreamTags is uniquely identified by streamID.
key.B = sid.marshal(key.B)
dstLen := len(dst)
dst = pt.s.streamTagsCache.GetBig(dst, key.B)
if len(dst) > dstLen {
// Fast path - the StreamTags have been found in cache.
return dst
}
// Slow path - search for StreamTags in idb
dst = pt.idb.appendStreamTagsByStreamID(dst, sid)
if len(dst) > dstLen {
// Store the found StreamTags to cache
pt.s.streamTagsCache.SetBig(key.B, dst[dstLen:])
}
return dst
}
func (pt *partition) hasStreamIDInCache(sid *streamID) bool {
var result [1]byte
bb := bbPool.Get()
bb.B = pt.marshalStreamIDCacheKey(bb.B, sid)
value := pt.s.streamIDCache.Get(result[:0], bb.B)
bbPool.Put(bb)
return bytes.Equal(value, okValue)
}
func (pt *partition) putStreamIDToCache(sid *streamID) {
bb := bbPool.Get()
bb.B = pt.marshalStreamIDCacheKey(bb.B, sid)
pt.s.streamIDCache.Set(bb.B, okValue)
bbPool.Put(bb)
}
func (pt *partition) marshalStreamIDCacheKey(dst []byte, sid *streamID) []byte {
dst = encoding.MarshalBytes(dst, bytesutil.ToUnsafeBytes(pt.name))
dst = sid.marshal(dst)
return dst
}
var okValue = []byte("1")
// debugFlush makes sure that all the recently ingested data data becomes searchable
func (pt *partition) debugFlush() {
pt.ddb.debugFlush()
pt.idb.debugFlush()
}
func (pt *partition) updateStats(ps *PartitionStats) {
pt.ddb.updateStats(&ps.DatadbStats)
pt.idb.updateStats(&ps.IndexdbStats)
}