Skip to content

Commit

Permalink
chore(queue): don't prepare request data before worker done. (#63)
Browse files Browse the repository at this point in the history
* chore(queue): don't prepare request data before the worker has done.

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
  • Loading branch information
appleboy committed Apr 20, 2022
1 parent ed24fa1 commit 03d90b7
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 7 deletions.
15 changes: 10 additions & 5 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,16 @@ func (q *Queue) start() {
tasks := make(chan core.QueuedMessage, 1)

for {
// check worker number
q.schedule()

select {
// wait worker ready
case <-q.ready:
case <-q.quit:
return
}

// request task from queue in background
q.routineGroup.Run(func() {
for {
Expand Down Expand Up @@ -289,11 +299,6 @@ func (q *Queue) start() {
return
}

// check worker number
q.schedule()
// wait worker ready
<-q.ready

// start new task
q.metric.IncBusyWorker()
q.routineGroup.Run(func() {
Expand Down
30 changes: 28 additions & 2 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (m mockMessage) Bytes() []byte {
return []byte(m.message)
}

func TestNewQueue(t *testing.T) {
func TestNewQueueWithZeroWorker(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()

Expand All @@ -34,18 +34,44 @@ func TestNewQueue(t *testing.T) {

w := mocks.NewMockWorker(controller)
w.EXPECT().Shutdown().Return(nil)
w.EXPECT().Request().Return(nil, nil)
q, err = NewQueue(
WithWorker(w),
WithWorkerCount(0),
)
assert.NoError(t, err)
assert.NotNil(t, q)

q.Start()
time.Sleep(50 * time.Millisecond)
assert.Equal(t, 0, q.BusyWorkers())
q.Release()
}

func TestNewQueueWithDefaultWorker(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()

q, err := NewQueue()
assert.Error(t, err)
assert.Nil(t, q)

w := mocks.NewMockWorker(controller)
m := mocks.NewMockQueuedMessage(controller)
m.EXPECT().Bytes().Return([]byte("test")).AnyTimes()
w.EXPECT().Shutdown().Return(nil)
w.EXPECT().Request().Return(m, nil).AnyTimes()
w.EXPECT().Run(m).Return(nil).AnyTimes()
q, err = NewQueue(
WithWorker(w),
)
assert.NoError(t, err)
assert.NotNil(t, q)

q.Start()
q.Release()
assert.Equal(t, 0, q.BusyWorkers())
}

func TestShtdonwOnce(t *testing.T) {
w := &messageWorker{
messages: make(chan core.QueuedMessage, 100),
Expand Down
2 changes: 2 additions & 0 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package queue
import (
"errors"
"testing"
"time"

"github.com/golang-queue/queue/core"
"github.com/golang-queue/queue/mocks"
Expand Down Expand Up @@ -30,5 +31,6 @@ func TestMockWorkerAndMessage(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, q)
q.Start()
time.Sleep(50 * time.Millisecond)
q.Release()
}

0 comments on commit 03d90b7

Please sign in to comment.