forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 9
/
matchers.go
155 lines (124 loc) · 3.4 KB
/
matchers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package add_kubernetes_metadata
import (
"fmt"
"sync"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/codec"
"github.com/elastic/beats/libbeat/outputs/codec/format"
)
const (
FieldMatcherName = "fields"
FieldFormatMatcherName = "field_format"
)
// Matcher takes a new event and returns the index
type Matcher interface {
// MetadataIndex returns the index string to use in annotation lookups for the given
// event. A previous indexer should have generated that index for this to work
// This function can return "" if the event doesn't match
MetadataIndex(event common.MapStr) string
}
type Matchers struct {
sync.RWMutex
matchers []Matcher
}
type MatcherConstructor func(config common.Config) (Matcher, error)
func NewMatchers(configs PluginConfig) *Matchers {
matchers := []Matcher{}
for _, pluginConfigs := range configs {
for name, pluginConfig := range pluginConfigs {
matchFunc := Indexing.GetMatcher(name)
if matchFunc == nil {
logp.Warn("Unable to find matcher plugin %s", name)
continue
}
matcher, err := matchFunc(pluginConfig)
if err != nil {
logp.Warn("Unable to initialize matcher plugin %s due to error %v", name, err)
}
matchers = append(matchers, matcher)
}
}
return &Matchers{
matchers: matchers,
}
}
// MetadataIndex returns the index string for the first matcher from the Registry returning one
func (m *Matchers) MetadataIndex(event common.MapStr) string {
m.RLock()
defer m.RUnlock()
for _, matcher := range m.matchers {
index := matcher.MetadataIndex(event)
if index != "" {
return index
}
}
// No index returned
return ""
}
func (m *Matchers) Empty() bool {
if len(m.matchers) == 0 {
return true
}
return false
}
type FieldMatcher struct {
MatchFields []string
}
func NewFieldMatcher(cfg common.Config) (Matcher, error) {
config := struct {
LookupFields []string `config:"lookup_fields"`
}{}
err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the `lookup_fields` configuration: %s", err)
}
if len(config.LookupFields) == 0 {
return nil, fmt.Errorf("lookup_fields can not be empty")
}
return &FieldMatcher{MatchFields: config.LookupFields}, nil
}
func (f *FieldMatcher) MetadataIndex(event common.MapStr) string {
for _, field := range f.MatchFields {
keyIface, err := event.GetValue(field)
if err == nil {
key, ok := keyIface.(string)
if ok {
return key
}
}
}
return ""
}
type FieldFormatMatcher struct {
Codec codec.Codec
}
func NewFieldFormatMatcher(cfg common.Config) (Matcher, error) {
config := struct {
Format string `config:"format"`
}{}
err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the `format` configuration of `field_format` matcher: %s", err)
}
if config.Format == "" {
return nil, fmt.Errorf("`format` of `field_format` matcher can't be empty")
}
return &FieldFormatMatcher{
Codec: format.New(fmtstr.MustCompileEvent(config.Format)),
}, nil
}
func (f *FieldFormatMatcher) MetadataIndex(event common.MapStr) string {
bytes, err := f.Codec.Encode("", &beat.Event{
Fields: event,
})
if err != nil {
logp.Debug("kubernetes", "Unable to apply field format pattern on event")
}
if len(bytes) == 0 {
return ""
}
return string(bytes)
}