forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
outputs.go
116 lines (96 loc) · 2.75 KB
/
outputs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package outputs
import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/op"
"github.com/elastic/beats/libbeat/logp"
)
type Options struct {
Guaranteed bool
}
type Outputer interface {
// Publish event
PublishEvent(sig op.Signaler, opts Options, event common.MapStr) error
Close() error
}
type TopologyOutputer interface {
// Register the agent name and its IPs to the topology map
PublishIPs(name string, localAddrs []string) error
// Get the agent name with a specific IP from the topology map
GetNameByIP(ip string) string
}
// BulkOutputer adds BulkPublish to publish batches of events without looping.
// Outputers still might loop on events or use more efficient bulk-apis if present.
type BulkOutputer interface {
Outputer
BulkPublish(sig op.Signaler, opts Options, event []common.MapStr) error
}
// Create and initialize the output plugin
type OutputBuilder func(config *common.Config, topologyExpire int) (Outputer, error)
// Functions to be exported by a output plugin
type OutputInterface interface {
Outputer
TopologyOutputer
}
type OutputPlugin struct {
Name string
Config *common.Config
Output Outputer
}
type bulkOutputAdapter struct {
Outputer
}
var enabledOutputPlugins = make(map[string]OutputBuilder)
func RegisterOutputPlugin(name string, builder OutputBuilder) {
enabledOutputPlugins[name] = builder
}
func FindOutputPlugin(name string) OutputBuilder {
return enabledOutputPlugins[name]
}
func InitOutputs(
beatName string,
configs map[string]*common.Config,
topologyExpire int,
) ([]OutputPlugin, error) {
var plugins []OutputPlugin = nil
for name, plugin := range enabledOutputPlugins {
config, exists := configs[name]
if !exists {
continue
}
if !config.HasField("index") {
config.SetString("index", -1, beatName)
}
output, err := plugin(config, topologyExpire)
if err != nil {
logp.Err("failed to initialize %s plugin as output: %s", name, err)
return nil, err
}
plugin := OutputPlugin{Name: name, Config: config, Output: output}
plugins = append(plugins, plugin)
logp.Info("Activated %s as output plugin.", name)
}
return plugins, nil
}
// CastBulkOutputer casts out into a BulkOutputer if out implements
// the BulkOutputer interface. If out does not implement the interface an outputer
// wrapper implementing the BulkOutputer interface is returned.
func CastBulkOutputer(out Outputer) BulkOutputer {
if bo, ok := out.(BulkOutputer); ok {
return bo
}
return &bulkOutputAdapter{out}
}
func (b *bulkOutputAdapter) BulkPublish(
signal op.Signaler,
opts Options,
events []common.MapStr,
) error {
signal = op.SplitSignaler(signal, len(events))
for _, evt := range events {
err := b.PublishEvent(signal, opts, evt)
if err != nil {
return err
}
}
return nil
}