Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Client interceptors for logging: logrus and zap #33

Merged
merged 2 commits into from
May 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions logging/logrus/DOC.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,27 @@ Please see examples and tests for examples of use.
var (
// SystemField is used in every log statement made through grpc_logrus. Can be overwritten before any initialization code.
SystemField = "system"

// KindField describes the log gield used to incicate whether this is a server or a client log statment.
KindField = "span.kind"
)
```

#### func DefaultClientCodeToLevel

```go
func DefaultClientCodeToLevel(code codes.Code) logrus.Level
```
DefaultClientCodeToLevel is the default implementation of gRPC return codes to
log levels for client side.

#### func DefaultCodeToLevel

```go
func DefaultCodeToLevel(code codes.Code) logrus.Level
```
DefaultCodeToLevel is the default implementation of gRPC return codes and
interceptor log level.
DefaultCodeToLevel is the default implementation of gRPC return codes to log
levels for server side.

#### func Extract

Expand All @@ -54,6 +65,14 @@ ReplaceGrpcLogger sets the given logrus.Logger as a gRPC-level logger. This
should be called *before* any other initialization, preferably from init()
functions.

#### func StreamClientInterceptor

```go
func StreamClientInterceptor(entry *logrus.Entry, opts ...Option) grpc.StreamClientInterceptor
```
StreamServerInterceptor returns a new streaming client interceptor that
optionally logs the execution of external gRPC calls.

#### func StreamServerInterceptor

```go
Expand All @@ -62,6 +81,14 @@ func StreamServerInterceptor(entry *logrus.Entry, opts ...Option) grpc.StreamSer
StreamServerInterceptor returns a new streaming server interceptor that adds
logrus.Entry to the context.

#### func UnaryClientInterceptor

```go
func UnaryClientInterceptor(entry *logrus.Entry, opts ...Option) grpc.UnaryClientInterceptor
```
UnaryClientInterceptor returns a new unary client interceptor that optionally
logs the execution of external gRPC calls.

#### func UnaryServerInterceptor

