From 5d814a44027f78f19f32501122052590fdf14837 Mon Sep 17 00:00:00 2001 From: shivanshgaur Date: Mon, 11 May 2020 15:51:03 +0530 Subject: [PATCH] Worker graceful shutdown (#543) * wait in broker consuming goroutine until worker.Quit() completes * wait in broker consuming goroutine until worker.Quit() completes for v2 --- v1/worker.go | 5 +++++ v2/worker.go | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/v1/worker.go b/v1/worker.go index 5105dce93..d15ac445f 100644 --- a/v1/worker.go +++ b/v1/worker.go @@ -6,6 +6,7 @@ import ( "net/url" "os" "os/signal" + "sync" "syscall" "time" @@ -69,6 +70,7 @@ func (worker *Worker) LaunchAsync(errorsChan chan<- error) { log.INFO.Printf(" - PrefetchCount: %d", cnf.AMQP.PrefetchCount) } + var signalWG sync.WaitGroup // Goroutine to start broker consumption and handle retries when broker connection dies go func() { for { @@ -81,6 +83,7 @@ func (worker *Worker) LaunchAsync(errorsChan chan<- error) { log.WARNING.Printf("Broker failed with error: %s", err) } } else { + signalWG.Wait() errorsChan <- err // stop the goroutine return } @@ -103,8 +106,10 @@ func (worker *Worker) LaunchAsync(errorsChan chan<- error) { // After first Ctrl+C start quitting the worker gracefully log.WARNING.Print("Waiting for running tasks to finish before shutting down") go func() { + signalWG.Add(1) worker.Quit() errorsChan <- ErrWorkerQuitGracefully + signalWG.Done() }() } else { // Abort the program when user hits Ctrl+C second time in a row diff --git a/v2/worker.go b/v2/worker.go index 276b5bb80..790e7aae5 100644 --- a/v2/worker.go +++ b/v2/worker.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "os/signal" + "sync" "syscall" "time" @@ -63,6 +64,7 @@ func (worker *Worker) LaunchAsync(errorsChan chan<- error) { log.INFO.Printf(" - PrefetchCount: %d", cnf.AMQP.PrefetchCount) } + var signalWG sync.WaitGroup // Goroutine to start broker consumption and handle retries when broker connection dies go func() { for { @@ -75,6 +77,7 @@ func (worker *Worker) LaunchAsync(errorsChan chan<- error) { log.WARNING.Printf("Broker failed with error: %s", err) } } else { + signalWG.Wait() errorsChan <- err // stop the goroutine return } @@ -97,8 +100,10 @@ func (worker *Worker) LaunchAsync(errorsChan chan<- error) { // After first Ctrl+C start quitting the worker gracefully log.WARNING.Print("Waiting for running tasks to finish before shutting down") go func() { + signalWG.Add(1) worker.Quit() errorsChan <- errors.New("Worker quit gracefully") + signalWG.Done() }() } else { // Abort the program when user hits Ctrl+C second time in a row