-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
log.go
170 lines (145 loc) · 3.83 KB
/
log.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package log
import (
"io"
"os"
"time"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/logp"
)
// Log contains all log related data
type Log struct {
fs harvester.Source
offset int64
config LogConfig
lastTimeRead time.Time
backoff time.Duration
done chan struct{}
}
// NewLog creates a new log instance to read log sources
func NewLog(
fs harvester.Source,
config LogConfig,
) (*Log, error) {
var offset int64
if seeker, ok := fs.(io.Seeker); ok {
var err error
offset, err = seeker.Seek(0, os.SEEK_CUR)
if err != nil {
return nil, err
}
}
return &Log{
fs: fs,
offset: offset,
config: config,
lastTimeRead: time.Now(),
backoff: config.Backoff,
done: make(chan struct{}),
}, nil
}
// Read reads from the reader and updates the offset
// The total number of bytes read is returned.
func (f *Log) Read(buf []byte) (int, error) {
totalN := 0
for {
select {
case <-f.done:
return 0, ErrClosed
default:
}
n, err := f.fs.Read(buf)
if n > 0 {
f.offset += int64(n)
f.lastTimeRead = time.Now()
}
totalN += n
// Read from source completed without error
// Either end reached or buffer full
if err == nil {
// reset backoff for next read
f.backoff = f.config.Backoff
return totalN, nil
}
// Move buffer forward for next read
buf = buf[n:]
// Checks if an error happened or buffer is full
// If buffer is full, cannot continue reading.
// Can happen if n == bufferSize + io.EOF error
err = f.errorChecks(err)
if err != nil || len(buf) == 0 {
return totalN, err
}
logp.Debug("harvester", "End of file reached: %s; Backoff now.", f.fs.Name())
f.wait()
}
}
// errorChecks checks how the given error should be handled based on the config options
func (f *Log) errorChecks(err error) error {
if err != io.EOF {
logp.Err("Unexpected state reading from %s; error: %s", f.fs.Name(), err)
return err
}
// Stdin is not continuable
if !f.fs.Continuable() {
logp.Debug("harvester", "Source is not continuable: %s", f.fs.Name())
return err
}
if err == io.EOF && f.config.CloseEOF {
return err
}
// Refetch fileinfo to check if the file was truncated or disappeared.
// Errors if the file was removed/rotated after reading and before
// calling the stat function
info, statErr := f.fs.Stat()
if statErr != nil {
logp.Err("Unexpected error reading from %s; error: %s", f.fs.Name(), statErr)
return statErr
}
// check if file was truncated
if info.Size() < f.offset {
logp.Debug("harvester",
"File was truncated as offset (%d) > size (%d): %s", f.offset, info.Size(), f.fs.Name())
return ErrFileTruncate
}
// Check file wasn't read for longer then CloseInactive
age := time.Since(f.lastTimeRead)
if age > f.config.CloseInactive {
return ErrInactive
}
if f.config.CloseRenamed {
// Check if the file can still be found under the same path
if !file.IsSameFile(f.fs.Name(), info) {
return ErrRenamed
}
}
if f.config.CloseRemoved {
// Check if the file name exists. See https://github.com/elastic/filebeat/issues/93
_, statErr := os.Stat(f.fs.Name())
// Error means file does not exist.
if statErr != nil {
return ErrRemoved
}
}
return nil
}
func (f *Log) wait() {
// Wait before trying to read file again. File reached EOF.
select {
case <-f.done:
return
case <-time.After(f.backoff):
}
// Increment backoff up to maxBackoff
if f.backoff < f.config.MaxBackoff {
f.backoff = f.backoff * time.Duration(f.config.BackoffFactor)
if f.backoff > f.config.MaxBackoff {
f.backoff = f.config.MaxBackoff
}
}
}
// Close closes the done channel but no th the file handler
func (f *Log) Close() {
close(f.done)
// Note: File reader is not closed here because that leads to race conditions
}