```go
Expand Down
64 changes: 64 additions & 0 deletions logging/logrus/client_interceptors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2017 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.

package grpc_logrus

import (
"path"
"time"

"github.com/Sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

// UnaryClientInterceptor returns a new unary client interceptor that optionally logs the execution of external gRPC calls.
func UnaryClientInterceptor(entry *logrus.Entry, opts ...Option) grpc.UnaryClientInterceptor {
o := evaluateClientOpt(opts)
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
fields := newClientLoggerFields(ctx, method)
startTime := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
logFinalClientLine(o, entry.WithFields(fields), startTime, err, "finished client unary call")
return err
}
}

// StreamServerInterceptor returns a new streaming client interceptor that optionally logs the execution of external gRPC calls.
func StreamClientInterceptor(entry *logrus.Entry, opts ...Option) grpc.StreamClientInterceptor {
o := evaluateClientOpt(opts)
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
fields := newClientLoggerFields(ctx, method)
startTime := time.Now()
clientStream, err := streamer(ctx, desc, cc, method, opts...)
logFinalClientLine(o, entry.WithFields(fields), startTime, err, "finished client streaming call")
return clientStream, err
}
}

func logFinalClientLine(o *options, entry *logrus.Entry, startTime time.Time, err error, msg string) {
code := o.codeFunc(err)
level := o.levelFunc(code)
fields := logrus.Fields{
"grpc.code": code.String(),
"grpc.time_ms": timeDiffToMilliseconds(startTime),
}
if err != nil {
fields[logrus.ErrorKey] = err
}
levelLogf(
entry.WithFields(fields),
level,
msg)
}

func newClientLoggerFields(ctx context.Context, fullMethodString string) logrus.Fields {
service := path.Dir(fullMethodString)[1:]
method := path.Base(fullMethodString)
return logrus.Fields{
SystemField: "grpc",
KindField: "client",
"grpc.service": service,
"grpc.method": method,
}
}
129 changes: 129 additions & 0 deletions logging/logrus/client_interceptors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2017 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.

package grpc_logrus_test

import (
"fmt"
"io"
"runtime"
"strings"
"testing"

"github.com/Sirupsen/logrus"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"

pb_testproto "github.com/grpc-ecosystem/go-grpc-middleware/testing/testproto"
)

func customClientCodeToLevel(c codes.Code) logrus.Level {
if c == codes.Unauthenticated {
// Make this a special case for tests, and an error.
return logrus.ErrorLevel
}
level := grpc_logrus.DefaultClientCodeToLevel(c)
return level
}

func TestLogrusClientSuite(t *testing.T) {
if strings.HasPrefix(runtime.Version(), "go1.7") {
t.Skipf("Skipping due to json.RawMessage incompatibility with go1.7")
return
}
opts := []grpc_logrus.Option{
grpc_logrus.WithLevels(customClientCodeToLevel),
}
b := newLogrusBaseSuite(t)
b.logger.Level = logrus.DebugLevel // a lot of our stuff is on debug level by default
b.InterceptorTestSuite.ClientOpts = []grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_logrus.UnaryClientInterceptor(logrus.NewEntry(b.logger), opts...)),
grpc.WithStreamInterceptor(grpc_logrus.StreamClientInterceptor(logrus.NewEntry(b.logger), opts...)),
}
suite.Run(t, &logrusClientSuite{b})
}

type logrusClientSuite struct {
*logrusBaseSuite
}

func (s *logrusClientSuite) TestPing() {
_, err := s.Client.Ping(s.SimpleCtx(), goodPing)
assert.NoError(s.T(), err, "there must be not be an on a successful call")
msgs := s.getOutputJSONs()
require.Len(s.T(), msgs, 1, "one log statement should be logged")
m := msgs[0]
assert.Contains(s.T(), m, `"grpc.service": "mwitkow.testproto.TestService"`, "all lines must contain service name")
assert.Contains(s.T(), m, `"grpc.method": "Ping"`, "all lines must contain method name")
assert.Contains(s.T(), m, `"span.kind": "client"`, "all lines must contain the kind of call (client)")
assert.Contains(s.T(), m, `"msg": "finished client unary call"`, "interceptor message must contain string")
assert.Contains(s.T(), m, `"level": "debug"`, "OK error codes must be logged on debug level.")
assert.Contains(s.T(), m, `"grpc.time_ms":`, "interceptor log statement should contain execution time")
}

func (s *logrusClientSuite) TestPingList() {
stream, err := s.Client.PingList(s.SimpleCtx(), goodPing)
require.NoError(s.T(), err, "should not fail on establishing the stream")
for {
_, err := stream.Recv()
if err == io.EOF {
break
}
require.NoError(s.T(), err, "reading stream should not fail")
}
msgs := s.getOutputJSONs()
require.Len(s.T(), msgs, 1, "one log statement should be logged")
m := msgs[0]
assert.Contains(s.T(), m, `"grpc.service": "mwitkow.testproto.TestService"`, "all lines must contain service name")
assert.Contains(s.T(), m, `"grpc.method": "PingList"`, "all lines must contain method name")
assert.Contains(s.T(), m, `"span.kind": "client"`, "all lines must contain the kind of call (client)")
assert.Contains(s.T(), m, `"msg": "finished client streaming call"`, "interceptor message must contain string")
assert.Contains(s.T(), m, `"level": "debug"`, "OK error codes must be logged on debug level.")
assert.Contains(s.T(), m, `"grpc.time_ms":`, "interceptor log statement should contain execution time")
}

func (s *logrusClientSuite) TestPingError_WithCustomLevels() {
for _, tcase := range []struct {
code codes.Code
level logrus.Level
msg string
}{
{
code: codes.Internal,
level: logrus.WarnLevel,
msg: "Internal must remap to ErrorLevel in DefaultClientCodeToLevel",
},
{
code: codes.NotFound,
level: logrus.DebugLevel,
msg: "NotFound must remap to InfoLevel in DefaultClientCodeToLevel",
},
{
code: codes.FailedPrecondition,
level: logrus.DebugLevel,
msg: "FailedPrecondition must remap to WarnLevel in DefaultClientCodeToLevel",
},
{
code: codes.Unauthenticated,
level: logrus.ErrorLevel,
msg: "Unauthenticated is overwritten to ErrorLevel with customClientCodeToLevel override, which probably didn't work",
},
} {
s.SetupTest()
_, err := s.Client.PingError(
s.SimpleCtx(),
&pb_testproto.PingRequest{Value: "something", ErrorCodeReturned: uint32(tcase.code)})
assert.Error(s.T(), err, "each call here must return an error")
msgs := s.getOutputJSONs()
require.Len(s.T(), msgs, 1, "only the interceptor log message is printed in PingErr")
m := msgs[0]
assert.Contains(s.T(), m, `"grpc.service": "mwitkow.testproto.TestService"`, "all lines must contain service name")
assert.Contains(s.T(), m, `"grpc.method": "PingError"`, "all lines must contain method name")
assert.Contains(s.T(), m, fmt.Sprintf(`"grpc.code": "%s"`, tcase.code.String()), "all lines must contain method name")
assert.Contains(s.T(), m, fmt.Sprintf(`"level": "%s"`, tcase.level.String()), tcase.msg)
}
}
62 changes: 60 additions & 2 deletions logging/logrus/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

var (
defaultOptions = &options{
levelFunc: DefaultCodeToLevel,
levelFunc: nil,
codeFunc: grpc_logging.DefaultErrorToCode,
}
)
Expand All @@ -30,6 +30,22 @@ func evaluateOptions(opts []Option) *options {
return optCopy
}

func evaluateServerOpt(opts []Option) *options {
o := evaluateOptions(opts)
if o.codeFunc == nil {
o.levelFunc = DefaultCodeToLevel
}
return o
}

func evaluateClientOpt(opts []Option) *options {
o := evaluateOptions(opts)
if o.codeFunc == nil {
o.levelFunc = DefaultCodeToLevel
}
return o
}

type Option func(*options)

// CodeToLevel function defines the mapping between gRPC return codes and interceptor log level.
Expand All @@ -49,7 +65,7 @@ func WithCodes(f grpc_logging.ErrorToCode) Option {
}
}

// DefaultCodeToLevel is the default implementation of gRPC return codes and interceptor log level.
// DefaultCodeToLevel is the default implementation of gRPC return codes to log levels for server side.
func DefaultCodeToLevel(code codes.Code) logrus.Level {
switch code {
case codes.OK:
Expand Down Expand Up @@ -90,3 +106,45 @@ func DefaultCodeToLevel(code codes.Code) logrus.Level {
return logrus.ErrorLevel
}
}

// DefaultClientCodeToLevel is the default implementation of gRPC return codes to log levels for client side.
func DefaultClientCodeToLevel(code codes.Code) logrus.Level {
switch code {
case codes.OK:
return logrus.DebugLevel
case codes.Canceled:
return logrus.DebugLevel
case codes.Unknown:
return logrus.InfoLevel
case codes.InvalidArgument:
return logrus.DebugLevel
case codes.DeadlineExceeded:
return logrus.InfoLevel
case codes.NotFound:
return logrus.DebugLevel
case codes.AlreadyExists:
return logrus.DebugLevel
case codes.PermissionDenied:
return logrus.InfoLevel
case codes.Unauthenticated:
return logrus.InfoLevel // unauthenticated requests can happen
case codes.ResourceExhausted:
return logrus.DebugLevel
case codes.FailedPrecondition:
return logrus.DebugLevel
case codes.Aborted:
return logrus.DebugLevel
case codes.OutOfRange:
return logrus.DebugLevel
case codes.Unimplemented:
return logrus.WarnLevel
case codes.Internal:
return logrus.WarnLevel
case codes.Unavailable:
return logrus.WarnLevel
case codes.DataLoss:
return logrus.WarnLevel
default:
return logrus.InfoLevel
}
}
10 changes: 7 additions & 3 deletions logging/logrus/server_interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ import (
var (
// SystemField is used in every log statement made through grpc_logrus. Can be overwritten before any initialization code.
SystemField = "system"

// KindField describes the log gield used to incicate whether this is a server or a client log statment.
KindField = "span.kind"
)

// UnaryServerInterceptor returns a new unary server interceptors that adds logrus.Entry to the context.
func UnaryServerInterceptor(entry *logrus.Entry, opts ...Option) grpc.UnaryServerInterceptor {
o := evaluateOptions(opts)
o := evaluateServerOpt(opts)
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
newCtx := newLoggerForCall(ctx, entry, info.FullMethod)

Expand All @@ -45,7 +48,7 @@ func UnaryServerInterceptor(entry *logrus.Entry, opts ...Option) grpc.UnaryServe

// StreamServerInterceptor returns a new streaming server interceptor that adds logrus.Entry to the context.
func StreamServerInterceptor(entry *logrus.Entry, opts ...Option) grpc.StreamServerInterceptor {
o := evaluateOptions(opts)
o := evaluateServerOpt(opts)
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
newCtx := newLoggerForCall(stream.Context(), entry, info.FullMethod)
wrapped := grpc_middleware.WrapServerStream(stream)
Expand Down Expand Up @@ -93,6 +96,7 @@ func newLoggerForCall(ctx context.Context, entry *logrus.Entry, fullMethodString
callLog := entry.WithFields(
logrus.Fields{
SystemField: "grpc",
KindField: "server",
"grpc.service": service,
"grpc.method": method,
})
Expand All @@ -101,4 +105,4 @@ func newLoggerForCall(ctx context.Context, entry *logrus.Entry, fullMethodString

func timeDiffToMilliseconds(then time.Time) float32 {
return float32(time.Now().Sub(then).Nanoseconds() / 1000 / 1000)
}
}
2 changes: 2 additions & 0 deletions logging/logrus/server_interceptors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (s *logrusServerSuite) TestPing_WithCustomTags() {
assert.Contains(s.T(), m, `"grpc.method": "Ping"`, "all lines must contain method name")
assert.Contains(s.T(), m, `"custom_tags.string": "something"`, "all lines must contain `custom_tags.string` set by AddFields")
assert.Contains(s.T(), m, `"custom_tags.int": 1337`, "all lines must contain `custom_tags.int` set by AddFields")
assert.Contains(s.T(), m, `"span.kind": "server"`, "all lines must contain the kind of call (server)")
// request field extraction
assert.Contains(s.T(), m, `"grpc.request.value": "something"`, "all lines must contain fields extracted from goodPing because of test.manual_extractfields.pb")
}
Expand Down Expand Up @@ -128,6 +129,7 @@ func (s *logrusServerSuite) TestPingList_WithCustomTags() {
s.T()
assert.Contains(s.T(), m, `"grpc.service": "mwitkow.testproto.TestService"`, "all lines must contain service name")
assert.Contains(s.T(), m, `"grpc.method": "PingList"`, "all lines must contain method name")
assert.Contains(s.T(), m, `"span.kind": "server"`, "all lines must contain the kind of call (server)")
assert.Contains(s.T(), m, `"custom_tags.string": "something"`, "all lines must contain `custom_tags.string` set by AddFields")
assert.Contains(s.T(), m, `"custom_tags.int": 1337`, "all lines must contain `custom_tags.int` set by AddFields")
// request field extraction
Expand Down
Loading