From 4d40d365038b6f267f8523e4cfb8a5b3469e0b33 Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Mon, 28 Mar 2022 22:48:24 +0800 Subject: [PATCH] fix(worker): handle all jobs with graceful shutdown fix https://github.com/golang-queue/queue/issues/53 --- consumer.go | 7 ------- consumer_test.go | 4 ++-- queue.go | 33 ++++++++------------------------- 3 files changed, 10 insertions(+), 34 deletions(-) diff --git a/consumer.go b/consumer.go index b72ccfc..43b608c 100644 --- a/consumer.go +++ b/consumer.go @@ -76,13 +76,6 @@ func (s *Consumer) handle(job Job) error { // Run start the worker func (s *Consumer) Run(task QueuedMessage) error { - // check queue status - select { - case <-s.stop: - return ErrQueueShutdown - default: - } - var data Job _ = json.Unmarshal(task.Bytes(), &data) if v, ok := task.(Job); ok { diff --git a/consumer_test.go b/consumer_test.go index 9d1649d..1dcfdbf 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -143,7 +143,7 @@ func TestCancelJobAfterShutdown(t *testing.T) { func TestGoroutineLeak(t *testing.T) { w := NewConsumer( - WithLogger(NewEmptyLogger()), + WithLogger(NewLogger()), WithFn(func(ctx context.Context, m QueuedMessage) error { for { select { @@ -178,7 +178,7 @@ func TestGoroutineLeak(t *testing.T) { } q.Start() - time.Sleep(2 * time.Second) + time.Sleep(1 * time.Second) q.Release() fmt.Println("number of goroutines:", runtime.NumGoroutine()) } diff --git a/queue.go b/queue.go index 9708eca..1cb34e7 100644 --- a/queue.go +++ b/queue.go @@ -81,7 +81,9 @@ func NewQueue(opts ...Option) (*Queue, error) { // Start to enable all worker func (q *Queue) Start() { - go q.start() + q.routineGroup.Run(func() { + q.start() + }) } // Shutdown stops all queues. @@ -220,6 +222,7 @@ func (q *Queue) work(task QueuedMessage) { } } +// UpdateWorkerCount to update worker number dynamically. func (q *Queue) UpdateWorkerCount(num int) { q.workerCount = num q.schedule() @@ -244,13 +247,9 @@ func (q *Queue) start() { for { var task QueuedMessage - if atomic.LoadInt32(&q.stopFlag) == 1 { - return - } // request task from queue in background q.routineGroup.Run(func() { - loop: for { select { case <-q.quit: @@ -261,7 +260,7 @@ func (q *Queue) start() { if err != nil { select { case <-q.quit: - break loop + return case <-time.After(time.Second): // sleep 1 second to fetch new task } @@ -269,7 +268,7 @@ func (q *Queue) start() { } if t != nil { tasks <- t - break loop + return } } } @@ -292,24 +291,8 @@ func (q *Queue) start() { // check worker number q.schedule() - - // get worker to execute new task - select { - case <-q.quit: - if err := q.worker.Queue(task); err != nil { - q.logger.Errorf("can't re-queue task: %v", err) - } - return - case <-q.ready: - select { - case <-q.quit: - if err := q.worker.Queue(task); err != nil { - q.logger.Errorf("can't re-queue task: %v", err) - } - return - default: - } - } + // wait worker ready + <-q.ready // start new task q.metric.IncBusyWorker()