-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Promtail: Fix deadlock on tailer shutdown. #2717
Changes from all commits
cb3c49a
a598328
e6d727d
f0d381b
44fee14
b9e6926
58ef3ef
ee2a592
6047b71
10fe290
8c3fccf
f12cb11
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,9 +25,11 @@ type tailer struct { | |
tail *tail.Tail | ||
|
||
posAndSizeMtx sync.Mutex | ||
stopOnce sync.Once | ||
|
||
running *atomic.Bool | ||
quit chan struct{} | ||
posquit chan struct{} | ||
posdone chan struct{} | ||
done chan struct{} | ||
} | ||
|
||
|
@@ -69,64 +71,86 @@ func newTailer(logger log.Logger, handler api.EntryHandler, positions positions. | |
path: path, | ||
tail: tail, | ||
running: atomic.NewBool(false), | ||
quit: make(chan struct{}), | ||
posquit: make(chan struct{}), | ||
posdone: make(chan struct{}), | ||
done: make(chan struct{}), | ||
} | ||
tail.Logger = util.NewLogAdapter(logger) | ||
|
||
go tailer.run() | ||
go tailer.readLines() | ||
go tailer.updatePosition() | ||
filesActive.Add(1.) | ||
return tailer, nil | ||
} | ||
|
||
func (t *tailer) run() { | ||
level.Info(t.logger).Log("msg", "start tailing file", "path", t.path) | ||
// updatePosition is run in a goroutine and checks the current size of the file and saves it to the positions file | ||
// at a regular interval. If there is ever an error it stops the tailer and exits, the tailer will be re-opened | ||
// by the filetarget sync method if it still exists and will start reading from the last successful entry in the | ||
// positions file. | ||
func (t *tailer) updatePosition() { | ||
positionSyncPeriod := t.positions.SyncPeriod() | ||
positionWait := time.NewTicker(positionSyncPeriod) | ||
t.running.Store(true) | ||
|
||
// This function runs in a goroutine, if it exits this tailer will never do any more tailing. | ||
// Clean everything up. | ||
defer func() { | ||
err := t.tail.Stop() | ||
if err != nil { | ||
level.Error(t.logger).Log("msg", "error stopping tailer when exiting tail goroutine", "path", t.path, "error", err) | ||
} | ||
|
||
positionWait.Stop() | ||
t.cleanupMetrics() | ||
t.running.Store(false) | ||
|
||
close(t.done) | ||
level.Info(t.logger).Log("msg", "position timer: exited", "path", t.path) | ||
close(t.posdone) | ||
}() | ||
|
||
for { | ||
select { | ||
case <-positionWait.C: | ||
err := t.markPositionAndSize() | ||
if err != nil { | ||
level.Error(t.logger).Log("msg", "error getting tail position and/or size, stopping tailer", "path", t.path, "error", err) | ||
level.Error(t.logger).Log("msg", "position timer: error getting tail position and/or size, stopping tailer", "path", t.path, "error", err) | ||
err := t.tail.Stop() | ||
if err != nil { | ||
level.Error(t.logger).Log("msg", "position timer: error stopping tailer", "path", t.path, "error", err) | ||
} | ||
return | ||
} | ||
case <-t.posquit: | ||
return | ||
} | ||
} | ||
} | ||
|
||
// readLines runs in a goroutine and consumes the t.tail.Lines channel from the underlying tailer. | ||
// it will only exit when that channel is closed. This is important to avoid a deadlock in the underlying | ||
// tailer which can happen if there are unread lines in this channel and the Stop method on the tailer | ||
// is called, the underlying tailer will never exit if there are unread lines in the t.tail.Lines channel | ||
func (t *tailer) readLines() { | ||
level.Info(t.logger).Log("msg", "tail routine: started", "path", t.path) | ||
|
||
t.running.Store(true) | ||
|
||
// This function runs in a goroutine, if it exits this tailer will never do any more tailing. | ||
// Clean everything up. | ||
defer func() { | ||
t.cleanupMetrics() | ||
t.running.Store(false) | ||
level.Info(t.logger).Log("msg", "tail routine: exited", "path", t.path) | ||
close(t.done) | ||
}() | ||
|
||
for { | ||
select { | ||
case line, ok := <-t.tail.Lines: | ||
if !ok { | ||
level.Info(t.logger).Log("msg", "tail routine: tail channel closed, stopping tailer", "path", t.path) | ||
return | ||
} | ||
|
||
// Note currently the tail implementation hardcodes Err to nil, this should never hit. | ||
if line.Err != nil { | ||
level.Error(t.logger).Log("msg", "error reading line", "path", t.path, "error", line.Err) | ||
level.Error(t.logger).Log("msg", "tail routine: error reading line", "path", t.path, "error", line.Err) | ||
continue | ||
} | ||
|
||
readLines.WithLabelValues(t.path).Inc() | ||
logLengthHistogram.WithLabelValues(t.path).Observe(float64(len(line.Text))) | ||
if err := t.handler.Handle(model.LabelSet{}, line.Time, line.Text); err != nil { | ||
level.Error(t.logger).Log("msg", "error handling line", "path", t.path, "error", err) | ||
level.Error(t.logger).Log("msg", "tail routine: error handling line", "path", t.path, "error", err) | ||
} | ||
case <-t.quit: | ||
return | ||
} | ||
} | ||
} | ||
|
@@ -136,33 +160,50 @@ func (t *tailer) markPositionAndSize() error { | |
t.posAndSizeMtx.Lock() | ||
defer t.posAndSizeMtx.Unlock() | ||
|
||
pos, err := t.tail.Tell() | ||
size, err := t.tail.Size() | ||
if err != nil { | ||
// If the file no longer exists, no need to save position information | ||
if err == os.ErrNotExist { | ||
level.Info(t.logger).Log("msg", "skipping update of position for a file which does not currently exist") | ||
return nil | ||
} | ||
Comment on lines
+165
to
+169
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is really the only important change in this function, the diff is screwy because I swapped the order of Tell() and Size() calls.
Comment on lines
+165
to
+169
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Small nit: can you make this an if/else if?
|
||
return err | ||
} | ||
readBytes.WithLabelValues(t.path).Set(float64(pos)) | ||
t.positions.Put(t.path, pos) | ||
totalBytes.WithLabelValues(t.path).Set(float64(size)) | ||
|
||
size, err := t.tail.Size() | ||
pos, err := t.tail.Tell() | ||
if err != nil { | ||
return err | ||
} | ||
totalBytes.WithLabelValues(t.path).Set(float64(size)) | ||
readBytes.WithLabelValues(t.path).Set(float64(pos)) | ||
t.positions.Put(t.path, pos) | ||
|
||
return nil | ||
} | ||
|
||
func (t *tailer) stop(removed bool) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In a change I made recently, I added this bool to do the conditional position update below but that isn't necessary (and was also not implemented correctly), it can be removed, calling the markPositionAndSize is basically a NOOP now if the file does not exist |
||
// Save the current position before shutting down tailer | ||
if !removed { | ||
func (t *tailer) stop() { | ||
// stop can be called by two separate threads in filetarget, to avoid a panic closing channels more than once | ||
// we wrap the stop in a sync.Once. | ||
t.stopOnce.Do(func() { | ||
// Shut down the position marker thread | ||
close(t.posquit) | ||
<-t.posdone | ||
|
||
// Save the current position before shutting down tailer | ||
err := t.markPositionAndSize() | ||
if err != nil { | ||
level.Error(t.logger).Log("msg", "error marking file position when stopping tailer", "path", t.path, "error", err) | ||
} | ||
} | ||
close(t.quit) | ||
<-t.done | ||
level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path) | ||
|
||
// Stop the underlying tailer | ||
err = t.tail.Stop() | ||
if err != nil { | ||
level.Error(t.logger).Log("msg", "error stopping tailer", "path", t.path, "error", err) | ||
} | ||
// Wait for readLines() to consume all the remaining messages and exit when the channel is closed | ||
<-t.done | ||
level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path) | ||
}) | ||
return | ||
} | ||
|
||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing the quit channel and instead we will quit when the underlying tailer closes the t.tail.Lines channel above.