From 61cf1b2934c4024c3ac381ef513b8e421a4fbc87 Mon Sep 17 00:00:00 2001 From: Michael Estrin Date: Wed, 24 Sep 2025 10:14:32 -0500 Subject: [PATCH] fix: synchronization issues in plugin goroutines on shutdown https://github.com/blinklabs-io/adder/issues/45 Signed-off-by: Michael Estrin --- pipeline/pipeline.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 0b3dfbd..a4605d2 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -16,6 +16,7 @@ package pipeline import ( "fmt" + "sync" "github.com/blinklabs-io/adder/event" "github.com/blinklabs-io/adder/plugin" @@ -29,6 +30,7 @@ type Pipeline struct { outputChan chan event.Event errorChan chan error doneChan chan bool + wg sync.WaitGroup } func New() *Pipeline { @@ -108,6 +110,7 @@ func (p *Pipeline) Start() error { // Stop shuts down the pipeline and all plugins func (p *Pipeline) Stop() error { close(p.doneChan) + p.wg.Wait() close(p.errorChan) close(p.filterChan) close(p.outputChan) @@ -131,9 +134,11 @@ func (p *Pipeline) chanCopyLoop( input <-chan event.Event, output chan<- event.Event, ) { + p.wg.Add(1) for { select { case <-p.doneChan: + p.wg.Done() return case evt, ok := <-input: if ok { @@ -146,9 +151,11 @@ func (p *Pipeline) chanCopyLoop( // outputChanLoop reads events from the output channel and writes them to each output plugin's input channel func (p *Pipeline) outputChanLoop() { + p.wg.Add(1) for { select { case <-p.doneChan: + p.wg.Done() return case evt, ok := <-p.outputChan: if ok {