diff --git a/helper.go b/helper.go index 9917a20..ba2c1e4 100644 --- a/helper.go +++ b/helper.go @@ -118,31 +118,31 @@ LOOP: _ = atomic.AddUint32(&v.workerNum, 1) go v.recruit(wg) } - // consume the waiting queue first - if v.WaitingTaskNum() > 0 { + if v.waitingQ.Size() > 0 { task := v.waitingQ.DelFirst() select { case v.workerC <- task: + continue default: v.waitingQ.AddLast(task) } - } else { + } + select { + case task, ok := <-v.taskC: + if !ok { + break LOOP + } select { - case task, ok := <-v.taskC: - if !ok { - break LOOP - } - select { - case v.workerC <- task: - default: - v.waitingQ.AddLast(task) - } - case <-timer.C: - if v.WorkerNum() > 0 { - v.tryDismiss() - } - timer.Reset(v.options.workerIdleTimeout) + case v.workerC <- task: + default: + v.waitingQ.AddLast(task) + } + case <-timer.C: + if v.WorkerNum() > 0 { + v.tryDismiss() } + timer.Reset(v.options.workerIdleTimeout) + default: } } v.dismissAll() @@ -150,7 +150,7 @@ LOOP: } func (v *Violin) clean() { - for v.WaitingTaskNum() > 0 { + for v.waitingQ.Size() > 0 { task := v.waitingQ.DelFirst() v.workerC <- task } diff --git a/violin_test.go b/violin_test.go index 10ae873..b4872dc 100644 --- a/violin_test.go +++ b/violin_test.go @@ -57,17 +57,17 @@ func TestMaxWorkers(t *testing.T) { t.Fatal("should have created one worker") } - max := 13 - v = New(WithMaxWorkers(max)) + maximum := 13 + v = New(WithMaxWorkers(maximum)) defer v.Shutdown() - assert.Equal(t, max, v.MaxWorkerNum()) + assert.Equal(t, maximum, v.MaxWorkerNum()) - started := make(chan struct{}, max) + started := make(chan struct{}, maximum) release := make(chan struct{}) // Start workers, and have them all wait on a channel before completing. - for i := 0; i < max; i++ { + for i := 0; i < maximum; i++ { v.Submit(func() { started <- struct{}{} <-release @@ -75,10 +75,10 @@ func TestMaxWorkers(t *testing.T) { } // Wait for all queued tasks to be dispatched to workers. - assert.Equal(t, v.waitingQ.Size(), int(v.WaitingTaskNum())) + assert.Equal(t, v.waitingQ.Size(), v.WaitingTaskNum()) timeout := time.After(5 * time.Second) - for startCount := 0; startCount < max; { + for startCount := 0; startCount < maximum; { select { case <-started: startCount++ @@ -127,18 +127,18 @@ func TestSubmitWait(t *testing.T) { func TestStopRace(t *testing.T) { defer goleak.VerifyNone(t) - max := 13 + maximum := 13 - v := New(WithMaxWorkers(max)) + v := New(WithMaxWorkers(maximum)) defer v.Shutdown() workRelChan := make(chan struct{}) var started sync.WaitGroup - started.Add(max) + started.Add(maximum) // Start workers, and have them all wait on a channel before completing. - for i := 0; i < max; i++ { + for i := 0; i < maximum; i++ { v.Submit(func() { started.Done() <-workRelChan