/
api.go
310 lines (274 loc) · 7.78 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
/*************************************************************************
* Copyright 2017 Gravwell, Inc. All rights reserved.
* Contact: <legal@gravwell.io>
*
* This software may be modified and distributed under the terms of the
* BSD 2-clause license. See the LICENSE file for details.
**************************************************************************/
package ingest
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"strings"
"time"
"github.com/crewjam/rfc5424"
)
const (
//MAJOR API VERSIONS should always be compatible, there just may be additional features
API_VERSION_MAJOR uint32 = 0
API_VERSION_MINOR uint32 = uint32(VERSION)
)
const (
configurationBlockSize uint32 = 1
maxStreamConfigurationBlockSize uint32 = 1024 * 1024 //just a sanity check
maxIngestStateSize uint32 = 1024 * 1024
CompressNone CompressionType = 0
CompressSnappy CompressionType = 0x10
)
var (
ErrInvalidBuffer = errors.New("invalid buffer")
ErrInvalidIngestStateHeader = errors.New("Invalid ingest state header")
ErrOversizedConfigBlock = errors.New("configuration block too large")
ErrEmptyConfigBlock = errors.New("configuration block empty")
)
type CompressionType uint8
func PrintVersion(wtr io.Writer) {
fmt.Fprintf(wtr, "API Version:\t%d.%d\n", API_VERSION_MAJOR, API_VERSION_MINOR)
}
type Logger interface {
Infof(string, ...interface{}) error
Warnf(string, ...interface{}) error
Errorf(string, ...interface{}) error
Info(string, ...rfc5424.SDParam) error
Warn(string, ...rfc5424.SDParam) error
Error(string, ...rfc5424.SDParam) error
InfofWithDepth(int, string, ...interface{}) error
WarnfWithDepth(int, string, ...interface{}) error
ErrorfWithDepth(int, string, ...interface{}) error
InfoWithDepth(int, string, ...rfc5424.SDParam) error
WarnWithDepth(int, string, ...rfc5424.SDParam) error
ErrorWithDepth(int, string, ...rfc5424.SDParam) error
Hostname() string
Appname() string
}
// StreamConfiguration is a structure that can be sent back and
type StreamConfiguration struct {
Compression CompressionType
}
func (c StreamConfiguration) Write(wtr io.Writer) (err error) {
var n int
buff := make([]byte, configurationBlockSize+4)
binary.LittleEndian.PutUint32(buff, configurationBlockSize)
if err = c.encode(buff[4:]); err != nil {
return
}
if n, err = wtr.Write(buff); err != nil {
return
} else if n != len(buff) {
err = errors.New("Failed to write configuration block")
}
return
}
func (c *StreamConfiguration) Read(rdr io.Reader) (err error) {
//read the block size
var bsz uint32
var n int
if err = binary.Read(rdr, binary.LittleEndian, &bsz); err != nil {
return
}
if bsz > maxStreamConfigurationBlockSize {
err = ErrOversizedConfigBlock
return
} else if bsz == 0 {
err = ErrEmptyConfigBlock
return
}
buff := make([]byte, bsz)
if n, err = rdr.Read(buff); err != nil {
return
} else if n != len(buff) {
err = errors.New("Failed to read configuration block")
return
}
err = c.decode(buff)
return
}
func (c StreamConfiguration) encode(buff []byte) (err error) {
if len(buff) == 0 {
err = ErrInvalidBuffer
return
}
buff[0] = byte(c.Compression)
return
}
func (c *StreamConfiguration) decode(buff []byte) (err error) {
if len(buff) < 1 {
err = ErrInvalidBuffer
return
}
c.Compression = CompressionType(buff[0])
err = c.validate()
return
}
func (c *StreamConfiguration) validate() (err error) {
if err = c.Compression.validate(); err != nil {
return
}
return
}
func (ct CompressionType) validate() (err error) {
switch ct {
case CompressNone:
case CompressSnappy:
default:
err = fmt.Errorf("Unknown compression id %x", ct)
}
return
}
func ParseCompression(v string) (ct CompressionType, err error) {
switch strings.ToLower(strings.TrimSpace(v)) {
case ``:
case `none`:
case `snappy`:
ct = CompressSnappy
default:
err = fmt.Errorf("Unknown compression type %q", v)
}
return
}
type IngesterState struct {
UUID string
Name string
Version string
Label string
IP net.IP //child IP, won't be populated unless in child
Hostname string // whatever the ingester thinks its hostname is
Entries uint64 // How many entries the ingester has written
Size uint64 // How many bytes the ingester has written
Uptime time.Duration // Nanoseconds since the ingest muxer was initialized
Tags []string // The tags registered with the ingester
CacheState string
CacheSize uint64
LastSeen time.Time
Children map[string]IngesterState
Configuration json.RawMessage `json:",omitempty"`
Metadata json.RawMessage `json:",omitempty"`
}
func (s *IngesterState) Write(wtr io.Writer) (err error) {
// First, encode to JSON
var data []byte
if data, err = json.Marshal(s); err != nil {
return err
} else if len(data) > int(maxIngestStateSize) || len(data) == 0 {
return ErrInvalidIngestStateHeader
}
// Now send the size
var n int
buff := make([]byte, 4)
binary.LittleEndian.PutUint32(buff, uint32(len(data)))
if n, err = wtr.Write(buff); err != nil {
return
} else if n != len(buff) {
err = errors.New("Failed to write ingest state size block")
}
// and write the JSON
if n, err = wtr.Write(data); err != nil {
return
} else if n != len(data) {
err = errors.New("Failed to write encoded ingest state")
}
return
}
func (s *IngesterState) Read(rdr io.Reader) (err error) {
// First read out the size (32-bit integer)
var bsz uint32
var n int
if err = binary.Read(rdr, binary.LittleEndian, &bsz); err != nil {
return
}
if bsz > maxIngestStateSize || bsz == 0 {
err = ErrInvalidIngestStateHeader
return
}
// Now read that much data off the reader
buff := make([]byte, bsz)
if n, err = rdr.Read(buff); err != nil {
return
} else if n != len(buff) {
err = errors.New("Failed to read ingest state")
return
}
// Finally, decode the JSON
err = json.Unmarshal(buff, s)
return
}
// Copy creates a deep copy of the ingester state, this is important when handing the data type off to a gob encoder
// if the server updates the ingester state when it is attempting to encode a state blob we could get a race
// where the internal map is updated while we are attempting to encode it, this would cause fault
func (s IngesterState) Copy() (r IngesterState) {
r = s
//copy the map
r.Children = make(map[string]IngesterState, len(s.Children))
for k, v := range s.Children {
r.Children[k] = v.Copy()
}
return
}
type es []string
func (e es) MarshalJSON() ([]byte, error) {
if len(e) == 0 {
return []byte("[]"), nil
}
return json.Marshal([]string(e))
}
type mis struct {
mp map[string]IngesterState
}
func (m mis) MarshalJSON() ([]byte, error) {
if len(m.mp) == 0 {
return []byte("{}"), nil
}
return json.Marshal(m.mp)
}
func (s IngesterState) MarshalJSON() ([]byte, error) {
x := struct {
UUID string
Name string
Version string
Label string
IP net.IP
Hostname string
Entries uint64
Size uint64
Uptime time.Duration
Tags es
CacheState string
CacheSize uint64
LastSeen time.Time
Children mis
Configuration json.RawMessage `json:",omitempty"`
Metadata json.RawMessage `json:",omitempty"`
}{
UUID: s.UUID,
Name: s.Name,
Version: s.Version,
Label: s.Label,
IP: s.IP,
Hostname: s.Hostname,
Entries: s.Entries,
Size: s.Size,
Uptime: s.Uptime,
Tags: es(s.Tags),
CacheState: s.CacheState,
CacheSize: s.CacheSize,
LastSeen: s.LastSeen,
Children: mis{mp: s.Children},
Configuration: s.Configuration,
Metadata: s.Metadata,
}
return json.Marshal(x)
}