diff --git a/consumer.go b/consumer.go index 96af040..da3e79d 100644 --- a/consumer.go +++ b/consumer.go @@ -7,16 +7,18 @@ import ( "sync" "sync/atomic" "time" + + "github.com/golang-queue/queue/core" ) -var _ Worker = (*Consumer)(nil) +var _ core.Worker = (*Consumer)(nil) var errMaxCapacity = errors.New("max capacity reached") // Consumer for simple queue using buffer channel type Consumer struct { - taskQueue chan QueuedMessage - runFunc func(context.Context, QueuedMessage) error + taskQueue chan core.QueuedMessage + runFunc func(context.Context, core.QueuedMessage) error stop chan struct{} logger Logger stopOnce sync.Once @@ -75,7 +77,7 @@ func (s *Consumer) handle(job Job) error { } // Run to execute new task -func (s *Consumer) Run(task QueuedMessage) error { +func (s *Consumer) Run(task core.QueuedMessage) error { var data Job _ = json.Unmarshal(task.Bytes(), &data) if v, ok := task.(Job); ok { @@ -104,7 +106,7 @@ func (s *Consumer) Shutdown() error { } // Queue send task to the buffer channel -func (s *Consumer) Queue(task QueuedMessage) error { +func (s *Consumer) Queue(task core.QueuedMessage) error { if atomic.LoadInt32(&s.stopFlag) == 1 { return ErrQueueShutdown } @@ -118,7 +120,7 @@ func (s *Consumer) Queue(task QueuedMessage) error { } // Request a new task from channel -func (s *Consumer) Request() (QueuedMessage, error) { +func (s *Consumer) Request() (core.QueuedMessage, error) { clock := 0 loop: for { @@ -143,7 +145,7 @@ loop: func NewConsumer(opts ...Option) *Consumer { o := NewOptions(opts...) w := &Consumer{ - taskQueue: make(chan QueuedMessage, o.queueSize), + taskQueue: make(chan core.QueuedMessage, o.queueSize), stop: make(chan struct{}), logger: o.logger, runFunc: o.fn, diff --git a/consumer_test.go b/consumer_test.go index 02a0f16..ba013b4 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/golang-queue/queue/core" + "github.com/stretchr/testify/assert" ) @@ -28,7 +30,7 @@ func TestCustomFuncAndWait(t *testing.T) { message: "foo", } w := NewConsumer( - WithFn(func(ctx context.Context, m QueuedMessage) error { + WithFn(func(ctx context.Context, m core.QueuedMessage) error { time.Sleep(500 * time.Millisecond) return nil }), @@ -77,7 +79,7 @@ func TestJobReachTimeout(t *testing.T) { message: "foo", } w := NewConsumer( - WithFn(func(ctx context.Context, m QueuedMessage) error { + WithFn(func(ctx context.Context, m core.QueuedMessage) error { for { select { case <-ctx.Done(): @@ -111,7 +113,7 @@ func TestCancelJobAfterShutdown(t *testing.T) { } w := NewConsumer( WithLogger(NewEmptyLogger()), - WithFn(func(ctx context.Context, m QueuedMessage) error { + WithFn(func(ctx context.Context, m core.QueuedMessage) error { for { select { case <-ctx.Done(): @@ -144,7 +146,7 @@ func TestCancelJobAfterShutdown(t *testing.T) { func TestGoroutineLeak(t *testing.T) { w := NewConsumer( WithLogger(NewLogger()), - WithFn(func(ctx context.Context, m QueuedMessage) error { + WithFn(func(ctx context.Context, m core.QueuedMessage) error { for { select { case <-ctx.Done(): @@ -187,7 +189,7 @@ func TestGoroutinePanic(t *testing.T) { message: "foo", } w := NewConsumer( - WithFn(func(ctx context.Context, m QueuedMessage) error { + WithFn(func(ctx context.Context, m core.QueuedMessage) error { panic("missing something") }), ) @@ -208,7 +210,7 @@ func TestHandleTimeout(t *testing.T) { Payload: []byte("foo"), } w := NewConsumer( - WithFn(func(ctx context.Context, m QueuedMessage) error { + WithFn(func(ctx context.Context, m core.QueuedMessage) error { time.Sleep(200 * time.Millisecond) return nil }), @@ -224,7 +226,7 @@ func TestHandleTimeout(t *testing.T) { } w = NewConsumer( - WithFn(func(ctx context.Context, m QueuedMessage) error { + WithFn(func(ctx context.Context, m core.QueuedMessage) error { time.Sleep(200 * time.Millisecond) return nil }), @@ -248,7 +250,7 @@ func TestJobComplete(t *testing.T) { Payload: []byte("foo"), } w := NewConsumer( - WithFn(func(ctx context.Context, m QueuedMessage) error { + WithFn(func(ctx context.Context, m core.QueuedMessage) error { return errors.New("job completed") }), ) @@ -263,7 +265,7 @@ func TestJobComplete(t *testing.T) { } w = NewConsumer( - WithFn(func(ctx context.Context, m QueuedMessage) error { + WithFn(func(ctx context.Context, m core.QueuedMessage) error { time.Sleep(200 * time.Millisecond) return errors.New("job completed") }), @@ -324,7 +326,7 @@ func TestTaskJobComplete(t *testing.T) { func TestIncreaseWorkerCount(t *testing.T) { w := NewConsumer( WithLogger(NewEmptyLogger()), - WithFn(func(ctx context.Context, m QueuedMessage) error { + WithFn(func(ctx context.Context, m core.QueuedMessage) error { time.Sleep(500 * time.Millisecond) return nil }), @@ -354,7 +356,7 @@ func TestIncreaseWorkerCount(t *testing.T) { func TestDecreaseWorkerCount(t *testing.T) { w := NewConsumer( - WithFn(func(ctx context.Context, m QueuedMessage) error { + WithFn(func(ctx context.Context, m core.QueuedMessage) error { time.Sleep(100 * time.Millisecond) return nil }), diff --git a/worker.go b/core/worker.go similarity index 96% rename from worker.go rename to core/worker.go index e246613..61f5bc2 100644 --- a/worker.go +++ b/core/worker.go @@ -1,4 +1,4 @@ -package queue +package core // Worker interface type Worker interface { diff --git a/go.mod b/go.mod index 7507553..94dfa70 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/golang-queue/queue go 1.18 require ( + github.com/golang/mock v1.6.0 github.com/stretchr/testify v1.7.1 go.uber.org/goleak v1.1.12 ) diff --git a/go.sum b/go.sum index addf9a6..9320fa5 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -36,6 +38,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/metric_test.go b/metric_test.go index 5bea48c..8f6e9a6 100644 --- a/metric_test.go +++ b/metric_test.go @@ -6,12 +6,14 @@ import ( "testing" "time" + "github.com/golang-queue/queue/core" + "github.com/stretchr/testify/assert" ) func TestMetricData(t *testing.T) { w := NewConsumer( - WithFn(func(ctx context.Context, m QueuedMessage) error { + WithFn(func(ctx context.Context, m core.QueuedMessage) error { switch string(m.Bytes()) { case "foo1": panic("missing something") diff --git a/mocks/mock_message.go b/mocks/mock_message.go new file mode 100644 index 0000000..13d4e2c --- /dev/null +++ b/mocks/mock_message.go @@ -0,0 +1,48 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/golang-queue/queue/core (interfaces: QueuedMessage) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockQueuedMessage is a mock of QueuedMessage interface. +type MockQueuedMessage struct { + ctrl *gomock.Controller + recorder *MockQueuedMessageMockRecorder +} + +// MockQueuedMessageMockRecorder is the mock recorder for MockQueuedMessage. +type MockQueuedMessageMockRecorder struct { + mock *MockQueuedMessage +} + +// NewMockQueuedMessage creates a new mock instance. +func NewMockQueuedMessage(ctrl *gomock.Controller) *MockQueuedMessage { + mock := &MockQueuedMessage{ctrl: ctrl} + mock.recorder = &MockQueuedMessageMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockQueuedMessage) EXPECT() *MockQueuedMessageMockRecorder { + return m.recorder +} + +// Bytes mocks base method. +func (m *MockQueuedMessage) Bytes() []byte { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Bytes") + ret0, _ := ret[0].([]byte) + return ret0 +} + +// Bytes indicates an expected call of Bytes. +func (mr *MockQueuedMessageMockRecorder) Bytes() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bytes", reflect.TypeOf((*MockQueuedMessage)(nil).Bytes)) +} diff --git a/mocks/mock_worker.go b/mocks/mock_worker.go new file mode 100644 index 0000000..1d50bca --- /dev/null +++ b/mocks/mock_worker.go @@ -0,0 +1,92 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/golang-queue/queue/core (interfaces: Worker) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + core "github.com/golang-queue/queue/core" + gomock "github.com/golang/mock/gomock" +) + +// MockWorker is a mock of Worker interface. +type MockWorker struct { + ctrl *gomock.Controller + recorder *MockWorkerMockRecorder +} + +// MockWorkerMockRecorder is the mock recorder for MockWorker. +type MockWorkerMockRecorder struct { + mock *MockWorker +} + +// NewMockWorker creates a new mock instance. +func NewMockWorker(ctrl *gomock.Controller) *MockWorker { + mock := &MockWorker{ctrl: ctrl} + mock.recorder = &MockWorkerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockWorker) EXPECT() *MockWorkerMockRecorder { + return m.recorder +} + +// Queue mocks base method. +func (m *MockWorker) Queue(arg0 core.QueuedMessage) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Queue", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Queue indicates an expected call of Queue. +func (mr *MockWorkerMockRecorder) Queue(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Queue", reflect.TypeOf((*MockWorker)(nil).Queue), arg0) +} + +// Request mocks base method. +func (m *MockWorker) Request() (core.QueuedMessage, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Request") + ret0, _ := ret[0].(core.QueuedMessage) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Request indicates an expected call of Request. +func (mr *MockWorkerMockRecorder) Request() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Request", reflect.TypeOf((*MockWorker)(nil).Request)) +} + +// Run mocks base method. +func (m *MockWorker) Run(arg0 core.QueuedMessage) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Run", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Run indicates an expected call of Run. +func (mr *MockWorkerMockRecorder) Run(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockWorker)(nil).Run), arg0) +} + +// Shutdown mocks base method. +func (m *MockWorker) Shutdown() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Shutdown") + ret0, _ := ret[0].(error) + return ret0 +} + +// Shutdown indicates an expected call of Shutdown. +func (mr *MockWorkerMockRecorder) Shutdown() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Shutdown", reflect.TypeOf((*MockWorker)(nil).Shutdown)) +} diff --git a/mocks/mocks.go b/mocks/mocks.go new file mode 100644 index 0000000..f87860c --- /dev/null +++ b/mocks/mocks.go @@ -0,0 +1,6 @@ +package mocks + +import _ "github.com/golang/mock/mockgen/model" + +//go:generate mockgen -package=mocks -destination=mock_worker.go github.com/golang-queue/queue/core Worker +//go:generate mockgen -package=mocks -destination=mock_message.go github.com/golang-queue/queue/core QueuedMessage diff --git a/options.go b/options.go index f6e47c9..26b03b9 100644 --- a/options.go +++ b/options.go @@ -4,6 +4,8 @@ import ( "context" "runtime" "time" + + "github.com/golang-queue/queue/core" ) var ( @@ -11,7 +13,7 @@ var ( defaultWorkerCount = runtime.NumCPU() defaultTimeout = 60 * time.Minute defaultNewLogger = NewLogger() - defaultFn = func(context.Context, QueuedMessage) error { return nil } + defaultFn = func(context.Context, core.QueuedMessage) error { return nil } defaultMetric = NewMetric() ) @@ -47,14 +49,14 @@ func WithMetric(m Metric) Option { } // WithWorker set custom worker -func WithWorker(w Worker) Option { +func WithWorker(w core.Worker) Option { return func(q *Options) { q.worker = w } } // WithFn set custom job function -func WithFn(fn func(context.Context, QueuedMessage) error) Option { +func WithFn(fn func(context.Context, core.QueuedMessage) error) Option { return func(q *Options) { q.fn = fn } @@ -73,8 +75,8 @@ type Options struct { timeout time.Duration logger Logger queueSize int - worker Worker - fn func(context.Context, QueuedMessage) error + worker core.Worker + fn func(context.Context, core.QueuedMessage) error metric Metric } diff --git a/queue.go b/queue.go index 6e96c8f..45ad1d1 100644 --- a/queue.go +++ b/queue.go @@ -7,6 +7,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/golang-queue/queue/core" ) // ErrQueueShutdown the queue is released and closed. @@ -25,7 +27,7 @@ type ( routineGroup *routineGroup quit chan struct{} ready chan struct{} - worker Worker + worker core.Worker stopOnce sync.Once timeout time.Duration stopFlag int32 @@ -139,16 +141,16 @@ func (q *Queue) Wait() { } // Queue to queue all job -func (q *Queue) Queue(job QueuedMessage) error { +func (q *Queue) Queue(job core.QueuedMessage) error { return q.handleQueue(q.timeout, job) } // QueueWithTimeout to queue all job with specified timeout. -func (q *Queue) QueueWithTimeout(timeout time.Duration, job QueuedMessage) error { +func (q *Queue) QueueWithTimeout(timeout time.Duration, job core.QueuedMessage) error { return q.handleQueue(timeout, job) } -func (q *Queue) handleQueue(timeout time.Duration, job QueuedMessage) error { +func (q *Queue) handleQueue(timeout time.Duration, job core.QueuedMessage) error { if atomic.LoadInt32(&q.stopFlag) == 1 { return ErrQueueShutdown } @@ -200,7 +202,7 @@ func (q *Queue) handleQueueTask(timeout time.Duration, task TaskFunc) error { return nil } -func (q *Queue) work(task QueuedMessage) { +func (q *Queue) work(task core.QueuedMessage) { var err error // to handle panic cases from inside the worker // in such case, we start a new goroutine @@ -246,7 +248,7 @@ func (q *Queue) schedule() { // start handle job func (q *Queue) start() { - tasks := make(chan QueuedMessage, 1) + tasks := make(chan core.QueuedMessage, 1) for { // request task from queue in background diff --git a/queue_test.go b/queue_test.go index 98d5a2e..e78c57c 100644 --- a/queue_test.go +++ b/queue_test.go @@ -4,6 +4,8 @@ import ( "testing" "time" + "github.com/golang-queue/queue/core" + "github.com/stretchr/testify/assert" "go.uber.org/goleak" ) @@ -40,7 +42,7 @@ func TestNewQueue(t *testing.T) { func TestShtdonwOnce(t *testing.T) { w := &messageWorker{ - messages: make(chan QueuedMessage, 100), + messages: make(chan core.QueuedMessage, 100), } q, err := NewQueue( WithWorker(w), @@ -60,7 +62,7 @@ func TestShtdonwOnce(t *testing.T) { func TestCapacityReached(t *testing.T) { w := &messageWorker{ - messages: make(chan QueuedMessage, 1), + messages: make(chan core.QueuedMessage, 1), } q, err := NewQueue( WithWorker(w), @@ -81,7 +83,7 @@ func TestCapacityReached(t *testing.T) { func TestCloseQueueAfterShutdown(t *testing.T) { w := &messageWorker{ - messages: make(chan QueuedMessage, 10), + messages: make(chan core.QueuedMessage, 10), } q, err := NewQueue( WithWorker(w), diff --git a/worker_empty.go b/worker_empty.go index 578a2ce..2486bb1 100644 --- a/worker_empty.go +++ b/worker_empty.go @@ -1,11 +1,13 @@ package queue -var _ Worker = (*emptyWorker)(nil) +import "github.com/golang-queue/queue/core" + +var _ core.Worker = (*emptyWorker)(nil) // just for unit testing, don't use it. type emptyWorker struct{} -func (w *emptyWorker) Run(task QueuedMessage) error { return nil } -func (w *emptyWorker) Shutdown() error { return nil } -func (w *emptyWorker) Queue(task QueuedMessage) error { return nil } -func (w *emptyWorker) Request() (QueuedMessage, error) { return nil, nil } +func (w *emptyWorker) Run(task core.QueuedMessage) error { return nil } +func (w *emptyWorker) Shutdown() error { return nil } +func (w *emptyWorker) Queue(task core.QueuedMessage) error { return nil } +func (w *emptyWorker) Request() (core.QueuedMessage, error) { return nil, nil } diff --git a/worker_message.go b/worker_message.go index de99730..277fdba 100644 --- a/worker_message.go +++ b/worker_message.go @@ -3,16 +3,18 @@ package queue import ( "errors" "time" + + "github.com/golang-queue/queue/core" ) -var _ Worker = (*messageWorker)(nil) +var _ core.Worker = (*messageWorker)(nil) // just for unit testing, don't use it. type messageWorker struct { - messages chan QueuedMessage + messages chan core.QueuedMessage } -func (w *messageWorker) Run(task QueuedMessage) error { +func (w *messageWorker) Run(task core.QueuedMessage) error { if string(task.Bytes()) == "panic" { panic("show panic") } @@ -25,7 +27,7 @@ func (w *messageWorker) Shutdown() error { return nil } -func (w *messageWorker) Queue(task QueuedMessage) error { +func (w *messageWorker) Queue(task core.QueuedMessage) error { select { case w.messages <- task: return nil @@ -34,7 +36,7 @@ func (w *messageWorker) Queue(task QueuedMessage) error { } } -func (w *messageWorker) Request() (QueuedMessage, error) { +func (w *messageWorker) Request() (core.QueuedMessage, error) { select { case task := <-w.messages: return task, nil diff --git a/worker_task.go b/worker_task.go index a3f1d0a..0edf79e 100644 --- a/worker_task.go +++ b/worker_task.go @@ -3,16 +3,18 @@ package queue import ( "context" "errors" + + "github.com/golang-queue/queue/core" ) -var _ Worker = (*taskWorker)(nil) +var _ core.Worker = (*taskWorker)(nil) // just for unit testing, don't use it. type taskWorker struct { - messages chan QueuedMessage + messages chan core.QueuedMessage } -func (w *taskWorker) Run(task QueuedMessage) error { +func (w *taskWorker) Run(task core.QueuedMessage) error { if v, ok := task.(Job); ok { if v.Task != nil { _ = v.Task(context.Background()) @@ -26,7 +28,7 @@ func (w *taskWorker) Shutdown() error { return nil } -func (w *taskWorker) Queue(job QueuedMessage) error { +func (w *taskWorker) Queue(job core.QueuedMessage) error { select { case w.messages <- job: return nil @@ -35,7 +37,7 @@ func (w *taskWorker) Queue(job QueuedMessage) error { } } -func (w *taskWorker) Request() (QueuedMessage, error) { +func (w *taskWorker) Request() (core.QueuedMessage, error) { select { case task := <-w.messages: return task, nil diff --git a/worker_task_test.go b/worker_task_test.go index 547a9ff..62280e5 100644 --- a/worker_task_test.go +++ b/worker_task_test.go @@ -5,12 +5,14 @@ import ( "testing" "time" + "github.com/golang-queue/queue/core" + "github.com/stretchr/testify/assert" ) func TestQueueTaskJob(t *testing.T) { w := &taskWorker{ - messages: make(chan QueuedMessage, 10), + messages: make(chan core.QueuedMessage, 10), } q, err := NewQueue( WithWorker(w), diff --git a/worker_test.go b/worker_test.go new file mode 100644 index 0000000..47095bc --- /dev/null +++ b/worker_test.go @@ -0,0 +1,34 @@ +package queue + +import ( + "errors" + "testing" + + "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/mocks" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +func TestMockWorkerAndMessage(t *testing.T) { + controller := gomock.NewController(t) + defer controller.Finish() + + m := mocks.NewMockQueuedMessage(controller) + + w := mocks.NewMockWorker(controller) + w.EXPECT().Shutdown().Return(nil) + w.EXPECT().Request().DoAndReturn(func() (core.QueuedMessage, error) { + return m, errors.New("nil") + }) + + q, err := NewQueue( + WithWorker(w), + WithWorkerCount(1), + ) + assert.NoError(t, err) + assert.NotNil(t, q) + q.Start() + q.Release() +}