forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
file.go
139 lines (109 loc) · 2.93 KB
/
file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package fileout
import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/codec"
"github.com/elastic/beats/libbeat/publisher"
)
func init() {
outputs.RegisterType("file", makeFileout)
}
type fileOutput struct {
beat beat.Info
observer outputs.Observer
rotator logp.FileRotator
codec codec.Codec
}
// New instantiates a new file output instance.
func makeFileout(
beat beat.Info,
observer outputs.Observer,
cfg *common.Config,
) (outputs.Group, error) {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return outputs.Fail(err)
}
// disable bulk support in publisher pipeline
cfg.SetInt("bulk_max_size", -1, -1)
fo := &fileOutput{beat: beat, observer: observer}
if err := fo.init(beat, config); err != nil {
return outputs.Fail(err)
}
return outputs.Success(-1, 0, fo)
}
func (out *fileOutput) init(beat beat.Info, config config) error {
var err error
out.rotator.Path = config.Path
out.rotator.Name = config.Filename
if out.rotator.Name == "" {
out.rotator.Name = out.beat.Beat
}
enc, err := codec.CreateEncoder(beat, config.Codec)
if err != nil {
return err
}
out.codec = enc
logp.Info("File output path set to: %v", out.rotator.Path)
logp.Info("File output base filename set to: %v", out.rotator.Name)
logp.Info("File output permissions set to: %#o", config.Permissions)
out.rotator.Permissions = &config.Permissions
rotateeverybytes := uint64(config.RotateEveryKb) * 1024
logp.Info("Rotate every bytes set to: %v", rotateeverybytes)
out.rotator.RotateEveryBytes = &rotateeverybytes
keepfiles := config.NumberOfFiles
logp.Info("Number of files set to: %v", keepfiles)
out.rotator.KeepFiles = &keepfiles
err = out.rotator.CreateDirectory()
if err != nil {
return err
}
err = out.rotator.CheckIfConfigSane()
if err != nil {
return err
}
return nil
}
// Implement Outputer
func (out *fileOutput) Close() error {
return nil
}
func (out *fileOutput) Publish(
batch publisher.Batch,
) error {
defer batch.ACK()
st := out.observer
events := batch.Events()
st.NewBatch(len(events))
dropped := 0
for i := range events {
event := &events[i]
serializedEvent, err := out.codec.Encode(out.beat.Beat, &event.Content)
if err != nil {
if event.Guaranteed() {
logp.Critical("Failed to serialize the event: %v", err)
} else {
logp.Warn("Failed to serialize the event: %v", err)
}
dropped++
continue
}
err = out.rotator.WriteLine(serializedEvent)
if err != nil {
st.WriteError(err)
if event.Guaranteed() {
logp.Critical("Writing event to file failed with: %v", err)
} else {
logp.Warn("Writing event to file failed with: %v", err)
}
dropped++
continue
}
st.WriteBytes(len(serializedEvent) + 1)
}
st.Dropped(dropped)
st.Acked(len(events) - dropped)
return nil
}