-
Notifications
You must be signed in to change notification settings - Fork 5
/
tail.go
114 lines (94 loc) · 2.44 KB
/
tail.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Package tail provides support for tailing an io.Reader that isn't yet done
// filling up.
package tail
import (
"errors"
"io"
"io/fs"
"time"
"github.com/benbjohnson/clock"
)
const (
_defaultDelay = 100 * time.Millisecond
_defaultBufferSize = 32 * 1024 // 32kB
)
// Tee copies text from source to destination until the user closes the source
// or calls Tee.Stop.
type Tee struct {
W io.Writer // destination (required)
R io.Reader // source (required)
// Maximum delay between retries. If the end of the source is reached,
// we'll wait up to this much time before trying again. Defaults to 100
// milliseconds.
Delay time.Duration
// Size of the copy buffer. Defaults to 32kB.
BufferSize int
Clock clock.Clock
err error
buffer []byte
quit, done chan struct{}
}
// Start begins tailing the source and copying blobs into destination until an
// error is encountered, source runs out, or Stop is called. If source reaches
// EOF but is not yet closed, Tee will try again after some delay (configurable
// via the Delay parameter).
//
// Start returns immediately.
func (t *Tee) Start() {
if t.Delay == 0 {
t.Delay = _defaultDelay
}
if t.BufferSize == 0 {
t.BufferSize = _defaultBufferSize
}
if t.Clock == nil {
t.Clock = clock.New()
}
t.buffer = make([]byte, t.BufferSize)
t.quit = make(chan struct{})
t.done = make(chan struct{})
go t.run()
}
// Stop tells Tee to stop copying text. It blocks until it has cleaned up the
// background job. Returns errors encountered during run, if any.
//
// If this freezes, make sure you closed the underlying file.
func (t *Tee) Stop() error {
close(t.quit)
return t.Wait()
}
// Wait waits until the tee stops from an error or from Stop being called.
// Returns the error, if any.
func (t *Tee) Wait() error {
<-t.done
return t.err
}
func (t *Tee) run() {
defer close(t.done)
ticker := t.Clock.Ticker(t.Delay)
defer ticker.Stop()
for {
n, err := io.CopyBuffer(t.W, t.R, t.buffer)
if err == nil && n > 0 {
// There are more bytes still to read.
continue
}
switch {
case errors.Is(err, fs.ErrClosed):
// File is closed. No new logs are expected.
return
case err == nil || errors.Is(err, io.EOF):
// There were no more bytes left to copy. Wait for quit
// or up to the specified delay and try again.
select {
case <-t.quit:
return
case <-ticker.C:
}
default:
// Something went wrong. Record and die.
t.err = err
return
}
}
}