Skip to content

Commit

Permalink
feat(queue): auto scale the task worker (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
appleboy committed Mar 28, 2022
1 parent 4c2d646 commit f2b7091
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 184 deletions.
34 changes: 21 additions & 13 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,26 +101,25 @@ func (s *Consumer) handle(job Job) error {
}

// Run start the worker
func (s *Consumer) Run() error {
func (s *Consumer) Run(task QueuedMessage) error {
// check queue status
select {
case <-s.stop:
return ErrQueueShutdown
default:
}

for task := range s.taskQueue {
var data Job
_ = json.Unmarshal(task.Bytes(), &data)
if v, ok := task.(Job); ok {
if v.Task != nil {
data.Task = v.Task
}
}
if err := s.handle(data); err != nil {
s.logger.Error(err.Error())
var data Job
_ = json.Unmarshal(task.Bytes(), &data)
if v, ok := task.(Job); ok {
if v.Task != nil {
data.Task = v.Task
}
}
if err := s.handle(data); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -148,19 +147,28 @@ func (s *Consumer) Usage() int {
}

// Queue send notification to queue
func (s *Consumer) Queue(job QueuedMessage) error {
func (s *Consumer) Queue(task QueuedMessage) error {
if atomic.LoadInt32(&s.stopFlag) == 1 {
return ErrQueueShutdown
}

select {
case s.taskQueue <- job:
case s.taskQueue <- task:
return nil
default:
return errMaxCapacity
}
}

func (s *Consumer) Request() (QueuedMessage, error) {
select {
case task := <-s.taskQueue:
return task, nil
default:
return nil, errors.New("no message in queue")
}
}

// NewConsumer for struc
func NewConsumer(opts ...Option) *Consumer {
o := NewOptions(opts...)
Expand Down
68 changes: 23 additions & 45 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@ func TestCustomFuncAndWait(t *testing.T) {
q, err := NewQueue(
WithWorker(w),
WithWorkerCount(2),
WithLogger(NewLogger()),
)
assert.NoError(t, err)
q.Start()
time.Sleep(100 * time.Millisecond)
assert.NoError(t, q.Queue(m))
assert.NoError(t, q.Queue(m))
assert.NoError(t, q.Queue(m))
assert.NoError(t, q.Queue(m))
q.Start()
time.Sleep(100 * time.Millisecond)
assert.Equal(t, 2, int(q.metric.BusyWorkers()))
time.Sleep(600 * time.Millisecond)
q.Shutdown()
q.Wait()
Expand All @@ -84,26 +86,6 @@ func TestEnqueueJobAfterShutdown(t *testing.T) {
q.Wait()
}

func TestConsumerNumAfterShutdown(t *testing.T) {
w := NewConsumer()
q, err := NewQueue(
WithWorker(w),
WithWorkerCount(2),
)
assert.NoError(t, err)
q.Start()
q.Start()
time.Sleep(50 * time.Millisecond)
assert.Equal(t, 4, q.Workers())
q.Shutdown()
q.Wait()
assert.Equal(t, 0, q.Workers())
// show queue has been shutdown meesgae
q.Start()
q.Start()
assert.Equal(t, 0, q.Workers())
}

func TestJobReachTimeout(t *testing.T) {
m := mockMessage{
message: "foo",
Expand Down Expand Up @@ -131,12 +113,10 @@ func TestJobReachTimeout(t *testing.T) {
WithWorkerCount(2),
)
assert.NoError(t, err)
q.Start()
time.Sleep(50 * time.Millisecond)
assert.NoError(t, q.QueueWithTimeout(30*time.Millisecond, m))
q.Start()
time.Sleep(50 * time.Millisecond)
q.Shutdown()
q.Wait()
q.Release()
}

func TestCancelJobAfterShutdown(t *testing.T) {
Expand Down Expand Up @@ -167,17 +147,15 @@ func TestCancelJobAfterShutdown(t *testing.T) {
WithWorkerCount(2),
)
assert.NoError(t, err)
q.Start()
time.Sleep(50 * time.Millisecond)
assert.NoError(t, q.QueueWithTimeout(100*time.Millisecond, m))
q.Shutdown()
q.Wait()
assert.NoError(t, q.QueueWithTimeout(100*time.Millisecond, m))
q.Start()
time.Sleep(10 * time.Millisecond)
assert.Equal(t, 2, int(q.metric.busyWorkers))
q.Release()
}

func TestGoroutineLeak(t *testing.T) {
m := mockMessage{
message: "foo",
}
w := NewConsumer(
WithLogger(NewEmptyLogger()),
WithFn(func(ctx context.Context, m QueuedMessage) error {
Expand All @@ -200,20 +178,22 @@ func TestGoroutineLeak(t *testing.T) {
}),
)
q, err := NewQueue(
WithLogger(NewEmptyLogger()),
WithLogger(NewLogger()),
WithWorker(w),
WithWorkerCount(10),
)
assert.NoError(t, err)
q.Start()
time.Sleep(50 * time.Millisecond)
for i := 0; i < 500; i++ {
m.message = fmt.Sprintf("foobar: %d", i+1)
for i := 0; i < 400; i++ {
m := mockMessage{
message: fmt.Sprintf("new message: %d", i+1),
}

assert.NoError(t, q.Queue(m))
}

q.Start()
time.Sleep(2 * time.Second)
q.Shutdown()
q.Wait()
q.Release()
fmt.Println("number of goroutines:", runtime.NumGoroutine())
}

Expand All @@ -231,12 +211,10 @@ func TestGoroutinePanic(t *testing.T) {
WithWorkerCount(2),
)
assert.NoError(t, err)
q.Start()
time.Sleep(50 * time.Millisecond)
assert.NoError(t, q.Queue(m))
time.Sleep(50 * time.Millisecond)
q.Shutdown()
q.Wait()
q.Start()
time.Sleep(10 * time.Millisecond)
q.Release()
}

func TestHandleTimeout(t *testing.T) {
Expand Down
6 changes: 1 addition & 5 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package queue
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -14,9 +13,6 @@ func TestNewPoolWithQueueTask(t *testing.T) {
rets := make(chan struct{}, taskN)

p := NewPool(totalN)
time.Sleep(time.Millisecond * 50)
assert.Equal(t, totalN, p.Workers())

for i := 0; i < taskN; i++ {
assert.NoError(t, p.QueueTask(func(context.Context) error {
rets <- struct{}{}
Expand All @@ -30,5 +26,5 @@ func TestNewPoolWithQueueTask(t *testing.T) {

// shutdown all, and now running worker is 0
p.Release()
assert.Equal(t, 0, p.Workers())
assert.Equal(t, 0, p.BusyWorkers())
}

0 comments on commit f2b7091

Please sign in to comment.