forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
242 lines (207 loc) · 6.59 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
package publisher
import (
"errors"
"expvar"
"sync/atomic"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/op"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
)
// Metrics that can retrieved through the expvar web interface.
var (
publishedEvents = expvar.NewInt("libbeat.publisher.published_events")
)
var (
ErrClientClosed = errors.New("client closed")
)
// Client is used by beats to publish new events.
//
// The publish methods add fields that are common to all events. Both methods
// add the 'beat' field that contains name and hostname. Also they add 'tags'
// and 'fields'.
//
// Event publishers can override the default index for an event by adding a
// 'beat' field whose value is a common.MapStr that contains an 'index' field
// specifying the destination index.
//
// event := common.MapStr{
// // Setting a custom index for a single event.
// "beat": common.MapStr{"index": "custom-index"},
// }
//
// Event publishers can add fields and tags to an event. The fields will take
// precedence over the global fields defined in the shipper configuration.
//
// event := common.MapStr{
// // Add custom fields to the root of the event.
// common.EventMetadataKey: common.EventMetadata{
// UnderRoot: true,
// Fields: common.MapStr{"env": "production"}
// }
// }
type Client interface {
// Close disconnects the Client from the publisher pipeline.
Close() error
// PublishEvent publishes one event with given options. If Sync option is set,
// PublishEvent will block until output plugins report success or failure state
// being returned by this method.
PublishEvent(event common.MapStr, opts ...ClientOption) bool
// PublishEvents publishes multiple events with given options. If Guaranteed
// option is set, PublishEvent will block until output plugins report
// success or failure state being returned by this method.
PublishEvents(events []common.MapStr, opts ...ClientOption) bool
}
type client struct {
canceler *op.Canceler
publisher *BeatPublisher
beatMeta common.MapStr // Beat metadata that is added to all events.
globalEventMetadata common.EventMetadata // Fields and tags that are added to all events.
}
func newClient(pub *BeatPublisher) *client {
c := &client{
canceler: op.NewCanceler(),
publisher: pub,
beatMeta: common.MapStr{
"name": pub.name,
"hostname": pub.hostname,
"version": pub.version,
},
globalEventMetadata: pub.globalEventMetadata,
}
return c
}
func (c *client) Close() error {
if c == nil {
return nil
}
c.canceler.Cancel()
// atomic decrement clients counter
atomic.AddUint32(&c.publisher.numClients, ^uint32(0))
return nil
}
func (c *client) PublishEvent(event common.MapStr, opts ...ClientOption) bool {
c.annotateEvent(event)
publishEvent := c.filterEvent(event)
if publishEvent == nil {
return false
}
var values *outputs.Values
meta, ctx, pipeline := c.getPipeline(opts)
if len(meta) != 0 {
if len(meta) != 1 {
logp.Debug("publish", "too many metadata, pick first")
meta = meta[:1]
}
values = outputs.ValuesWithMetadata(nil, meta[0])
}
publishedEvents.Add(1)
return pipeline.publish(message{
client: c,
context: ctx,
datum: outputs.Data{Event: *publishEvent, Values: values},
})
}
func (c *client) PublishEvents(events []common.MapStr, opts ...ClientOption) bool {
var valuesAll *outputs.Values
meta, ctx, pipeline := c.getPipeline(opts)
if len(meta) != 0 && len(events) != len(meta) {
if len(meta) != 1 {
logp.Debug("publish",
"Number of metadata elements does not match number of events => dropping metadata")
meta = nil
} else {
valuesAll = outputs.ValuesWithMetadata(nil, meta[0])
meta = nil
}
}
data := make([]outputs.Data, 0, len(events))
for i, event := range events {
c.annotateEvent(event)
publishEvent := c.filterEvent(event)
if publishEvent == nil {
continue
}
evt := outputs.Data{Event: *publishEvent, Values: valuesAll}
if meta != nil {
if m := meta[i]; m != nil {
evt.Values = outputs.ValuesWithMetadata(valuesAll, meta[i])
}
}
data = append(data, evt)
}
if len(data) == 0 {
logp.Debug("filter", "No events to publish")
return true
}
publishedEvents.Add(int64(len(data)))
return pipeline.publish(message{client: c, context: ctx, data: data})
}
// annotateEvent adds fields that are common to all events. This adds the 'beat'
// field that contains name and hostname. It also adds 'tags' and 'fields'. See
// the documentation for Client for more information.
func (c *client) annotateEvent(event common.MapStr) {
// Allow an event to override the destination index for an event by setting
// beat.index in an event.
beatMeta := c.beatMeta
if beatIfc, ok := event["beat"]; ok {
ms, ok := beatIfc.(common.MapStr)
if ok {
// Copy beatMeta so the defaults are not changed.
beatMeta = common.MapStrUnion(beatMeta, ms)
}
}
event["beat"] = beatMeta
// Add the global tags and fields defined under shipper.
common.AddTags(event, c.globalEventMetadata.Tags)
common.MergeFields(event, c.globalEventMetadata.Fields, c.globalEventMetadata.FieldsUnderRoot)
// Add the event specific fields last so that they precedence over globals.
if metaIfc, ok := event[common.EventMetadataKey]; ok {
eventMetadata, ok := metaIfc.(common.EventMetadata)
if ok {
common.AddTags(event, eventMetadata.Tags)
common.MergeFields(event, eventMetadata.Fields, eventMetadata.FieldsUnderRoot)
}
delete(event, common.EventMetadataKey)
}
}
func (c *client) filterEvent(event common.MapStr) *common.MapStr {
if event = common.ConvertToGenericEvent(event); event == nil {
logp.Err("fail to convert to a generic event")
return nil
}
// process the event by applying the configured actions
publishEvent := c.publisher.Processors.Run(event)
if publishEvent == nil {
// the event is dropped
logp.Debug("publish", "Drop event %s", event.StringToPrint())
return nil
}
if logp.IsDebug("publish") {
logp.Debug("publish", "Publish: %s", publishEvent.StringToPrint())
}
return &publishEvent
}
func (c *client) getPipeline(opts []ClientOption) ([]common.MapStr, Context, pipeline) {
values, ctx := MakeContext(opts)
if ctx.Sync {
return values, ctx, c.publisher.pipelines.sync
}
return values, ctx, c.publisher.pipelines.async
}
func MakeContext(opts []ClientOption) ([]common.MapStr, Context) {
var ctx Context
var meta []common.MapStr
for _, opt := range opts {
var m []common.MapStr
m, ctx = opt(ctx)
if m != nil {
if meta == nil {
meta = m
} else {
meta = append(meta, m...)
}
}
}
return meta, ctx
}