Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promtail: Fix deadlock on tailer shutdown. #2717

Merged
merged 12 commits into from
Oct 4, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ require (
k8s.io/klog v1.0.0
)

replace github.com/hpcloud/tail => github.com/grafana/tail v0.0.0-20191024143944-0b54ddf21fe7
replace github.com/hpcloud/tail => github.com/grafana/tail v0.0.0-20201003194716-9e00e999625e

replace github.com/Azure/azure-sdk-for-go => github.com/Azure/azure-sdk-for-go v36.2.0+incompatible

Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,8 @@ github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I=
github.com/grafana/tail v0.0.0-20191024143944-0b54ddf21fe7 h1:eeBhshivxpgHEX78QxJkoL251Pjr0B2GL59ZsivnplU=
github.com/grafana/tail v0.0.0-20191024143944-0b54ddf21fe7/go.mod h1:aS6CMYGLEIABOzX3OL8SqZ3zAZCGN7nmBnqgnyJGxyA=
github.com/grafana/tail v0.0.0-20201003194716-9e00e999625e h1:kiGDJCbVcJPdkyF6GosycSFpDbnxgS45FhQwpY1FX9E=
github.com/grafana/tail v0.0.0-20201003194716-9e00e999625e/go.mod h1:GIMXMPB/lRAllP5rVDvcGif87ryO2hgD7tCtHMdHrho=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 h1:THDBEeQ9xZ8JEaCLyLQqXMMdRqNr0QAUJTIkQAUtFjg=
Expand Down Expand Up @@ -1413,6 +1413,7 @@ golang.org/x/sys v0.0.0-20191025021431-6c3a3bfe00ae/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191113165036-4c7a9d0fe056/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191119060738-e882bf8e40c2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down Expand Up @@ -1601,6 +1602,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogR
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.2.1/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/fsnotify/fsnotify.v1 v1.4.7 h1:XNNYLJHt73EyYiCZi6+xjupS9CpvmiDgjPTAjrBlQbo=
Expand All @@ -1617,6 +1619,7 @@ gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.57.0 h1:9unxIsFcTt4I55uWluz+UmL95q4kdJ0buvQ1ZIqVQww=
gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/tomb.v1 v1.0.0-20140529071818-c131134a1947/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI=
Expand Down
4 changes: 2 additions & 2 deletions pkg/promtail/targets/file/filetarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (t *FileTarget) run() {
defer func() {
helpers.LogError("closing watcher", t.watcher.Close)
for _, v := range t.tails {
v.stop(false)
v.stop()
}
level.Debug(t.logger).Log("msg", "watcher closed, tailer stopped, positions saved")
close(t.done)
Expand Down Expand Up @@ -313,7 +313,7 @@ func (t *FileTarget) startTailing(ps []string) {
func (t *FileTarget) stopTailingAndRemovePosition(ps []string) {
for _, p := range ps {
if tailer, ok := t.tails[p]; ok {
tailer.stop(true)
tailer.stop()
t.positions.Remove(tailer.path)
delete(t.tails, p)
}
Expand Down
51 changes: 29 additions & 22 deletions pkg/promtail/targets/file/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type tailer struct {
posAndSizeMtx sync.Mutex

running *atomic.Bool
quit chan struct{}
done chan struct{}
}

Expand Down Expand Up @@ -69,7 +68,6 @@ func newTailer(logger log.Logger, handler api.EntryHandler, positions positions.
path: path,
tail: tail,
running: atomic.NewBool(false),
quit: make(chan struct{}),
done: make(chan struct{}),
}
tail.Logger = util.NewLogAdapter(logger)
Expand All @@ -88,11 +86,6 @@ func (t *tailer) run() {
// This function runs in a goroutine, if it exits this tailer will never do any more tailing.
// Clean everything up.
defer func() {
err := t.tail.Stop()
if err != nil {
level.Error(t.logger).Log("msg", "error stopping tailer when exiting tail goroutine", "path", t.path, "error", err)
}

positionWait.Stop()
t.cleanupMetrics()
t.running.Store(false)
Expand All @@ -105,12 +98,21 @@ func (t *tailer) run() {
case <-positionWait.C:
err := t.markPositionAndSize()
if err != nil {
level.Error(t.logger).Log("msg", "error getting tail position and/or size, stopping tailer", "path", t.path, "error", err)
level.Error(t.logger).Log("msg", "position timer: error getting tail position and/or size, stopping tailer", "path", t.path, "error", err)
// To prevent a deadlock on stopping the tailer we need to launch a thread to consume any unread lines
go func() {
for range t.tail.Lines {}
}()
rfratto marked this conversation as resolved.
Show resolved Hide resolved
t.tail.Stop()
if err != nil {
level.Error(t.logger).Log("msg", "position timer: error stopping tailer", "path", t.path, "error", err)
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to help recover from weird errors we have seen in some environments, if we fail to update the position information on the timer we want to close the tailer so it can be re-opened by the upper level sync function.

Adding the goroutine here to consume t.tail.Lines makes sure we dont' deadlock on lines being in the channel when we tel the tail to stop

return
}

case line, ok := <-t.tail.Lines:
if !ok {
level.Info(t.logger).Log("msg", "tail channel closed, stopping tailer", "path", t.path)
return
}

Expand All @@ -125,8 +127,6 @@ func (t *tailer) run() {
if err := t.handler.Handle(model.LabelSet{}, line.Time, line.Text); err != nil {
level.Error(t.logger).Log("msg", "error handling line", "path", t.path, "error", err)
}
case <-t.quit:
return
Comment on lines -128 to -129
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the quit channel and instead we will quit when the underlying tailer closes the t.tail.Lines channel above.

}
}
}
Expand All @@ -136,31 +136,38 @@ func (t *tailer) markPositionAndSize() error {
t.posAndSizeMtx.Lock()
defer t.posAndSizeMtx.Unlock()

pos, err := t.tail.Tell()
size, err := t.tail.Size()
if err != nil {
// If the file no longer exists, no need to save position information
if err == os.ErrNotExist {
level.Info(t.logger).Log("msg", "skipping update of position for a file which does not currently exist")
return nil
}
Comment on lines +165 to +169
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really the only important change in this function, the diff is screwy because I swapped the order of Tell() and Size() calls.
This is a more graceful way to handle ignoring the size/position for a file which doesn't exist, it leverages the change in the HP tail lib which returns the os.ErrNotExist error

Comment on lines +165 to +169
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small nit: can you make this an if/else if?

size, err := t.tail.Size()
if err == os.ErrNotExist {
  // ...
  return nil
} else if err != nil {
  return err
} 

return err
}
readBytes.WithLabelValues(t.path).Set(float64(pos))
t.positions.Put(t.path, pos)
totalBytes.WithLabelValues(t.path).Set(float64(size))

size, err := t.tail.Size()
pos, err := t.tail.Tell()
if err != nil {
return err
}
totalBytes.WithLabelValues(t.path).Set(float64(size))
readBytes.WithLabelValues(t.path).Set(float64(pos))
t.positions.Put(t.path, pos)

return nil
}

func (t *tailer) stop(removed bool) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a change I made recently, I added this bool to do the conditional position update below but that isn't necessary (and was also not implemented correctly), it can be removed, calling the markPositionAndSize is basically a NOOP now if the file does not exist

func (t *tailer) stop() {
// Save the current position before shutting down tailer
if !removed {
err := t.markPositionAndSize()
if err != nil {
level.Error(t.logger).Log("msg", "error marking file position when stopping tailer", "path", t.path, "error", err)
}
err := t.markPositionAndSize()
if err != nil {
level.Error(t.logger).Log("msg", "error marking file position when stopping tailer", "path", t.path, "error", err)
}

err = t.tail.Stop()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stop the tail here, these leaves the main goroutine running still which will consume the tail.Lines channel until it's closed

if err != nil {
level.Error(t.logger).Log("msg", "error stopping tailer", "path", t.path, "error", err)
}
close(t.quit)
<-t.done
level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path)
return
Expand Down
1 change: 1 addition & 0 deletions vendor/github.com/hpcloud/tail/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions vendor/github.com/hpcloud/tail/go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions vendor/github.com/hpcloud/tail/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 21 additions & 9 deletions vendor/github.com/hpcloud/tail/tail.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ github.com/hashicorp/golang-lru/simplelru
github.com/hashicorp/memberlist
# github.com/hashicorp/serf v0.9.0
github.com/hashicorp/serf/coordinate
# github.com/hpcloud/tail v1.0.0 => github.com/grafana/tail v0.0.0-20191024143944-0b54ddf21fe7
# github.com/hpcloud/tail v1.0.0 => github.com/grafana/tail v0.0.0-20201003194716-9e00e999625e
## explicit
github.com/hpcloud/tail
github.com/hpcloud/tail/ratelimiter
Expand Down Expand Up @@ -1474,7 +1474,7 @@ rsc.io/binaryregexp/syntax
sigs.k8s.io/structured-merge-diff/v3/value
# sigs.k8s.io/yaml v1.2.0
sigs.k8s.io/yaml
# github.com/hpcloud/tail => github.com/grafana/tail v0.0.0-20191024143944-0b54ddf21fe7
# github.com/hpcloud/tail => github.com/grafana/tail v0.0.0-20201003194716-9e00e999625e
# github.com/Azure/azure-sdk-for-go => github.com/Azure/azure-sdk-for-go v36.2.0+incompatible
# k8s.io/client-go => k8s.io/client-go v0.18.3
# github.com/satori/go.uuid => github.com/satori/go.uuid v1.2.0
Expand Down