-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
mb.go
297 lines (248 loc) · 10 KB
/
mb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
/*
Package mb (short for Metricbeat) contains the public interfaces that are used
to implement Modules and their associated MetricSets.
*/
package mb
import (
"fmt"
"time"
"github.com/elastic/beats/libbeat/common"
)
const (
// TimestampKey is the key used in events created by MetricSets to add their
// own timestamp to an event. If a timestamp is not specified then the that
// the fetch started will be used.
TimestampKey string = "@timestamp"
// ModuleDataKey is the key used in events created by MetricSets to add data
// to an event that is common to the module. The data must be a
// common.MapStr and when the final event is built the object will be stored
// in the event under a key that is the module name.
ModuleDataKey string = "_module"
// NamespaceKey is used to define a different namespace for the metricset
// This is useful for dynamic metricsets or metricsets which do not
// put the name under the same name as the package. This is for example
// the case in elasticsearch `node_stats` which puts the data under `node.stats`.
NamespaceKey string = "_namespace"
// RTTKey is used by a MetricSet to specify the round trip time (RTT), or
// total amount of time, taken to collect the information in the event. The
// data must be of type time.Duration otherwise the value is ignored.
RTTKey string = "_rtt"
)
// Module interfaces
// Module is the common interface for all Module implementations.
type Module interface {
Name() string // Name returns the name of the Module.
Config() ModuleConfig // Config returns the ModuleConfig used to create the Module.
UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object.
}
// BaseModule implements the Module interface.
//
// When a Module needs to store additional data or provide methods to its
// MetricSets, it can embed this type into another struct to satisfy the
// Module interface requirements.
type BaseModule struct {
name string
config ModuleConfig
rawConfig *common.Config
}
func (m *BaseModule) String() string {
return fmt.Sprintf(`{name:"%v", config:%v}`, m.name, m.config.String())
}
func (m *BaseModule) GoString() string { return m.String() }
// Name returns the name of the Module.
func (m *BaseModule) Name() string { return m.name }
// Config returns the ModuleConfig used to create the Module.
func (m *BaseModule) Config() ModuleConfig { return m.config }
// UnpackConfig unpacks the raw module config to the given object.
func (m *BaseModule) UnpackConfig(to interface{}) error {
return m.rawConfig.Unpack(to)
}
// MetricSet interfaces
// MetricSet is the common interface for all MetricSet implementations. In
// addition to this interface, all MetricSets must implement either
// EventFetcher or EventsFetcher (but not both).
type MetricSet interface {
Name() string // Name returns the name of the MetricSet.
Module() Module // Module returns the parent Module for the MetricSet.
Host() string // Host returns a hostname or other module specific value
// that identifies a specific host or service instance from which to collect
// metrics.
HostData() HostData // HostData returns the parsed host data.
Registration() MetricSetRegistration // Params used in registration.
}
// Closer is an optional interface that a MetricSet can implement in order to
// cleanup any resources it has open at shutdown.
type Closer interface {
Close() error
}
// EventFetcher is a MetricSet that returns a single event when collecting data.
// Use ReportingMetricSet for new MetricSet implementations.
type EventFetcher interface {
MetricSet
Fetch() (common.MapStr, error)
}
// EventsFetcher is a MetricSet that returns a multiple events when collecting
// data. Use ReportingMetricSet for new MetricSet implementations.
type EventsFetcher interface {
MetricSet
Fetch() ([]common.MapStr, error)
}
// Reporter is used by a MetricSet to report events, errors, or errors with
// metadata. The methods return false if and only if publishing failed because
// the MetricSet is being closed.
//
// Deprecated: Use ReporterV2.
type Reporter interface {
Event(event common.MapStr) bool // Event reports a single successful event.
ErrorWith(err error, meta common.MapStr) bool // ErrorWith reports a single error event with the additional metadata.
Error(err error) bool // Error reports a single error event.
}
// ReportingMetricSet is a MetricSet that reports events or errors through the
// Reporter interface. Fetch is called periodically to collect events.
//
// Deprecated: Use ReportingMetricSetV2.
type ReportingMetricSet interface {
MetricSet
Fetch(r Reporter)
}
// PushReporter is used by a MetricSet to report events, errors, or errors with
// metadata. It provides a done channel used to signal that reporter should
// stop.
//
// Deprecated: Use PushReporterV2.
type PushReporter interface {
Reporter
// Done returns a channel that's closed when work done on behalf of this
// reporter should be canceled.
Done() <-chan struct{}
}
// PushMetricSet is a MetricSet that pushes events (rather than pulling them
// periodically via a Fetch callback). Run is invoked to start the event
// subscription and it should block until the MetricSet is ready to stop or
// the PushReporter's done channel is closed.
//
// Deprecated: Use PushMetricSetV2.
type PushMetricSet interface {
MetricSet
Run(r PushReporter)
}
// V2 Interfaces
// ReporterV2 is used by a MetricSet to report Events. The methods return false
// if and only if publishing failed because the MetricSet is being closed.
type ReporterV2 interface {
Event(event Event) bool // Event reports a single successful event.
Error(err error) bool
}
// PushReporterV2 is used by a MetricSet to report events, errors, or errors with
// metadata. It provides a done channel used to signal that reporter should
// stop.
type PushReporterV2 interface {
ReporterV2
// Done returns a channel that's closed when work done on behalf of this
// reporter should be canceled.
Done() <-chan struct{}
}
// ReportingMetricSetV2 is a MetricSet that reports events or errors through the
// ReporterV2 interface. Fetch is called periodically to collect events.
type ReportingMetricSetV2 interface {
MetricSet
Fetch(r ReporterV2)
}
// PushMetricSetV2 is a MetricSet that pushes events (rather than pulling them
// periodically via a Fetch callback). Run is invoked to start the event
// subscription and it should block until the MetricSet is ready to stop or
// the PushReporterV2's done channel is closed.
type PushMetricSetV2 interface {
MetricSet
Run(r PushReporterV2)
}
// HostData contains values parsed from the 'host' configuration. Other
// configuration data like protocols, usernames, and passwords may also be
// used to construct this HostData data.
type HostData struct {
URI string // The full URI that should be used in connections.
SanitizedURI string // A sanitized version of the URI without credentials.
// Parts of the URI.
Host string // The host and possibly port.
User string // Username
Password string // Password
}
func (h HostData) String() string {
return fmt.Sprintf(`{SanitizedURI:"%v", Host:"%v"}`, h.SanitizedURI, h.Host)
}
func (h HostData) GoString() string { return h.String() }
// BaseMetricSet implements the MetricSet interface.
//
// The BaseMetricSet type can be embedded into another struct to satisfy the
// MetricSet interface requirements, leaving only the Fetch() method to be
// implemented to have a complete MetricSet implementation.
type BaseMetricSet struct {
name string
module Module
host string
hostData HostData
registration MetricSetRegistration
}
func (b *BaseMetricSet) String() string {
moduleName := "nil"
if b.module != nil {
moduleName = b.module.Name()
}
return fmt.Sprintf(`{name:"%v", module:"%v", hostData:%v, registration:%v}`,
b.name, moduleName, b.hostData.String(), b.registration)
}
func (b *BaseMetricSet) GoString() string { return b.String() }
// Name returns the name of the MetricSet. It should not include the name of
// the module.
func (b *BaseMetricSet) Name() string {
return b.name
}
// Module returns the parent Module for the MetricSet.
func (b *BaseMetricSet) Module() Module {
return b.module
}
// Host returns the hostname or other module specific value that identifies a
// specific host or service instance from which to collect metrics.
func (b *BaseMetricSet) Host() string {
return b.host
}
// HostData returns the parsed host data.
func (b *BaseMetricSet) HostData() HostData {
return b.hostData
}
// Registration returns the parameters that were used when the MetricSet was
// registered with the registry.
func (b *BaseMetricSet) Registration() MetricSetRegistration {
return b.registration
}
// Configuration types
// ModuleConfig is the base configuration data for all Modules.
//
// The Raw config option is used to enable raw fields in a metricset. This means
// the metricset fetches not only the predefined fields but add alls raw data under
// the raw namespace to the event.
type ModuleConfig struct {
Hosts []string `config:"hosts"`
Period time.Duration `config:"period" validate:"positive"`
Timeout time.Duration `config:"timeout" validate:"positive"`
Module string `config:"module" validate:"required"`
MetricSets []string `config:"metricsets"`
Enabled bool `config:"enabled"`
Raw bool `config:"raw"`
}
func (c ModuleConfig) String() string {
return fmt.Sprintf(`{Module:"%v", MetricSets:%v, Enabled:%v, `+
`Hosts:[%v hosts], Period:"%v", Timeout:"%v", Raw:%v}`,
c.Module, c.MetricSets, c.Enabled, len(c.Hosts), c.Period, c.Timeout,
c.Raw)
}
func (c ModuleConfig) GoString() string { return c.String() }
// defaultModuleConfig contains the default values for ModuleConfig instances.
var defaultModuleConfig = ModuleConfig{
Enabled: true,
Period: time.Second * 10,
}
// DefaultModuleConfig returns a ModuleConfig with the default values populated.
func DefaultModuleConfig() ModuleConfig {
return defaultModuleConfig
}