Skip to content

Commit

Permalink
Implement transport level error handlers (#863)
Browse files Browse the repository at this point in the history
* Add error handler implementation to the log package

* Add error handler to http transport

* Add error handler to amqp transport

* Add error handler to awslambda transport

* Add error handler to grpc transport

* Add error handler to nats transport

* Move error handler interfaces to transport package

* Move log error handler to transport package

* Remove error logger precedence

* Improve documentation wording

* Adjust transport package documentation

* Remove ignore error

* Update examples

* Add context to the error handler signature
  • Loading branch information
sagikazarmark authored and peterbourgon committed Apr 24, 2019
1 parent 22a2d43 commit 3c77e8c
Show file tree
Hide file tree
Showing 15 changed files with 161 additions and 50 deletions.
3 changes: 2 additions & 1 deletion examples/addsvc/pkg/addtransport/grpc.go
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/go-kit/kit/ratelimit"
"github.com/go-kit/kit/tracing/opentracing"
"github.com/go-kit/kit/tracing/zipkin"
"github.com/go-kit/kit/transport"
grpctransport "github.com/go-kit/kit/transport/grpc"

"github.com/go-kit/kit/examples/addsvc/pb"
Expand All @@ -44,7 +45,7 @@ func NewGRPCServer(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zi
zipkinServer := zipkin.GRPCServerTrace(zipkinTracer)

options := []grpctransport.ServerOption{
grpctransport.ServerErrorLogger(logger),
grpctransport.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
zipkinServer,
}

Expand Down
3 changes: 2 additions & 1 deletion examples/addsvc/pkg/addtransport/http.go
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/go-kit/kit/ratelimit"
"github.com/go-kit/kit/tracing/opentracing"
"github.com/go-kit/kit/tracing/zipkin"
"github.com/go-kit/kit/transport"
httptransport "github.com/go-kit/kit/transport/http"

"github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
Expand All @@ -41,7 +42,7 @@ func NewHTTPHandler(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, z

options := []httptransport.ServerOption{
httptransport.ServerErrorEncoder(errorEncoder),
httptransport.ServerErrorLogger(logger),
httptransport.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
zipkinServer,
}

Expand Down
3 changes: 2 additions & 1 deletion examples/profilesvc/transport.go
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/gorilla/mux"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/transport"
httptransport "github.com/go-kit/kit/transport/http"
)

Expand All @@ -29,7 +30,7 @@ func MakeHTTPHandler(s Service, logger log.Logger) http.Handler {
r := mux.NewRouter()
e := MakeServerEndpoints(s)
options := []httptransport.ServerOption{
httptransport.ServerErrorLogger(logger),
httptransport.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
httptransport.ServerErrorEncoder(encodeError),
}

Expand Down
3 changes: 2 additions & 1 deletion examples/shipping/booking/transport.go
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/gorilla/mux"

kitlog "github.com/go-kit/kit/log"
"github.com/go-kit/kit/transport"
kithttp "github.com/go-kit/kit/transport/http"

"github.com/go-kit/kit/examples/shipping/cargo"
Expand All @@ -19,7 +20,7 @@ import (
// MakeHandler returns a handler for the booking service.
func MakeHandler(bs Service, logger kitlog.Logger) http.Handler {
opts := []kithttp.ServerOption{
kithttp.ServerErrorLogger(logger),
kithttp.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
kithttp.ServerErrorEncoder(encodeError),
}

Expand Down
3 changes: 2 additions & 1 deletion examples/shipping/handling/transport.go
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/gorilla/mux"

kitlog "github.com/go-kit/kit/log"
"github.com/go-kit/kit/transport"
kithttp "github.com/go-kit/kit/transport/http"

"github.com/go-kit/kit/examples/shipping/cargo"
Expand All @@ -21,7 +22,7 @@ func MakeHandler(hs Service, logger kitlog.Logger) http.Handler {
r := mux.NewRouter()

opts := []kithttp.ServerOption{
kithttp.ServerErrorLogger(logger),
kithttp.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
kithttp.ServerErrorEncoder(encodeError),
}

Expand Down
3 changes: 2 additions & 1 deletion examples/shipping/tracking/transport.go
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/gorilla/mux"

kitlog "github.com/go-kit/kit/log"
kittransport "github.com/go-kit/kit/transport"
kithttp "github.com/go-kit/kit/transport/http"

"github.com/go-kit/kit/examples/shipping/cargo"
Expand All @@ -19,7 +20,7 @@ func MakeHandler(ts Service, logger kitlog.Logger) http.Handler {
r := mux.NewRouter()

opts := []kithttp.ServerOption{
kithttp.ServerErrorLogger(logger),
kithttp.ServerErrorHandler(kittransport.NewLogErrorHandler(logger)),
kithttp.ServerErrorEncoder(encodeError),
}

Expand Down
24 changes: 17 additions & 7 deletions transport/amqp/subscriber.go
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/transport"
"github.com/streadway/amqp"
)

Expand All @@ -19,7 +20,7 @@ type Subscriber struct {
after []SubscriberResponseFunc
responsePublisher ResponsePublisher
errorEncoder ErrorEncoder
logger log.Logger
errorHandler transport.ErrorHandler
}

// NewSubscriber constructs a new subscriber, which provides a handler
Expand All @@ -36,7 +37,7 @@ func NewSubscriber(
enc: enc,
responsePublisher: DefaultResponsePublisher,
errorEncoder: DefaultErrorEncoder,
logger: log.NewNopLogger(),
errorHandler: transport.NewLogErrorHandler(log.NewNopLogger()),
}
for _, option := range options {
option(s)
Expand Down Expand Up @@ -78,8 +79,17 @@ func SubscriberErrorEncoder(ee ErrorEncoder) SubscriberOption {
// are logged. This is intended as a diagnostic measure. Finer-grained control
// of error handling, including logging in more detail, should be performed in a
// custom SubscriberErrorEncoder which has access to the context.
// Deprecated: Use SubscriberErrorHandler instead.
func SubscriberErrorLogger(logger log.Logger) SubscriberOption {
return func(s *Subscriber) { s.logger = logger }
return func(s *Subscriber) { s.errorHandler = transport.NewLogErrorHandler(logger) }
}

// SubscriberErrorHandler is used to handle non-terminal errors. By default, non-terminal errors
// are ignored. This is intended as a diagnostic measure. Finer-grained control
// of error handling, including logging in more detail, should be performed in a
// custom SubscriberErrorEncoder which has access to the context.
func SubscriberErrorHandler(errorHandler transport.ErrorHandler) SubscriberOption {
return func(s *Subscriber) { s.errorHandler = errorHandler }
}

// ServeDelivery handles AMQP Delivery messages
Expand All @@ -98,14 +108,14 @@ func (s Subscriber) ServeDelivery(ch Channel) func(deliv *amqp.Delivery) {

request, err := s.dec(ctx, deliv)
if err != nil {
s.logger.Log("err", err)
s.errorHandler.Handle(ctx, err)
s.errorEncoder(ctx, err, deliv, ch, &pub)
return
}

response, err := s.e(ctx, request)
if err != nil {
s.logger.Log("err", err)
s.errorHandler.Handle(ctx, err)
s.errorEncoder(ctx, err, deliv, ch, &pub)
return
}
Expand All @@ -115,13 +125,13 @@ func (s Subscriber) ServeDelivery(ch Channel) func(deliv *amqp.Delivery) {
}

if err := s.enc(ctx, &pub, response); err != nil {
s.logger.Log("err", err)
s.errorHandler.Handle(ctx, err)
s.errorEncoder(ctx, err, deliv, ch, &pub)
return
}

if err := s.responsePublisher(ctx, deliv, ch, &pub); err != nil {
s.logger.Log("err", err)
s.errorHandler.Handle(ctx, err)
s.errorEncoder(ctx, err, deliv, ch, &pub)
return
}
Expand Down
20 changes: 14 additions & 6 deletions transport/awslambda/handler.go
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/transport"
)

// Handler wraps an endpoint.
Expand All @@ -16,7 +17,7 @@ type Handler struct {
after []HandlerResponseFunc
errorEncoder ErrorEncoder
finalizer []HandlerFinalizerFunc
logger log.Logger
errorHandler transport.ErrorHandler
}

// NewHandler constructs a new handler, which implements
Expand All @@ -31,8 +32,8 @@ func NewHandler(
e: e,
dec: dec,
enc: enc,
logger: log.NewNopLogger(),
errorEncoder: DefaultErrorEncoder,
errorHandler: transport.NewLogErrorHandler(log.NewNopLogger()),
}
for _, option := range options {
option(h)
Expand All @@ -57,8 +58,15 @@ func HandlerAfter(after ...HandlerResponseFunc) HandlerOption {

// HandlerErrorLogger is used to log non-terminal errors.
// By default, no errors are logged.
// Deprecated: Use HandlerErrorHandler instead.
func HandlerErrorLogger(logger log.Logger) HandlerOption {
return func(h *Handler) { h.logger = logger }
return func(h *Handler) { h.errorHandler = transport.NewLogErrorHandler(logger) }
}

// HandlerErrorHandler is used to handle non-terminal errors.
// By default, non-terminal errors are ignored.
func HandlerErrorHandler(errorHandler transport.ErrorHandler) HandlerOption {
return func(h *Handler) { h.errorHandler = errorHandler }
}

// HandlerErrorEncoder is used to encode errors.
Expand Down Expand Up @@ -97,13 +105,13 @@ func (h *Handler) Invoke(

request, err := h.dec(ctx, payload)
if err != nil {
h.logger.Log("err", err)
h.errorHandler.Handle(ctx, err)
return h.errorEncoder(ctx, err)
}

response, err := h.e(ctx, request)
if err != nil {
h.logger.Log("err", err)
h.errorHandler.Handle(ctx, err)
return h.errorEncoder(ctx, err)
}

Expand All @@ -112,7 +120,7 @@ func (h *Handler) Invoke(
}

if resp, err = h.enc(ctx, response); err != nil {
h.logger.Log("err", err)
h.errorHandler.Handle(ctx, err)
return h.errorEncoder(ctx, err)
}

Expand Down
3 changes: 2 additions & 1 deletion transport/awslambda/handler_test.go
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/aws/aws-lambda-go/events"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/transport"
)

type key int
Expand Down Expand Up @@ -39,7 +40,7 @@ func TestInvokeHappyPath(t *testing.T) {
makeTest01HelloEndpoint(svc),
decodeHelloRequestWithTwoBefores,
encodeResponse,
HandlerErrorLogger(log.NewNopLogger()),
HandlerErrorHandler(transport.NewLogErrorHandler(log.NewNopLogger())),
HandlerBefore(func(
ctx context.Context,
payload []byte,
Expand Down
2 changes: 1 addition & 1 deletion transport/doc.go
@@ -1,2 +1,2 @@
// Package transport contains bindings to concrete transports.
// Package transport contains helpers applicable to all supported transports.
package transport
28 changes: 28 additions & 0 deletions transport/error_handler.go
@@ -0,0 +1,28 @@
package transport

import (
"context"

"github.com/go-kit/kit/log"
)

// ErrorHandler receives a transport error to be processed for diagnostic purposes.
// Usually this means logging the error.
type ErrorHandler interface {
Handle(ctx context.Context, err error)
}

// LogErrorHandler is a transport error handler implementation which logs an error.
type LogErrorHandler struct {
logger log.Logger
}

func NewLogErrorHandler(logger log.Logger) *LogErrorHandler {
return &LogErrorHandler{
logger: logger,
}
}

func (h *LogErrorHandler) Handle(ctx context.Context, err error) {
h.logger.Log("err", err)
}
29 changes: 29 additions & 0 deletions transport/error_handler_test.go
@@ -0,0 +1,29 @@
package transport_test

import (
"context"
"errors"
"testing"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/transport"
)

func TestLogErrorHandler(t *testing.T) {
var output []interface{}

logger := log.Logger(log.LoggerFunc(func(keyvals ...interface{}) error {
output = append(output, keyvals...)
return nil
}))

errorHandler := transport.NewLogErrorHandler(logger)

err := errors.New("error")

errorHandler.Handle(context.Background(), err)

if output[1] != err {
t.Errorf("expected an error log event: have %v, want %v", output[1], err)
}
}

0 comments on commit 3c77e8c

Please sign in to comment.