-
-
Notifications
You must be signed in to change notification settings - Fork 946
Getting Started
This guide explains how to get started with asynq package to start processing tasks in your applications.
In this quick tour of asynq, we are going to create two programs.
-
producer.gowill create and schedule tasks to be processed asynchronously by the consumer. -
consumer.gowill process the tasks created by the producer.
This guide assumes that you are running a Redis server at localhost:6379.
Before we start, make sure you have Redis installed and running.
- Import
asynqin both files.
import "github.com/hibiken/asynq"- Asynq uses Redis as a message broker.
Use one of
RedisConnOpttypes to specify how to connect to Redis. We are going to useRedisClientOpthere.
// both in producer.go and consumer.go
var redis = &asynq.RedisClientOpt{
Addr: "localhost:6379",
// Omit if no password is required
Password: "mypassword",
// Use a dedicated db number for asynq.
// By default, Redis offers 16 databases (0..15)
DB: 0,
}- In
producer.go, create aClientinstance to create and schedule tasks.
// producer.go
func main() {
client := asynq.NewClient(redis)
// Create a task with typename and payload.
t1 := asynq.NewTask(
"send_welcome_email",
map[string]interface{}{"user_id": 42})
t2 := asynq.NewTask(
"send_reminder_email",
map[string]interface{}{"user_id": 42})
// Process the task immediately.
err := client.Schedule(t1, time.Now())
if err != nil {
log.Fatal(err)
}
// Process the task 24 hours later.
err = client.Schedule(t2, time.Now().Add(24 * time.Hour))
if err != nil {
log.Fatal(err)
}
}- In
consumer.go, create aBackgroundinstance to process tasks.
// consumer.go
func main() {
bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10,
})
bg.Run(handler)
}The argument to (*asynq.Background).Run is an interface asynq.Handler which has one method ProcessTask.
// ProcessTask should return nil if the processing of a task
// is successful.
//
// If ProcessTask return a non-nil error or panics, the task
// will be retried.
type Handler interface {
ProcessTask(*Task) error
}The simplest way to implement a handler is to define a function with the same signature and use asynq.HandlerFunc adapter type when passing it to Run.
func handler(t *asynq.Task) error {
switch t.Type {
case "send_welcome_email":
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
case "send_reminder_email":
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Reminder Email to User %d\n", id)
default:
return fmt.Errorf("unexpected task type: %s", t.Type)
}
return nil
}
func main() {
bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10,
})
// Use asynq.HandlerFunc adapter for a handler function
bg.Run(asynq.HandlerFunc(handler))
}We could kep adding cases to this handler function, but in a realistic application, it's convenient to define the logic for each case in a separate function.
To refactor our code, let's create a simple dispatcher which maps task type to its handler.
// consumer.go
// Dispatcher is used to dispatch tasks to registered handlers.
type Dispatcher struct {
mapping map[string]asynq.HandlerFunc
}
// HandleFunc registers a task handler
func (d *Dispatcher) HandleFunc(taskType string, fn asynq.HandlerFunc) {
d.mapping[taskType] = fn
}
// ProcessTask processes a task.
//
// NOTE: Dispatcher satisfies asynq.Handler interface.
func (d *Dispatcher) ProcessTask(task *asynq.Task) error {
fn, ok := d.mapping[task.Type]
if !ok {
return fmt.Errorf("no handler registered for %q", task.Type)
}
return fn(task)
}
func main() {
d := &Dispatcher{mapping: make(map[string]asynq.HandlerFunc)}
d.HandleFunc("send_welcome_email", sendWelcomeEmail)
d.HandleFunc("send_reminder_email", sendReminderEmail)
bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10,
})
bg.Run(d)
}
func sendWelcomeEmail(t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}
func sendReminderEmail(t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}Now that we have both task producer and consumer, we can run both programs.
go run consumer.goNote: This will not exit until you send a signal to terminate the program. See Signal Wiki page for best practice on how to safely terminate background processing.
With our consumer running, also run
go run producer.goThis will create a task and the first task will get processed immediately by the consumer. The second task will be processed 24 hours later.
Let's use asynqmon tool to inspect the tasks.
asynqmon statsThis command will show the number of tasks in each state and stats for the current date as well as redis information.
To understand the meaning of each state, see Life of a Task Wiki page.
For in-depth guide on asynqmon tool, see the README for the CLI.
This was a quick tour of asynq basics. To see all of its features such as priority queues and custom retry, see the Wiki page.