diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 0b3dfbd..a4605d2 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -16,6 +16,7 @@ package pipeline import ( "fmt" + "sync" "github.com/blinklabs-io/adder/event" "github.com/blinklabs-io/adder/plugin" @@ -29,6 +30,7 @@ type Pipeline struct { outputChan chan event.Event errorChan chan error doneChan chan bool + wg sync.WaitGroup } func New() *Pipeline { @@ -108,6 +110,7 @@ func (p *Pipeline) Start() error { // Stop shuts down the pipeline and all plugins func (p *Pipeline) Stop() error { close(p.doneChan) + p.wg.Wait() close(p.errorChan) close(p.filterChan) close(p.outputChan) @@ -131,9 +134,11 @@ func (p *Pipeline) chanCopyLoop( input <-chan event.Event, output chan<- event.Event, ) { + p.wg.Add(1) for { select { case <-p.doneChan: + p.wg.Done() return case evt, ok := <-input: if ok { @@ -146,9 +151,11 @@ func (p *Pipeline) chanCopyLoop( // outputChanLoop reads events from the output channel and writes them to each output plugin's input channel func (p *Pipeline) outputChanLoop() { + p.wg.Add(1) for { select { case <-p.doneChan: + p.wg.Done() return case evt, ok := <-p.outputChan: if ok {