Skip to content
Ken Hibino edited this page May 3, 2020 · 5 revisions

In this page, I'll explain the design behind the Handler interface.

Handler Interface

Core of your asynchronous task processing logic lives inside the Handler instance you provide to run a server.
Handler's responsibility is to take a task and process it, while also taking context into account. It should report any errors to retry the task later, if the processing is unsuccessful.

Here's the interface definition:

type Handler interface {
    ProcessTask(context.Context, *Task) error
}

It's a simple interface and describes the Handler's responsibility succinctly.

This interface is powerful, you can pass anything that implements this simple method to run a server.

Implementing Interface

Implementing this handler interface can be done in many ways.

Here's an example of defining your own struct type to process tasks.

type MyTaskHandler struct {
   // ... fields
}

func (h *MyTaskHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
   // ... task processing logic
}

You can even define a function to satisfy the interface, thanks to the HandlerFunc adapter type.

func myHandler(ctx context.Context, t *asynq.Task) error {
    // ... task processing logic
}

// h satisfies Handler
h := asynq.HandlerFunc(myHandler) 

In most cases, you'd probably want to examine the Type of the input task and process it accordingly.

func (h *MyTaskHandler) ProcessTask(ctx context.Context, t *asynq.Task) error {
   switch t.Type {
   case "type1":
      // process type 1
   case "type2":
      // process type2
   case "typeN":
      // process typeN

   default:
      return fmt.Errorf("received unexpected type: %q", t.Type)
   }
}

You can see that a Handler can be composed of many different handlers, each case in the above example can be handled by a dedicated handler. This is where ServeMux type can be useful.

Using ServeMux

You don't have to use ServeMux type to implement a Handler, but it can be useful in many cases.
With ServeMux, you can register multiple Handlers. It matches the type of each task against a list of registered patterns and calls the handler for the pattern that most closely matches the taks's type name.

mux := asynq.NewServeMux()
mux.Handle("email:welcome", welcomeEmailHandler)
mux.Handle("email:reminder", reminderEmailHandler)
mux.Handle("email:" defaultEmailHandler) // catchall for all other task types with a prefix "email:" 

Using Middleware

If you need to execute some code before and/or after handlers, you can accomplish that using middlewares. Middleware is a function that takes a Handler and returns a Handler.
Here's an example of a middleware that logs the start and end of task processing.

func loggingMiddleware(h asynq.Handler) asynq.Handler {
    return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
        start := time.Now()
        log.Printf("Start processing %q", t.Type)
        err := h.ProcessTask(ctx, t)
        if err != nil {
            return err
        }
        log.Printf("Finished processing %q: Elapsed Time = %v", t.Type, time.Since(start))
        return nil
    })
}

And now you can wrap your Handler with this middleware.

myHandler = loggingMiddleware(myHandler)

Alternatively, if you are using ServeMux you can provide middlewares like this.

mux := NewServeMux()
mux.Use(loggingMiddleware)

Clone this wiki locally