forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 7
/
file.go
136 lines (107 loc) · 2.79 KB
/
file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package fileout
import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/codec"
"github.com/elastic/beats/libbeat/publisher"
)
func init() {
outputs.RegisterType("file", makeFileout)
}
type fileOutput struct {
beat beat.Info
stats *outputs.Stats
rotator logp.FileRotator
codec codec.Codec
}
// New instantiates a new file output instance.
func makeFileout(
beat beat.Info,
stats *outputs.Stats,
cfg *common.Config,
) (outputs.Group, error) {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return outputs.Fail(err)
}
// disable bulk support in publisher pipeline
cfg.SetInt("bulk_max_size", -1, -1)
fo := &fileOutput{beat: beat, stats: stats}
if err := fo.init(beat, config); err != nil {
return outputs.Fail(err)
}
return outputs.Success(-1, 0, fo)
}
func (out *fileOutput) init(beat beat.Info, config config) error {
var err error
out.rotator.Path = config.Path
out.rotator.Name = config.Filename
if out.rotator.Name == "" {
out.rotator.Name = out.beat.Beat
}
enc, err := codec.CreateEncoder(beat, config.Codec)
if err != nil {
return err
}
out.codec = enc
logp.Info("File output path set to: %v", out.rotator.Path)
logp.Info("File output base filename set to: %v", out.rotator.Name)
rotateeverybytes := uint64(config.RotateEveryKb) * 1024
logp.Info("Rotate every bytes set to: %v", rotateeverybytes)
out.rotator.RotateEveryBytes = &rotateeverybytes
keepfiles := config.NumberOfFiles
logp.Info("Number of files set to: %v", keepfiles)
out.rotator.KeepFiles = &keepfiles
err = out.rotator.CreateDirectory()
if err != nil {
return err
}
err = out.rotator.CheckIfConfigSane()
if err != nil {
return err
}
return nil
}
// Implement Outputer
func (out *fileOutput) Close() error {
return nil
}
func (out *fileOutput) Publish(
batch publisher.Batch,
) error {
defer batch.ACK()
st := out.stats
events := batch.Events()
st.NewBatch(len(events))
dropped := 0
for i := range events {
event := &events[i]
serializedEvent, err := out.codec.Encode(out.beat.Beat, &event.Content)
if err != nil {
if event.Guaranteed() {
logp.Critical("Failed to serialize the event: %v", err)
} else {
logp.Warn("Failed to serialize the event: %v", err)
}
dropped++
continue
}
err = out.rotator.WriteLine(serializedEvent)
if err != nil {
st.WriteError()
if event.Guaranteed() {
logp.Critical("Writing event to file failed with: %v", err)
} else {
logp.Warn("Writing event to file failed with: %v", err)
}
dropped++
continue
}
st.WriteBytes(len(serializedEvent) + 1)
}
st.Dropped(dropped)
st.Acked(len(events) - dropped)
return nil
}