-
Notifications
You must be signed in to change notification settings - Fork 17
/
entry.go
601 lines (550 loc) · 17.2 KB
/
entry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
/*************************************************************************
* Copyright 2017 Gravwell, Inc. All rights reserved.
* Contact: <legal@gravwell.io>
*
* This software may be modified and distributed under the terms of the
* BSD 2-clause license. See the LICENSE file for details.
**************************************************************************/
package entry
import (
"bytes"
"encoding/binary"
"encoding/gob"
"errors"
"io"
"net"
)
const (
/* 34 = 4 + 8 + 8 + 2 + 16
*/
ENTRY_HEADER_SIZE int = 34
SRC_SIZE int = 16
IPV4_SRC_SIZE int = 4
maxSliceAllocSize int = 0x4000000 //if a slice is less than 64MB, do it all at once
maxSliceTransferSize uint64 = 0xffffffff //slices can't be larger than 4GB in one transfer
)
var (
ErrInvalidHeader = errors.New("Invalid Entry header in decode")
ErrInvalidBufferSize = errors.New("Invalid buffer size, too small")
ErrFailedHeaderWrite = errors.New("Failed to write header while encoding")
ErrFailedBodyWrite = errors.New("Failed to write body while encoding")
ErrFailedBodyRead = errors.New("Failed to read body while decoding")
ErrSliceLenTooLarge = errors.New("Slice length is too large for encoding")
ErrSliceSizeTooLarge = errors.New("Slice size is too large for encoding")
ErrDataSizeTooLarge = errors.New("Entry data size is too large, must be < 1GB")
)
type EntryTag uint16
type EntryKey int64
type Entry struct {
TS Timestamp
SRC net.IP
Tag EntryTag
Data []byte
EVB EVBlock
}
func init() {
gob.Register(&EVBlock{})
gob.Register(&Entry{})
gob.Register([]Entry{})
gob.Register([]*Entry{})
}
func (ent *Entry) Key() EntryKey {
return EntryKey(ent.TS.Sec)
}
// EnumeratedValues returns the slice of enumerated values, this is an accessor to prevent direct assignment
func (ent Entry) EnumeratedValues() []EnumeratedValue {
return ent.EVB.Values()
}
// ClearEnumeratedValues is a convenience function to remove all enumerated values
func (ent *Entry) ClearEnumeratedValues() {
ent.EVB.Reset()
}
// AddEnumeratedValue will attach a natively typed enumerated value to an entry.
// An error is returned if the enumerated value is invalid or adding it would cause
// the entry to exceed the maximum entry size.
func (ent *Entry) AddEnumeratedValue(ev EnumeratedValue) (err error) {
if ev.Valid() {
ent.EVB.Add(ev)
} else {
err = ErrInvalid
}
return
}
// AddEnumeratedValues will attach a slice of natively typed enumerated value to an entry.
// An error is returned if the set of enumerated values contain an invalid enumerated value
// or adding the set would would cause the entry to exceed the maximum entry size.
func (ent *Entry) AddEnumeratedValues(evs []EnumeratedValue) (err error) {
if len(evs) == 0 {
return
}
for i := range evs {
if evs[i].Valid() == false {
err = ErrInvalid
return
}
}
ent.EVB.AddSet(evs)
return
}
// AddEnumeratedValueEx will attach a natively typed enumerated value to an entry.
// An error is returned if the enumerated value is invalid or adding it would cause
// the entry to exceed the maximum entry size. An error is also returned if the
// the provided value type is not natively supported.
func (ent *Entry) AddEnumeratedValueEx(name string, val interface{}) error {
ev, err := NewEnumeratedValue(name, val)
if err != nil {
return err
}
ent.EVB.Add(ev)
return nil
}
// GetEnumeratedValue looks up an enumerated value by name and returns it as a native type
// if the value is not present the function returns nil and false
func (ent *Entry) GetEnumeratedValue(name string) (val interface{}, ok bool) {
var ev EnumeratedValue
if ent == nil {
return
}
if ev, ok = ent.EVB.Get(name); ok {
val = ev.Value.Interface()
ok = (val != nil)
}
return
}
func (ent *Entry) CopyEnumeratedBlock(sent *Entry) {
if ent == nil || sent == nil || !sent.EVB.Populated() {
return
}
ent.EVB.Append(sent.EVB)
}
// Size returns the size of an entry as if it were encoded.
func (ent *Entry) Size() uint64 {
return uint64(len(ent.Data)) + uint64(ENTRY_HEADER_SIZE) + ent.EVB.Size()
}
// DecodeHeader hands back a completely decoded header with direct references to the underlying data.
func DecodeHeader(buff []byte) (ts Timestamp, src net.IP, tag EntryTag, hasEvs bool, datasize uint32) {
var ipv4 bool
/* buffer should come formatted as follows:
data size uint32 //top 2 bits contain flags
TS seconds (int64)
TS nanoseconds (int64)
Tag (16bit)
SRC (16 bytes)
*/
//decode the datasize and grab the flags from the datasize
datasize = binary.LittleEndian.Uint32(buff)
flags := uint8(datasize >> 30)
datasize &= flagMask // clear flags from datasize
hasEvs = ((flags & flagEVs) != 0)
//check if we are an ipv4 address
if (flags & flagIPv4) != 0 {
ipv4 = true
}
ts.Decode(buff[4:])
tag = EntryTag(binary.LittleEndian.Uint16(buff[16:]))
if ipv4 {
src = buff[18:22]
} else {
src = buff[18:ENTRY_HEADER_SIZE]
}
return
}
// DecodeHeaderTagSec checks that the buffer is big enough for a header
// then ONLY extracts the tag and second component of the timestamp.
// This function is used for rapidly scanning an entry header to decide
// if we want to decode it, we assume the caller has already ensured that
// the buffer is large enough to at least contain a header.
func DecodeHeaderTagSec(buff []byte) (tag EntryTag, sec int64) {
tag = EntryTag(binary.LittleEndian.Uint16(buff[16:]))
sec = int64(binary.LittleEndian.Uint64(buff[4:]))
return
}
// EntrySize just decodes enough of the header to decide the actual encoded
// size of an entry this function is typically used for rapidly skipping an entry.
func EntrySize(buff []byte) (n int, err error) {
if len(buff) < ENTRY_HEADER_SIZE {
err = ErrInvalidHeader
return
}
datasize := binary.LittleEndian.Uint32(buff)
flags := uint8(datasize >> 30)
n = int(datasize&flagMask) + ENTRY_HEADER_SIZE // clear flags from datasize
if len(buff) < n {
err = ErrInvalidBufferSize
return
}
if (flags & flagEVs) == 0 {
return
}
//we have EVs, check the buffer again
var hdr EVBlockHeader
if hdr, err = DecodeEVBlockHeader(buff[n:]); err == nil {
n += int(hdr.Size)
}
return
}
// DecodePartialHeader decodes only the timestamp second, tag, hasEvs, and DataSize.
// This function is used for quickly scanning through entries in their encoded form.
func DecodePartialHeader(buff []byte) (ts Timestamp, tag EntryTag, ipv4, hasEvs bool, datasize uint32) {
//decode the datasize and grab the flags from the datasize
datasize = binary.LittleEndian.Uint32(buff)
flags := uint8(datasize >> 30)
datasize &= flagMask // clear flags from datasize
hasEvs = ((flags & flagEVs) != 0)
ipv4 = ((flags & flagIPv4) != 0)
tag = EntryTag(binary.LittleEndian.Uint16(buff[16:]))
ts.Decode(buff[4:])
return
}
// decodeHeader copies copies the SRC buffer,
// it returns the data size and whether the entry has EVs.
func (ent *Entry) decodeHeader(buff []byte) (int, bool) {
var hasEvs bool
var datasize uint32
var src net.IP
ent.TS, src, ent.Tag, hasEvs, datasize = DecodeHeader(buff)
ent.SRC = append(net.IP(nil), src...)
return int(datasize), hasEvs
}
// decodeHeaderAlt gets a direct handle on the SRC buffer,
// it returns the data size and whether the entry has EVs.
func (ent *Entry) decodeHeaderAlt(buff []byte) (int, bool) {
var hasEvs bool
var datasize uint32
ent.TS, ent.SRC, ent.Tag, hasEvs, datasize = DecodeHeader(buff)
return int(datasize), hasEvs
}
// DecodeHeader will decode an entry header from the provided buffer
// and return the data size, whether there are EVs, and potentially an error.
func (ent *Entry) DecodeHeader(buff []byte) (int, bool, error) {
if len(buff) < ENTRY_HEADER_SIZE {
return 0, false, ErrInvalidBufferSize
}
dataLen, hasEvs := ent.decodeHeader(buff)
return dataLen, hasEvs, nil
}
// DecodeEntry will copy values out of the buffer to generate an entry with its own
// copies of data. This ensures that entries don't maintain ties to blocks
// DecodeEntry assumes that a size check has already happened.
// You probably want Decode.
func (ent *Entry) DecodeEntry(buff []byte) (err error) {
dataSize, hasEvs := ent.decodeHeader(buff)
ent.Data = append([]byte(nil), buff[ENTRY_HEADER_SIZE:ENTRY_HEADER_SIZE+int(dataSize)]...)
if hasEvs {
_, err = ent.EVB.Decode(append([]byte(nil), buff[:ENTRY_HEADER_SIZE+int(dataSize)]...))
}
return
}
// DecodeEntryAlt doesn't copy the SRC or data out, it just references the slice handed in
// it also assumes a size check for the entry header size has occurred by the caller.
// You probably want DecodeAlt.
func (ent *Entry) DecodeEntryAlt(buff []byte) (err error) {
dataSize, hasEvs := ent.decodeHeaderAlt(buff)
ent.Data = buff[ENTRY_HEADER_SIZE : ENTRY_HEADER_SIZE+int(dataSize)]
if hasEvs {
buff = buff[:ENTRY_HEADER_SIZE+int(dataSize)]
_, err = ent.EVB.Decode(buff)
}
return
}
// Decode completely decodes an entry and returns the number of bytes consumed from a buffer
// This is useful for iterating over entries in a raw buffer.
// Decode will decode the entire entry and all of its EVs, copying all bytes so that
// the caller can re-use the underlying buffer.
// The function returns the number of bytes consumed by the decode and a potential error.
func (ent *Entry) Decode(buff []byte) (int, error) {
var off int
dataSize, hasEvs, err := ent.DecodeHeader(buff)
if err != nil {
return -1, err
}
off += ENTRY_HEADER_SIZE
if buff = buff[ENTRY_HEADER_SIZE:]; len(buff) < dataSize {
return -1, ErrInvalidBufferSize
}
ent.Data = append([]byte(nil), buff[:int(dataSize)]...)
buff = buff[dataSize:]
off += int(dataSize)
if hasEvs {
var n int
if n, err = ent.EVB.Decode(buff); err == nil {
off += n
}
}
return off, err
}
// DecodeAlt completely decodes an entry and returns the number of bytes consumed from a buffer
// This is useful for iterating over entries in a raw buffer.
// This decode method directly references the underlying buffer, callers cannot re-use the buffer
// if the entry and/or its EVs will be used.
// The function returns the number of bytes consumed by the decode and a potential error.
func (ent *Entry) DecodeAlt(buff []byte) (int, error) {
var off int
if len(buff) < ENTRY_HEADER_SIZE {
return 0, ErrInvalidBufferSize
}
dataSize, hasEvs := ent.decodeHeaderAlt(buff)
off += ENTRY_HEADER_SIZE
if buff = buff[ENTRY_HEADER_SIZE:]; len(buff) < dataSize {
return 0, ErrInvalidBufferSize
}
ent.Data = buff[:int(dataSize)]
buff = buff[dataSize:]
off += int(dataSize)
if hasEvs {
n, err := ent.EVB.DecodeAlt(buff)
if err != nil {
return 0, err
}
off += n
}
return off, nil
}
// EncodeHeader encodes the header into the buffer for the transport, the function only encodes the header.
// The function returns a boolean indicating if EVs are marked and a potential error.
func (ent *Entry) EncodeHeader(buff []byte) (bool, error) {
if len(buff) < ENTRY_HEADER_SIZE {
return false, ErrInvalidBufferSize
} else if len(ent.Data) > int(MaxDataSize) {
return false, ErrDataSizeTooLarge
}
/* buffer should come formatted as follows in littleendian format:
data size (uint32)
TS seconds (int64)
TS nanoseconds (uint32)
Tag (16bit)
SRC (16 bytes)
*/
var hasEvs bool
var flags uint8
if len(ent.SRC) == IPV4_SRC_SIZE {
flags |= flagIPv4
}
if ent.EVB.Populated() {
flags |= flagEVs
hasEvs = true
}
binary.LittleEndian.PutUint32(buff, uint32(len(ent.Data)))
buff[3] |= (flags << 6) //mask in the flags
ent.TS.Encode(buff[4:16])
binary.LittleEndian.PutUint16(buff[16:], uint16(ent.Tag))
copy(buff[18:ENTRY_HEADER_SIZE], ent.SRC)
return hasEvs, nil
}
// Encode encodes an entry into the provided buffer. The function returns the number of bytes
// consumed in the buffer as well as any potential errors.
func (ent *Entry) Encode(buff []byte) (int, error) {
hasEvs, err := ent.EncodeHeader(buff)
if err != nil {
return -1, err
}
if len(buff) < (len(ent.Data) + ENTRY_HEADER_SIZE) {
return -1, ErrInvalidBufferSize
}
copy(buff[ENTRY_HEADER_SIZE:], ent.Data)
r := len(ent.Data) + ENTRY_HEADER_SIZE
if hasEvs {
n, err := ent.EVB.EncodeBuffer(buff[r:])
if err != nil {
return -1, err
}
r += n
}
return r, nil
}
// writeAll is a helper function to handle segmented writes to an io.Writer.
func writeAll(wtr io.Writer, buff []byte) error {
var written int
for written < len(buff) {
n, err := wtr.Write(buff[written:])
if err != nil {
return err
}
if n <= 0 {
return ErrFailedBodyWrite
}
written += n
}
return nil
}
// readAll is a helper function to handle segmented reads from an io.Reader.
func readAll(rdr io.Reader, buff []byte) error {
var r int
for r < len(buff) {
n, err := rdr.Read(buff[r:])
if err != nil {
return err
}
if n <= 0 {
return ErrFailedBodyRead
}
r += n
}
return nil
}
// EncodeWriter will fully encode an entry to an io.Writer,
// it returns the number of bytes written and a potential error.
func (ent *Entry) EncodeWriter(wtr io.Writer) (int, error) {
headerBuff := make([]byte, ENTRY_HEADER_SIZE)
hasEvs, err := ent.EncodeHeader(headerBuff)
if err != nil {
return -1, err
}
n, err := wtr.Write(headerBuff)
if err != nil {
return -1, err
} else if n != ENTRY_HEADER_SIZE {
return -1, ErrFailedHeaderWrite
} else if err = writeAll(wtr, ent.Data); err != nil {
return -1, err
} else {
n += len(ent.Data)
}
if hasEvs {
nn, err := ent.EVB.EncodeWriter(wtr)
if err != nil {
return -1, err
}
n += nn
}
return n, err
}
// EVCount is a helper function that ruturns the number of EVs attached to the entry.
func (ent *Entry) EVCount() int {
return ent.EVB.Count()
}
// EVSize is a helper function that ruturns the number of bytes consumed by the EVs on an entry.
func (ent *Entry) EVSize() int {
return int(ent.EVB.Size())
}
// EVEncodeWriter is a helper function for writing just the EVs to a provided io.writer,
// the function also returns the number of bytes written and a potential error.
func (ent *Entry) EVEncodeWriter(wtr io.Writer) (int, error) {
return ent.EVB.EncodeWriter(wtr)
}
type EntrySlice []Entry
// EncodeWriter encodes a slice of entries to an io.Writer.
func (es EntrySlice) EncodeWriter(wtr io.Writer) error {
if len(es) > int(MaxSliceCount) {
return ErrSliceLenTooLarge
}
sz := es.Size()
if sz > maxSliceTransferSize {
return ErrSliceSizeTooLarge
}
//write the count as a little endian uint32
if err := binary.Write(wtr, binary.LittleEndian, uint32(len(es))); err != nil {
return err
}
//write the count as a little endian uint32
if err := binary.Write(wtr, binary.LittleEndian, uint32(sz)); err != nil {
return err
}
for i := range es {
if _, err := es[i].EncodeWriter(wtr); err != nil {
return err
}
}
return nil
}
// DecodeReader decodes a slice of entries from an io.Reader.
func (es *EntrySlice) DecodeReader(rdr io.Reader) error {
var l uint32
var sz uint32
//write the count as a little endian uint32
if err := binary.Read(rdr, binary.LittleEndian, &l); err != nil {
return err
}
if l > MaxSliceCount {
return ErrSliceLenTooLarge
}
//write the count as a little endian uint32
if err := binary.Read(rdr, binary.LittleEndian, &sz); err != nil {
return err
}
*es = make(EntrySlice, int(l))
for i := range *es {
if err := (*es)[i].DecodeReader(rdr); err != nil {
return err
}
}
return nil
}
// Size eturns the encoded size of a slice of entries.
func (es *EntrySlice) Size() uint64 {
sz := uint64(8) //uint32 len and size header
for i := range *es {
sz += (*es)[i].Size()
}
return sz
}
// DecodeReader decodes a slice of entries from an io.Reader.
func (ent *Entry) DecodeReader(rdr io.Reader) error {
headerBuff := make([]byte, ENTRY_HEADER_SIZE)
if err := readAll(rdr, headerBuff); err != nil {
return err
}
n, hasEvs := ent.decodeHeader(headerBuff)
if n <= 0 || n > (int(MaxDataSize)-ENTRY_HEADER_SIZE) {
return ErrInvalidHeader
}
ent.Data = make([]byte, n)
if err := readAll(rdr, ent.Data); err != nil {
return err
}
if hasEvs {
_, err := ent.EVB.DecodeReader(rdr)
return err
} else {
ent.EVB.Reset()
}
return nil
}
// ReadEVs is a deprecated function, use DecodeReader instead.
func (ent *Entry) ReadEVs(rdr io.Reader) error {
_, err := ent.EVB.DecodeReader(rdr)
return err
}
// MarshallBytes implements a gob encoder, the function is deprecated.
func (ent *Entry) MarshallBytes() ([]byte, error) {
buff := make([]byte, ent.Size())
if _, err := ent.Encode(buff); err != nil {
return nil, err
}
return buff, nil
}
// DeepCopy provides a complete copy of an entry, this is REALLY expensive, so make sure its worth it.
func (ent *Entry) DeepCopy() (c Entry) {
c.TS = ent.TS
if len(ent.SRC) > 0 {
c.SRC = append(net.IP(nil), ent.SRC...)
}
c.Tag = ent.Tag
if len(ent.Data) > 0 {
c.Data = append([]byte(nil), ent.Data...)
}
c.EVB = ent.EVB.DeepCopy()
return
}
// Compare will perform a deep compare between to entries, used for testing.
// An error is returned if describing how entries do not match if they do not match.
func (ent *Entry) Compare(v *Entry) error {
if ent == nil {
if v == nil {
return nil
}
return errors.New("mismatched nil")
} else if v == nil {
return errors.New("mismatched nil")
}
if ent.TS != v.TS {
return errors.New("differing timestamp")
} else if ent.Tag != v.Tag {
return errors.New("differing tags")
} else if bytes.Compare(ent.SRC, v.SRC) != 0 {
return errors.New("differeing source values")
} else if bytes.Compare(ent.Data, v.Data) != 0 {
return errors.New("differing data")
}
return ent.EVB.Compare(v.EVB)
}