forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
eventlogger.go
128 lines (108 loc) · 2.87 KB
/
eventlogger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package beater
import (
"time"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/winlogbeat/checkpoint"
"github.com/elastic/beats/winlogbeat/eventlog"
)
type eventLogger struct {
source eventlog.EventLog
eventMeta common.EventMetadata
processors beat.ProcessorList
}
type eventLoggerConfig struct {
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Processors processors.PluginConfig `config:"processors"`
}
func newEventLogger(
source eventlog.EventLog,
options *common.Config,
) (*eventLogger, error) {
config := eventLoggerConfig{}
if err := options.Unpack(&config); err != nil {
return nil, err
}
processors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}
return &eventLogger{
source: source,
eventMeta: config.EventMetadata,
processors: processors,
}, nil
}
func (e *eventLogger) connect(pipeline beat.Pipeline) (beat.Client, error) {
api := e.source.Name()
return pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: e.eventMeta,
Meta: nil, // TODO: configure modules/ES ingest pipeline?
Processor: e.processors,
ACKCount: func(n int) {
addPublished(api, n)
logp.Info("EventLog[%s] successfully published %d events", api, n)
},
})
}
func (e *eventLogger) run(
done <-chan struct{},
pipeline beat.Pipeline,
state checkpoint.EventLogState,
) {
api := e.source
// Initialize per event log metrics.
initMetrics(api.Name())
client, err := e.connect(pipeline)
if err != nil {
logp.Warn("EventLog[%s] Pipeline error. Failed to connect to publisher pipeline",
api.Name())
return
}
// close client on function return or when `done` is triggered (unblock client)
defer client.Close()
go func() {
<-done
client.Close()
}()
err = api.Open(state.RecordNumber)
if err != nil {
logp.Warn("EventLog[%s] Open() error. No events will be read from "+
"this source. %v", api.Name(), err)
return
}
defer func() {
logp.Info("EventLog[%s] Stop processing.", api.Name())
if err := api.Close(); err != nil {
logp.Warn("EventLog[%s] Close() error. %v", api.Name(), err)
return
}
}()
debugf("EventLog[%s] opened successfully", api.Name())
for {
select {
case <-done:
return
default:
}
// Read from the event.
records, err := api.Read()
if err != nil {
logp.Warn("EventLog[%s] Read() error: %v", api.Name(), err)
break
}
debugf("EventLog[%s] Read() returned %d records", api.Name(), len(records))
if len(records) == 0 {
// TODO: Consider implementing notifications using
// NotifyChangeEventLog instead of polling.
time.Sleep(time.Second)
continue
}
for _, lr := range records {
client.Publish(lr.ToEvent())
}
}
}