forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 1
/
outputs.go
147 lines (126 loc) · 3.61 KB
/
outputs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package outputs
import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)
type MothershipConfig struct {
Save_topology bool
Host string
Port int
Hosts []string
LoadBalance *bool
Protocol string
Username string
Password string
ProxyURL string `yaml:"proxy_url"`
Index string
Path string
Db int
Db_topology int
Timeout int
ReconnectInterval int `yaml:"reconnect_interval"`
Filename string `yaml:"filename"`
RotateEveryKb int `yaml:"rotate_every_kb"`
NumberOfFiles int `yaml:"number_of_files"`
DataType string
FlushInterval *int `yaml:"flush_interval"`
BulkMaxSize *int `yaml:"bulk_max_size"`
MaxRetries *int `yaml:"max_retries"`
Pretty *bool `yaml:"pretty"`
TLS *TLSConfig
Worker int
CompressionLevel *int `yaml:"compression_level"`
}
type Options struct {
Guaranteed bool
}
type Outputer interface {
// Publish event
PublishEvent(trans Signaler, opts Options, event common.MapStr) error
}
type TopologyOutputer interface {
// Register the agent name and its IPs to the topology map
PublishIPs(name string, localAddrs []string) error
// Get the agent name with a specific IP from the topology map
GetNameByIP(ip string) string
}
// BulkOutputer adds BulkPublish to publish batches of events without looping.
// Outputers still might loop on events or use more efficient bulk-apis if present.
type BulkOutputer interface {
Outputer
BulkPublish(trans Signaler, opts Options, event []common.MapStr) error
}
type OutputBuilder interface {
// Create and initialize the output plugin
NewOutput(
config *MothershipConfig,
topologyExpire int) (Outputer, error)
}
// Functions to be exported by a output plugin
type OutputInterface interface {
Outputer
TopologyOutputer
}
type OutputPlugin struct {
Name string
Config MothershipConfig
Output Outputer
}
type bulkOutputAdapter struct {
Outputer
}
var enabledOutputPlugins = make(map[string]OutputBuilder)
func RegisterOutputPlugin(name string, builder OutputBuilder) {
enabledOutputPlugins[name] = builder
}
func FindOutputPlugin(name string) OutputBuilder {
return enabledOutputPlugins[name]
}
func InitOutputs(
beatName string,
configs map[string]MothershipConfig,
topologyExpire int,
) ([]OutputPlugin, error) {
var plugins []OutputPlugin = nil
for name, plugin := range enabledOutputPlugins {
config, exists := configs[name]
if !exists {
continue
}
if config.Index == "" {
config.Index = beatName
}
output, err := plugin.NewOutput(&config, topologyExpire)
if err != nil {
logp.Err("failed to initialize %s plugin as output: %s", name, err)
return nil, err
}
plugin := OutputPlugin{Name: name, Config: config, Output: output}
plugins = append(plugins, plugin)
logp.Info("Activated %s as output plugin.", name)
}
return plugins, nil
}
// CastBulkOutputer casts out into a BulkOutputer if out implements
// the BulkOutputer interface. If out does not implement the interface an outputer
// wrapper implementing the BulkOutputer interface is returned.
func CastBulkOutputer(out Outputer) BulkOutputer {
if bo, ok := out.(BulkOutputer); ok {
return bo
}
return &bulkOutputAdapter{out}
}
func (b *bulkOutputAdapter) BulkPublish(
signal Signaler,
opts Options,
events []common.MapStr,
) error {
signal = NewSplitSignaler(signal, len(events))
for _, evt := range events {
err := b.PublishEvent(signal, opts, evt)
if err != nil {
return err
}
}
return nil
}