/
config.go
179 lines (160 loc) · 4.48 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
/*************************************************************************
* Copyright 2018 Gravwell, Inc. All rights reserved.
* Contact: <legal@gravwell.io>
*
* This software may be modified and distributed under the terms of the
* BSD 2-clause license. See the LICENSE file for details.
**************************************************************************/
package main
import (
"errors"
"fmt"
"strings"
"time"
"github.com/gravwell/gravwell/v4/ingest/attach"
"github.com/gravwell/gravwell/v4/ingest/config"
"github.com/gravwell/gravwell/v4/ingest/processors"
)
const (
defaultStateStore = `/opt/gravwell/etc/kinesis_ingest.state`
defaultLogFile = `/opt/gravwell/log/kinesis.log`
)
type bindType int
type readerType int
type global struct {
config.IngestConfig
State_Store_Location string
Credentials_Type string
AWS_Access_Key_ID string `json:"-"` // DO NOT send this when marshalling
AWS_Secret_Access_Key string `json:"-"` // DO NOT send this when marshalling
}
type streamDef struct {
Stream_Name string
Tag_Name string
Iterator_Type string
Region string
Assume_Local_Timezone bool
Timezone_Override string
Parse_Time bool
Metrics_Interval int // Seconds between metrics update, 0 disables
JSON_Metrics bool // set to true for json-output metrics instead of plaintext
Preprocessor []string
}
type cfgType struct {
Global global
Attach attach.AttachConfig
KinesisStream map[string]*streamDef
Preprocessor processors.ProcessorConfig
TimeFormat config.CustomTimeFormat
}
func GetConfig(path, overlayPath string) (*cfgType, error) {
var c cfgType
if err := config.LoadConfigFile(&c, path); err != nil {
return nil, err
} else if err = config.LoadConfigOverlays(&c, overlayPath); err != nil {
return nil, err
}
//initialize the state store location if its empty
if c.Global.State_Store_Location == `` {
c.Global.State_Store_Location = defaultStateStore
}
if c.Global.Log_File == `` {
c.Global.Log_File = defaultLogFile
}
if err := c.Verify(); err != nil {
return nil, err
}
return &c, nil
}
func (c cfgType) Verify() error {
if err := c.Global.IngestConfig.Verify(); err != nil {
return err
} else if err = c.Attach.Verify(); err != nil {
return err
}
if to, err := c.parseTimeout(); err != nil || to < 0 {
if err != nil {
return err
}
return errors.New("Invalid connection timeout")
}
if c.Global.Ingest_Secret == "" {
return errors.New("Ingest-Secret not specified")
}
//ensure there is at least one target
connCount := len(c.Global.Cleartext_Backend_Target) +
len(c.Global.Encrypted_Backend_Target) +
len(c.Global.Pipe_Backend_Target)
if connCount == 0 {
return errors.New("No backend targets specified")
}
if len(c.KinesisStream) == 0 {
return errors.New("At least one Kinesis stream required.")
}
if err := c.Preprocessor.Validate(); err != nil {
return err
} else if err = c.TimeFormat.Validate(); err != nil {
return err
}
for k, v := range c.KinesisStream {
if v == nil {
return fmt.Errorf("Kinesis stream %v config is nil", k)
}
if err := c.Preprocessor.CheckProcessors(v.Preprocessor); err != nil {
return fmt.Errorf("Kinesis stream %s preprocessor invalid: %v", k, err)
}
if v.Iterator_Type == `` {
// default to LATEST
v.Iterator_Type = "LATEST"
}
}
return nil
}
func (c *cfgType) Targets() ([]string, error) {
return c.Global.Targets()
}
func (c *cfgType) Tags() ([]string, error) {
var tags []string
tagMp := make(map[string]bool, 1)
for _, v := range c.KinesisStream {
if len(v.Tag_Name) == 0 {
continue
}
if _, ok := tagMp[v.Tag_Name]; !ok {
tags = append(tags, v.Tag_Name)
tagMp[v.Tag_Name] = true
}
}
if len(tags) == 0 {
return nil, errors.New("No tags specified")
}
return tags, nil
}
func (c *cfgType) IngestBaseConfig() config.IngestConfig {
return c.Global.IngestConfig
}
func (c *cfgType) AttachConfig() attach.AttachConfig {
return c.Attach
}
func (c *cfgType) VerifyRemote() bool {
return c.Global.Verify_Remote_Certificates
}
func (c *cfgType) Timeout() time.Duration {
if tos, _ := c.parseTimeout(); tos > 0 {
return tos
}
return 0
}
func (c *cfgType) Secret() string {
return c.Global.Ingest_Secret
}
func (c *cfgType) LogLevel() string {
return c.Global.Log_Level
}
func (c *cfgType) parseTimeout() (time.Duration, error) {
tos := strings.TrimSpace(c.Global.Connection_Timeout)
if len(tos) == 0 {
return 0, nil
}
return time.ParseDuration(tos)
}