Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update go-concert to 0.1.0 #23770

Merged
merged 4 commits into from
Feb 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6336,11 +6336,11 @@ SOFTWARE

--------------------------------------------------------------------------------
Dependency : github.com/elastic/go-concert
Version: v0.0.4
Version: v0.1.0
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/go-concert@v0.0.4/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/elastic/go-concert@v0.1.0/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down
68 changes: 25 additions & 43 deletions filebeat/input/filestream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
package filestream

import (
"context"
"errors"
"io"
"os"
"time"

"github.com/elastic/go-concert/ctxtool"
"github.com/elastic/go-concert/timed"
"github.com/elastic/go-concert/unison"

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/common/backoff"
"github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert/ctxtool"
"github.com/elastic/go-concert/unison"
)

var (
Expand All @@ -39,10 +40,9 @@ var (

// logFile contains all log related data
type logFile struct {
file *os.File
log *logp.Logger
ctx context.Context
cancelReading context.CancelFunc
file *os.File
log *logp.Logger
readerCtx ctxtool.CancelContext

closeAfterInterval time.Duration
closeOnEOF bool
Expand All @@ -55,7 +55,7 @@ type logFile struct {
offset int64
lastTimeRead time.Time
backoff backoff.Backoff
tg unison.TaskGroup
tg *unison.TaskGroup
}

// newFileReader creates a new log instance to read log sources
Expand All @@ -71,6 +71,9 @@ func newFileReader(
return nil, err
}

readerCtx := ctxtool.WithCancelContext(ctxtool.FromCanceller(canceler))
tg := unison.TaskGroupWithCancel(readerCtx)

l := &logFile{
file: f,
log: log,
Expand All @@ -83,16 +86,10 @@ func newFileReader(
offset: offset,
lastTimeRead: time.Now(),
backoff: backoff.NewExpBackoff(canceler.Done(), config.Backoff.Init, config.Backoff.Max),
tg: unison.TaskGroup{},
readerCtx: readerCtx,
tg: tg,
}

l.ctx, l.cancelReading = ctxtool.WithFunc(ctxtool.FromCanceller(canceler), func() {
err := l.tg.Stop()
if err != nil {
l.log.Errorf("Error while stopping filestream logFile reader: %v", err)
}
})
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The group now shuts down when the readerCtx gets cancelled. This is a slight change in semantics, as the reader context was closed in the past only after the taskgroup go-routines have finished (Stop used to wait).

In order to make behavior and signal propagation a little more predictable, the change ensures that we have a clear chain of cancellation contexts:
canceler -> readerCtx -> (taskgroup ctx). Before we used to have something like:

canceler -> ctx
canceler -> (taskgroup ctx)


l.startFileMonitoringIfNeeded()

return l, nil
Expand All @@ -103,7 +100,7 @@ func newFileReader(
func (f *logFile) Read(buf []byte) (int, error) {
totalN := 0

for f.ctx.Err() == nil {
for f.readerCtx.Err() == nil {
n, err := f.file.Read(buf)
if n > 0 {
f.offset += int64(n)
Expand Down Expand Up @@ -154,35 +151,18 @@ func (f *logFile) startFileMonitoringIfNeeded() {
}

func (f *logFile) closeIfTimeout(ctx unison.Canceler) {
timer := time.NewTimer(f.closeAfterInterval)
defer timer.Stop()

for {
select {
case <-ctx.Done():
return
case <-timer.C:
f.cancelReading()
return
}
if err := timed.Wait(ctx, f.closeAfterInterval); err == nil {
f.readerCtx.Cancel()
}
}

func (f *logFile) periodicStateCheck(ctx unison.Canceler) {
ticker := time.NewTicker(f.checkInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if f.shouldBeClosed() {
f.cancelReading()
return
}
timed.Periodic(ctx, f.checkInterval, func() error {
if f.shouldBeClosed() {
f.readerCtx.Cancel()
}
}
return nil
})
Copy link
Author

@urso urso Jan 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced the for loop with timed.Periodic. The for loop did not check the state of the context before executing the select statement. Due to select checking channels in random order the ctx.Done() can be ignored multiple times in the worst case. Switching to Periodic resolves this issue.

}

func (f *logFile) shouldBeClosed() bool {
Expand Down Expand Up @@ -267,6 +247,8 @@ func (f *logFile) handleEOF() error {

// Close
func (f *logFile) Close() error {
f.cancelReading()
return f.file.Close()
f.readerCtx.Cancel()
err := f.file.Close()
f.tg.Stop() // Wait until all resources are released for sure.
return err
}
2 changes: 1 addition & 1 deletion filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (inp *filestream) Run(
return err
}

_, streamCancel := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() {
_, streamCancel := ctxtool.WithFunc(ctx.Cancelation, func() {
log.Debug("Closing reader of filestream")
err := r.Close()
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions filebeat/input/v2/compat/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package compat

import (
"context"
"fmt"
"sync"

Expand All @@ -31,7 +32,7 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert"
"github.com/elastic/go-concert/ctxtool"
)

// factory implements the cfgfile.RunnerFactory interface and wraps the
Expand All @@ -52,7 +53,7 @@ type runner struct {
log *logp.Logger
agent *beat.Info
wg sync.WaitGroup
sig *concert.OnceSignaler
sig ctxtool.CancelContext
input v2.Input
connector beat.PipelineConnector
}
Expand Down Expand Up @@ -94,7 +95,7 @@ func (f *factory) Create(
id: id,
log: f.log.Named(input.Name()),
agent: &f.info,
sig: concert.NewOnceSignaler(),
sig: ctxtool.WithCancelContext(context.Background()),
input: input,
connector: p,
}, nil
Expand Down Expand Up @@ -126,7 +127,7 @@ func (r *runner) Start() {
}

func (r *runner) Stop() {
r.sig.Trigger()
r.sig.Cancel()
r.wg.Wait()
r.log.Infof("Input '%v' stopped", r.input.Name())
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/winlog/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (eventlogRunner) Run(

// setup closing the API if either the run function is signaled asynchronously
// to shut down or when returning after io.EOF
cancelCtx, cancelFn := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() {
cancelCtx, cancelFn := ctxtool.WithFunc(ctx.Cancelation, func() {
if err := api.Close(); err != nil {
log.Errorf("Error while closing Windows Eventlog Access: %v", err)
}
Expand Down
41 changes: 23 additions & 18 deletions filebeat/inputsource/common/dgram/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,29 @@ func (l *Listener) Run(ctx context.Context) error {
l.log.Info("Started listening for " + l.family.String() + " connection")

for ctx.Err() == nil {
conn, err := l.listener()
if err != nil {
l.log.Debugw("Cannot connect", "error", err)
continue
}
connCtx, connCancel := ctxtool.WithFunc(ctx, func() {
conn.Close()
})

err = l.run(connCtx, conn)
if err != nil {
l.log.Debugw("Error while processing input", "error", err)
connCancel()
continue
}
connCancel()
l.doRun(ctx)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the body into a separate method, so we can do defer cancel() to cleanup resources. The handle is created independent of the module and we must cleanup in case the handler panics.

TODO: shall we consider to recover from a panic here or only recover if the handler failed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the package net, we probably won't see many panics from listener function. Also, how could we recover from such issues? If anything fails during that function call it requires action from the user, e.g. fixing addresses or changing the protocol. These are problems from those we cannot recover without user interference.

I would only call recover to take care of panics from handler.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would only call recover to take care of panics from handler.

yeah, I did mean the handler. Added Recover to connectAndRun

}
return nil
}

func (l *Listener) doRun(ctx context.Context) {
conn, err := l.listener()
if err != nil {
l.log.Debugw("Cannot connect", "error", err)
return
}

connCtx, connCancel := ctxtool.WithFunc(ctx, func() {
conn.Close()
})
defer connCancel()

err = l.connectAndRun(connCtx, conn)
if err != nil {
l.log.Debugw("Error while processing input", "error", err)
}
}

func (l *Listener) Start() error {
l.log.Info("Started listening for " + l.family.String() + " connection")

Expand All @@ -106,12 +109,14 @@ func (l *Listener) Start() error {
})
defer connCancel()

return l.run(ctxtool.FromCanceller(connCtx), conn)
return l.connectAndRun(ctxtool.FromCanceller(connCtx), conn)
})
return nil
}

func (l *Listener) run(ctx context.Context, conn net.PacketConn) error {
func (l *Listener) connectAndRun(ctx context.Context, conn net.PacketConn) error {
defer l.log.Recover("Panic handling datagram")

handler := l.connect(*l.config)
for ctx.Err() == nil {
err := handler(ctx, conn)
Expand Down
9 changes: 4 additions & 5 deletions filebeat/inputsource/common/streaming/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ type Listener struct {
family inputsource.Family
wg sync.WaitGroup
log *logp.Logger
ctx context.Context
cancel context.CancelFunc
ctx ctxtool.CancelContext
clientsCount atomic.Int
handlerFactory HandlerFactory
listenerFactory ListenerFactory
Expand Down Expand Up @@ -111,9 +110,9 @@ func (l *Listener) initListen(ctx context.Context) error {
return err
}

l.ctx, l.cancel = ctxtool.WithFunc(ctx, func() {
l.ctx = ctxtool.WrapCancel(ctxtool.WithFunc(ctx, func() {
l.Listener.Close()
})
}))
return nil
}

Expand Down Expand Up @@ -171,7 +170,7 @@ func (l *Listener) run() {
// Stop stops accepting new incoming connections and Close any active clients
func (l *Listener) Stop() {
l.log.Info("Stopping" + l.family.String() + "server")
l.cancel()
l.ctx.Cancel()
l.wg.Wait()
l.log.Info(l.family.String() + " server stopped")
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ require (
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2
github.com/elastic/ecs v1.6.0
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a
github.com/elastic/go-concert v0.0.4
github.com/elastic/go-concert v0.1.0
github.com/elastic/go-libaudit/v2 v2.1.0
github.com/elastic/go-licenser v0.3.1
github.com/elastic/go-lookslike v0.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a h1
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a/go.mod h1:uh/Gj9a0XEbYoM4NYz4LvaBVARz3QXLmlNjsrKY9fTc=
github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 h1:cWPqxlPtir4RoQVCpGSRXmLqjEHpJKbR60rxh1nQZY4=
github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270/go.mod h1:Msl1pdboCbArMF/nSCDUXgQuWTeoMmE/z8607X+k7ng=
github.com/elastic/go-concert v0.0.4 h1:pzgYCmJ/xMJsW8PSk33inAWZ065hrwSeP79TpwAbsLE=
github.com/elastic/go-concert v0.0.4/go.mod h1:9MtFarjXroUgmm0m6HY3NSe1XiKhdktiNRRj9hWvIaM=
github.com/elastic/go-concert v0.1.0 h1:gz/yvA3bseuHzoF/lNMltkL30XdPqMo+bg5o2mBx2EE=
github.com/elastic/go-concert v0.1.0/go.mod h1:9MtFarjXroUgmm0m6HY3NSe1XiKhdktiNRRj9hWvIaM=
github.com/elastic/go-libaudit/v2 v2.1.0 h1:yWSKoGaoWLGFPjqWrQ4gwtuM77pTk7K4CsPxXss8he4=
github.com/elastic/go-libaudit/v2 v2.1.0/go.mod h1:MM/l/4xV7ilcl+cIblL8Zn448J7RZaDwgNLE4gNKYPg=
github.com/elastic/go-licenser v0.3.1 h1:RmRukU/JUmts+rpexAw0Fvt2ly7VVu6mw8z4HrEzObU=
Expand Down
13 changes: 7 additions & 6 deletions journalbeat/reader/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,23 @@ type Reader struct {
r *journalread.Reader
journal *sdjournal.Journal
config Config
done chan struct{}
ctx ctxtool.CancelContext
logger *logp.Logger
backoff backoff.Backoff
}

// New creates a new journal reader and moves the FP to the configured position.
func New(c Config, done chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) {
func New(c Config, done <-chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) {
return newReader(c.Path, c, done, state, logger)
}

// NewLocal creates a reader to read form the local journal and moves the FP
// to the configured position.
func NewLocal(c Config, done chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) {
func NewLocal(c Config, done <-chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) {
return newReader(LocalSystemJournalID, c, done, state, logger)
}

func newReader(path string, c Config, done chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) {
func newReader(path string, c Config, done <-chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) {
logger = logger.With("path", path)
backoff := backoff.NewExpBackoff(done, c.Backoff, c.MaxBackoff)

Expand All @@ -79,7 +79,7 @@ func newReader(path string, c Config, done chan struct{}, state checkpoint.Journ
r: r,
journal: journal,
config: c,
done: done,
ctx: ctxtool.WithCancelContext(ctxtool.FromChannel(done)),
logger: logger,
backoff: backoff,
}, nil
Expand All @@ -100,13 +100,14 @@ func seekBy(log *logp.Logger, c Config, state checkpoint.JournalState) (journalr
// Close closes the underlying journal reader.
func (r *Reader) Close() {
instance.StopMonitoringJournal(r.config.Path)
r.ctx.Cancel()
r.r.Close()
}

// Next waits until a new event shows up and returns it.
// It blocks until an event is returned or an error occurs.
func (r *Reader) Next() (*beat.Event, error) {
entry, err := r.r.Next(ctxtool.FromChannel(r.done))
entry, err := r.r.Next(r.ctx)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/cloudfoundry/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (i *inputV1) Run(ctx v2.Context, publisher stateless.Publisher) error {
return errors.Wrapf(err, "initializing doppler consumer")
}

stopCtx, cancel := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() {
stopCtx, cancel := ctxtool.WithFunc(ctx.Cancelation, func() {
// wait stops the consumer and waits for all internal go-routines to be stopped.
consumer.Wait()
})
Expand Down
Loading