-
Notifications
You must be signed in to change notification settings - Fork 0
/
module.go
132 lines (108 loc) · 3.07 KB
/
module.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package pipeline
import (
"errors"
"flag"
"fmt"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/queue"
)
// Global pipeline module for loading the main pipeline from a configuration object
// command line flags
var publishDisabled = false
const defaultQueueType = "mem"
func init() {
flag.BoolVar(&publishDisabled, "N", false, "Disable actual publishing for testing")
}
// Load uses a Config object to create a new complete Pipeline instance with
// configured queue and outputs.
func Load(
beatInfo beat.Info,
config Config,
outcfg common.ConfigNamespace,
) (*Pipeline, error) {
if publishDisabled {
logp.Info("Dry run mode. All output types except the file based one are disabled.")
}
processors, err := processors.New(config.Processors)
if err != nil {
return nil, fmt.Errorf("error initializing processors: %v", err)
}
reg := monitoring.Default.GetRegistry("libbeat")
if reg == nil {
reg = monitoring.Default.NewRegistry("libbeat")
}
name := beatInfo.Name
settings := Settings{
WaitClose: 0,
WaitCloseMode: NoWaitOnClose,
Disabled: publishDisabled,
Processors: processors,
Annotations: Annotations{
Event: config.EventMetadata,
Beat: common.MapStr{
"name": name,
"hostname": beatInfo.Hostname,
"version": beatInfo.Version,
},
},
}
queueBuilder, err := createQueueBuilder(config.Queue)
if err != nil {
return nil, err
}
out, err := loadOutput(beatInfo, reg, outcfg)
if err != nil {
return nil, err
}
p, err := New(reg, queueBuilder, out, settings)
if err != nil {
return nil, err
}
logp.Info("Publisher name: %s", name)
return p, err
}
func loadOutput(
beatInfo beat.Info,
reg *monitoring.Registry,
outcfg common.ConfigNamespace,
) (outputs.Group, error) {
if publishDisabled {
return outputs.Group{}, nil
}
if !outcfg.IsSet() {
msg := "No outputs are defined. Please define one under the output section."
logp.Info(msg)
return outputs.Fail(errors.New(msg))
}
// TODO: add support to unload/reassign outStats on output reloading
outReg := reg.NewRegistry("output")
outStats := outputs.MakeStats(outReg)
out, err := outputs.Load(beatInfo, &outStats, outcfg.Name(), outcfg.Config())
if err != nil {
return outputs.Fail(err)
}
monitoring.NewString(outReg, "type").Set(outcfg.Name())
return out, nil
}
func createQueueBuilder(config common.ConfigNamespace) (func(queue.Eventer) (queue.Queue, error), error) {
queueType := defaultQueueType
if b := config.Name(); b != "" {
queueType = b
}
queueFactory := queue.FindFactory(queueType)
if queueFactory == nil {
return nil, fmt.Errorf("'%v' is no valid queue type", queueType)
}
queueConfig := config.Config()
if queueConfig == nil {
queueConfig = common.NewConfig()
}
return func(eventer queue.Eventer) (queue.Queue, error) {
return queueFactory(eventer, queueConfig)
}, nil
}