-
Notifications
You must be signed in to change notification settings - Fork 18.7k
/
follow.go
141 lines (125 loc) · 3.75 KB
/
follow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"
import (
"context"
"fmt"
"io"
"os"
"github.com/containerd/log"
"github.com/docker/docker/daemon/logger"
"github.com/pkg/errors"
)
type follow struct {
LogFile *LogFile
Watcher *logger.LogWatcher
Decoder Decoder
Forwarder *forwarder
log *log.Entry
c chan logPos
}
// Do follows the log file as it is written, starting from f at read.
func (fl *follow) Do(f *os.File, read logPos) {
fl.log = log.G(context.TODO()).WithFields(log.Fields{
"module": "logger",
"file": f.Name(),
})
// Optimization: allocate the write-notifications channel only once and
// reuse it for multiple invocations of nextPos().
fl.c = make(chan logPos, 1)
defer func() {
if err := f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
fl.log.WithError(err).Warn("error closing current log file")
}
}()
for {
wrote, ok := fl.nextPos(read)
if !ok {
return
}
if wrote.rotation != read.rotation {
// Flush the current file before moving on to the next.
if _, err := f.Seek(read.size, io.SeekStart); err != nil {
fl.Watcher.Err <- err
return
}
if !fl.forward(f) {
return
}
// Open the new file, which has the same name as the old
// file thanks to file rotation. Make no mistake: they
// are different files, with distinct identities.
// Atomically capture the wrote position to make
// absolutely sure that the position corresponds to the
// file we have opened; more rotations could have
// occurred since we previously received it.
if err := f.Close(); err != nil {
fl.log.WithError(err).Warn("error closing rotated log file")
}
var err error
func() {
fl.LogFile.fsopMu.RLock()
st := <-fl.LogFile.read
defer func() {
fl.LogFile.read <- st
fl.LogFile.fsopMu.RUnlock()
}()
f, err = open(f.Name())
wrote = st.pos
}()
// We tried to open the file inside a critical section
// so we shouldn't have been racing the rotation of the
// file. Any error, even fs.ErrNotFound, is exceptional.
if err != nil {
fl.Watcher.Err <- fmt.Errorf("logger: error opening log file for follow after rotation: %w", err)
return
}
if nrot := wrote.rotation - read.rotation; nrot > 1 {
fl.log.WithField("missed-rotations", nrot).
Warn("file rotations were missed while following logs; some log messages have been skipped over")
}
// Set up our read position to start from the top of the file.
read.size = 0
}
if !fl.forward(io.NewSectionReader(f, read.size, wrote.size-read.size)) {
return
}
read = wrote
}
}
// nextPos waits until the write position of the LogFile being followed has
// advanced from current and returns the new position.
func (fl *follow) nextPos(current logPos) (next logPos, ok bool) {
var st logReadState
select {
case <-fl.Watcher.WatchConsumerGone():
return current, false
case st = <-fl.LogFile.read:
}
// Have any logs been written since we last checked?
if st.pos == current { // Nope.
// Add ourself to the notify list.
st.wait = append(st.wait, fl.c)
} else { // Yes.
// "Notify" ourself immediately.
fl.c <- st.pos
}
fl.LogFile.read <- st
select {
case <-fl.LogFile.closed: // No more logs will be written.
select { // Have we followed to the end?
case next = <-fl.c: // No: received a new position.
default: // Yes.
return current, false
}
case <-fl.Watcher.WatchConsumerGone():
return current, false
case next = <-fl.c:
}
return next, true
}
// forward decodes log messages from r and forwards them to the log watcher.
//
// The return value, cont, signals whether following should continue.
func (fl *follow) forward(r io.Reader) (cont bool) {
fl.Decoder.Reset(r)
return fl.Forwarder.Do(fl.Watcher, fl.Decoder)
}