forked from driskell/log-courier
-
Notifications
You must be signed in to change notification settings - Fork 0
/
config.go
394 lines (341 loc) · 12.8 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
/*
* Copyright 2014-2015 Jason Woods.
*
* This file is a modification of code from Logstash Forwarder.
* Copyright 2012-2013 Jordan Sissel and contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"fmt"
"os"
"path"
"path/filepath"
"reflect"
"time"
"gopkg.in/op/go-logging.v1"
)
var (
// DefaultConfigurationFile is a path to the default configuration file to
// load, this can be changed during init()
DefaultConfigurationFile = ""
// DefaultGeneralPersistDir is a path to the default directory to store
DefaultGeneralPersistDir = ""
)
const (
defaultGeneralHost string = "localhost.localdomain"
defaultGeneralLogLevel logging.Level = logging.INFO
defaultGeneralLogStdout bool = true
defaultGeneralLogSyslog bool = false
defaultGeneralLineBufferBytes int64 = 16384
defaultGeneralMaxLineBytes int64 = 1048576
defaultGeneralProspectInterval time.Duration = 10 * time.Second
defaultGeneralSpoolMaxBytes int64 = 10485760
defaultGeneralSpoolSize int64 = 1024
defaultGeneralSpoolTimeout time.Duration = 5 * time.Second
defaultNetworkBackoff time.Duration = 5 * time.Second
defaultNetworkBackoffMax time.Duration = 300 * time.Second
defaultNetworkMaxPendingPayloads int64 = 10
defaultNetworkMethod string = "random"
defaultNetworkRfc2782Service string = "courier"
defaultNetworkRfc2782Srv bool = true
defaultNetworkTimeout time.Duration = 15 * time.Second
defaultNetworkTransport string = "tls"
defaultStreamAddHostField bool = true
defaultStreamAddOffsetField bool = true
defaultStreamAddPathField bool = true
defaultStreamAddTimezoneField bool = false
defaultStreamCodec string = "plain"
defaultStreamDeadTime time.Duration = 1 * time.Hour
)
// Section is implemented by external config structures that will be
// registered with the config package
type Section interface {
Validate() error
}
// SectionCreator creates new Section structures
type SectionCreator func() Section
// registeredSectionCreators contains a list of registered external Section
// creators that should be processed in all new Config structures
var registeredSectionCreators = make(map[string]SectionCreator)
// General holds the general configuration
type General struct {
GlobalFields map[string]interface{} `config:"global fields"`
Host string `config:"host"`
LineBufferBytes int64 `config:"line buffer bytes"`
LogFile string `config:"log file"`
LogLevel logging.Level `config:"log level"`
LogStdout bool `config:"log stdout"`
LogSyslog bool `config:"log syslog"`
MaxLineBytes int64 `config:"max line bytes"`
PersistDir string `config:"persist directory"`
ProspectInterval time.Duration `config:"prospect interval"`
SpoolSize int64 `config:"spool size"`
SpoolMaxBytes int64 `config:"spool max bytes"`
SpoolTimeout time.Duration `config:"spool timeout"`
}
// InitDefaults initialises default values for the general configuration
func (gc *General) InitDefaults() {
gc.LineBufferBytes = defaultGeneralLineBufferBytes
gc.LogLevel = defaultGeneralLogLevel
gc.LogStdout = defaultGeneralLogStdout
gc.LogSyslog = defaultGeneralLogSyslog
gc.MaxLineBytes = defaultGeneralMaxLineBytes
gc.PersistDir = DefaultGeneralPersistDir
gc.ProspectInterval = defaultGeneralProspectInterval
gc.SpoolSize = defaultGeneralSpoolSize
gc.SpoolMaxBytes = defaultGeneralSpoolMaxBytes
gc.SpoolTimeout = defaultGeneralSpoolTimeout
// NOTE: Empty string for Host means calculate it automatically, so leave it
}
// Network holds network related configuration
type Network struct {
Factory interface{}
Backoff time.Duration `config:"failure backoff"`
BackoffMax time.Duration `config:"failure backoff max"`
MaxPendingPayloads int64 `config:"max pending payloads"`
Method string `config:"method"`
Rfc2782Service string `config:"rfc 2782 service"`
Rfc2782Srv bool `config:"rfc 2782 srv"`
Servers []string `config:"servers"`
Timeout time.Duration `config:"timeout"`
Transport string `config:"transport"`
Unused map[string]interface{}
}
// InitDefaults initiases default values for the network configuration
func (nc *Network) InitDefaults() {
nc.Backoff = defaultNetworkBackoff
nc.BackoffMax = defaultNetworkBackoffMax
nc.MaxPendingPayloads = defaultNetworkMaxPendingPayloads
nc.Method = defaultNetworkMethod
nc.Rfc2782Service = defaultNetworkRfc2782Service
nc.Rfc2782Srv = defaultNetworkRfc2782Srv
nc.Timeout = defaultNetworkTimeout
nc.Transport = defaultNetworkTransport
}
// CodecStub holds an unknown codec configuration
// After initial parsing of configuration, these CodecStubs are turned into
// real configuration blocks for the codec given by their Name field
type CodecStub struct {
Name string `config:"name"`
Unused map[string]interface{}
Factory interface{}
}
// Stream holds the configuration for a log stream
type Stream struct {
AddHostField bool `config:"add host field"`
AddOffsetField bool `config:"add offset field"`
AddPathField bool `config:"add path field"`
AddTimezoneField bool `config:"add timezone field"`
Codecs []CodecStub `config:"codecs"`
DeadTime time.Duration `config:"dead time"`
Fields map[string]interface{} `config:"fields"`
}
// InitDefaults initialises the default configuration for a log stream
func (sc *Stream) InitDefaults() {
sc.AddHostField = defaultStreamAddHostField
sc.AddOffsetField = defaultStreamAddOffsetField
sc.AddPathField = defaultStreamAddPathField
sc.AddTimezoneField = defaultStreamAddTimezoneField
sc.DeadTime = defaultStreamDeadTime
}
// File holds the configuration for a set of paths that share the same stream
// configuration
type File struct {
Paths []string `config:"paths"`
Stream `config:",embed"`
}
// Config holds all the configuration for Log Courier
type Config struct {
Files []File `config:"files"`
General General `config:"general"`
Includes []string `config:"includes"`
Network Network `config:"network"`
Stdin Stream `config:"stdin"`
// Dynamic sections
// TODO: All top level sections to use this
Sections map[string]Section `config:",dynamic"`
}
// NewConfig creates a new, empty, configuration structure
func NewConfig() *Config {
c := &Config{
Sections: make(map[string]Section),
}
for name, creator := range registeredSectionCreators {
c.Sections[name] = creator()
}
return c
}
// loadFile detects the extension of the given file and loads it using the
// relevant load function
func (c *Config) loadFile(filePath string, rawConfig interface{}) error {
ext := path.Ext(filePath)
switch ext {
case ".json":
return c.loadJSONFile(filePath, rawConfig)
case ".conf":
return c.loadJSONFile(filePath, rawConfig)
case ".yaml":
return c.loadYAMLFile(filePath, rawConfig)
}
return fmt.Errorf("File extension '%s' is not within the known extensions: conf, json, yaml", ext)
}
// Load the configuration from the given file
// If initFactories is false, factories (such as codec names or transport
// names) are not initialised so they do not need to be built in
func (c *Config) Load(path string, initFactories bool) (err error) {
// Read the main config file
rawConfig := make(map[string]interface{})
if err = c.loadFile(path, &rawConfig); err != nil {
return
}
// Populate configuration - reporting errors on spelling mistakes etc.
if err = c.PopulateConfig(c, rawConfig, "/"); err != nil {
return
}
// Iterate includes
for _, glob := range c.Includes {
// Glob the path
var matches []string
if matches, err = filepath.Glob(glob); err != nil {
return
}
for _, include := range matches {
// Read the include
var rawInclude []interface{}
if err = c.loadFile(include, &rawInclude); err != nil {
return
}
// Append to configuration
vRawInclude := reflect.ValueOf(rawInclude)
if err = c.populateSlice(reflect.ValueOf(c).Elem().FieldByName("Files"), vRawInclude, fmt.Sprintf("%s/", include)); err != nil {
return
}
}
}
if c.General.PersistDir == "" {
err = fmt.Errorf("/general/persist directory must be specified")
return
}
// Enforce maximum of 2 GB since event transmit length is uint32
if c.General.SpoolMaxBytes > 2*1024*1024*1024 {
err = fmt.Errorf("/general/spool max bytes can not be greater than 2 GiB")
return
}
if c.General.LineBufferBytes < 1 {
err = fmt.Errorf("/general/line buffer bytes must be greater than 1")
return
}
// Max line bytes can not be larger than spool max bytes
if c.General.MaxLineBytes > c.General.SpoolMaxBytes {
err = fmt.Errorf("/general/max line bytes can not be greater than /general/spool max bytes")
return
}
if c.General.Host == "" {
ret, err := os.Hostname()
if err == nil {
c.General.Host = ret
} else {
c.General.Host = defaultGeneralHost
log.Warning("Failed to determine the FQDN; using '%s'.", c.General.Host)
}
}
// TODO: Network method factory in publisher
if c.Network.Method == "" {
c.Network.Method = defaultNetworkMethod
}
if c.Network.Method != "random" && c.Network.Method != "failover" && c.Network.Method != "loadbalance" {
err = fmt.Errorf("The network method (/network/method) is not recognised: %s", c.Network.Method)
return
}
if len(c.Network.Servers) == 0 {
err = fmt.Errorf("No network servers were specified (/network/servers)")
return
}
servers := make(map[string]bool)
for _, server := range c.Network.Servers {
if _, exists := servers[server]; exists {
err = fmt.Errorf("The list of network servers (/network/servers) must be unique: %s appears multiple times", server)
return
}
servers[server] = true
}
servers = nil
if initFactories {
if registrarFunc, ok := registeredTransports[c.Network.Transport]; ok {
if c.Network.Factory, err = registrarFunc(c, "/network/", c.Network.Unused, c.Network.Transport); err != nil {
return
}
} else {
err = fmt.Errorf("Unrecognised transport '%s'", c.Network.Transport)
return
}
}
for k := range c.Files {
if len(c.Files[k].Paths) == 0 {
err = fmt.Errorf("No paths specified for /files[%d]/", k)
return
}
if err = c.initStreamConfig(fmt.Sprintf("/files[%d]", k), &c.Files[k].Stream, initFactories); err != nil {
return
}
}
if err = c.initStreamConfig("/stdin", &c.Stdin, initFactories); err != nil {
return
}
// Validate the registered configurables
for _, section := range c.Sections {
if err = section.Validate(); err != nil {
return
}
}
return
}
// initStreamConfig initialises a stream configuration by creating the necessary
// codec factories the harvesters will require
func (c *Config) initStreamConfig(path string, streamConfig *Stream, initFactories bool) (err error) {
if !initFactories {
// Currently only codec factory is initialised, so skip if we're not doing that
return nil
}
if len(streamConfig.Codecs) == 0 {
streamConfig.Codecs = []CodecStub{CodecStub{Name: defaultStreamCodec}}
}
for i := 0; i < len(streamConfig.Codecs); i++ {
codec := &streamConfig.Codecs[i]
if registrarFunc, ok := registeredCodecs[codec.Name]; ok {
if codec.Factory, err = registrarFunc(c, path, codec.Unused, codec.Name); err != nil {
return
}
} else {
return fmt.Errorf("Unrecognised codec '%s' for %s", codec.Name, path)
}
}
// TODO: EDGE CASE: Event transmit length is uint32, if fields length is rediculous we will fail
return nil
}
// Get returns the requested dynamic configuration entry
func (c *Config) Get(name string) interface{} {
ret, ok := c.Sections[name]
if !ok {
return nil
}
return ret
}
// RegisterConfigSection registers a new Section creator which will be used to
// create new sections that will be available via Get() in all created Config
// structures
func RegisterConfigSection(name string, creator SectionCreator) {
registeredSectionCreators[name] = creator
}