Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@ func (s *Consumer) handle(job Job) error {

// Run start the worker
func (s *Consumer) Run(task QueuedMessage) error {
// check queue status
select {
case <-s.stop:
return ErrQueueShutdown
default:
}

var data Job
_ = json.Unmarshal(task.Bytes(), &data)
if v, ok := task.(Job); ok {
Expand Down
4 changes: 2 additions & 2 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {

func TestGoroutineLeak(t *testing.T) {
w := NewConsumer(
WithLogger(NewEmptyLogger()),
WithLogger(NewLogger()),
WithFn(func(ctx context.Context, m QueuedMessage) error {
for {
select {
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestGoroutineLeak(t *testing.T) {
}

q.Start()
time.Sleep(2 * time.Second)
time.Sleep(1 * time.Second)
q.Release()
fmt.Println("number of goroutines:", runtime.NumGoroutine())
}
Expand Down
33 changes: 8 additions & 25 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func NewQueue(opts ...Option) (*Queue, error) {

// Start to enable all worker
func (q *Queue) Start() {
go q.start()
q.routineGroup.Run(func() {
q.start()
})
}

// Shutdown stops all queues.
Expand Down Expand Up @@ -220,6 +222,7 @@ func (q *Queue) work(task QueuedMessage) {
}
}

// UpdateWorkerCount to update worker number dynamically.
func (q *Queue) UpdateWorkerCount(num int) {
q.workerCount = num
q.schedule()
Expand All @@ -244,13 +247,9 @@ func (q *Queue) start() {

for {
var task QueuedMessage
if atomic.LoadInt32(&q.stopFlag) == 1 {
return
}

// request task from queue in background
q.routineGroup.Run(func() {
loop:
for {
select {
case <-q.quit:
Expand All @@ -261,15 +260,15 @@ func (q *Queue) start() {
if err != nil {
select {
case <-q.quit:
break loop
return
case <-time.After(time.Second):
// sleep 1 second to fetch new task
}
}
}
if t != nil {
tasks <- t
break loop
return
}
}
}
Expand All @@ -292,24 +291,8 @@ func (q *Queue) start() {

// check worker number
q.schedule()

// get worker to execute new task
select {
case <-q.quit:
if err := q.worker.Queue(task); err != nil {
q.logger.Errorf("can't re-queue task: %v", err)
}
return
case <-q.ready:
select {
case <-q.quit:
if err := q.worker.Queue(task); err != nil {
q.logger.Errorf("can't re-queue task: %v", err)
}
return
default:
}
}
// wait worker ready
<-q.ready

// start new task
q.metric.IncBusyWorker()
Expand Down