forked from mathpl/tail
/
tail.go
239 lines (209 loc) · 5.03 KB
/
tail.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
// Copyright (c) 2013 ActiveState Software Inc. All rights reserved.
package tail
import (
"bufio"
"fmt"
"io"
"launchpad.net/tomb"
"log"
"os"
"time"
)
type Line struct {
Text string
Time time.Time
}
type Config struct {
Location int // -n
Follow bool // -f
ReOpen bool // -F
MustExist bool // if false, wait for the file to exist before beginning to tail.
Poll bool // if true, do not use inotify but use polling
MaxLineSize int // if > 0, limit the line size (rest of the line would be returned as next lines)
}
type Tail struct {
Filename string
Lines chan *Line
Config
file *os.File
reader *bufio.Reader
watcher FileWatcher
tomb.Tomb // provides: Done, Kill, Dying
}
// TailFile channels the lines of a logfile along with timestamp. If
// end is true, channel only newly added lines. If retry is true, tail
// the file name (not descriptor) and retry on file open/read errors.
// func TailFile(filename string, maxlinesize int, end bool, retry bool, useinotify bool) (*Tail, error) {
func TailFile(filename string, config Config) (*Tail, error) {
if !(config.Location == 0 || config.Location == -1) {
panic("only 0/-1 values are supported for Location.")
}
if config.ReOpen && !config.Follow {
panic("cannot set ReOpen without Follow.")
}
if !config.Follow {
panic("Follow=false is not supported.")
}
t := &Tail{
Filename: filename,
Lines: make(chan *Line),
Config: config}
if t.Poll {
t.watcher = NewPollingFileWatcher(filename)
} else {
t.watcher = NewInotifyFileWatcher(filename)
}
if t.MustExist {
var err error
t.file, err = os.Open(t.Filename)
if err != nil {
return nil, err
}
}
go t.tailFileSync()
return t, nil
}
func (tail *Tail) Stop() error {
tail.Kill(nil)
return tail.Wait()
}
func (tail *Tail) close() {
close(tail.Lines)
if tail.file != nil {
tail.file.Close()
}
}
func (tail *Tail) reopen() error {
if tail.file != nil {
tail.file.Close()
}
for {
var err error
tail.file, err = os.Open(tail.Filename)
if err != nil {
if os.IsNotExist(err) {
log.Printf("Waiting for %s to appear...", tail.Filename)
err := tail.watcher.BlockUntilExists()
if err != nil {
return fmt.Errorf("Failed to detect creation of %s: %s", tail.Filename, err)
}
continue
}
return fmt.Errorf("Unable to open file %s: %s", tail.Filename, err)
}
break
}
return nil
}
func (tail *Tail) readLine() ([]byte, error) {
line, _, err := tail.reader.ReadLine()
return line, err
}
func (tail *Tail) tailFileSync() {
defer tail.Done()
if !tail.MustExist {
err := tail.reopen()
if err != nil {
tail.close()
tail.Kill(err)
return
}
}
var changes chan bool
// Note: seeking to end happens only at the beginning of tail;
// never during subsequent re-opens.
if tail.Location == 0 {
_, err := tail.file.Seek(0, 2) // seek to end of the file
if err != nil {
tail.close()
tail.Killf("Seek error on %s: %s", tail.Filename, err)
return
}
}
tail.reader = bufio.NewReader(tail.file)
for {
line, err := tail.readLine()
if err == nil {
if line != nil {
now := time.Now()
if tail.MaxLineSize > 0 && len(line) > tail.MaxLineSize {
for _, line := range partitionString(string(line), tail.MaxLineSize) {
tail.Lines <- &Line{line, now}
}
} else {
tail.Lines <- &Line{string(line), now}
}
}
} else {
if err != io.EOF {
tail.close()
tail.Killf("Error reading %s: %s", tail.Filename, err)
return
}
// When end of file is reached, wait for more data to
// become available. Wait strategy is based on the
// `tail.watcher` implementation (inotify or polling).
if err == io.EOF {
if changes == nil {
changes = tail.watcher.ChangeEvents()
}
select {
case _, ok := <-changes:
if !ok {
// File got deleted/renamed
if tail.ReOpen {
// TODO: no logging in a library?
log.Printf("Re-opening moved/deleted file %s ...", tail.Filename)
err := tail.reopen()
if err != nil {
tail.close()
tail.Kill(err)
return
}
log.Printf("Successfully reopened %s", tail.Filename)
tail.reader = bufio.NewReader(tail.file)
changes = nil // XXX: how to kill changes' goroutine?
continue
} else {
log.Printf("Finishing because file has been moved/deleted: %s", tail.Filename)
tail.close()
return
}
}
case <-tail.Dying():
tail.close()
return
}
}
}
select {
case <-tail.Dying():
tail.close()
return
default:
}
}
}
// partitionString partitions the string into chunks of given size,
// with the last chunk of variable size.
func partitionString(s string, chunkSize int) []string {
if chunkSize <= 0 {
panic("invalid chunkSize")
}
length := len(s)
chunks := 1 + length/chunkSize
start := 0
end := chunkSize
parts := make([]string, 0, chunks)
for {
if end > length {
end = length
}
parts = append(parts, s[start:end])
if end == length {
break
}
start, end = end, end+chunkSize
}
return parts
}