-
Notifications
You must be signed in to change notification settings - Fork 94
/
tailer.go
349 lines (304 loc) · 9.71 KB
/
tailer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
package file
// This code is copied from loki/promtail@a8d5815510bd959a6dd8c176a5d9fd9bbfc8f8b5.
// tailer implements the reader interface by using the github.com/grafana/tail package to tail files.
import (
"bytes"
"fmt"
"io"
"os"
"sync"
"time"
"github.com/go-kit/log"
"github.com/grafana/alloy/internal/alloy/logging/level"
"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/component/common/loki/positions"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/tail"
"github.com/grafana/tail/watch"
"github.com/prometheus/common/model"
"go.uber.org/atomic"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/ianaindex"
"golang.org/x/text/transform"
)
type tailer struct {
metrics *metrics
logger log.Logger
handler loki.EntryHandler
positions positions.Positions
path string
labels string
tail *tail.Tail
posAndSizeMtx sync.Mutex
stopOnce sync.Once
running *atomic.Bool
posquit chan struct{}
posdone chan struct{}
done chan struct{}
decoder *encoding.Decoder
}
func newTailer(metrics *metrics, logger log.Logger, handler loki.EntryHandler, positions positions.Positions, path string,
labels string, encoding string, pollOptions watch.PollingFileWatcherOptions, tailFromEnd bool) (*tailer, error) {
// Simple check to make sure the file we are tailing doesn't
// have a position already saved which is past the end of the file.
fi, err := os.Stat(path)
if err != nil {
return nil, err
}
pos, err := positions.Get(path, labels)
if err != nil {
return nil, err
}
if fi.Size() < pos {
positions.Remove(path, labels)
}
// If no cached position is found and the tailFromEnd option is enabled.
if pos == 0 && tailFromEnd {
pos, err = getLastLinePosition(path)
if err != nil {
level.Error(logger).Log("msg", "failed to get a position from the end of the file, default to start of file", err)
} else {
positions.Put(path, labels, pos)
level.Info(logger).Log("msg", "retrieved and stored the position of the last line")
}
}
tail, err := tail.TailFile(path, tail.Config{
Follow: true,
Poll: true,
ReOpen: true,
MustExist: true,
Location: &tail.SeekInfo{
Offset: pos,
Whence: 0,
},
Logger: util.NewLogAdapter(logger),
PollOptions: pollOptions,
})
if err != nil {
return nil, err
}
logger = log.With(logger, "component", "tailer")
tailer := &tailer{
metrics: metrics,
logger: logger,
handler: loki.AddLabelsMiddleware(model.LabelSet{filenameLabel: model.LabelValue(path)}).Wrap(handler),
positions: positions,
path: path,
labels: labels,
tail: tail,
running: atomic.NewBool(false),
posquit: make(chan struct{}),
posdone: make(chan struct{}),
done: make(chan struct{}),
}
if encoding != "" {
level.Info(tailer.logger).Log("msg", "Will decode messages", "from", encoding, "to", "UTF8")
encoder, err := ianaindex.IANA.Encoding(encoding)
if err != nil {
return nil, fmt.Errorf("failed to get IANA encoding %s: %w", encoding, err)
}
decoder := encoder.NewDecoder()
tailer.decoder = decoder
}
go tailer.readLines()
go tailer.updatePosition()
metrics.filesActive.Add(1.)
return tailer, nil
}
// getLastLinePosition returns the offset of the start of the last line in the file at the given path.
// It will read chunks of bytes starting from the end of the file to return the position of the last '\n' + 1.
// If it cannot find any '\n' it will return 0.
func getLastLinePosition(path string) (int64, error) {
file, err := os.Open(path)
if err != nil {
return 0, err
}
defer file.Close()
const chunkSize = 1024
buf := make([]byte, chunkSize)
fi, err := file.Stat()
if err != nil {
return 0, err
}
if fi.Size() == 0 {
return 0, nil
}
var pos int64 = fi.Size() - chunkSize
if pos < 0 {
pos = 0
}
for {
_, err = file.Seek(pos, io.SeekStart)
if err != nil {
return 0, err
}
bytesRead, err := file.Read(buf)
if err != nil {
return 0, err
}
idx := bytes.LastIndexByte(buf[:bytesRead], '\n')
// newline found
if idx != -1 {
return pos + int64(idx) + 1, nil
}
// no newline found in the entire file
if pos == 0 {
return 0, nil
}
pos -= chunkSize
if pos < 0 {
pos = 0
}
}
}
// updatePosition is run in a goroutine and checks the current size of the file
// and saves it to the positions file at a regular interval. If there is ever
// an error it stops the tailer and exits, the tailer will be re-opened by the
// filetarget sync method if it still exists and will start reading from the
// last successful entry in the positions file.
func (t *tailer) updatePosition() {
positionSyncPeriod := t.positions.SyncPeriod()
positionWait := time.NewTicker(positionSyncPeriod)
defer func() {
positionWait.Stop()
level.Info(t.logger).Log("msg", "position timer: exited", "path", t.path)
// NOTE: metrics must be cleaned up after the position timer exits, as MarkPositionAndSize() updates metrics.
t.cleanupMetrics()
close(t.posdone)
}()
for {
select {
case <-positionWait.C:
err := t.MarkPositionAndSize()
if err != nil {
level.Error(t.logger).Log("msg", "position timer: error getting tail position and/or size, stopping tailer", "path", t.path, "error", err)
err := t.tail.Stop()
if err != nil {
level.Error(t.logger).Log("msg", "position timer: error stopping tailer", "path", t.path, "error", err)
}
return
}
case <-t.posquit:
return
}
}
}
// readLines runs in a goroutine and consumes the t.tail.Lines channel from the
// underlying tailer. Et will only exit when that channel is closed. This is
// important to avoid a deadlock in the underlying tailer which can happen if
// there are unread lines in this channel and the Stop method on the tailer is
// called, the underlying tailer will never exit if there are unread lines in
// the t.tail.Lines channel
func (t *tailer) readLines() {
level.Info(t.logger).Log("msg", "tail routine: started", "path", t.path)
t.running.Store(true)
// This function runs in a goroutine, if it exits this tailer will never do any more tailing.
// Clean everything up.
defer func() {
t.running.Store(false)
level.Info(t.logger).Log("msg", "tail routine: exited", "path", t.path)
close(t.done)
// Shut down the position marker thread
close(t.posquit)
}()
entries := t.handler.Chan()
for {
line, ok := <-t.tail.Lines
if !ok {
level.Info(t.logger).Log("msg", "tail routine: tail channel closed, stopping tailer", "path", t.path, "reason", t.tail.Tomb.Err())
return
}
// Note currently the tail implementation hardcodes Err to nil, this should never hit.
if line.Err != nil {
level.Error(t.logger).Log("msg", "tail routine: error reading line", "path", t.path, "error", line.Err)
continue
}
var text string
if t.decoder != nil {
var err error
text, err = t.convertToUTF8(line.Text)
if err != nil {
level.Debug(t.logger).Log("msg", "failed to convert encoding", "error", err)
t.metrics.encodingFailures.WithLabelValues(t.path).Inc()
text = fmt.Sprintf("the requested encoding conversion for this line failed in Alloy: %s", err.Error())
}
} else {
text = line.Text
}
t.metrics.readLines.WithLabelValues(t.path).Inc()
entries <- loki.Entry{
Labels: model.LabelSet{},
Entry: logproto.Entry{
Timestamp: line.Time,
Line: text,
},
}
}
}
func (t *tailer) MarkPositionAndSize() error {
// Lock this update as there are 2 timers calling this routine, the sync in filetarget and the positions sync in this file.
t.posAndSizeMtx.Lock()
defer t.posAndSizeMtx.Unlock()
size, err := t.tail.Size()
if err != nil {
// If the file no longer exists, no need to save position information
if err == os.ErrNotExist {
level.Info(t.logger).Log("msg", "skipping update of position for a file which does not currently exist", "path", t.path)
return nil
}
return err
}
pos, err := t.tail.Tell()
if err != nil {
return err
}
// Update metrics and positions file all together to avoid race conditions when `t.tail` is stopped.
t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(size))
t.metrics.readBytes.WithLabelValues(t.path).Set(float64(pos))
t.positions.Put(t.path, t.labels, pos)
return nil
}
func (t *tailer) Stop() {
// stop can be called by two separate threads in filetarget, to avoid a panic closing channels more than once
// we wrap the stop in a sync.Once.
t.stopOnce.Do(func() {
// Save the current position before shutting down tailer
err := t.MarkPositionAndSize()
if err != nil {
level.Error(t.logger).Log("msg", "error marking file position when stopping tailer", "path", t.path, "error", err)
}
// Stop the underlying tailer
err = t.tail.Stop()
if err != nil {
level.Error(t.logger).Log("msg", "error stopping tailer", "path", t.path, "error", err)
}
// Wait for readLines() to consume all the remaining messages and exit when the channel is closed
<-t.done
// Wait for the position marker thread to exit
<-t.posdone
level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path)
t.handler.Stop()
})
}
func (t *tailer) IsRunning() bool {
return t.running.Load()
}
func (t *tailer) convertToUTF8(text string) (string, error) {
res, _, err := transform.String(t.decoder, text)
if err != nil {
return "", fmt.Errorf("failed to decode text to UTF8: %w", err)
}
return res, nil
}
// cleanupMetrics removes all metrics exported by this tailer
func (t *tailer) cleanupMetrics() {
// When we stop tailing the file, also un-export metrics related to the file
t.metrics.filesActive.Add(-1.)
t.metrics.readLines.DeleteLabelValues(t.path)
t.metrics.readBytes.DeleteLabelValues(t.path)
t.metrics.totalBytes.DeleteLabelValues(t.path)
}
func (t *tailer) Path() string {
return t.path
}