diff --git a/NOTICE.txt b/NOTICE.txt index d5811381395..4a49e2f9af4 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -6336,11 +6336,11 @@ SOFTWARE -------------------------------------------------------------------------------- Dependency : github.com/elastic/go-concert -Version: v0.0.4 +Version: v0.1.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/go-concert@v0.0.4/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/go-concert@v0.1.0/LICENSE: Apache License Version 2.0, January 2004 diff --git a/filebeat/input/filestream/filestream.go b/filebeat/input/filestream/filestream.go index 1a559c67e06..908d8558145 100644 --- a/filebeat/input/filestream/filestream.go +++ b/filebeat/input/filestream/filestream.go @@ -18,18 +18,19 @@ package filestream import ( - "context" "errors" "io" "os" "time" + "github.com/elastic/go-concert/ctxtool" + "github.com/elastic/go-concert/timed" + "github.com/elastic/go-concert/unison" + input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/common/backoff" "github.com/elastic/beats/v7/libbeat/common/file" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/go-concert/ctxtool" - "github.com/elastic/go-concert/unison" ) var ( @@ -39,10 +40,9 @@ var ( // logFile contains all log related data type logFile struct { - file *os.File - log *logp.Logger - ctx context.Context - cancelReading context.CancelFunc + file *os.File + log *logp.Logger + readerCtx ctxtool.CancelContext closeAfterInterval time.Duration closeOnEOF bool @@ -55,7 +55,7 @@ type logFile struct { offset int64 lastTimeRead time.Time backoff backoff.Backoff - tg unison.TaskGroup + tg *unison.TaskGroup } // newFileReader creates a new log instance to read log sources @@ -71,6 +71,9 @@ func newFileReader( return nil, err } + readerCtx := ctxtool.WithCancelContext(ctxtool.FromCanceller(canceler)) + tg := unison.TaskGroupWithCancel(readerCtx) + l := &logFile{ file: f, log: log, @@ -83,16 +86,10 @@ func newFileReader( offset: offset, lastTimeRead: time.Now(), backoff: backoff.NewExpBackoff(canceler.Done(), config.Backoff.Init, config.Backoff.Max), - tg: unison.TaskGroup{}, + readerCtx: readerCtx, + tg: tg, } - l.ctx, l.cancelReading = ctxtool.WithFunc(ctxtool.FromCanceller(canceler), func() { - err := l.tg.Stop() - if err != nil { - l.log.Errorf("Error while stopping filestream logFile reader: %v", err) - } - }) - l.startFileMonitoringIfNeeded() return l, nil @@ -103,7 +100,7 @@ func newFileReader( func (f *logFile) Read(buf []byte) (int, error) { totalN := 0 - for f.ctx.Err() == nil { + for f.readerCtx.Err() == nil { n, err := f.file.Read(buf) if n > 0 { f.offset += int64(n) @@ -154,35 +151,18 @@ func (f *logFile) startFileMonitoringIfNeeded() { } func (f *logFile) closeIfTimeout(ctx unison.Canceler) { - timer := time.NewTimer(f.closeAfterInterval) - defer timer.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-timer.C: - f.cancelReading() - return - } + if err := timed.Wait(ctx, f.closeAfterInterval); err == nil { + f.readerCtx.Cancel() } } func (f *logFile) periodicStateCheck(ctx unison.Canceler) { - ticker := time.NewTicker(f.checkInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if f.shouldBeClosed() { - f.cancelReading() - return - } + timed.Periodic(ctx, f.checkInterval, func() error { + if f.shouldBeClosed() { + f.readerCtx.Cancel() } - } + return nil + }) } func (f *logFile) shouldBeClosed() bool { @@ -267,6 +247,8 @@ func (f *logFile) handleEOF() error { // Close func (f *logFile) Close() error { - f.cancelReading() - return f.file.Close() + f.readerCtx.Cancel() + err := f.file.Close() + f.tg.Stop() // Wait until all resources are released for sure. + return err } diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 8d40284c366..8f21cb505ce 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -159,7 +159,7 @@ func (inp *filestream) Run( return err } - _, streamCancel := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() { + _, streamCancel := ctxtool.WithFunc(ctx.Cancelation, func() { log.Debug("Closing reader of filestream") err := r.Close() if err != nil { diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go index 67bc9c7ac13..ad2d2a16e8f 100644 --- a/filebeat/input/v2/compat/compat.go +++ b/filebeat/input/v2/compat/compat.go @@ -21,6 +21,7 @@ package compat import ( + "context" "fmt" "sync" @@ -31,7 +32,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/go-concert" + "github.com/elastic/go-concert/ctxtool" ) // factory implements the cfgfile.RunnerFactory interface and wraps the @@ -52,7 +53,7 @@ type runner struct { log *logp.Logger agent *beat.Info wg sync.WaitGroup - sig *concert.OnceSignaler + sig ctxtool.CancelContext input v2.Input connector beat.PipelineConnector } @@ -94,7 +95,7 @@ func (f *factory) Create( id: id, log: f.log.Named(input.Name()), agent: &f.info, - sig: concert.NewOnceSignaler(), + sig: ctxtool.WithCancelContext(context.Background()), input: input, connector: p, }, nil @@ -126,7 +127,7 @@ func (r *runner) Start() { } func (r *runner) Stop() { - r.sig.Trigger() + r.sig.Cancel() r.wg.Wait() r.log.Infof("Input '%v' stopped", r.input.Name()) } diff --git a/filebeat/input/winlog/input.go b/filebeat/input/winlog/input.go index ced7a62b228..08b15a84b4b 100644 --- a/filebeat/input/winlog/input.go +++ b/filebeat/input/winlog/input.go @@ -98,7 +98,7 @@ func (eventlogRunner) Run( // setup closing the API if either the run function is signaled asynchronously // to shut down or when returning after io.EOF - cancelCtx, cancelFn := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() { + cancelCtx, cancelFn := ctxtool.WithFunc(ctx.Cancelation, func() { if err := api.Close(); err != nil { log.Errorf("Error while closing Windows Eventlog Access: %v", err) } diff --git a/filebeat/inputsource/common/dgram/server.go b/filebeat/inputsource/common/dgram/server.go index ecb4844ed67..41a028f7d33 100644 --- a/filebeat/inputsource/common/dgram/server.go +++ b/filebeat/inputsource/common/dgram/server.go @@ -72,26 +72,29 @@ func (l *Listener) Run(ctx context.Context) error { l.log.Info("Started listening for " + l.family.String() + " connection") for ctx.Err() == nil { - conn, err := l.listener() - if err != nil { - l.log.Debugw("Cannot connect", "error", err) - continue - } - connCtx, connCancel := ctxtool.WithFunc(ctx, func() { - conn.Close() - }) - - err = l.run(connCtx, conn) - if err != nil { - l.log.Debugw("Error while processing input", "error", err) - connCancel() - continue - } - connCancel() + l.doRun(ctx) } return nil } +func (l *Listener) doRun(ctx context.Context) { + conn, err := l.listener() + if err != nil { + l.log.Debugw("Cannot connect", "error", err) + return + } + + connCtx, connCancel := ctxtool.WithFunc(ctx, func() { + conn.Close() + }) + defer connCancel() + + err = l.connectAndRun(connCtx, conn) + if err != nil { + l.log.Debugw("Error while processing input", "error", err) + } +} + func (l *Listener) Start() error { l.log.Info("Started listening for " + l.family.String() + " connection") @@ -106,12 +109,14 @@ func (l *Listener) Start() error { }) defer connCancel() - return l.run(ctxtool.FromCanceller(connCtx), conn) + return l.connectAndRun(ctxtool.FromCanceller(connCtx), conn) }) return nil } -func (l *Listener) run(ctx context.Context, conn net.PacketConn) error { +func (l *Listener) connectAndRun(ctx context.Context, conn net.PacketConn) error { + defer l.log.Recover("Panic handling datagram") + handler := l.connect(*l.config) for ctx.Err() == nil { err := handler(ctx, conn) diff --git a/filebeat/inputsource/common/streaming/listener.go b/filebeat/inputsource/common/streaming/listener.go index 16e708f554b..11e60a35b7b 100644 --- a/filebeat/inputsource/common/streaming/listener.go +++ b/filebeat/inputsource/common/streaming/listener.go @@ -41,8 +41,7 @@ type Listener struct { family inputsource.Family wg sync.WaitGroup log *logp.Logger - ctx context.Context - cancel context.CancelFunc + ctx ctxtool.CancelContext clientsCount atomic.Int handlerFactory HandlerFactory listenerFactory ListenerFactory @@ -111,9 +110,9 @@ func (l *Listener) initListen(ctx context.Context) error { return err } - l.ctx, l.cancel = ctxtool.WithFunc(ctx, func() { + l.ctx = ctxtool.WrapCancel(ctxtool.WithFunc(ctx, func() { l.Listener.Close() - }) + })) return nil } @@ -171,7 +170,7 @@ func (l *Listener) run() { // Stop stops accepting new incoming connections and Close any active clients func (l *Listener) Stop() { l.log.Info("Stopping" + l.family.String() + "server") - l.cancel() + l.ctx.Cancel() l.wg.Wait() l.log.Info(l.family.String() + " server stopped") } diff --git a/go.mod b/go.mod index fb47064f7ca..7d5c452cc38 100644 --- a/go.mod +++ b/go.mod @@ -61,7 +61,7 @@ require ( github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2 github.com/elastic/ecs v1.6.0 github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a - github.com/elastic/go-concert v0.0.4 + github.com/elastic/go-concert v0.1.0 github.com/elastic/go-libaudit/v2 v2.1.0 github.com/elastic/go-licenser v0.3.1 github.com/elastic/go-lookslike v0.3.0 diff --git a/go.sum b/go.sum index 28477be0c46..118df228463 100644 --- a/go.sum +++ b/go.sum @@ -253,8 +253,8 @@ github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a h1 github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a/go.mod h1:uh/Gj9a0XEbYoM4NYz4LvaBVARz3QXLmlNjsrKY9fTc= github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 h1:cWPqxlPtir4RoQVCpGSRXmLqjEHpJKbR60rxh1nQZY4= github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270/go.mod h1:Msl1pdboCbArMF/nSCDUXgQuWTeoMmE/z8607X+k7ng= -github.com/elastic/go-concert v0.0.4 h1:pzgYCmJ/xMJsW8PSk33inAWZ065hrwSeP79TpwAbsLE= -github.com/elastic/go-concert v0.0.4/go.mod h1:9MtFarjXroUgmm0m6HY3NSe1XiKhdktiNRRj9hWvIaM= +github.com/elastic/go-concert v0.1.0 h1:gz/yvA3bseuHzoF/lNMltkL30XdPqMo+bg5o2mBx2EE= +github.com/elastic/go-concert v0.1.0/go.mod h1:9MtFarjXroUgmm0m6HY3NSe1XiKhdktiNRRj9hWvIaM= github.com/elastic/go-libaudit/v2 v2.1.0 h1:yWSKoGaoWLGFPjqWrQ4gwtuM77pTk7K4CsPxXss8he4= github.com/elastic/go-libaudit/v2 v2.1.0/go.mod h1:MM/l/4xV7ilcl+cIblL8Zn448J7RZaDwgNLE4gNKYPg= github.com/elastic/go-licenser v0.3.1 h1:RmRukU/JUmts+rpexAw0Fvt2ly7VVu6mw8z4HrEzObU= diff --git a/journalbeat/reader/journal.go b/journalbeat/reader/journal.go index 6b3136d65c6..10aa382a142 100644 --- a/journalbeat/reader/journal.go +++ b/journalbeat/reader/journal.go @@ -39,23 +39,23 @@ type Reader struct { r *journalread.Reader journal *sdjournal.Journal config Config - done chan struct{} + ctx ctxtool.CancelContext logger *logp.Logger backoff backoff.Backoff } // New creates a new journal reader and moves the FP to the configured position. -func New(c Config, done chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) { +func New(c Config, done <-chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) { return newReader(c.Path, c, done, state, logger) } // NewLocal creates a reader to read form the local journal and moves the FP // to the configured position. -func NewLocal(c Config, done chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) { +func NewLocal(c Config, done <-chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) { return newReader(LocalSystemJournalID, c, done, state, logger) } -func newReader(path string, c Config, done chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) { +func newReader(path string, c Config, done <-chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) { logger = logger.With("path", path) backoff := backoff.NewExpBackoff(done, c.Backoff, c.MaxBackoff) @@ -79,7 +79,7 @@ func newReader(path string, c Config, done chan struct{}, state checkpoint.Journ r: r, journal: journal, config: c, - done: done, + ctx: ctxtool.WithCancelContext(ctxtool.FromChannel(done)), logger: logger, backoff: backoff, }, nil @@ -100,13 +100,14 @@ func seekBy(log *logp.Logger, c Config, state checkpoint.JournalState) (journalr // Close closes the underlying journal reader. func (r *Reader) Close() { instance.StopMonitoringJournal(r.config.Path) + r.ctx.Cancel() r.r.Close() } // Next waits until a new event shows up and returns it. // It blocks until an event is returned or an error occurs. func (r *Reader) Next() (*beat.Event, error) { - entry, err := r.r.Next(ctxtool.FromChannel(r.done)) + entry, err := r.r.Next(r.ctx) if err != nil { return nil, err } diff --git a/x-pack/filebeat/input/cloudfoundry/v1.go b/x-pack/filebeat/input/cloudfoundry/v1.go index ffc5ccd4a75..100fb944319 100644 --- a/x-pack/filebeat/input/cloudfoundry/v1.go +++ b/x-pack/filebeat/input/cloudfoundry/v1.go @@ -51,7 +51,7 @@ func (i *inputV1) Run(ctx v2.Context, publisher stateless.Publisher) error { return errors.Wrapf(err, "initializing doppler consumer") } - stopCtx, cancel := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() { + stopCtx, cancel := ctxtool.WithFunc(ctx.Cancelation, func() { // wait stops the consumer and waits for all internal go-routines to be stopped. consumer.Wait() }) diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index 2c799c1f14f..3e01616ed48 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -103,9 +103,7 @@ func (e *httpEndpoint) Run(ctx v2.Context, publisher stateless.Publisher) error mux := http.NewServeMux() mux.HandleFunc(e.config.URL, withValidator(validator, handler.apiResponse)) server := &http.Server{Addr: e.addr, TLSConfig: e.tlsConfig, Handler: mux} - _, cancel := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() { - server.Close() - }) + _, cancel := ctxtool.WithFunc(ctx.Cancelation, func() { server.Close() }) defer cancel() var err error