Skip to content

Commit

Permalink
fix: not used queue
Browse files Browse the repository at this point in the history
  • Loading branch information
justlorain committed Sep 3, 2023
1 parent 2aefd72 commit 29c4160
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 29 deletions.
36 changes: 18 additions & 18 deletions helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,39 +118,39 @@ LOOP:
_ = atomic.AddUint32(&v.workerNum, 1)
go v.recruit(wg)
}
// consume the waiting queue first
if v.WaitingTaskNum() > 0 {
if v.waitingQ.Size() > 0 {
task := v.waitingQ.DelFirst()
select {
case v.workerC <- task:
continue
default:
v.waitingQ.AddLast(task)
}
} else {
}
select {
case task, ok := <-v.taskC:
if !ok {
break LOOP
}
select {
case task, ok := <-v.taskC:
if !ok {
break LOOP
}
select {
case v.workerC <- task:
default:
v.waitingQ.AddLast(task)
}
case <-timer.C:
if v.WorkerNum() > 0 {
v.tryDismiss()
}
timer.Reset(v.options.workerIdleTimeout)
case v.workerC <- task:
default:
v.waitingQ.AddLast(task)
}
case <-timer.C:
if v.WorkerNum() > 0 {
v.tryDismiss()
}
timer.Reset(v.options.workerIdleTimeout)
default:
}
}
v.dismissAll()
wg.Wait()
}

func (v *Violin) clean() {
for v.WaitingTaskNum() > 0 {
for v.waitingQ.Size() > 0 {
task := v.waitingQ.DelFirst()
v.workerC <- task
}
Expand Down
22 changes: 11 additions & 11 deletions violin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,28 @@ func TestMaxWorkers(t *testing.T) {
t.Fatal("should have created one worker")
}

max := 13
v = New(WithMaxWorkers(max))
maximum := 13
v = New(WithMaxWorkers(maximum))
defer v.Shutdown()

assert.Equal(t, max, v.MaxWorkerNum())
assert.Equal(t, maximum, v.MaxWorkerNum())

started := make(chan struct{}, max)
started := make(chan struct{}, maximum)
release := make(chan struct{})

// Start workers, and have them all wait on a channel before completing.
for i := 0; i < max; i++ {
for i := 0; i < maximum; i++ {
v.Submit(func() {
started <- struct{}{}
<-release
})
}

// Wait for all queued tasks to be dispatched to workers.
assert.Equal(t, v.waitingQ.Size(), int(v.WaitingTaskNum()))
assert.Equal(t, v.waitingQ.Size(), v.WaitingTaskNum())

timeout := time.After(5 * time.Second)
for startCount := 0; startCount < max; {
for startCount := 0; startCount < maximum; {
select {
case <-started:
startCount++
Expand Down Expand Up @@ -127,18 +127,18 @@ func TestSubmitWait(t *testing.T) {
func TestStopRace(t *testing.T) {
defer goleak.VerifyNone(t)

max := 13
maximum := 13

v := New(WithMaxWorkers(max))
v := New(WithMaxWorkers(maximum))
defer v.Shutdown()

workRelChan := make(chan struct{})

var started sync.WaitGroup
started.Add(max)
started.Add(maximum)

// Start workers, and have them all wait on a channel before completing.
for i := 0; i < max; i++ {
for i := 0; i < maximum; i++ {
v.Submit(func() {
started.Done()
<-workRelChan
Expand Down

0 comments on commit 29c4160

Please sign in to comment.