From d2efd623d78b61ac0bb1dace398542a6329ccef3 Mon Sep 17 00:00:00 2001 From: Adrien Fillon Date: Thu, 8 Aug 2019 09:45:20 +0200 Subject: [PATCH] Add go-kit logging middleware --- CHANGELOG.md | 4 + README.md | 2 +- go.mod | 3 + go.sum | 8 + logging/doc.go | 2 +- logging/kit/client_interceptors.go | 54 ++++ logging/kit/client_interceptors_test.go | 184 +++++++++++++ logging/kit/ctxkit/context.go | 60 +++++ logging/kit/ctxkit/doc.go | 14 + logging/kit/ctxkit/examples_test.go | 23 ++ logging/kit/doc.go | 68 +++++ logging/kit/examples_test.go | 81 ++++++ logging/kit/options.go | 127 +++++++++ logging/kit/payload_interceptors.go | 150 +++++++++++ logging/kit/payload_interceptors_test.go | 131 ++++++++++ logging/kit/server_interceptors.go | 92 +++++++ logging/kit/server_interceptors_test.go | 319 +++++++++++++++++++++++ logging/kit/shared_test.go | 95 +++++++ 18 files changed, 1415 insertions(+), 2 deletions(-) create mode 100644 logging/kit/client_interceptors.go create mode 100644 logging/kit/client_interceptors_test.go create mode 100644 logging/kit/ctxkit/context.go create mode 100644 logging/kit/ctxkit/doc.go create mode 100644 logging/kit/ctxkit/examples_test.go create mode 100644 logging/kit/doc.go create mode 100644 logging/kit/examples_test.go create mode 100644 logging/kit/options.go create mode 100644 logging/kit/payload_interceptors.go create mode 100644 logging/kit/payload_interceptors_test.go create mode 100644 logging/kit/server_interceptors.go create mode 100644 logging/kit/server_interceptors_test.go create mode 100644 logging/kit/shared_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 11e1973f6..6eeb7e2dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,10 @@ Types of changes: ## [Unreleased] +### Added + +- [#223](https://github.com/grpc-ecosystem/go-grpc-middleware/pull/223) Add go-kit logging middleware - [adrien-f](https://github.com/adrien-f) + ## [v1.1.0] - 2019-09-12 ### Added - [#226](https://github.com/grpc-ecosystem/go-grpc-middleware/pull/226) Support for go modules. diff --git a/README.md b/README.md index 1fd8643d0..3a4cc2175 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ myServer := grpc.NewServer( * [`grpc_ctxtags`](tags/) - a library that adds a `Tag` map to context, with data populated from request body * [`grpc_zap`](logging/zap/) - integration of [zap](https://github.com/uber-go/zap) logging library into gRPC handlers. * [`grpc_logrus`](logging/logrus/) - integration of [logrus](https://github.com/sirupsen/logrus) logging library into gRPC handlers. - + * [`grpc_kit`](logging/kit/) - integration of [go-kit](https://github.com/go-kit/kit/tree/master/log) logging library into gRPC handlers. #### Monitoring * [`grpc_prometheus`⚡](https://github.com/grpc-ecosystem/go-grpc-prometheus) - Prometheus client-side and server-side monitoring middleware diff --git a/go.mod b/go.mod index 7d0e3cb74..6f8eeac43 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,9 @@ module github.com/grpc-ecosystem/go-grpc-middleware require ( + github.com/go-kit/kit v0.9.0 + github.com/go-logfmt/logfmt v0.4.0 // indirect + github.com/go-stack/stack v1.8.0 // indirect github.com/gogo/protobuf v1.2.1 github.com/golang/protobuf v1.3.2 github.com/opentracing/opentracing-go v1.1.0 diff --git a/go.sum b/go.sum index f142425f6..ff8a579c5 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,12 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk= +github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.4.0 h1:MP4Eh7ZCb31lleYCFuwm0oe4/YGak+5l1vA2NOE80nA= +github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= @@ -19,6 +25,8 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= diff --git a/logging/doc.go b/logging/doc.go index ce8bb2987..f492d38e6 100644 --- a/logging/doc.go +++ b/logging/doc.go @@ -28,7 +28,7 @@ https://github.com/opentracing/specification/blob/master/semantic_conventions.md Implementations -There are two implementations at the moment: logrus and zap +There are three implementations at the moment: logrus, zap and kit See relevant packages below. */ diff --git a/logging/kit/client_interceptors.go b/logging/kit/client_interceptors.go new file mode 100644 index 000000000..0a025137c --- /dev/null +++ b/logging/kit/client_interceptors.go @@ -0,0 +1,54 @@ +package kit + +import ( + "path" + "time" + + "context" + + "github.com/go-kit/kit/log" + "google.golang.org/grpc" +) + +// UnaryClientInterceptor returns a new unary client interceptor that optionally logs the execution of external gRPC calls. +func UnaryClientInterceptor(logger log.Logger, opts ...Option) grpc.UnaryClientInterceptor { + o := evaluateClientOpt(opts) + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + fields := newClientLoggerFields(ctx, method) + startTime := time.Now() + err := invoker(ctx, method, req, reply, cc, opts...) + logFinalClientLine(o, log.With(logger, fields...), startTime, err, "finished client unary call") + return err + } +} + +// StreamClientInterceptor returns a new streaming client interceptor that optionally logs the execution of external gRPC calls. +func StreamClientInterceptor(logger log.Logger, opts ...Option) grpc.StreamClientInterceptor { + o := evaluateClientOpt(opts) + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + fields := newClientLoggerFields(ctx, method) + startTime := time.Now() + clientStream, err := streamer(ctx, desc, cc, method, opts...) + logFinalClientLine(o, log.With(logger, fields...), startTime, err, "finished client streaming call") + return clientStream, err + } +} + +func logFinalClientLine(o *options, logger log.Logger, startTime time.Time, err error, msg string) { + code := o.codeFunc(err) + logger = o.levelFunc(code, logger) + args := []interface{}{"msg", msg, "error", err, "grpc.code", code.String()} + args = append(args, o.durationFunc(time.Since(startTime))...) + logger.Log(args...) +} + +func newClientLoggerFields(ctx context.Context, fullMethodString string) []interface{} { + service := path.Dir(fullMethodString)[1:] + method := path.Base(fullMethodString) + return []interface{}{ + "system", "grpc", + "span.kind", "client", + "grpc.service", service, + "grpc.method", method, + } +} diff --git a/logging/kit/client_interceptors_test.go b/logging/kit/client_interceptors_test.go new file mode 100644 index 000000000..dfb04b63f --- /dev/null +++ b/logging/kit/client_interceptors_test.go @@ -0,0 +1,184 @@ +package kit_test + +import ( + "io" + "runtime" + "strings" + "testing" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + grpc_kit "github.com/grpc-ecosystem/go-grpc-middleware/logging/kit" + pb_testproto "github.com/grpc-ecosystem/go-grpc-middleware/testing/testproto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +func customClientCodeToLevel(c codes.Code, logger log.Logger) log.Logger { + if c == codes.Unauthenticated { + // Make this a special case for tests, and an error. + return level.Error(logger) + } + return grpc_kit.DefaultClientCodeToLevel(c, logger) +} + +func TestKitClientSuite(t *testing.T) { + opts := []grpc_kit.Option{ + grpc_kit.WithLevels(customClientCodeToLevel), + } + b := newKitBaseSuite(t) + b.logger = level.NewFilter(b.logger, level.AllowDebug()) // a lot of our stuff is on debug level by default + b.InterceptorTestSuite.ClientOpts = []grpc.DialOption{ + grpc.WithUnaryInterceptor(grpc_kit.UnaryClientInterceptor(b.logger, opts...)), + grpc.WithStreamInterceptor(grpc_kit.StreamClientInterceptor(b.logger, opts...)), + } + suite.Run(t, &kitClientSuite{b}) +} + +type kitClientSuite struct { + *kitBaseSuite +} + +func (s *kitClientSuite) TestPing() { + _, err := s.Client.Ping(s.SimpleCtx(), goodPing) + assert.NoError(s.T(), err, "there must be not be an on a successful call") + + msgs := s.getOutputJSONs() + require.Len(s.T(), msgs, 1, "one log statement should be logged") + + assert.Equal(s.T(), msgs[0]["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain the correct service name") + assert.Equal(s.T(), msgs[0]["grpc.method"], "Ping", "all lines must contain the correct method name") + assert.Equal(s.T(), msgs[0]["msg"], "finished client unary call", "handler's message must contain the correct message") + assert.Equal(s.T(), msgs[0]["span.kind"], "client", "all lines must contain the kind of call (client)") + assert.Equal(s.T(), msgs[0]["level"], "debug", "OK codes must be logged on debug level.") + + assert.Contains(s.T(), msgs[0], "grpc.time_ms", "interceptor log statement should contain execution time (duration in ms)") +} + +func (s *kitClientSuite) TestPingList() { + stream, err := s.Client.PingList(s.SimpleCtx(), goodPing) + require.NoError(s.T(), err, "should not fail on establishing the stream") + for { + _, err := stream.Recv() + if err == io.EOF { + break + } + require.NoError(s.T(), err, "reading stream should not fail") + } + + msgs := s.getOutputJSONs() + require.Len(s.T(), msgs, 1, "one log statement should be logged") + + assert.Equal(s.T(), msgs[0]["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain the correct service name") + assert.Equal(s.T(), msgs[0]["grpc.method"], "PingList", "all lines must contain the correct method name") + assert.Equal(s.T(), msgs[0]["msg"], "finished client streaming call", "handler's message must contain the correct message") + assert.Equal(s.T(), msgs[0]["span.kind"], "client", "all lines must contain the kind of call (client)") + assert.Equal(s.T(), msgs[0]["level"], "debug", "OK codes must be logged on debug level.") + assert.Contains(s.T(), msgs[0], "grpc.time_ms", "interceptor log statement should contain execution time (duration in ms)") +} + +func (s *kitClientSuite) TestPingError_WithCustomLevels() { + for _, tcase := range []struct { + code codes.Code + level level.Value + msg string + }{ + { + code: codes.Internal, + level: level.WarnValue(), + msg: "Internal must remap to WarnLevel in DefaultClientCodeToLevel", + }, + { + code: codes.NotFound, + level: level.DebugValue(), + msg: "NotFound must remap to DebugLevel in DefaultClientCodeToLevel", + }, + { + code: codes.FailedPrecondition, + level: level.DebugValue(), + msg: "FailedPrecondition must remap to DebugLevel in DefaultClientCodeToLevel", + }, + { + code: codes.Unauthenticated, + level: level.ErrorValue(), + msg: "Unauthenticated is overwritten to ErrorLevel with customClientCodeToLevel override, which probably didn't work", + }, + } { + s.SetupTest() + _, err := s.Client.PingError( + s.SimpleCtx(), + &pb_testproto.PingRequest{Value: "something", ErrorCodeReturned: uint32(tcase.code)}) + + assert.Error(s.T(), err, "each call here must return an error") + + msgs := s.getOutputJSONs() + require.Len(s.T(), msgs, 1, "only a single log message is printed") + + assert.Equal(s.T(), msgs[0]["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain the correct service name") + assert.Equal(s.T(), msgs[0]["grpc.method"], "PingError", "all lines must contain the correct method name") + assert.Equal(s.T(), msgs[0]["grpc.code"], tcase.code.String(), "all lines must contain a grpc code") + assert.Equal(s.T(), msgs[0]["level"], tcase.level.String(), tcase.msg) + } +} + +func TestKitClientOverrideSuite(t *testing.T) { + if strings.HasPrefix(runtime.Version(), "go1.7") { + t.Skip("Skipping due to json.RawMessage incompatibility with go1.7") + return + } + opts := []grpc_kit.Option{ + grpc_kit.WithDurationField(grpc_kit.DurationToDurationField), + } + b := newKitBaseSuite(t) + b.logger = level.NewFilter(b.logger, level.AllowDebug()) // a lot of our stuff is on debug level by default + b.InterceptorTestSuite.ClientOpts = []grpc.DialOption{ + grpc.WithUnaryInterceptor(grpc_kit.UnaryClientInterceptor(b.logger, opts...)), + grpc.WithStreamInterceptor(grpc_kit.StreamClientInterceptor(b.logger, opts...)), + } + suite.Run(t, &kitClientOverrideSuite{b}) +} + +type kitClientOverrideSuite struct { + *kitBaseSuite +} + +func (s *kitClientOverrideSuite) TestPing_HasOverrides() { + _, err := s.Client.Ping(s.SimpleCtx(), goodPing) + assert.NoError(s.T(), err, "there must be not be an on a successful call") + + msgs := s.getOutputJSONs() + require.Len(s.T(), msgs, 1, "one log statement should be logged") + assert.Equal(s.T(), msgs[0]["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain the correct service name") + assert.Equal(s.T(), msgs[0]["grpc.method"], "Ping", "all lines must contain the correct method name") + assert.Equal(s.T(), msgs[0]["msg"], "finished client unary call", "handler's message must contain the correct message") + + assert.NotContains(s.T(), msgs[0], "grpc.time_ms", "message must not contain default duration") + assert.Contains(s.T(), msgs[0], "grpc.duration", "message must contain overridden duration") +} + +func (s *kitClientOverrideSuite) TestPingList_HasOverrides() { + stream, err := s.Client.PingList(s.SimpleCtx(), goodPing) + require.NoError(s.T(), err, "should not fail on establishing the stream") + for { + _, err := stream.Recv() + if err == io.EOF { + break + } + require.NoError(s.T(), err, "reading stream should not fail") + } + + msgs := s.getOutputJSONs() + require.Len(s.T(), msgs, 1, "one log statement should be logged") + + assert.Equal(s.T(), msgs[0]["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain the correct service name") + assert.Equal(s.T(), msgs[0]["grpc.method"], "PingList", "all lines must contain the correct method name") + assert.Equal(s.T(), msgs[0]["msg"], "finished client streaming call", "log message must be correct") + assert.Equal(s.T(), msgs[0]["span.kind"], "client", "all lines must contain the kind of call (client)") + assert.Equal(s.T(), msgs[0]["level"], "debug", "OK codes must be logged on debug level.") + + assert.NotContains(s.T(), msgs[0], "grpc.time_ms", "message must not contain default duration") + assert.Contains(s.T(), msgs[0], "grpc.duration", "message must contain overridden duration") +} diff --git a/logging/kit/ctxkit/context.go b/logging/kit/ctxkit/context.go new file mode 100644 index 000000000..fa1ca9f8f --- /dev/null +++ b/logging/kit/ctxkit/context.go @@ -0,0 +1,60 @@ +package ctxkit + +import ( + "context" + + "github.com/go-kit/kit/log" + grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" +) + +type ctxMarker struct{} + +type ctxLogger struct { + logger log.Logger + fields []interface{} +} + +var ( + ctxMarkerKey = &ctxMarker{} +) + +// AddFields adds fields to the logger. +func AddFields(ctx context.Context, fields ...interface{}) { + l, ok := ctx.Value(ctxMarkerKey).(*ctxLogger) + if !ok || l == nil { + return + } + l.fields = append(l.fields, fields...) +} + +// Extract takes the call-scoped Logger from grpc_kit middleware. +// +// It always returns a Logger that has all the grpc_ctxtags updated. +func Extract(ctx context.Context) log.Logger { + l, ok := ctx.Value(ctxMarkerKey).(*ctxLogger) + if !ok || l == nil { + return log.NewNopLogger() + } + // Add grpc_ctxtags tags metadata until now. + fields := TagsToFields(ctx) + return log.With(l.logger, append(fields, l.fields...)...) +} + +// TagsToFields transforms the Tags on the supplied context into kit fields. +func TagsToFields(ctx context.Context) []interface{} { + var fields []interface{} + tags := grpc_ctxtags.Extract(ctx) + for k, v := range tags.Values() { + fields = append(fields, k, v) + } + return fields +} + +// ToContext adds the kit.Logger to the context for extraction later. +// Returning the new context that has been created. +func ToContext(ctx context.Context, logger log.Logger) context.Context { + l := &ctxLogger{ + logger: logger, + } + return context.WithValue(ctx, ctxMarkerKey, l) +} diff --git a/logging/kit/ctxkit/doc.go b/logging/kit/ctxkit/doc.go new file mode 100644 index 000000000..582f7b012 --- /dev/null +++ b/logging/kit/ctxkit/doc.go @@ -0,0 +1,14 @@ +/* +`ctxkit` is a ctxlogger that is backed by go-kit + +It accepts a user-configured `log.Logger` that will be used for logging. The same `log.Logger` will +be populated into the `context.Context` passed into gRPC handler code. + +You can use `ctxkit.Extract` to log into a request-scoped `log.Logger` instance in your handler code. + +As `ctxkit.Extract` will iterate all tags on from `grpc_ctxtags` it is therefore expensive so it is advised that you +extract once at the start of the function from the context and reuse it for the remainder of the function (see examples). + +Please see examples and tests for examples of use. +*/ +package ctxkit diff --git a/logging/kit/ctxkit/examples_test.go b/logging/kit/ctxkit/examples_test.go new file mode 100644 index 000000000..7fe7e582e --- /dev/null +++ b/logging/kit/ctxkit/examples_test.go @@ -0,0 +1,23 @@ +package ctxkit_test + +import ( + "context" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/kit/ctxkit" + grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" + pb_testproto "github.com/grpc-ecosystem/go-grpc-middleware/testing/testproto" +) + +// Simple unary handler that adds custom fields to the requests's context. These will be used for all log statements. +func ExampleExtract_unary() { + _ = func(ctx context.Context, ping *pb_testproto.PingRequest) (*pb_testproto.PingResponse, error) { + // Add fields the ctxtags of the request which will be added to all extracted loggers. + grpc_ctxtags.Extract(ctx).Set("custom_tags.string", "something").Set("custom_tags.int", 1337) + + // Extract a single request-scoped log.Logger and log messages. + l := ctxkit.Extract(ctx) + l.Log("msg", "some ping") + l.Log("msg", "another ping") + return &pb_testproto.PingResponse{Value: ping.Value}, nil + } +} diff --git a/logging/kit/doc.go b/logging/kit/doc.go new file mode 100644 index 000000000..c9853231b --- /dev/null +++ b/logging/kit/doc.go @@ -0,0 +1,68 @@ +/* +`grpc_kit` is a gRPC logging middleware backed by go-kit loggers + +It accepts a user-configured `log.Logger` that will be used for logging completed gRPC calls, +and be populated into the `context.Context` passed into gRPC handler code. + +On calling `StreamServerInterceptor` or `UnaryServerInterceptor` this logging middleware will add gRPC call information +to the ctx so that it will be present on subsequent use of the `ctxkit` logger. + +If a deadline is present on the gRPC request the grpc.request.deadline tag is populated when the request begins. grpc.request.deadline +is a string representing the time (RFC3339) when the current call will expire. + +This package also implements request and response *payload* logging, both for server-side and client-side. These will be +logged as structured `jsonpb` fields for every message received/sent (both unary and streaming). For that please use +`Payload*Interceptor` functions for that. Please note that the user-provided function that determines whetether to log +the full request/response payload needs to be written with care, this can significantly slow down gRPC. + +*Server Interceptor* +Below is a JSON formatted example of a log that would be logged by the server interceptor: + + { + "level": "info", // string log level + "msg": "finished unary call", // string log message + + "grpc.code": "OK", // string grpc status code + "grpc.method": "Ping", // string method name + "grpc.service": "mwitkow.testproto.TestService", // string full name of the called service + "grpc.start_time": "2006-01-02T15:04:05Z07:00", // string RFC3339 representation of the start time + "grpc.request.deadline": "2006-01-02T15:04:05Z07:00", // string RFC3339 deadline of the current request if supplied + "grpc.request.value": "something", // string value on the request + "grpc.time_ms": 1.345, // float32 run time of the call in ms + + "peer.address": { + "IP": "127.0.0.1", // string IP address of calling party + "Port": 60216, // int port call is coming in on + "Zone": "" // string peer zone for caller + }, + "span.kind": "server", // string client | server + "system": "grpc" // string + + "custom_field": "custom_value", // string user defined field + "custom_tags.int": 1337, // int user defined tag on the ctx + "custom_tags.string": "something", // string user defined tag on the ctx + } + +*Payload Interceptor* +Below is a JSON formatted example of a log that would be logged by the payload interceptor: + + { + "level": "info", // string kit log levels + "msg": "client request payload logged as grpc.request.content", // string log message + + "grpc.request.content": { // object content of RPC request + "msg" : { // object kit specific inner object + "value": "something", // string defined by caller + "sleepTimeMs": 9999 // int defined by caller + } + }, + "grpc.method": "Ping", // string method being called + "grpc.service": "mwitkow.testproto.TestService", // string service being called + + "span.kind": "client", // string client | server + "system": "grpc" // string + } + +Please see examples and tests for examples of use. +*/ +package kit diff --git a/logging/kit/examples_test.go b/logging/kit/examples_test.go new file mode 100644 index 000000000..6e8c0bbb8 --- /dev/null +++ b/logging/kit/examples_test.go @@ -0,0 +1,81 @@ +package kit_test + +import ( + "time" + + "github.com/go-kit/kit/log" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/kit" + grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" + + "google.golang.org/grpc" +) + +var ( + customFunc kit.CodeToLevel +) + +// Initialization shows a relatively complex initialization sequence. +func Example_initialization() { + // Logger is used, allowing pre-definition of certain fields by the user. + logger := log.NewNopLogger() + // Shared options for the logger, with a custom gRPC code to log level function. + opts := []kit.Option{ + kit.WithLevels(customFunc), + } + // Create a server, make sure we put the grpc_ctxtags context before everything else. + _ = grpc.NewServer( + grpc_middleware.WithUnaryServerChain( + grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)), + kit.UnaryServerInterceptor(logger, opts...), + ), + grpc_middleware.WithStreamServerChain( + grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)), + kit.StreamServerInterceptor(logger, opts...), + ), + ) +} + +func Example_initializationWithDurationFieldOverride() { + // Logger is used, allowing pre-definition of certain fields by the user. + logger := log.NewNopLogger() + // Shared options for the logger, with a custom duration to log field function. + opts := []kit.Option{ + kit.WithDurationField(func(duration time.Duration) []interface{} { + return kit.DurationToTimeMillisField(duration) + }), + } + _ = grpc.NewServer( + grpc_middleware.WithUnaryServerChain( + grpc_ctxtags.UnaryServerInterceptor(), + kit.UnaryServerInterceptor(logger, opts...), + ), + grpc_middleware.WithStreamServerChain( + grpc_ctxtags.StreamServerInterceptor(), + kit.StreamServerInterceptor(logger, opts...), + ), + ) +} + +func ExampleWithDecider() { + opts := []kit.Option{ + kit.WithDecider(func(methodFullName string, err error) bool { + // will not log gRPC calls if it was a call to healthcheck and no error was raised + if err == nil && methodFullName == "blah.foo.healthcheck" { + return false + } + + // by default you will log all calls + return true + }), + } + + _ = []grpc.ServerOption{ + grpc_middleware.WithStreamServerChain( + grpc_ctxtags.StreamServerInterceptor(), + kit.StreamServerInterceptor(log.NewNopLogger(), opts...)), + grpc_middleware.WithUnaryServerChain( + grpc_ctxtags.UnaryServerInterceptor(), + kit.UnaryServerInterceptor(log.NewNopLogger(), opts...)), + } +} diff --git a/logging/kit/options.go b/logging/kit/options.go new file mode 100644 index 000000000..c0dd542c2 --- /dev/null +++ b/logging/kit/options.go @@ -0,0 +1,127 @@ +package kit + +import ( + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/logging" + "google.golang.org/grpc/codes" +) + +var ( + defaultOptions = &options{ + shouldLog: grpc_logging.DefaultDeciderMethod, + codeFunc: grpc_logging.DefaultErrorToCode, + durationFunc: DefaultDurationToField, + } +) + +type options struct { + levelFunc CodeToLevel + shouldLog grpc_logging.Decider + codeFunc grpc_logging.ErrorToCode + durationFunc DurationToField +} + +type Option func(*options) + +// CodeToLevel function defines the mapping between gRPC return codes and interceptor log level. +type CodeToLevel func(code codes.Code, logger log.Logger) log.Logger + +// DurationToField function defines how to produce duration fields for logging +type DurationToField func(duration time.Duration) []interface{} + +func evaluateServerOpt(opts []Option) *options { + optCopy := &options{} + *optCopy = *defaultOptions + optCopy.levelFunc = DefaultCodeToLevel + for _, o := range opts { + o(optCopy) + } + return optCopy +} + +func evaluateClientOpt(opts []Option) *options { + optCopy := &options{} + *optCopy = *defaultOptions + optCopy.levelFunc = DefaultClientCodeToLevel + for _, o := range opts { + o(optCopy) + } + return optCopy +} + +// WithDecider customizes the function for deciding if the gRPC interceptor logs should log. +func WithDecider(f grpc_logging.Decider) Option { + return func(o *options) { + o.shouldLog = f + } +} + +// WithLevels customizes the function for mapping gRPC return codes and interceptor log level statements. +func WithLevels(f CodeToLevel) Option { + return func(o *options) { + o.levelFunc = f + } +} + +// WithCodes customizes the function for mapping errors to error codes. +func WithCodes(f grpc_logging.ErrorToCode) Option { + return func(o *options) { + o.codeFunc = f + } +} + +// WithDurationField customizes the function for mapping request durations to log fields. +func WithDurationField(f DurationToField) Option { + return func(o *options) { + o.durationFunc = f + } +} + +// DefaultCodeToLevel is the default implementation of gRPC return codes and interceptor log level for server side. +func DefaultCodeToLevel(code codes.Code, logger log.Logger) log.Logger { + switch code { + case codes.OK, codes.Canceled, codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.Unauthenticated: + return level.Info(logger) + case codes.DeadlineExceeded, codes.PermissionDenied, codes.ResourceExhausted, codes.FailedPrecondition, codes.Aborted, codes.OutOfRange, codes.Unavailable: + return level.Warn(logger) + case codes.Unknown, codes.Unimplemented, codes.Internal, codes.DataLoss: + return level.Error(logger) + default: + return level.Error(logger) + } +} + +// DefaultClientCodeToLevel is the default implementation of gRPC return codes to log levels for client side. +func DefaultClientCodeToLevel(code codes.Code, logger log.Logger) log.Logger { + switch code { + case codes.OK, codes.Canceled, codes.InvalidArgument, codes.NotFound, codes.AlreadyExists, codes.ResourceExhausted, codes.FailedPrecondition, codes.Aborted, codes.OutOfRange: + return level.Debug(logger) + case codes.Unknown, codes.DeadlineExceeded, codes.PermissionDenied, codes.Unauthenticated: + return level.Info(logger) + case codes.Unimplemented, codes.Internal, codes.Unavailable, codes.DataLoss: + return level.Warn(logger) + default: + return level.Info(logger) + } +} + +// DefaultDurationToField is the default implementation of converting request duration to a kit field. +var DefaultDurationToField = DurationToTimeMillisField + +// DurationToTimeMillisField converts the duration to milliseconds and uses the key `grpc.time_ms`. +func DurationToTimeMillisField(duration time.Duration) []interface{} { + return []interface{}{"grpc.time_ms", durationToMilliseconds(duration)} +} + +// DurationToDurationField uses a Duration field to log the request duration +// and leaves it up to Log's encoder settings to determine how that is output. +func DurationToDurationField(duration time.Duration) []interface{} { + return []interface{}{"grpc.duration", duration} +} + +func durationToMilliseconds(duration time.Duration) float32 { + return float32(duration.Nanoseconds()/1000) / 1000 +} diff --git a/logging/kit/payload_interceptors.go b/logging/kit/payload_interceptors.go new file mode 100644 index 000000000..d75bae5d1 --- /dev/null +++ b/logging/kit/payload_interceptors.go @@ -0,0 +1,150 @@ +package kit + +import ( + "bytes" + "fmt" + + "context" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" + grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/logging" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/kit/ctxkit" + "google.golang.org/grpc" +) + +var ( + // JsonPbMarshaller is the marshaller used for serializing protobuf messages. + // If needed, this variable can be reassigned with a different marshaller with the same Marshal() signature. + JsonPbMarshaller grpc_logging.JsonPbMarshaler = &jsonpb.Marshaler{} +) + +// PayloadUnaryServerInterceptor returns a new unary server interceptors that logs the payloads of requests. +// +// This *only* works when placed *after* the `kit.UnaryServerInterceptor`. However, the logging can be done to a +// separate instance of the logger. +func PayloadUnaryServerInterceptor(logger log.Logger, decider grpc_logging.ServerPayloadLoggingDecider) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + if !decider(ctx, info.FullMethod, info.Server) { + return handler(ctx, req) + } + // Use the provided log.Logger for logging but use the fields from context. + logger = log.With(logger, append(serverCallFields(info.FullMethod), ctxkit.TagsToFields(ctx)...)...) + logProtoMessageAsJson(logger, req, "grpc.request.content", "server request payload logged as grpc.request.content field") + resp, err := handler(ctx, req) + if err == nil { + logProtoMessageAsJson(logger, resp, "grpc.response.content", "server response payload logged as grpc.request.content field") + } + return resp, err + } +} + +// PayloadStreamServerInterceptor returns a new server server interceptors that logs the payloads of requests. +// +// This *only* works when placed *after* the `kit.StreamServerInterceptor`. However, the logging can be done to a +// separate instance of the logger. +func PayloadStreamServerInterceptor(logger log.Logger, decider grpc_logging.ServerPayloadLoggingDecider) grpc.StreamServerInterceptor { + return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if !decider(stream.Context(), info.FullMethod, srv) { + return handler(srv, stream) + } + logEntry := log.With(logger, append(serverCallFields(info.FullMethod), ctxkit.TagsToFields(stream.Context())...)...) + newStream := &loggingServerStream{ServerStream: stream, logger: logEntry} + return handler(srv, newStream) + } +} + +// PayloadUnaryClientInterceptor returns a new unary client interceptor that logs the paylods of requests and responses. +func PayloadUnaryClientInterceptor(logger log.Logger, decider grpc_logging.ClientPayloadLoggingDecider) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + if !decider(ctx, method) { + return invoker(ctx, method, req, reply, cc, opts...) + } + logEntry := log.With(logger, newClientLoggerFields(ctx, method)...) + logProtoMessageAsJson(logEntry, req, "grpc.request.content", "client request payload logged as grpc.request.content") + err := invoker(ctx, method, req, reply, cc, opts...) + if err == nil { + logProtoMessageAsJson(logEntry, reply, "grpc.response.content", "client response payload logged as grpc.response.content") + } + return err + } +} + +// PayloadStreamClientInterceptor returns a new streaming client interceptor that logs the paylods of requests and responses. +func PayloadStreamClientInterceptor(logger log.Logger, decider grpc_logging.ClientPayloadLoggingDecider) grpc.StreamClientInterceptor { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + if !decider(ctx, method) { + return streamer(ctx, desc, cc, method, opts...) + } + logEntry := log.With(logger, newClientLoggerFields(ctx, method)...) + clientStream, err := streamer(ctx, desc, cc, method, opts...) + newStream := &loggingClientStream{ClientStream: clientStream, logger: logEntry} + return newStream, err + } +} + +type loggingClientStream struct { + grpc.ClientStream + logger log.Logger +} + +func (l *loggingClientStream) SendMsg(m interface{}) error { + err := l.ClientStream.SendMsg(m) + if err == nil { + logProtoMessageAsJson(l.logger, m, "grpc.request.content", "server request payload logged as grpc.request.content field") + } + return err +} + +func (l *loggingClientStream) RecvMsg(m interface{}) error { + err := l.ClientStream.RecvMsg(m) + if err == nil { + logProtoMessageAsJson(l.logger, m, "grpc.response.content", "server response payload logged as grpc.response.content field") + } + return err +} + +type loggingServerStream struct { + grpc.ServerStream + logger log.Logger +} + +func (l *loggingServerStream) SendMsg(m interface{}) error { + err := l.ServerStream.SendMsg(m) + if err == nil { + logProtoMessageAsJson(l.logger, m, "grpc.response.content", "server response payload logged as grpc.response.content field") + } + return err +} + +func (l *loggingServerStream) RecvMsg(m interface{}) error { + err := l.ServerStream.RecvMsg(m) + if err == nil { + logProtoMessageAsJson(l.logger, m, "grpc.request.content", "server request payload logged as grpc.request.content field") + } + return err +} + +func logProtoMessageAsJson(logger log.Logger, pbMsg interface{}, key string, msg string) { + if p, ok := pbMsg.(proto.Message); ok { + payload, err := (&jsonpbObjectMarshaler{pb: p}).marshalJSON() + if err != nil { + level.Info(logger).Log(key, err) + } + level.Info(logger).Log(key, string(payload)) + } +} + +type jsonpbObjectMarshaler struct { + pb proto.Message +} + +func (j *jsonpbObjectMarshaler) marshalJSON() ([]byte, error) { + b := &bytes.Buffer{} + if err := JsonPbMarshaller.Marshal(b, j.pb); err != nil { + return nil, fmt.Errorf("jsonpb serializer failed: %v", err) + } + return b.Bytes(), nil +} diff --git a/logging/kit/payload_interceptors_test.go b/logging/kit/payload_interceptors_test.go new file mode 100644 index 000000000..408d861ff --- /dev/null +++ b/logging/kit/payload_interceptors_test.go @@ -0,0 +1,131 @@ +package kit_test + +import ( + "io" + "runtime" + "strings" + "testing" + + "context" + + "github.com/go-kit/kit/log" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_kit "github.com/grpc-ecosystem/go-grpc-middleware/logging/kit" + grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" + pb_testproto "github.com/grpc-ecosystem/go-grpc-middleware/testing/testproto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "google.golang.org/grpc" +) + +func TestKitPayloadSuite(t *testing.T) { + if strings.HasPrefix(runtime.Version(), "go1.7") { + t.Skipf("Skipping due to json.RawMessage incompatibility with go1.7") + return + } + + alwaysLoggingDeciderServer := func(ctx context.Context, fullMethodName string, servingObject interface{}) bool { return true } + alwaysLoggingDeciderClient := func(ctx context.Context, fullMethodName string) bool { return true } + + b := newKitBaseSuite(t) + b.InterceptorTestSuite.ClientOpts = []grpc.DialOption{ + grpc.WithUnaryInterceptor(grpc_kit.PayloadUnaryClientInterceptor(b.logger, alwaysLoggingDeciderClient)), + grpc.WithStreamInterceptor(grpc_kit.PayloadStreamClientInterceptor(b.logger, alwaysLoggingDeciderClient)), + } + noOpLogger := log.NewNopLogger() + b.InterceptorTestSuite.ServerOpts = []grpc.ServerOption{ + grpc_middleware.WithStreamServerChain( + grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)), + grpc_kit.StreamServerInterceptor(noOpLogger), + grpc_kit.PayloadStreamServerInterceptor(b.logger, alwaysLoggingDeciderServer)), + grpc_middleware.WithUnaryServerChain( + grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)), + grpc_kit.UnaryServerInterceptor(noOpLogger), + grpc_kit.PayloadUnaryServerInterceptor(b.logger, alwaysLoggingDeciderServer)), + } + suite.Run(t, &kitPayloadSuite{b}) +} + +type kitPayloadSuite struct { + *kitBaseSuite +} + +func (s *kitPayloadSuite) getServerAndClientMessages(expectedServer int, expectedClient int) (serverMsgs []map[string]interface{}, clientMsgs []map[string]interface{}) { + msgs := s.getOutputJSONs() + for _, m := range msgs { + if m["span.kind"] == "server" { + serverMsgs = append(serverMsgs, m) + } else if m["span.kind"] == "client" { + clientMsgs = append(clientMsgs, m) + } + } + require.Len(s.T(), serverMsgs, expectedServer, "must match expected number of server log messages") + require.Len(s.T(), clientMsgs, expectedClient, "must match expected number of client log messages") + return serverMsgs, clientMsgs +} + +func (s *kitPayloadSuite) TestPing_LogsBothRequestAndResponse() { + _, err := s.Client.Ping(s.SimpleCtx(), goodPing) + + require.NoError(s.T(), err, "there must be not be an error on a successful call") + serverMsgs, clientMsgs := s.getServerAndClientMessages(2, 2) + for _, m := range append(serverMsgs, clientMsgs...) { + assert.Equal(s.T(), m["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain service name") + assert.Equal(s.T(), m["grpc.method"], "Ping", "all lines must contain method name") + assert.Equal(s.T(), m["level"], "info", "all payloads must be logged on info level") + } + + serverReq, serverResp := serverMsgs[0], serverMsgs[1] + clientReq, clientResp := clientMsgs[0], clientMsgs[1] + s.T().Log(clientReq) + assert.Contains(s.T(), clientReq, "grpc.request.content", "request payload must be logged in a structured way") + assert.Contains(s.T(), serverReq, "grpc.request.content", "request payload must be logged in a structured way") + assert.Contains(s.T(), clientResp, "grpc.response.content", "response payload must be logged in a structured way") + assert.Contains(s.T(), serverResp, "grpc.response.content", "response payload must be logged in a structured way") +} + +func (s *kitPayloadSuite) TestPingError_LogsOnlyRequestsOnError() { + _, err := s.Client.PingError(s.SimpleCtx(), &pb_testproto.PingRequest{Value: "something", ErrorCodeReturned: uint32(4)}) + + require.Error(s.T(), err, "there must be an error on an unsuccessful call") + serverMsgs, clientMsgs := s.getServerAndClientMessages(1, 1) + for _, m := range append(serverMsgs, clientMsgs...) { + assert.Equal(s.T(), m["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain service name") + assert.Equal(s.T(), m["grpc.method"], "PingError", "all lines must contain method name") + assert.Equal(s.T(), m["level"], "info", "must be logged at the info level") + } + + assert.Contains(s.T(), clientMsgs[0], "grpc.request.content", "request payload must be logged in a structured way") + assert.Contains(s.T(), serverMsgs[0], "grpc.request.content", "request payload must be logged in a structured way") +} + +func (s *kitPayloadSuite) TestPingStream_LogsAllRequestsAndResponses() { + messagesExpected := 20 + stream, err := s.Client.PingStream(s.SimpleCtx()) + + require.NoError(s.T(), err, "no error on stream creation") + for i := 0; i < messagesExpected; i++ { + require.NoError(s.T(), stream.Send(goodPing), "sending must succeed") + } + require.NoError(s.T(), stream.CloseSend(), "no error on send stream") + + for { + pong := &pb_testproto.PingResponse{} + err := stream.RecvMsg(pong) + if err == io.EOF { + break + } + require.NoError(s.T(), err, "no error on receive") + } + + serverMsgs, clientMsgs := s.getServerAndClientMessages(2*messagesExpected, 2*messagesExpected) + for _, m := range append(serverMsgs, clientMsgs...) { + assert.Equal(s.T(), m["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain service name") + assert.Equal(s.T(), m["grpc.method"], "PingStream", "all lines must contain method name") + assert.Equal(s.T(), m["level"], "info", "all lines must logged at info level") + + content := m["grpc.request.content"] != nil || m["grpc.response.content"] != nil + assert.True(s.T(), content, "all messages must contain payloads") + } +} diff --git a/logging/kit/server_interceptors.go b/logging/kit/server_interceptors.go new file mode 100644 index 000000000..5d24ac262 --- /dev/null +++ b/logging/kit/server_interceptors.go @@ -0,0 +1,92 @@ +package kit + +import ( + "path" + "time" + + "context" + + "github.com/go-kit/kit/log" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/kit/ctxkit" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +var ( + // SystemField is used in every log statement made through grpc_zap. Can be overwritten before any initialization code. + SystemField = "grpc" + // ServerField is used in every server-side log statement made through grpc_zap.Can be overwritten before initialization. + ServerField = "server" +) + +// UnaryServerInterceptor returns a new unary server interceptors that adds kit.Logger to the context. +func UnaryServerInterceptor(logger log.Logger, opts ...Option) grpc.UnaryServerInterceptor { + o := evaluateServerOpt(opts) + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + startTime := time.Now() + newCtx := injectLogger(ctx, logger, info.FullMethod, startTime) + + resp, err := handler(newCtx, req) + if !o.shouldLog(info.FullMethod, err) { + return resp, err + } + + code := o.codeFunc(err) + logCall(newCtx, o, "finished unary call with code "+code.String(), code, startTime, err) + + return resp, err + } +} + +// StreamServerInterceptor returns a new stream server interceptors that adds kit.Logger to the context. +func StreamServerInterceptor(logger log.Logger, opts ...Option) grpc.StreamServerInterceptor { + o := evaluateServerOpt(opts) + return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + startTime := time.Now() + newCtx := injectLogger(stream.Context(), logger, info.FullMethod, startTime) + + wrapped := grpc_middleware.WrapServerStream(stream) + wrapped.WrappedContext = newCtx + + err := handler(srv, wrapped) + if !o.shouldLog(info.FullMethod, err) { + return err + } + + code := o.codeFunc(err) + logCall(newCtx, o, "finished streaming call with code "+code.String(), code, startTime, err) + + return err + } +} + +func injectLogger(ctx context.Context, logger log.Logger, fullMethodString string, start time.Time) context.Context { + f := ctxkit.TagsToFields(ctx) + f = append(f, "grpc.start_time", start.Format(time.RFC3339)) + if d, ok := ctx.Deadline(); ok { + f = append(f, "grpc.request.deadline", d.Format(time.RFC3339)) + } + f = append(f, serverCallFields(fullMethodString)...) + callLog := log.With(logger, f...) + return ctxkit.ToContext(ctx, callLog) +} + +func serverCallFields(fullMethodString string) []interface{} { + service := path.Dir(fullMethodString)[1:] + method := path.Base(fullMethodString) + return []interface{}{ + "system", SystemField, + "span.kind", ServerField, + "grpc.service", service, + "grpc.method", method, + } +} + +func logCall(ctx context.Context, options *options, msg string, code codes.Code, startTime time.Time, err error) { + extractedLogger := ctxkit.Extract(ctx) + extractedLogger = options.levelFunc(code, extractedLogger) + args := []interface{}{"msg", msg, "error", err, "grpc.code", code.String()} + args = append(args, options.durationFunc(time.Since(startTime))...) + _ = extractedLogger.Log(args...) +} diff --git a/logging/kit/server_interceptors_test.go b/logging/kit/server_interceptors_test.go new file mode 100644 index 000000000..5032a3bfb --- /dev/null +++ b/logging/kit/server_interceptors_test.go @@ -0,0 +1,319 @@ +package kit_test + +import ( + "io" + "runtime" + "strings" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_kit "github.com/grpc-ecosystem/go-grpc-middleware/logging/kit" + grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" + pb_testproto "github.com/grpc-ecosystem/go-grpc-middleware/testing/testproto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +func customCodeToLevel(c codes.Code, logger log.Logger) log.Logger { + if c == codes.Unauthenticated { + // Make this a special case for tests, and an error. + return level.Error(logger) + } + return grpc_kit.DefaultCodeToLevel(c, logger) +} + +func TestKitLoggingSuite(t *testing.T) { + if strings.HasPrefix(runtime.Version(), "go1.7") { + t.Skipf("Skipping due to json.RawMessage incompatibility with go1.7") + return + } + opts := []grpc_kit.Option{ + grpc_kit.WithLevels(customCodeToLevel), + } + b := newKitBaseSuite(t) + b.InterceptorTestSuite.ServerOpts = []grpc.ServerOption{ + grpc_middleware.WithStreamServerChain( + grpc_ctxtags.StreamServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)), + grpc_kit.StreamServerInterceptor(b.logger, opts...)), + grpc_middleware.WithUnaryServerChain( + grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)), + grpc_kit.UnaryServerInterceptor(b.logger, opts...)), + } + suite.Run(t, &kitServerSuite{b}) +} + +type kitServerSuite struct { + *kitBaseSuite +} + +func (s *kitServerSuite) TestPing_WithCustomTags() { + deadline := time.Now().Add(3 * time.Second) + _, err := s.Client.Ping(s.DeadlineCtx(deadline), goodPing) + require.NoError(s.T(), err, "there must be not be an error on a successful call") + + msgs := s.getOutputJSONs() + require.Len(s.T(), msgs, 2, "two log statements should be logged") + for _, m := range msgs { + assert.Equal(s.T(), m["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain service name") + assert.Equal(s.T(), m["grpc.method"], "Ping", "all lines must contain method name") + assert.Equal(s.T(), m["span.kind"], "server", "all lines must contain the kind of call (server)") + assert.Equal(s.T(), m["custom_tags.string"], "something", "all lines must contain `custom_tags.string`") + assert.Equal(s.T(), m["grpc.request.value"], "something", "all lines must contain fields extracted") + assert.Equal(s.T(), m["custom_field"], "custom_value", "all lines must contain `custom_field`") + + assert.Contains(s.T(), m, "custom_tags.int", "all lines must contain `custom_tags.int`") + require.Contains(s.T(), m, "grpc.start_time", "all lines must contain the start time") + _, err := time.Parse(time.RFC3339, m["grpc.start_time"].(string)) + assert.NoError(s.T(), err, "should be able to parse start time as RFC3339") + + require.Contains(s.T(), m, "grpc.request.deadline", "all lines must contain the deadline of the call") + _, err = time.Parse(time.RFC3339, m["grpc.request.deadline"].(string)) + require.NoError(s.T(), err, "should be able to parse deadline as RFC3339") + assert.Equal(s.T(), m["grpc.request.deadline"], deadline.Format(time.RFC3339), "should have the same deadline that was set by the caller") + } + + assert.Equal(s.T(), msgs[0]["msg"], "some ping", "handler's message must contain user message") + + assert.Equal(s.T(), msgs[1]["msg"], "finished unary call with code OK", "handler's message must contain user message") + assert.Equal(s.T(), msgs[1]["level"], "info", "must be logged at info level") + assert.Contains(s.T(), msgs[1], "grpc.time_ms", "interceptor log statement should contain execution time") +} + +func (s *kitServerSuite) TestPingError_WithCustomLevels() { + for _, tcase := range []struct { + code codes.Code + level level.Value + msg string + }{ + { + code: codes.Internal, + level: level.ErrorValue(), + msg: "Internal must remap to ErrorLevel in DefaultCodeToLevel", + }, + { + code: codes.NotFound, + level: level.InfoValue(), + msg: "NotFound must remap to InfoLevel in DefaultCodeToLevel", + }, + { + code: codes.FailedPrecondition, + level: level.WarnValue(), + msg: "FailedPrecondition must remap to WarnLevel in DefaultCodeToLevel", + }, + { + code: codes.Unauthenticated, + level: level.ErrorValue(), + msg: "Unauthenticated is overwritten to DPanicLevel with customCodeToLevel override, which probably didn't work", + }, + } { + s.buffer.Reset() + _, err := s.Client.PingError( + s.SimpleCtx(), + &pb_testproto.PingRequest{Value: "something", ErrorCodeReturned: uint32(tcase.code)}) + require.Error(s.T(), err, "each call here must return an error") + + msgs := s.getOutputJSONs() + require.Len(s.T(), msgs, 1, "only the interceptor log message is printed in PingErr") + + m := msgs[0] + assert.Equal(s.T(), m["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain service name") + assert.Equal(s.T(), m["grpc.method"], "PingError", "all lines must contain method name") + assert.Equal(s.T(), m["grpc.code"], tcase.code.String(), "all lines have the correct gRPC code") + assert.Equal(s.T(), m["level"], tcase.level.String(), tcase.msg) + assert.Equal(s.T(), m["msg"], "finished unary call with code "+tcase.code.String(), "needs the correct end message") + + require.Contains(s.T(), m, "grpc.start_time", "all lines must contain the start time") + _, err = time.Parse(time.RFC3339, m["grpc.start_time"].(string)) + assert.NoError(s.T(), err, "should be able to parse start time as RFC3339") + } +} + +func (s *kitServerSuite) TestPingList_WithCustomTags() { + stream, err := s.Client.PingList(s.SimpleCtx(), goodPing) + require.NoError(s.T(), err, "should not fail on establishing the stream") + for { + _, err := stream.Recv() + if err == io.EOF { + break + } + require.NoError(s.T(), err, "reading stream should not fail") + } + msgs := s.getOutputJSONs() + require.Len(s.T(), msgs, 2, "two log statements should be logged") + + for _, m := range msgs { + assert.Equal(s.T(), m["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain service name") + assert.Equal(s.T(), m["grpc.method"], "PingList", "all lines must contain method name") + assert.Equal(s.T(), m["span.kind"], "server", "all lines must contain the kind of call (server)") + assert.Equal(s.T(), m["custom_tags.string"], "something", "all lines must contain `custom_tags.string` set by AddFields") + assert.Equal(s.T(), m["grpc.request.value"], "something", "all lines must contain fields extracted from goodPing because of test.manual_extractfields.pb") + + assert.Contains(s.T(), m, "custom_tags.int", "all lines must contain `custom_tags.int` set by AddFields") + require.Contains(s.T(), m, "grpc.start_time", "all lines must contain the start time") + _, err := time.Parse(time.RFC3339, m["grpc.start_time"].(string)) + assert.NoError(s.T(), err, "should be able to parse start time as RFC3339") + } + + assert.Equal(s.T(), msgs[0]["msg"], "some pinglist", "handler's message must contain user message") + + assert.Equal(s.T(), msgs[1]["msg"], "finished streaming call with code OK", "handler's message must contain user message") + assert.Equal(s.T(), msgs[1]["level"], "info", "OK codes must be logged on info level.") + assert.Contains(s.T(), msgs[1], "grpc.time_ms", "interceptor log statement should contain execution time") +} + +func TestKitLoggingOverrideSuite(t *testing.T) { + if strings.HasPrefix(runtime.Version(), "go1.7") { + t.Skip("Skipping due to json.RawMessage incompatibility with go1.7") + return + } + opts := []grpc_kit.Option{ + grpc_kit.WithDurationField(grpc_kit.DurationToDurationField), + } + b := newKitBaseSuite(t) + b.InterceptorTestSuite.ServerOpts = []grpc.ServerOption{ + grpc_middleware.WithStreamServerChain( + grpc_ctxtags.StreamServerInterceptor(), + grpc_kit.StreamServerInterceptor(b.logger, opts...)), + grpc_middleware.WithUnaryServerChain( + grpc_ctxtags.UnaryServerInterceptor(), + grpc_kit.UnaryServerInterceptor(b.logger, opts...)), + } + suite.Run(t, &kitServerOverrideSuite{b}) +} + +type kitServerOverrideSuite struct { + *kitBaseSuite +} + +func (s *kitServerOverrideSuite) TestPing_HasOverriddenDuration() { + _, err := s.Client.Ping(s.SimpleCtx(), goodPing) + require.NoError(s.T(), err, "there must be not be an error on a successful call") + msgs := s.getOutputJSONs() + require.Len(s.T(), msgs, 2, "two log statements should be logged") + + for _, m := range msgs { + assert.Equal(s.T(), m["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain service name") + assert.Equal(s.T(), m["grpc.method"], "Ping", "all lines must contain method name") + } + assert.Equal(s.T(), msgs[0]["msg"], "some ping", "handler's message must contain user message") + assert.NotContains(s.T(), msgs[0], "grpc.time_ms", "handler's message must not contain default duration") + assert.NotContains(s.T(), msgs[0], "grpc.duration", "handler's message must not contain overridden duration") + + assert.Equal(s.T(), msgs[1]["msg"], "finished unary call with code OK", "handler's message must contain user message") + assert.Equal(s.T(), msgs[1]["level"], "info", "OK error codes must be logged on info level.") + assert.NotContains(s.T(), msgs[1], "grpc.time_ms", "handler's message must not contain default duration") + assert.Contains(s.T(), msgs[1], "grpc.duration", "handler's message must contain overridden duration") +} + +func (s *kitServerOverrideSuite) TestPingList_HasOverriddenDuration() { + stream, err := s.Client.PingList(s.SimpleCtx(), goodPing) + require.NoError(s.T(), err, "should not fail on establishing the stream") + for { + _, err := stream.Recv() + if err == io.EOF { + break + } + require.NoError(s.T(), err, "reading stream should not fail") + } + msgs := s.getOutputJSONs() + require.Len(s.T(), msgs, 2, "two log statements should be logged") + for _, m := range msgs { + s.T() + assert.Equal(s.T(), m["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain service name") + assert.Equal(s.T(), m["grpc.method"], "PingList", "all lines must contain method name") + } + + assert.Equal(s.T(), msgs[0]["msg"], "some pinglist", "handler's message must contain user message") + assert.NotContains(s.T(), msgs[0], "grpc.time_ms", "handler's message must not contain default duration") + assert.NotContains(s.T(), msgs[0], "grpc.duration", "handler's message must not contain overridden duration") + + assert.Equal(s.T(), msgs[1]["msg"], "finished streaming call with code OK", "handler's message must contain user message") + assert.Equal(s.T(), msgs[1]["level"], "info", "OK error codes must be logged on info level.") + assert.NotContains(s.T(), msgs[1], "grpc.time_ms", "handler's message must not contain default duration") + assert.Contains(s.T(), msgs[1], "grpc.duration", "handler's message must contain overridden duration") +} + +func TestKitServerOverrideSuppressedSuite(t *testing.T) { + if strings.HasPrefix(runtime.Version(), "go1.7") { + t.Skip("Skipping due to json.RawMessage incompatibility with go1.7") + return + } + opts := []grpc_kit.Option{ + grpc_kit.WithDecider(func(method string, err error) bool { + if err != nil && method == "/mwitkow.testproto.TestService/PingError" { + return true + } + return false + }), + } + b := newKitBaseSuite(t) + b.InterceptorTestSuite.ServerOpts = []grpc.ServerOption{ + grpc_middleware.WithStreamServerChain( + grpc_ctxtags.StreamServerInterceptor(), + grpc_kit.StreamServerInterceptor(b.logger, opts...)), + grpc_middleware.WithUnaryServerChain( + grpc_ctxtags.UnaryServerInterceptor(), + grpc_kit.UnaryServerInterceptor(b.logger, opts...)), + } + suite.Run(t, &kitServerOverridenDeciderSuite{b}) +} + +type kitServerOverridenDeciderSuite struct { + *kitBaseSuite +} + +func (s *kitServerOverridenDeciderSuite) TestPing_HasOverriddenDecider() { + _, err := s.Client.Ping(s.SimpleCtx(), goodPing) + require.NoError(s.T(), err, "there must be not be an error on a successful call") + msgs := s.getOutputJSONs() + require.Len(s.T(), msgs, 1, "single log statements should be logged") + + assert.Equal(s.T(), msgs[0]["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain service name") + assert.Equal(s.T(), msgs[0]["grpc.method"], "Ping", "all lines must contain method name") + assert.Equal(s.T(), msgs[0]["msg"], "some ping", "handler's message must contain user message") +} + +func (s *kitServerOverridenDeciderSuite) TestPingError_HasOverriddenDecider() { + code := codes.NotFound + msg := "NotFound must remap to InfoLevel in DefaultCodeToLevel" + + s.buffer.Reset() + _, err := s.Client.PingError( + s.SimpleCtx(), + &pb_testproto.PingRequest{Value: "something", ErrorCodeReturned: uint32(code)}) + require.Error(s.T(), err, "each call here must return an error") + msgs := s.getOutputJSONs() + require.Len(s.T(), msgs, 1, "only the interceptor log message is printed in PingErr") + m := msgs[0] + assert.Equal(s.T(), m["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain service name") + assert.Equal(s.T(), m["grpc.method"], "PingError", "all lines must contain method name") + assert.Equal(s.T(), m["grpc.code"], code.String(), "all lines must contain the correct gRPC code") + assert.Equal(s.T(), m["level"], "info", msg) +} + +func (s *kitServerOverridenDeciderSuite) TestPingList_HasOverriddenDecider() { + stream, err := s.Client.PingList(s.SimpleCtx(), goodPing) + require.NoError(s.T(), err, "should not fail on establishing the stream") + for { + _, err := stream.Recv() + if err == io.EOF { + break + } + require.NoError(s.T(), err, "reading stream should not fail") + } + msgs := s.getOutputJSONs() + require.Len(s.T(), msgs, 1, "single log statements should be logged") + + assert.Equal(s.T(), msgs[0]["grpc.service"], "mwitkow.testproto.TestService", "all lines must contain service name") + assert.Equal(s.T(), msgs[0]["grpc.method"], "PingList", "all lines must contain method name") + assert.Equal(s.T(), msgs[0]["msg"], "some pinglist", "handler's message must contain user message") + + assert.NotContains(s.T(), msgs[0], "grpc.time_ms", "handler's message must not contain default duration") + assert.NotContains(s.T(), msgs[0], "grpc.duration", "handler's message must not contain overridden duration") +} diff --git a/logging/kit/shared_test.go b/logging/kit/shared_test.go new file mode 100644 index 000000000..7dacf30b0 --- /dev/null +++ b/logging/kit/shared_test.go @@ -0,0 +1,95 @@ +package kit_test + +import ( + "bytes" + "encoding/json" + "io" + "testing" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/kit/ctxkit" + + "context" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" + grpc_testing "github.com/grpc-ecosystem/go-grpc-middleware/testing" + pb_testproto "github.com/grpc-ecosystem/go-grpc-middleware/testing/testproto" +) + +var ( + goodPing = &pb_testproto.PingRequest{Value: "something", SleepTimeMs: 9999} +) + +type loggingPingService struct { + pb_testproto.TestServiceServer +} + +func (s *loggingPingService) Ping(ctx context.Context, ping *pb_testproto.PingRequest) (*pb_testproto.PingResponse, error) { + grpc_ctxtags.Extract(ctx).Set("custom_tags.string", "something").Set("custom_tags.int", 1337) + ctxkit.AddFields(ctx, []interface{}{"custom_field", "custom_value"}...) + level.Info(ctxkit.Extract(ctx)).Log("msg", "some ping") + return s.TestServiceServer.Ping(ctx, ping) +} + +func (s *loggingPingService) PingError(ctx context.Context, ping *pb_testproto.PingRequest) (*pb_testproto.Empty, error) { + return s.TestServiceServer.PingError(ctx, ping) +} + +func (s *loggingPingService) PingList(ping *pb_testproto.PingRequest, stream pb_testproto.TestService_PingListServer) error { + grpc_ctxtags.Extract(stream.Context()).Set("custom_tags.string", "something").Set("custom_tags.int", 1337) + ctxkit.AddFields(stream.Context(), []interface{}{"custom_field", "custom_value"}...) + level.Info(ctxkit.Extract(stream.Context())).Log("msg", "some pinglist") + return s.TestServiceServer.PingList(ping, stream) +} + +func (s *loggingPingService) PingEmpty(ctx context.Context, empty *pb_testproto.Empty) (*pb_testproto.PingResponse, error) { + return s.TestServiceServer.PingEmpty(ctx, empty) +} + +type kitBaseSuite struct { + *grpc_testing.InterceptorTestSuite + mutexBuffer *grpc_testing.MutexReadWriter + buffer *bytes.Buffer + logger log.Logger +} + +func newKitBaseSuite(t *testing.T) *kitBaseSuite { + b := &bytes.Buffer{} + muB := grpc_testing.NewMutexReadWriter(b) + logger := log.NewJSONLogger(log.NewSyncWriter(muB)) + return &kitBaseSuite{ + logger: logger, + buffer: b, + mutexBuffer: muB, + InterceptorTestSuite: &grpc_testing.InterceptorTestSuite{ + TestService: &loggingPingService{&grpc_testing.TestPingService{T: t}}, + }, + } +} + +func (s *kitBaseSuite) SetupTest() { + s.mutexBuffer.Lock() + s.buffer.Reset() + s.mutexBuffer.Unlock() +} + +func (s *kitBaseSuite) getOutputJSONs() []map[string]interface{} { + ret := make([]map[string]interface{}, 0) + dec := json.NewDecoder(s.mutexBuffer) + + for { + var val map[string]interface{} + err := dec.Decode(&val) + if err == io.EOF { + break + } + if err != nil { + s.T().Fatalf("failed decoding output from go-kit JSON: %v", err) + } + + ret = append(ret, val) + } + + return ret +}