forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathregistry.go
300 lines (245 loc) · 8.96 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
package mb
import (
"fmt"
"sort"
"strings"
"sync"
"github.com/elastic/beats/libbeat/logp"
)
const initialSize = 20 // initialSize specifies the initial size of the Register.
// Registry is the singleton Register instance where all ModuleFactory's and
// MetricSetFactory's should be registered.
var Registry = NewRegister()
// DefaultModuleFactory returns the given BaseModule and never returns an error.
// If a MetricSets are registered without an associated ModuleFactory, then
// the DefaultModuleFactory will be used to instantiate a Module.
var DefaultModuleFactory = func(base BaseModule) (Module, error) {
return &base, nil
}
// ModuleFactory accepts a BaseModule and returns a Module. If there was an
// error creating the Module then an error will be returned.
type ModuleFactory func(base BaseModule) (Module, error)
// MetricSetFactory accepts a BaseMetricSet and returns a MetricSet. If there
// was an error creating the MetricSet then an error will be returned. The
// returned MetricSet must also implement either EventFetcher or EventsFetcher
// (but not both).
type MetricSetFactory func(base BaseMetricSet) (MetricSet, error)
// HostParser is a function that parses a host value from the configuration
// and returns a HostData object. The module is provided in case additional
// configuration values are required to parse and build the HostData object.
// An error should be returned if the host or configuration is invalid.
type HostParser func(module Module, host string) (HostData, error)
// MetricSetRegistration contains the parameters that were used to register
// a MetricSet.
type MetricSetRegistration struct {
Name string
Factory MetricSetFactory
// Options
IsDefault bool
HostParser HostParser
Namespace string
}
// MetricSetOption sets an option for a MetricSetFactory that is being
// registered.
type MetricSetOption func(info *MetricSetRegistration)
// WithHostParser specifies the HostParser that should be used with the
// MetricSet.
func WithHostParser(p HostParser) MetricSetOption {
return func(r *MetricSetRegistration) {
r.HostParser = p
}
}
// DefaultMetricSet specifies that the MetricSetFactory will be the default
// when no MetricSet names are specified in the configuration.
func DefaultMetricSet() MetricSetOption {
return func(r *MetricSetRegistration) {
r.IsDefault = true
}
}
// WithNamespace specifies the fully qualified namespace under which MetricSet
// data will be added. If no namespace is specified then [module].[metricset]
// will be used.
func WithNamespace(namespace string) MetricSetOption {
return func(r *MetricSetRegistration) {
r.Namespace = namespace
}
}
// Register contains the factory functions for creating new Modules and new
// MetricSets. Registers are thread safe for concurrent usage.
type Register struct {
// Lock to control concurrent read/writes
lock sync.RWMutex
// A map of module name to ModuleFactory.
modules map[string]ModuleFactory
// A map of module name to nested map of MetricSet name to MetricSetRegistration.
metricSets map[string]map[string]MetricSetRegistration
}
// NewRegister creates and returns a new Register.
func NewRegister() *Register {
return &Register{
modules: make(map[string]ModuleFactory, initialSize),
metricSets: make(map[string]map[string]MetricSetRegistration, initialSize),
}
}
// AddModule registers a new ModuleFactory. An error is returned if the
// name is empty, factory is nil, or if a factory has already been registered
// under the name.
func (r *Register) AddModule(name string, factory ModuleFactory) error {
r.lock.Lock()
defer r.lock.Unlock()
if name == "" {
return fmt.Errorf("module name is required")
}
name = strings.ToLower(name)
_, exists := r.modules[name]
if exists {
return fmt.Errorf("module '%s' is already registered", name)
}
if factory == nil {
return fmt.Errorf("module '%s' cannot be registered with a nil factory", name)
}
r.modules[name] = factory
logp.Info("Module registered: %s", name)
return nil
}
// AddMetricSet registers a new MetricSetFactory. Optionally it accepts a single
// HostParser function for parsing the 'host' configuration data. An error is
// returned if any parameter is empty or nil or if a factory has already been
// registered under the name.
//
// Use MustAddMetricSet for new code.
func (r *Register) AddMetricSet(module string, name string, factory MetricSetFactory, hostParser ...HostParser) error {
var opts []MetricSetOption
if len(hostParser) > 0 {
opts = append(opts, WithHostParser(hostParser[0]))
}
return r.addMetricSet(module, name, factory, opts...)
}
// MustAddMetricSet registers a new MetricSetFactory. It panics if any parameter
// is empty or nil OR if a factory has already been registered under this name.
func (r *Register) MustAddMetricSet(module, name string, factory MetricSetFactory, options ...MetricSetOption) {
if err := r.addMetricSet(module, name, factory, options...); err != nil {
panic(err)
}
}
// addMetricSet registers a new MetricSetFactory. An error is returned if any
// parameter is empty or nil or if a factory has already been registered under
// the name.
func (r *Register) addMetricSet(module, name string, factory MetricSetFactory, options ...MetricSetOption) error {
r.lock.Lock()
defer r.lock.Unlock()
if module == "" {
return fmt.Errorf("module name is required")
}
if name == "" {
return fmt.Errorf("metricset name is required")
}
module = strings.ToLower(module)
name = strings.ToLower(name)
if metricsets, ok := r.metricSets[module]; !ok {
r.metricSets[module] = map[string]MetricSetRegistration{}
} else if _, exists := metricsets[name]; exists {
return fmt.Errorf("metricset '%s/%s' is already registered", module, name)
}
if factory == nil {
return fmt.Errorf("metricset '%s/%s' cannot be registered with a nil factory", module, name)
}
// Set the options.
msInfo := MetricSetRegistration{Name: name, Factory: factory}
for _, opt := range options {
opt(&msInfo)
}
r.metricSets[module][name] = msInfo
logp.Info("MetricSet registered: %s/%s", module, name)
return nil
}
// moduleFactory returns the registered ModuleFactory associated with the
// given name. It returns nil if no ModuleFactory is registered.
func (r *Register) moduleFactory(name string) ModuleFactory {
r.lock.RLock()
defer r.lock.RUnlock()
return r.modules[strings.ToLower(name)]
}
// metricSetRegistration returns the registration data associated with the given
// metricset name. It returns an error if no metricset is registered.
func (r *Register) metricSetRegistration(module, name string) (MetricSetRegistration, error) {
r.lock.RLock()
defer r.lock.RUnlock()
module = strings.ToLower(module)
name = strings.ToLower(name)
metricSets, exists := r.metricSets[module]
if !exists {
return MetricSetRegistration{}, fmt.Errorf("metricset '%s/%s' is not registered, module not found", module, name)
}
registration, exists := metricSets[name]
if !exists {
return MetricSetRegistration{}, fmt.Errorf("metricset '%s/%s' is not registered, metricset not found", module, name)
}
return registration, nil
}
// defaultMetricSets returns the names of the default MetricSets for a module.
// An error is returned if no default MetricSet is declared or the module does
// not exist.
func (r *Register) defaultMetricSets(module string) ([]string, error) {
r.lock.RLock()
defer r.lock.RUnlock()
module = strings.ToLower(module)
metricSets, exists := r.metricSets[module]
if !exists {
return nil, fmt.Errorf("module '%s' not found", module)
}
var defaults []string
for _, reg := range metricSets {
if reg.IsDefault {
defaults = append(defaults, reg.Name)
}
}
if len(defaults) == 0 {
return nil, fmt.Errorf("no default metricset exists for module '%s'", module)
}
return defaults, nil
}
// Modules returns the list of module names that are registered
func (r *Register) Modules() []string {
r.lock.RLock()
defer r.lock.RUnlock()
modules := make([]string, 0, len(r.modules))
for module := range r.modules {
modules = append(modules, module)
}
return modules
}
// MetricSets returns the list of MetricSets registered for a given module
func (r *Register) MetricSets(module string) []string {
r.lock.RLock()
defer r.lock.RUnlock()
var metricsets []string
sets, ok := r.metricSets[strings.ToLower(module)]
if ok {
metricsets = make([]string, 0, len(sets))
for name := range sets {
metricsets = append(metricsets, name)
}
}
return metricsets
}
// String return a string representation of the registered ModuleFactory's and
// MetricSetFactory's.
func (r *Register) String() string {
r.lock.RLock()
defer r.lock.RUnlock()
var modules []string
for module := range r.modules {
modules = append(modules, module)
}
sort.Strings(modules)
var metricSets []string
for module, m := range r.metricSets {
for name := range m {
metricSets = append(metricSets, fmt.Sprintf("%s/%s", module, name))
}
}
sort.Strings(metricSets)
return fmt.Sprintf("Register [ModuleFactory:[%s], MetricSetFactory:[%s]]",
strings.Join(modules, ", "), strings.Join(metricSets, ", "))
}