-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
log_rows.go
305 lines (250 loc) · 6.85 KB
/
log_rows.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
package logstorage
import (
"sort"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// LogRows holds a set of rows needed for Storage.MustAddRows
//
// LogRows must be obtained via GetLogRows()
type LogRows struct {
// buf holds all the bytes referred by items in LogRows
buf []byte
// fieldsBuf holds all the fields referred by items in LogRows
fieldsBuf []Field
// streamIDs holds streamIDs for rows added to LogRows
streamIDs []streamID
// streamTagsCanonicals holds streamTagsCanonical entries for rows added to LogRows
streamTagsCanonicals [][]byte
// timestamps holds stimestamps for rows added to LogRows
timestamps []int64
// rows holds fields for rows atted to LogRows.
rows [][]Field
// sf is a helper for sorting fields in every added row
sf sortedFields
// streamFields contains names for stream fields
streamFields map[string]struct{}
// ignoreFields contains names for log fields, which must be skipped during data ingestion
ignoreFields map[string]struct{}
}
type sortedFields []Field
func (sf *sortedFields) Len() int {
return len(*sf)
}
func (sf *sortedFields) Less(i, j int) bool {
a := *sf
return a[i].Name < a[j].Name
}
func (sf *sortedFields) Swap(i, j int) {
a := *sf
a[i], a[j] = a[j], a[i]
}
// RowFormatter implementes fmt.Stringer for []Field aka a single log row
type RowFormatter []Field
// String returns user-readable representation for rf
func (rf *RowFormatter) String() string {
b := append([]byte{}, '{')
fields := *rf
if len(fields) > 0 {
b = append(b, fields[0].String()...)
fields = fields[1:]
for _, field := range fields {
b = append(b, ',')
b = append(b, field.String()...)
}
}
b = append(b, '}')
return string(b)
}
// Reset resets lr with all its settings.
//
// Call ResetKeepSettings() for resetting lr without resetting its settings.
func (lr *LogRows) Reset() {
lr.ResetKeepSettings()
sfs := lr.streamFields
for k := range sfs {
delete(sfs, k)
}
ifs := lr.ignoreFields
for k := range ifs {
delete(ifs, k)
}
}
// ResetKeepSettings resets rows stored in lr, while keeping its settings passed to GetLogRows().
func (lr *LogRows) ResetKeepSettings() {
lr.buf = lr.buf[:0]
fb := lr.fieldsBuf
for i := range fb {
fb[i].Reset()
}
lr.fieldsBuf = fb[:0]
sids := lr.streamIDs
for i := range sids {
sids[i].reset()
}
lr.streamIDs = sids[:0]
sns := lr.streamTagsCanonicals
for i := range sns {
sns[i] = nil
}
lr.streamTagsCanonicals = sns[:0]
lr.timestamps = lr.timestamps[:0]
rows := lr.rows
for i := range rows {
rows[i] = nil
}
lr.rows = rows[:0]
lr.sf = nil
}
// NeedFlush returns true if lr contains too much data, so it must be flushed to the storage.
func (lr *LogRows) NeedFlush() bool {
return len(lr.buf) > (maxUncompressedBlockSize/8)*7
}
// MustAdd adds a log entry with the given args to lr.
//
// It is OK to modify the args after returning from the function,
// since lr copies all the args to internal data.
func (lr *LogRows) MustAdd(tenantID TenantID, timestamp int64, fields []Field) {
// Compose StreamTags from fields according to lr.streamFields
sfs := lr.streamFields
st := GetStreamTags()
for i := range fields {
f := &fields[i]
if _, ok := sfs[f.Name]; ok {
st.Add(f.Name, f.Value)
}
}
// Marshal StreamTags
bb := bbPool.Get()
bb.B = st.MarshalCanonical(bb.B)
PutStreamTags(st)
// Calculate the id for the StreamTags
var sid streamID
sid.tenantID = tenantID
sid.id = hash128(bb.B)
// Store the row
lr.mustAddInternal(sid, timestamp, fields, bb.B)
bbPool.Put(bb)
}
func (lr *LogRows) mustAddInternal(sid streamID, timestamp int64, fields []Field, streamTagsCanonical []byte) {
buf := lr.buf
bufLen := len(buf)
buf = append(buf, streamTagsCanonical...)
lr.streamTagsCanonicals = append(lr.streamTagsCanonicals, buf[bufLen:])
lr.streamIDs = append(lr.streamIDs, sid)
lr.timestamps = append(lr.timestamps, timestamp)
// Store all the fields
ifs := lr.ignoreFields
fb := lr.fieldsBuf
fieldsLen := len(fb)
for i := range fields {
f := &fields[i]
if _, ok := ifs[f.Name]; ok {
// Skip fields from the ifs map
continue
}
if f.Value == "" {
// Skip fields without values
continue
}
fb = append(fb, Field{})
dstField := &fb[len(fb)-1]
bufLen = len(buf)
if f.Name != "_msg" {
buf = append(buf, f.Name...)
}
dstField.Name = bytesutil.ToUnsafeString(buf[bufLen:])
bufLen = len(buf)
buf = append(buf, f.Value...)
dstField.Value = bytesutil.ToUnsafeString(buf[bufLen:])
}
lr.sf = fb[fieldsLen:]
sort.Sort(&lr.sf)
lr.rows = append(lr.rows, lr.sf)
lr.fieldsBuf = fb
lr.buf = buf
}
// GetRowString returns string representation of the row with the given idx.
func (lr *LogRows) GetRowString(idx int) string {
tf := TimeFormatter(lr.timestamps[idx])
streamTags := getStreamTagsString(lr.streamTagsCanonicals[idx])
var rf RowFormatter
rf = append(rf[:0], lr.rows[idx]...)
rf = append(rf, Field{
Name: "_time",
Value: tf.String(),
})
rf = append(rf, Field{
Name: "_stream",
Value: streamTags,
})
sort.Slice(rf, func(i, j int) bool {
return rf[i].Name < rf[j].Name
})
return rf.String()
}
// GetLogRows returns LogRows from the pool for the given streamFields.
//
// streamFields is a set of field names, which must be associated with the stream.
// ignoreFields is a set of field names, which must be ignored during data ingestion.
//
// Return back it to the pool with PutLogRows() when it is no longer needed.
func GetLogRows(streamFields, ignoreFields []string) *LogRows {
v := logRowsPool.Get()
if v == nil {
v = &LogRows{}
}
lr := v.(*LogRows)
// Initialize streamFields
sfs := lr.streamFields
if sfs == nil {
sfs = make(map[string]struct{}, len(streamFields))
lr.streamFields = sfs
}
for _, f := range streamFields {
sfs[f] = struct{}{}
}
// Initialize ignoreFields
ifs := lr.ignoreFields
if ifs == nil {
ifs = make(map[string]struct{}, len(ignoreFields))
lr.ignoreFields = ifs
}
for _, f := range ignoreFields {
if f != "" {
ifs[f] = struct{}{}
}
}
return lr
}
// PutLogRows returns lr to the pool.
func PutLogRows(lr *LogRows) {
lr.Reset()
logRowsPool.Put(lr)
}
var logRowsPool sync.Pool
// Len returns the number of items in lr.
func (lr *LogRows) Len() int {
return len(lr.streamIDs)
}
// Less returns true if (streamID, timestamp) for row i is smaller than the (streamID, timestamp) for row j
func (lr *LogRows) Less(i, j int) bool {
a := &lr.streamIDs[i]
b := &lr.streamIDs[j]
if !a.equal(b) {
return a.less(b)
}
return lr.timestamps[i] < lr.timestamps[j]
}
// Swap swaps rows i and j in lr.
func (lr *LogRows) Swap(i, j int) {
a := &lr.streamIDs[i]
b := &lr.streamIDs[j]
*a, *b = *b, *a
tsA, tsB := &lr.timestamps[i], &lr.timestamps[j]
*tsA, *tsB = *tsB, *tsA
snA, snB := &lr.streamTagsCanonicals[i], &lr.streamTagsCanonicals[j]
*snA, *snB = *snB, *snA
fieldsA, fieldsB := &lr.rows[i], &lr.rows[j]
*fieldsA, *fieldsB = *fieldsB, *fieldsA
}