diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index b24f773..66bbbe2 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -27,7 +27,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - go: [1.13, 1.14, 1.15, 1.16, 1.17, 1.18, 1.19] + go: [1.18, 1.19] include: - os: ubuntu-latest go-build: ~/.cache/go-build diff --git a/consumer.go b/consumer.go index 29a0f79..b8c3b26 100644 --- a/consumer.go +++ b/consumer.go @@ -9,6 +9,7 @@ import ( "github.com/goccy/go-json" "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/job" ) var _ core.Worker = (*Consumer)(nil) @@ -26,12 +27,12 @@ type Consumer struct { stopFlag int32 } -func (s *Consumer) handle(job *Job) error { +func (s *Consumer) handle(m *job.Message) error { // create channel with buffer size 1 to avoid goroutine leak done := make(chan error, 1) panicChan := make(chan interface{}, 1) startTime := time.Now() - ctx, cancel := context.WithTimeout(context.Background(), job.Timeout) + ctx, cancel := context.WithTimeout(context.Background(), m.Timeout) defer func() { cancel() }() @@ -46,10 +47,10 @@ func (s *Consumer) handle(job *Job) error { }() // run custom process function - if job.Task != nil { - done <- job.Task(ctx) + if m.Task != nil { + done <- m.Task(ctx) } else { - done <- s.runFunc(ctx, job) + done <- s.runFunc(ctx, m) } }() @@ -62,7 +63,7 @@ func (s *Consumer) handle(job *Job) error { // cancel job cancel() - leftTime := job.Timeout - time.Since(startTime) + leftTime := m.Timeout - time.Since(startTime) // wait job select { case <-time.After(leftTime): @@ -79,7 +80,7 @@ func (s *Consumer) handle(job *Job) error { // Run to execute new task func (s *Consumer) Run(task core.QueuedMessage) error { - data := task.(*Job) + data := task.(*job.Message) if data.Task == nil { _ = json.Unmarshal(task.Bytes(), data) } diff --git a/consumer_test.go b/consumer_test.go index b00371e..a4e5626 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/job" "github.com/golang-queue/queue/mocks" "github.com/golang/mock/gomock" @@ -103,7 +104,7 @@ func TestJobReachTimeout(t *testing.T) { WithWorkerCount(2), ) assert.NoError(t, err) - assert.NoError(t, q.QueueWithTimeout(30*time.Millisecond, m)) + assert.NoError(t, q.Queue(m, job.WithTimeout(30*time.Millisecond))) q.Start() time.Sleep(50 * time.Millisecond) q.Release() @@ -137,8 +138,8 @@ func TestCancelJobAfterShutdown(t *testing.T) { WithWorkerCount(2), ) assert.NoError(t, err) - assert.NoError(t, q.QueueWithTimeout(100*time.Millisecond, m)) - assert.NoError(t, q.QueueWithTimeout(100*time.Millisecond, m)) + assert.NoError(t, q.Queue(m, job.WithTimeout(100*time.Millisecond))) + assert.NoError(t, q.Queue(m, job.WithTimeout(100*time.Millisecond))) q.Start() time.Sleep(10 * time.Millisecond) assert.Equal(t, 2, int(q.metric.busyWorkers)) @@ -207,7 +208,7 @@ func TestGoroutinePanic(t *testing.T) { } func TestHandleTimeout(t *testing.T) { - job := &Job{ + m := &job.Message{ Timeout: 100 * time.Millisecond, Payload: []byte("foo"), } @@ -218,11 +219,11 @@ func TestHandleTimeout(t *testing.T) { }), ) - err := w.handle(job) + err := w.handle(m) assert.Error(t, err) assert.Equal(t, context.DeadlineExceeded, err) - job = &Job{ + m = &job.Message{ Timeout: 150 * time.Millisecond, Payload: []byte("foo"), } @@ -236,7 +237,7 @@ func TestHandleTimeout(t *testing.T) { done := make(chan error) go func() { - done <- w.handle(job) + done <- w.handle(m) }() err = <-done @@ -245,7 +246,7 @@ func TestHandleTimeout(t *testing.T) { } func TestJobComplete(t *testing.T) { - job := &Job{ + m := &job.Message{ Timeout: 100 * time.Millisecond, Payload: []byte("foo"), } @@ -255,11 +256,11 @@ func TestJobComplete(t *testing.T) { }), ) - err := w.handle(job) + err := w.handle(m) assert.Error(t, err) assert.Equal(t, errors.New("job completed"), err) - job = &Job{ + m = &job.Message{ Timeout: 250 * time.Millisecond, Payload: []byte("foo"), } @@ -273,7 +274,7 @@ func TestJobComplete(t *testing.T) { done := make(chan error) go func() { - done <- w.handle(job) + done <- w.handle(m) }() err = <-done @@ -282,7 +283,7 @@ func TestJobComplete(t *testing.T) { } func TestTaskJobComplete(t *testing.T) { - job := &Job{ + m := &job.Message{ Timeout: 100 * time.Millisecond, Task: func(ctx context.Context) error { return errors.New("job completed") @@ -290,11 +291,11 @@ func TestTaskJobComplete(t *testing.T) { } w := NewConsumer() - err := w.handle(job) + err := w.handle(m) assert.Error(t, err) assert.Equal(t, errors.New("job completed"), err) - job = &Job{ + m = &job.Message{ Timeout: 250 * time.Millisecond, Task: func(ctx context.Context) error { return nil @@ -304,21 +305,21 @@ func TestTaskJobComplete(t *testing.T) { w = NewConsumer() done := make(chan error) go func() { - done <- w.handle(job) + done <- w.handle(m) }() err = <-done assert.NoError(t, err) // job timeout - job = &Job{ + m = &job.Message{ Timeout: 50 * time.Millisecond, Task: func(ctx context.Context) error { time.Sleep(60 * time.Millisecond) return nil }, } - assert.Equal(t, context.DeadlineExceeded, w.handle(job)) + assert.Equal(t, context.DeadlineExceeded, w.handle(m)) } func TestIncreaseWorkerCount(t *testing.T) { diff --git a/job/job.go b/job/job.go new file mode 100644 index 0000000..c0b3dee --- /dev/null +++ b/job/job.go @@ -0,0 +1,67 @@ +package job + +import ( + "context" + "time" + + "github.com/goccy/go-json" + "github.com/golang-queue/queue/core" +) + +// TaskFunc is the task function +type TaskFunc func(context.Context) error + +// Message describes a task and its metadata. +type Message struct { + Task TaskFunc `json:"-"` + + // Timeout is the duration the task can be processed by Handler. + // zero if not specified + Timeout time.Duration `json:"timeout"` + + // Payload is the payload data of the task. + Payload []byte `json:"body"` + + // RetryCount retry count if failure + RetryCount int64 `json:"retry_count"` + + // RetryCount retry count if failure + RetryDelay time.Duration `json:"retry_delay"` +} + +// Bytes get string body +func (m *Message) Bytes() []byte { + if m.Task != nil { + return nil + } + return m.Payload +} + +// Encode for encoding the structure +func (m *Message) Encode() []byte { + b, _ := json.Marshal(m) + + return b +} + +func NewMessage(m core.QueuedMessage, opts ...Option) *Message { + o := NewOptions(opts...) + + return &Message{ + RetryCount: o.retryCount, + RetryDelay: o.retryDelay, + Timeout: o.timeout, + Payload: m.Bytes(), + } +} + +func NewTask(task TaskFunc, opts ...Option) *Message { + o := NewOptions(opts...) + + return &Message{ + Timeout: o.timeout, + RetryCount: o.retryCount, + RetryDelay: o.retryDelay, + Task: task, + } +} diff --git a/job/option.go b/job/option.go new file mode 100644 index 0000000..175c660 --- /dev/null +++ b/job/option.go @@ -0,0 +1,56 @@ +package job + +import "time" + +type Options struct { + retryCount int64 + retryDelay time.Duration + timeout time.Duration +} + +// An Option configures a mutex. +type Option interface { + apply(*Options) +} + +// OptionFunc is a function that configures a job. +type OptionFunc func(*Options) + +// apply calls f(option) +func (f OptionFunc) apply(option *Options) { + f(option) +} + +func NewOptions(opts ...Option) *Options { + o := &Options{ + retryCount: 0, + retryDelay: 100 * time.Millisecond, + timeout: 60 * time.Minute, + } + + // Loop through each option + for _, opt := range opts { + // Call the option giving the instantiated + opt.apply(o) + } + + return o +} + +func WithRetryCount(count int64) Option { + return OptionFunc(func(o *Options) { + o.retryCount = count + }) +} + +func WithRetryDelay(t time.Duration) Option { + return OptionFunc(func(o *Options) { + o.retryDelay = t + }) +} + +func WithTimeout(t time.Duration) Option { + return OptionFunc(func(o *Options) { + o.timeout = t + }) +} diff --git a/options.go b/options.go index 873bdcc..ba0b51a 100644 --- a/options.go +++ b/options.go @@ -3,7 +3,6 @@ package queue import ( "context" "runtime" - "time" "github.com/golang-queue/queue/core" ) @@ -11,7 +10,6 @@ import ( var ( defaultQueueSize = 4096 defaultWorkerCount = runtime.NumCPU() - defaultTimeout = 60 * time.Minute defaultNewLogger = NewLogger() defaultFn = func(context.Context, core.QueuedMessage) error { return nil } defaultMetric = NewMetric() @@ -72,17 +70,9 @@ func WithFn(fn func(context.Context, core.QueuedMessage) error) Option { }) } -// WithTimeOut set custom timeout -func WithTimeOut(t time.Duration) Option { - return OptionFunc(func(q *Options) { - q.timeout = t - }) -} - // Options for custom args in Queue type Options struct { workerCount int - timeout time.Duration logger Logger queueSize int worker core.Worker @@ -95,7 +85,6 @@ func NewOptions(opts ...Option) *Options { o := &Options{ workerCount: defaultWorkerCount, queueSize: defaultQueueSize, - timeout: defaultTimeout, logger: defaultNewLogger, worker: nil, fn: defaultFn, diff --git a/queue.go b/queue.go index 1097637..46a5d12 100644 --- a/queue.go +++ b/queue.go @@ -1,22 +1,18 @@ package queue import ( - "context" "errors" "sync" "sync/atomic" "time" - "github.com/goccy/go-json" "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/job" ) // ErrQueueShutdown the queue is released and closed. var ErrQueueShutdown = errors.New("queue has been closed and released") -// TaskFunc is the task function -type TaskFunc func(context.Context) error - type ( // A Queue is a message queue. Queue struct { @@ -29,38 +25,10 @@ type ( ready chan struct{} worker core.Worker stopOnce sync.Once - timeout time.Duration stopFlag int32 } - - // Job describes a task and its metadata. - Job struct { - Task TaskFunc `json:"-"` - - // Timeout is the duration the task can be processed by Handler. - // zero if not specified - Timeout time.Duration `json:"timeout"` - - // Payload is the payload data of the task. - Payload []byte `json:"body"` - } ) -// Bytes get string body -func (j *Job) Bytes() []byte { - if j.Task != nil { - return nil - } - return j.Payload -} - -// Encode for encoding the structure -func (j *Job) Encode() []byte { - b, _ := json.Marshal(j) - - return b -} - // ErrMissingWorker missing define worker var ErrMissingWorker = errors.New("missing worker module") @@ -73,7 +41,6 @@ func NewQueue(opts ...Option) (*Queue, error) { ready: make(chan struct{}, 1), workerCount: o.workerCount, logger: o.logger, - timeout: o.timeout, worker: o.worker, metric: &metric{}, } @@ -145,25 +112,15 @@ func (q *Queue) Wait() { } // Queue to queue all job -func (q *Queue) Queue(job core.QueuedMessage) error { - return q.handleQueue(q.timeout, job) -} - -// QueueWithTimeout to queue all job with specified timeout. -func (q *Queue) QueueWithTimeout(timeout time.Duration, job core.QueuedMessage) error { - return q.handleQueue(timeout, job) -} - -func (q *Queue) handleQueue(timeout time.Duration, job core.QueuedMessage) error { +func (q *Queue) Queue(m core.QueuedMessage, opts ...job.Option) error { if atomic.LoadInt32(&q.stopFlag) == 1 { return ErrQueueShutdown } - if err := q.worker.Queue(&Job{ - Payload: (&Job{ - Timeout: timeout, - Payload: job.Bytes(), - }).Encode(), + message := job.NewMessage(m, opts...) + + if err := q.worker.Queue(&job.Message{ + Payload: message.Encode(), }); err != nil { return err } @@ -174,24 +131,14 @@ func (q *Queue) handleQueue(timeout time.Duration, job core.QueuedMessage) error } // QueueTask to queue job task -func (q *Queue) QueueTask(task TaskFunc) error { - return q.handleQueueTask(q.timeout, task) -} - -// QueueTaskWithTimeout to queue job task with timeout -func (q *Queue) QueueTaskWithTimeout(timeout time.Duration, task TaskFunc) error { - return q.handleQueueTask(timeout, task) -} - -func (q *Queue) handleQueueTask(timeout time.Duration, task TaskFunc) error { +func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.Option) error { if atomic.LoadInt32(&q.stopFlag) == 1 { return ErrQueueShutdown } - if err := q.worker.Queue(&Job{ - Timeout: timeout, - Task: task, - }); err != nil { + message := job.NewTask(task, opts...) + + if err := q.worker.Queue(message); err != nil { return err } diff --git a/queue_example_test.go b/queue_example_test.go index 850af5c..1383c1b 100644 --- a/queue_example_test.go +++ b/queue_example_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/golang-queue/queue" + "github.com/golang-queue/queue/job" ) func ExampleNewPool_queueTask() { @@ -57,7 +58,7 @@ func ExampleNewPool_queueTaskTimeout() { // assign tasks to asynchronous goroutine pool for i := 0; i < taskN; i++ { idx := i - if err := q.QueueTaskWithTimeout(100*time.Millisecond, func(ctx context.Context) error { + if err := q.QueueTask(func(ctx context.Context) error { // panic job if idx == 5 { panic("system error") @@ -74,7 +75,7 @@ func ExampleNewPool_queueTaskTimeout() { rets <- idx return nil - }); err != nil { + }, job.WithTimeout(100*time.Millisecond)); err != nil { log.Println(err) } } diff --git a/queue_test.go b/queue_test.go index 0bd9232..be541bc 100644 --- a/queue_test.go +++ b/queue_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/job" "github.com/golang-queue/queue/mocks" "github.com/golang/mock/gomock" @@ -134,9 +135,9 @@ func TestCloseQueueAfterShutdown(t *testing.T) { }) assert.Error(t, err) assert.Equal(t, ErrQueueShutdown, err) - err = q.QueueWithTimeout(10*time.Millisecond, mockMessage{ + err = q.Queue(mockMessage{ message: "foobar", - }) + }, job.WithTimeout(10*time.Millisecond)) assert.Error(t, err) assert.Equal(t, ErrQueueShutdown, err) } diff --git a/worker_task.go b/worker_task.go index 88ce88e..1fea86b 100644 --- a/worker_task.go +++ b/worker_task.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/job" ) var _ core.Worker = (*taskWorker)(nil) @@ -15,7 +16,7 @@ type taskWorker struct { } func (w *taskWorker) Run(task core.QueuedMessage) error { - if v, ok := task.(*Job); ok { + if v, ok := task.(*job.Message); ok { if v.Task != nil { _ = v.Task(context.Background()) } diff --git a/worker_task_test.go b/worker_task_test.go index 62280e5..1d8c2f5 100644 --- a/worker_task_test.go +++ b/worker_task_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/golang-queue/queue/core" + "github.com/golang-queue/queue/job" "github.com/stretchr/testify/assert" ) @@ -32,10 +33,10 @@ func TestQueueTaskJob(t *testing.T) { q.logger.Info("Add new task 2") return nil })) - assert.NoError(t, q.QueueTaskWithTimeout(50*time.Millisecond, func(ctx context.Context) error { + assert.NoError(t, q.QueueTask(func(ctx context.Context) error { time.Sleep(80 * time.Millisecond) return nil - })) + }, job.WithTimeout(50*time.Millisecond))) time.Sleep(50 * time.Millisecond) q.Shutdown() assert.Equal(t, ErrQueueShutdown, q.QueueTask(func(ctx context.Context) error {