Skip to content

Commit

Permalink
Add SetDefaultOptions method to Client
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken committed Apr 27, 2020
1 parent eb8ced6 commit e33d297
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- `ParseRedisURI` helper function is added to create a `RedisConnOpt` from a URI string.
- `SetDefaultOptions` method is added to `Client`.

## [0.8.0] - 2020-04-19

Expand Down
19 changes: 17 additions & 2 deletions README.md
Expand Up @@ -173,12 +173,25 @@ func main() {


// --------------------------------------------------------------------------
// Example 3: Pass options to tune task processing behavior.
// Example 3: Set options to tune task processing behavior.
// Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
// --------------------------------------------------------------------------

c.SetDefaultOptions(tasks.ImageProcessing, asynq.MaxRetry(10), asynq.Timeout(time.Minute))

t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url")
err = c.Enqueue(t)
if err != nil {
log.Fatal("could not enqueue task: %v", err)
}

// --------------------------------------------------------------------------
// Example 4: Pass options to tune task processing behavior at enqueue time.
// Options passed at enqueue time override default ones, if any.
// --------------------------------------------------------------------------

t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url")
err = c.Enqueue(t, asynq.MaxRetry(10), asynq.Queue("critical"), asynq.Timeout(time.Minute))
err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
if err != nil {
log.Fatal("could not enqueue task: %v", err)
}
Expand All @@ -194,6 +207,8 @@ You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?
package main

import (
"log"

"github.com/hibiken/asynq"
"your/app/package/tasks"
)
Expand Down
76 changes: 50 additions & 26 deletions client.go
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"sort"
"strings"
"sync"
"time"

"github.com/hibiken/asynq/internal/base"
Expand All @@ -23,13 +24,18 @@ import (
//
// Clients are safe for concurrent use by multiple goroutines.
type Client struct {
rdb *rdb.RDB
mu sync.Mutex
opts map[string][]Option
rdb *rdb.RDB
}

// NewClient and returns a new Client given a redis connection option.
func NewClient(r RedisConnOpt) *Client {
rdb := rdb.NewRDB(createRedisClient(r))
return &Client{rdb}
return &Client{
opts: make(map[string][]Option),
rdb: rdb,
}
}

// Option specifies the task processing behavior.
Expand Down Expand Up @@ -159,10 +165,19 @@ func serializePayload(payload map[string]interface{}) string {
return b.String()
}

const (
// Max retry count by default
defaultMaxRetry = 25
)
// Default max retry count used if nothing is specified.
const defaultMaxRetry = 25

// SetDefaultOptions sets options to be used for a given task type.
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
//
// Default options can be overridden by options passed at enqueue time.
func (c *Client) SetDefaultOptions(taskType string, opts ...Option) {
c.mu.Lock()
defer c.mu.Unlock()
c.opts[taskType] = opts
}

// EnqueueAt schedules task to be enqueued at the specified time.
//
Expand All @@ -171,6 +186,35 @@ const (
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error {
return c.enqueueAt(t, task, opts...)
}

// Enqueue enqueues task to be processed immediately.
//
// Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error.
//
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
func (c *Client) Enqueue(task *Task, opts ...Option) error {
return c.enqueueAt(time.Now(), task, opts...)
}

// EnqueueIn schedules task to be enqueued after the specified delay.
//
// EnqueueIn returns nil if the task is scheduled successfully, otherwise returns a non-nil error.
//
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error {
return c.enqueueAt(time.Now().Add(d), task, opts...)
}

func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) error {
c.mu.Lock()
defer c.mu.Unlock()
if defaults, ok := c.opts[task.Type]; ok {
opts = append(defaults, opts...)
}
opt := composeOptions(opts...)
msg := &base.TaskMessage{
ID: xid.New(),
Expand All @@ -194,26 +238,6 @@ func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error {
return err
}

// Enqueue enqueues task to be processed immediately.
//
// Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error.
//
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
func (c *Client) Enqueue(task *Task, opts ...Option) error {
return c.EnqueueAt(time.Now(), task, opts...)
}

// EnqueueIn schedules task to be enqueued after the specified delay.
//
// EnqueueIn returns nil if the task is scheduled successfully, otherwise returns a non-nil error.
//
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error {
return c.EnqueueAt(time.Now().Add(d), task, opts...)
}

func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error {
if uniqueTTL > 0 {
return c.rdb.EnqueueUnique(msg, uniqueTTL)
Expand Down
98 changes: 85 additions & 13 deletions client_test.go
Expand Up @@ -15,6 +15,11 @@ import (
"github.com/hibiken/asynq/internal/base"
)

var (
noTimeout = time.Duration(0).String()
noDeadline = time.Time{}.Format(time.RFC3339)
)

func TestClientEnqueueAt(t *testing.T) {
r := setup(t)
client := NewClient(RedisClientOpt{
Expand All @@ -27,9 +32,6 @@ func TestClientEnqueueAt(t *testing.T) {
var (
now = time.Now()
oneHourLater = now.Add(time.Hour)

noTimeout = time.Duration(0).String()
noDeadline = time.Time{}.Format(time.RFC3339)
)

tests := []struct {
Expand Down Expand Up @@ -113,11 +115,6 @@ func TestClientEnqueue(t *testing.T) {

task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})

var (
noTimeout = time.Duration(0).String()
noDeadline = time.Time{}.Format(time.RFC3339)
)

tests := []struct {
desc string
task *Task
Expand Down Expand Up @@ -287,11 +284,6 @@ func TestClientEnqueueIn(t *testing.T) {

task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})

var (
noTimeout = time.Duration(0).String()
noDeadline = time.Time{}.Format(time.RFC3339)
)

tests := []struct {
desc string
task *Task
Expand Down Expand Up @@ -364,6 +356,86 @@ func TestClientEnqueueIn(t *testing.T) {
}
}

func TestClientDefaultOptions(t *testing.T) {
r := setup(t)

tests := []struct {
desc string
defaultOpts []Option // options set at the client level.
opts []Option // options used at enqueue time.
task *Task
queue string // queue that the message should go into.
want *base.TaskMessage
}{
{
desc: "With queue routing option",
defaultOpts: []Option{Queue("feed")},
opts: []Option{},
task: NewTask("feed:import", nil),
queue: "feed",
want: &base.TaskMessage{
Type: "feed:import",
Payload: nil,
Retry: defaultMaxRetry,
Queue: "feed",
Timeout: noTimeout,
Deadline: noDeadline,
},
},
{
desc: "With multiple options",
defaultOpts: []Option{Queue("feed"), MaxRetry(5)},
opts: []Option{},
task: NewTask("feed:import", nil),
queue: "feed",
want: &base.TaskMessage{
Type: "feed:import",
Payload: nil,
Retry: 5,
Queue: "feed",
Timeout: noTimeout,
Deadline: noDeadline,
},
},
{
desc: "With overriding options at enqueue time",
defaultOpts: []Option{Queue("feed"), MaxRetry(5)},
opts: []Option{Queue("critical")},
task: NewTask("feed:import", nil),
queue: "critical",
want: &base.TaskMessage{
Type: "feed:import",
Payload: nil,
Retry: 5,
Queue: "critical",
Timeout: noTimeout,
Deadline: noDeadline,
},
},
}

for _, tc := range tests {
h.FlushDB(t, r)
c := NewClient(RedisClientOpt{Addr: redisAddr, DB: redisDB})
c.SetDefaultOptions(tc.task.Type, tc.defaultOpts...)
err := c.Enqueue(tc.task, tc.opts...)
if err != nil {
t.Fatal(err)
}
enqueued := h.GetEnqueuedMessages(t, r, tc.queue)
if len(enqueued) != 1 {
t.Errorf("%s;\nexpected queue %q to have one message; got %d messages in the queue.",
tc.desc, tc.queue, len(enqueued))
continue
}
got := enqueued[0]
if diff := cmp.Diff(tc.want, got, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in enqueued task message; (-want,+got)\n%s",
tc.desc, diff)
}
}
}

func TestUniqueKey(t *testing.T) {
tests := []struct {
desc string
Expand Down

0 comments on commit e33d297

Please sign in to comment.