Skip to content
Ken Hibino edited this page Feb 24, 2020 · 48 revisions

In this quick tour of asynq, we are going to create two programs.

  • client.go will create and schedule tasks to be processed asynchronously by workers.
  • workers.go will process the tasks created by the client.

This guide assumes that you are running a Redis server at localhost:6379. Before we start, make sure you have Redis installed and running.

The first thing we need to do is create two main files.

mkdir client workers
touch client/client.go workers/workers.go

Import asynq in both files.

import "github.com/hibiken/asynq"

Asynq uses Redis as a message broker. Use one of RedisConnOpt types to specify how to connect to Redis. We are going to use RedisClientOpt here.

// both in client.go and workers.go
var redis = &asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // Omit if no password is required
    Password: "mypassword",
    // Use a dedicated db number for asynq.
    // By default, Redis offers 16 databases (0..15)
    DB: 0,
}

In client.go, we are going to create a asynq.Client instance to create and schedule tasks.

In asynq, a unit of work is encapsluated in a struct called Task. Which has two fields: Type and Payload.

// Task represents a task to be performed.
type Task struct {
    // Type indicates the type of task to be performed.
    Type string

    // Payload holds data needed to perform the task.
    Payload Payload
}

To create a task, use NewTask function and pass type and payload for the task.

Use client.Enqueue to enqueue tasks to be processed immediately.
Use client.EnqueueIn or client.EnqueueAt to schedule tasks to be processed in the future.

// client.go
func main() {
    client := asynq.NewClient(redis)

    // Create a task with typename and payload.
    t1 := asynq.NewTask(
        "send_welcome_email",
        map[string]interface{}{"user_id": 42})

    t2 := asynq.NewTask(
        "send_reminder_email",
        map[string]interface{}{"user_id": 42})

    // Process the task immediately.
    err := client.Enqueue(t1)
    if err != nil {
        log.Fatal(err)
    }

    // Process the task 24 hours later.
    err = client.EnqueueIn(24*time.Hour, t2)
    if err != nil {
        log.Fatal(err)
    }
}

In workers.go, create a Background instance to start the workers.

NewBackground function takes RedisConnOpt and Config.

You can take a look at documentation on Config to see all the available config options.

We are only going to specify the concurrency in this example.

// workers.go
func main() {
    bg := asynq.NewBackground(redis, &asynq.Config{
        Concurrency: 10,
    })

    bg.Run(handler)
}

The argument to (*asynq.Background).Run is an interface asynq.Handler which has one method ProcessTask.

// ProcessTask should return nil if the processing of a task
// is successful.
//
// If ProcessTask return a non-nil error or panics, the task
// will be retried.
type Handler interface {
    ProcessTask(context.Context, *Task) error
}

The simplest way to implement a handler is to define a function with the same signature and use asynq.HandlerFunc adapter type when passing it to Run.

func handler(ctx context.Context, t *asynq.Task) error {
    switch t.Type {
    case "send_welcome_email":
        id, err := t.Payload.GetInt("user_id")
        if err != nil {
            return err
        }
        fmt.Printf("Send Welcome Email to User %d\n", id)

    case "send_reminder_email":
        id, err := t.Payload.GetInt("user_id")
        if err != nil {
            return err
        }
        fmt.Printf("Send Reminder Email to User %d\n", id)

    default:
        return fmt.Errorf("unexpected task type: %s", t.Type)
    }
    return nil
}

func main() {
    bg := asynq.NewBackground(redis, &asynq.Config{
        Concurrency: 10,
    })

    // Use asynq.HandlerFunc adapter for a handler function
    bg.Run(asynq.HandlerFunc(handler))
}

We could keep adding cases to this handler function, but in a realistic application, it's convenient to define the logic for each case in a separate function.

To refactor our code, let's create a simple dispatcher which maps task type to its handler.

// workers.go

// Dispatcher is used to dispatch tasks to registered handlers.
type Dispatcher struct {
    mapping map[string]asynq.HandlerFunc
}

// HandleFunc registers a task handler
func (d *Dispatcher) HandleFunc(taskType string, fn asynq.HandlerFunc) {
    d.mapping[taskType] = fn
}

// ProcessTask processes a task.
//
// NOTE: Dispatcher satisfies asynq.Handler interface.
func (d *Dispatcher) ProcessTask(ctx context.Context, task *asynq.Task) error {
    fn, ok := d.mapping[task.Type]
    if !ok {
        return fmt.Errorf("no handler registered for %q", task.Type)
    }
    return fn(ctx, task)
}

func main() {
    d := &Dispatcher{mapping: make(map[string]asynq.HandlerFunc)}
    d.HandleFunc("send_welcome_email", sendWelcomeEmail)
    d.HandleFunc("send_reminder_email", sendReminderEmail)

    bg := asynq.NewBackground(redis, &asynq.Config{
        Concurrency: 10,
    })
    bg.Run(d)
}

func sendWelcomeEmail(context.Context, t *asynq.Task) error {
    id, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Welcome Email to User %d\n", id)
    return nil
}

func sendReminderEmail(context.Context, t *asynq.Task) error {
    id, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Welcome Email to User %d\n", id)
    return nil
}

Now that we have both client and workers, we can run both programs.

go run client.go

This will create two tasks: One that should processed immediately and another to be processed 24 hours later.

Let's use asynqmon tool to inspect the tasks.

asynqmon stats

You should be able to see that there's one task in Enqueued state and another in Scheduled state.

Note: To understand the meaning of each state, see Life of a Task.

Let's run asynqmon with watch command so that we can continuously run the command to see the changes.

watch -n 3 asynqmon stats # Runs `asynqmon stats` every 3 seconds

And finally, let's start the worker program to process scheduled tasks.

go run workers.go

Note: This will not exit until you send a signal to terminate the program. See Signal Wiki page for best practice on how to safely terminate background workers.

You should be able to see some text printed in your terminal indicating that the task was processed successfully.

This was a whirlwind tour of asynq basics. To learn more about all of its features such as priority queues and custom retry, see our Wiki page.

Clone this wiki locally