-
Notifications
You must be signed in to change notification settings - Fork 0
/
server_interceptors.go
132 lines (119 loc) · 3.5 KB
/
server_interceptors.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright (c) Improbable Worlds Ltd, All Rights Reserved
package grpc_glog
import (
"bytes"
"context"
"fmt"
"github.com/donetkit/contrib-gin/grpc_middleware"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"path"
"strings"
"time"
"encoding/json"
"google.golang.org/grpc"
)
// UnaryServerInterceptor returns a new unary server interceptors that adds logrus.Entry to the context.
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
o := evaluateServerOpt(opts)
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
log := &LogParams{
TimeStamp: time.Now(),
IP: LogRequestIP(ctx),
}
LogRequest(log, req, info.FullMethod)
resp, err := handler(ctx, req)
if err != nil {
LogStatusError(log, err)
} else {
LogStatusError(log, err)
LogResponse(log, resp)
}
//code := o.codeFunc(err)
//log.StatusCode = code.String()
log.Latency = time.Since(log.TimeStamp)
logStr, _ := json.Marshal(log)
o.logger.Debug(string(logStr))
return resp, err
}
}
// StreamServerInterceptor returns a new streaming server interceptor that adds logrus.Entry to the context.
func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
o := evaluateServerOpt(opts)
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ctx := stream.Context()
log := &LogParams{
TimeStamp: time.Now(),
IP: LogRequestIP(ctx),
}
wrapped := grpc_middleware.WrapServerStream(stream)
wrapped.WrappedContext = ctx
err := handler(srv, wrapped)
if err != nil {
LogStatusError(log, err)
} else {
LogStatusError(log, err)
//LogResponse(log, resp)
}
log.Latency = time.Since(log.TimeStamp)
logStr, _ := json.Marshal(log)
o.logger.Debug(string(logStr))
return err
}
}
func LogRequestIP(ctx context.Context) string {
if p, ok := peer.FromContext(ctx); ok {
return p.Addr.String()
}
return ""
}
func LogRequest(log *LogParams, req interface{}, fullMethodString string) {
log.Service = path.Dir(fullMethodString)[1:]
log.Method = path.Base(fullMethodString)
if b := GetRawJSON(req); b != nil {
if b.Len() <= 1024*1024 {
log.RequestData = string(b.Bytes())
} else {
log.RequestData = fmt.Sprintf("request data is too large, limit size: %d", 1024*1024)
}
}
}
func LogResponse(log *LogParams, resp interface{}) {
if b := GetRawJSON(resp); b != nil {
if b.Len() <= 1024*1024*2 {
log.ResponseData = string(b.Bytes())
} else {
log.ResponseData = fmt.Sprintf("response data is too large, limit size: %d", 1024*1024*2)
}
}
}
func LogMetadata(log *LogParams, md *metadata.MD) []string {
var dict []string
for i := range *md {
dict = append(dict, i, strings.Join(md.Get(i), ","))
}
return dict
}
func LogUserAgent(log *LogParams, md *metadata.MD) {
if ua := strings.Join(md.Get("user-agent"), ""); ua != "" {
log.RequestUserAgent = ua
}
}
func LogStatusError(log *LogParams, err error) {
statusErr := status.Convert(err)
//statusErr.Details()
log.StatusCode = statusErr.Code().String()
log.ErrorMessage = statusErr.Message()
}
// GetRawJSON converts a Protobuf message to JSON bytes if less than MaxSize.
func GetRawJSON(i interface{}) *bytes.Buffer {
if pb, ok := i.(proto.Message); ok {
b := &bytes.Buffer{}
if err := Marshaller.Marshal(b, pb); err == nil && b.Len() < 2048000 {
return b
}
}
return nil
}