Skip to content

Commit

Permalink
chore(benchmark): improve add task in queue performance (#72)
Browse files Browse the repository at this point in the history
* test(benchmark): add queue performance check

* fix: defer

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>

* chore: add pointer

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>

* chore: update json library

Signed-off-by: Bo-Yi Wu <appleboy.tw@gmail.com>
  • Loading branch information
appleboy committed May 29, 2022
1 parent b3dd500 commit c420418
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 37 deletions.
1 change: 1 addition & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ jobs:
- name: Run Tests
run: |
go test -v -covermode=atomic -coverprofile=coverage.out
go test -v -run=^$ -benchmem -bench .
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
Expand Down
14 changes: 6 additions & 8 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package queue

import (
"context"
"encoding/json"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/golang-queue/queue/core"
)

Expand All @@ -26,7 +26,7 @@ type Consumer struct {
stopFlag int32
}

func (s *Consumer) handle(job Job) error {
func (s *Consumer) handle(job *Job) error {
// create channel with buffer size 1 to avoid goroutine leak
done := make(chan error, 1)
panicChan := make(chan interface{}, 1)
Expand Down Expand Up @@ -79,13 +79,11 @@ func (s *Consumer) handle(job Job) error {

// Run to execute new task
func (s *Consumer) Run(task core.QueuedMessage) error {
var data Job
_ = json.Unmarshal(task.Bytes(), &data)
if v, ok := task.(Job); ok {
if v.Task != nil {
data.Task = v.Task
}
data := task.(*Job)
if data.Task == nil {
_ = json.Unmarshal(task.Bytes(), data)
}

if err := s.handle(data); err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func TestGoroutinePanic(t *testing.T) {
}

func TestHandleTimeout(t *testing.T) {
job := Job{
job := &Job{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
}
Expand All @@ -222,7 +222,7 @@ func TestHandleTimeout(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, context.DeadlineExceeded, err)

job = Job{
job = &Job{
Timeout: 150 * time.Millisecond,
Payload: []byte("foo"),
}
Expand All @@ -245,7 +245,7 @@ func TestHandleTimeout(t *testing.T) {
}

func TestJobComplete(t *testing.T) {
job := Job{
job := &Job{
Timeout: 100 * time.Millisecond,
Payload: []byte("foo"),
}
Expand All @@ -259,7 +259,7 @@ func TestJobComplete(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, errors.New("job completed"), err)

job = Job{
job = &Job{
Timeout: 250 * time.Millisecond,
Payload: []byte("foo"),
}
Expand All @@ -282,7 +282,7 @@ func TestJobComplete(t *testing.T) {
}

func TestTaskJobComplete(t *testing.T) {
job := Job{
job := &Job{
Timeout: 100 * time.Millisecond,
Task: func(ctx context.Context) error {
return errors.New("job completed")
Expand All @@ -294,7 +294,7 @@ func TestTaskJobComplete(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, errors.New("job completed"), err)

job = Job{
job = &Job{
Timeout: 250 * time.Millisecond,
Task: func(ctx context.Context) error {
return nil
Expand All @@ -311,7 +311,7 @@ func TestTaskJobComplete(t *testing.T) {
assert.NoError(t, err)

// job timeout
job = Job{
job = &Job{
Timeout: 50 * time.Millisecond,
Task: func(ctx context.Context) error {
time.Sleep(60 * time.Millisecond)
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ module github.com/golang-queue/queue
go 1.18

require (
github.com/goccy/go-json v0.9.7
github.com/golang/mock v1.6.0
github.com/stretchr/testify v1.7.1
go.uber.org/goleak v1.1.12
)

require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM=
github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
Expand Down
36 changes: 17 additions & 19 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package queue

import (
"context"
"encoding/json"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/goccy/go-json"
"github.com/golang-queue/queue/core"
)

Expand Down Expand Up @@ -47,13 +47,17 @@ type (
)

// Bytes get string body
func (j Job) Bytes() []byte {
func (j *Job) Bytes() []byte {
if j.Task != nil {
return nil
}
return j.Payload
}

// Encode for encoding the structure
func (j Job) Encode() []byte {
func (j *Job) Encode() []byte {
b, _ := json.Marshal(j)

return b
}

Expand Down Expand Up @@ -97,11 +101,11 @@ func (q *Queue) Shutdown() {
return
}

if q.metric.BusyWorkers() > 0 {
q.logger.Infof("shutdown all tasks: %d workers", q.metric.BusyWorkers())
}

q.stopOnce.Do(func() {
if q.metric.BusyWorkers() > 0 {
q.logger.Infof("shutdown all tasks: %d workers", q.metric.BusyWorkers())
}

if err := q.worker.Shutdown(); err != nil {
q.logger.Error(err)
}
Expand Down Expand Up @@ -155,13 +159,11 @@ func (q *Queue) handleQueue(timeout time.Duration, job core.QueuedMessage) error
return ErrQueueShutdown
}

data := Job{
Timeout: timeout,
Payload: job.Bytes(),
}

if err := q.worker.Queue(Job{
Payload: data.Encode(),
if err := q.worker.Queue(&Job{
Payload: (&Job{
Timeout: timeout,
Payload: job.Bytes(),
}).Encode(),
}); err != nil {
return err
}
Expand All @@ -186,13 +188,9 @@ func (q *Queue) handleQueueTask(timeout time.Duration, task TaskFunc) error {
return ErrQueueShutdown
}

data := Job{
if err := q.worker.Queue(&Job{
Timeout: timeout,
}

if err := q.worker.Queue(Job{
Task: task,
Payload: data.Encode(),
}); err != nil {
return err
}
Expand Down
24 changes: 24 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package queue

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -140,3 +141,26 @@ func TestCloseQueueAfterShutdown(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, ErrQueueShutdown, err)
}

func BenchmarkQueueTask(b *testing.B) {
b.ReportAllocs()
q := NewPool(5)
defer q.Release()
for n := 0; n < b.N; n++ {
_ = q.QueueTask(func(context.Context) error {
return nil
})
}
}

func BenchmarkQueue(b *testing.B) {
b.ReportAllocs()
m := &mockMessage{
message: "foo",
}
q := NewPool(5)
defer q.Release()
for n := 0; n < b.N; n++ {
_ = q.Queue(m)
}
}
2 changes: 1 addition & 1 deletion worker_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type taskWorker struct {
}

func (w *taskWorker) Run(task core.QueuedMessage) error {
if v, ok := task.(Job); ok {
if v, ok := task.(*Job); ok {
if v.Task != nil {
_ = v.Task(context.Background())
}
Expand Down

0 comments on commit c420418

Please sign in to comment.