Skip to content

Commit

Permalink
Promtail: Fix deadlock on tailer shutdown. (#2717)
Browse files Browse the repository at this point in the history
* to avoid a deadlock on shutdown, leave the goroutine running which reads the Lines channel from the tailer and it will exit when the tailer closes the channel.

* remove the flag I added a few weeks ago and instead when updating the position information of the file gracefully fail if the file no longer exists.

* update the vendoring of the tail library, more re-work on how tailing shutdown and errors are handled.

* fix mistake

* updating tail lib to merged commit hash

* go mod tidy

* update tail library with additional mutex lock

* break up the tailer goroutine into two separate goroutines to make it a little more clear and remove some special case logic.

* don't forget to close your channels!

* make the stop method in the tailer only execute at most one time.

* update hp-tail again with improvements to the mutex locking in closeFile

* replace the more complicated channel based single execution code with a sync.Once
  • Loading branch information
slim-bean committed Oct 4, 2020
1 parent d3bf21e commit 8ea6c38
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 52 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ require (
k8s.io/klog v1.0.0
)

replace github.com/hpcloud/tail => github.com/grafana/tail v0.0.0-20191024143944-0b54ddf21fe7
replace github.com/hpcloud/tail => github.com/grafana/tail v0.0.0-20201004203643-7aa4e4a91f03

replace github.com/Azure/azure-sdk-for-go => github.com/Azure/azure-sdk-for-go v36.2.0+incompatible

Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,8 @@ github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I=
github.com/grafana/tail v0.0.0-20191024143944-0b54ddf21fe7 h1:eeBhshivxpgHEX78QxJkoL251Pjr0B2GL59ZsivnplU=
github.com/grafana/tail v0.0.0-20191024143944-0b54ddf21fe7/go.mod h1:aS6CMYGLEIABOzX3OL8SqZ3zAZCGN7nmBnqgnyJGxyA=
github.com/grafana/tail v0.0.0-20201004203643-7aa4e4a91f03 h1:fGgFrAraMB0BaPfYumu+iulfDXwHm+GFyHA4xEtBqI8=
github.com/grafana/tail v0.0.0-20201004203643-7aa4e4a91f03/go.mod h1:GIMXMPB/lRAllP5rVDvcGif87ryO2hgD7tCtHMdHrho=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 h1:THDBEeQ9xZ8JEaCLyLQqXMMdRqNr0QAUJTIkQAUtFjg=
Expand Down Expand Up @@ -1413,6 +1413,7 @@ golang.org/x/sys v0.0.0-20191025021431-6c3a3bfe00ae/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191113165036-4c7a9d0fe056/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191119060738-e882bf8e40c2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down Expand Up @@ -1601,6 +1602,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogR
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.2.1/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/fsnotify/fsnotify.v1 v1.4.7 h1:XNNYLJHt73EyYiCZi6+xjupS9CpvmiDgjPTAjrBlQbo=
Expand All @@ -1617,6 +1619,7 @@ gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.57.0 h1:9unxIsFcTt4I55uWluz+UmL95q4kdJ0buvQ1ZIqVQww=
gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/tomb.v1 v1.0.0-20140529071818-c131134a1947/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI=
Expand Down
6 changes: 3 additions & 3 deletions pkg/promtail/targets/file/filetarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ func (t *FileTarget) run() {
defer func() {
helpers.LogError("closing watcher", t.watcher.Close)
for _, v := range t.tails {
v.stop(false)
v.stop()
}
level.Debug(t.logger).Log("msg", "watcher closed, tailer stopped, positions saved")
level.Info(t.logger).Log("msg", "filetarget: watcher closed, tailer stopped, positions saved", "path", t.path)
close(t.done)
}()

Expand Down Expand Up @@ -313,7 +313,7 @@ func (t *FileTarget) startTailing(ps []string) {
func (t *FileTarget) stopTailingAndRemovePosition(ps []string) {
for _, p := range ps {
if tailer, ok := t.tails[p]; ok {
tailer.stop(true)
tailer.stop()
t.positions.Remove(tailer.path)
delete(t.tails, p)
}
Expand Down
111 changes: 76 additions & 35 deletions pkg/promtail/targets/file/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ type tailer struct {
tail *tail.Tail

posAndSizeMtx sync.Mutex
stopOnce sync.Once

running *atomic.Bool
quit chan struct{}
posquit chan struct{}
posdone chan struct{}
done chan struct{}
}

Expand Down Expand Up @@ -69,64 +71,86 @@ func newTailer(logger log.Logger, handler api.EntryHandler, positions positions.
path: path,
tail: tail,
running: atomic.NewBool(false),
quit: make(chan struct{}),
posquit: make(chan struct{}),
posdone: make(chan struct{}),
done: make(chan struct{}),
}
tail.Logger = util.NewLogAdapter(logger)

go tailer.run()
go tailer.readLines()
go tailer.updatePosition()
filesActive.Add(1.)
return tailer, nil
}

func (t *tailer) run() {
level.Info(t.logger).Log("msg", "start tailing file", "path", t.path)
// updatePosition is run in a goroutine and checks the current size of the file and saves it to the positions file
// at a regular interval. If there is ever an error it stops the tailer and exits, the tailer will be re-opened
// by the filetarget sync method if it still exists and will start reading from the last successful entry in the
// positions file.
func (t *tailer) updatePosition() {
positionSyncPeriod := t.positions.SyncPeriod()
positionWait := time.NewTicker(positionSyncPeriod)
t.running.Store(true)

// This function runs in a goroutine, if it exits this tailer will never do any more tailing.
// Clean everything up.
defer func() {
err := t.tail.Stop()
if err != nil {
level.Error(t.logger).Log("msg", "error stopping tailer when exiting tail goroutine", "path", t.path, "error", err)
}

positionWait.Stop()
t.cleanupMetrics()
t.running.Store(false)

close(t.done)
level.Info(t.logger).Log("msg", "position timer: exited", "path", t.path)
close(t.posdone)
}()

for {
select {
case <-positionWait.C:
err := t.markPositionAndSize()
if err != nil {
level.Error(t.logger).Log("msg", "error getting tail position and/or size, stopping tailer", "path", t.path, "error", err)
level.Error(t.logger).Log("msg", "position timer: error getting tail position and/or size, stopping tailer", "path", t.path, "error", err)
err := t.tail.Stop()
if err != nil {
level.Error(t.logger).Log("msg", "position timer: error stopping tailer", "path", t.path, "error", err)
}
return
}
case <-t.posquit:
return
}
}
}

// readLines runs in a goroutine and consumes the t.tail.Lines channel from the underlying tailer.
// it will only exit when that channel is closed. This is important to avoid a deadlock in the underlying
// tailer which can happen if there are unread lines in this channel and the Stop method on the tailer
// is called, the underlying tailer will never exit if there are unread lines in the t.tail.Lines channel
func (t *tailer) readLines() {
level.Info(t.logger).Log("msg", "tail routine: started", "path", t.path)

t.running.Store(true)

// This function runs in a goroutine, if it exits this tailer will never do any more tailing.
// Clean everything up.
defer func() {
t.cleanupMetrics()
t.running.Store(false)
level.Info(t.logger).Log("msg", "tail routine: exited", "path", t.path)
close(t.done)
}()

for {
select {
case line, ok := <-t.tail.Lines:
if !ok {
level.Info(t.logger).Log("msg", "tail routine: tail channel closed, stopping tailer", "path", t.path)
return
}

// Note currently the tail implementation hardcodes Err to nil, this should never hit.
if line.Err != nil {
level.Error(t.logger).Log("msg", "error reading line", "path", t.path, "error", line.Err)
level.Error(t.logger).Log("msg", "tail routine: error reading line", "path", t.path, "error", line.Err)
continue
}

readLines.WithLabelValues(t.path).Inc()
logLengthHistogram.WithLabelValues(t.path).Observe(float64(len(line.Text)))
if err := t.handler.Handle(model.LabelSet{}, line.Time, line.Text); err != nil {
level.Error(t.logger).Log("msg", "error handling line", "path", t.path, "error", err)
level.Error(t.logger).Log("msg", "tail routine: error handling line", "path", t.path, "error", err)
}
case <-t.quit:
return
}
}
}
Expand All @@ -136,33 +160,50 @@ func (t *tailer) markPositionAndSize() error {
t.posAndSizeMtx.Lock()
defer t.posAndSizeMtx.Unlock()

pos, err := t.tail.Tell()
size, err := t.tail.Size()
if err != nil {
// If the file no longer exists, no need to save position information
if err == os.ErrNotExist {
level.Info(t.logger).Log("msg", "skipping update of position for a file which does not currently exist")
return nil
}
return err
}
readBytes.WithLabelValues(t.path).Set(float64(pos))
t.positions.Put(t.path, pos)
totalBytes.WithLabelValues(t.path).Set(float64(size))

size, err := t.tail.Size()
pos, err := t.tail.Tell()
if err != nil {
return err
}
totalBytes.WithLabelValues(t.path).Set(float64(size))
readBytes.WithLabelValues(t.path).Set(float64(pos))
t.positions.Put(t.path, pos)

return nil
}

func (t *tailer) stop(removed bool) {
// Save the current position before shutting down tailer
if !removed {
func (t *tailer) stop() {
// stop can be called by two separate threads in filetarget, to avoid a panic closing channels more than once
// we wrap the stop in a sync.Once.
t.stopOnce.Do(func() {
// Shut down the position marker thread
close(t.posquit)
<-t.posdone

// Save the current position before shutting down tailer
err := t.markPositionAndSize()
if err != nil {
level.Error(t.logger).Log("msg", "error marking file position when stopping tailer", "path", t.path, "error", err)
}
}
close(t.quit)
<-t.done
level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path)

// Stop the underlying tailer
err = t.tail.Stop()
if err != nil {
level.Error(t.logger).Log("msg", "error stopping tailer", "path", t.path, "error", err)
}
// Wait for readLines() to consume all the remaining messages and exit when the channel is closed
<-t.done
level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path)
})
return
}

Expand Down
1 change: 1 addition & 0 deletions vendor/github.com/hpcloud/tail/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions vendor/github.com/hpcloud/tail/go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions vendor/github.com/hpcloud/tail/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 23 additions & 9 deletions vendor/github.com/hpcloud/tail/tail.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ github.com/hashicorp/golang-lru/simplelru
github.com/hashicorp/memberlist
# github.com/hashicorp/serf v0.9.0
github.com/hashicorp/serf/coordinate
# github.com/hpcloud/tail v1.0.0 => github.com/grafana/tail v0.0.0-20191024143944-0b54ddf21fe7
# github.com/hpcloud/tail v1.0.0 => github.com/grafana/tail v0.0.0-20201004203643-7aa4e4a91f03
## explicit
github.com/hpcloud/tail
github.com/hpcloud/tail/ratelimiter
Expand Down Expand Up @@ -1474,7 +1474,7 @@ rsc.io/binaryregexp/syntax
sigs.k8s.io/structured-merge-diff/v3/value
# sigs.k8s.io/yaml v1.2.0
sigs.k8s.io/yaml
# github.com/hpcloud/tail => github.com/grafana/tail v0.0.0-20191024143944-0b54ddf21fe7
# github.com/hpcloud/tail => github.com/grafana/tail v0.0.0-20201004203643-7aa4e4a91f03
# github.com/Azure/azure-sdk-for-go => github.com/Azure/azure-sdk-for-go v36.2.0+incompatible
# k8s.io/client-go => k8s.io/client-go v0.18.3
# github.com/satori/go.uuid => github.com/satori/go.uuid v1.2.0
Expand Down

0 comments on commit 8ea6c38

Please sign in to comment.