/
request_audit.go
132 lines (117 loc) · 3.29 KB
/
request_audit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package interceptors
import (
"context"
logrus "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"net"
"strings"
"time"
)
var healthCheckMethodName = "/grpc.health.v1.Health/Check"
// SetHealthCheckMethodName changes the default health check method name
func SetHealthCheckMethodName(methodName string) {
healthCheckMethodName = methodName
}
// Logging request information for Unary requests
func UnaryAuditServiceRequest() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (_ interface{}, err error) {
// Extract all needed info for audit from the RPC call
peer, ok := peer.FromContext(ctx)
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "missing peer info")
}
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "missing metadata")
}
start := time.Now()
resp, err := handler(ctx, req)
logRequest(
start,
info.FullMethod,
md["user-agent"],
peer.Addr,
info.FullMethod,
err,
)
return resp, err // passing up the chain the response and the err
}
}
// Logging request information for Stream requests
func StreamAuditServiceRequest() grpc.StreamServerInterceptor {
return func(
srv interface{},
stream grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler) (err error) {
peer, ok := peer.FromContext(stream.Context())
if !ok {
return status.Errorf(codes.InvalidArgument, "missing peer info")
}
md, ok := metadata.FromIncomingContext(stream.Context())
if !ok {
return status.Errorf(codes.InvalidArgument, "missing metadata")
}
start := time.Now()
err = handler(srv, stream)
logRequest(
start,
info.FullMethod,
md["user-agent"],
peer.Addr,
info.FullMethod,
err,
)
return err
}
}
func logRequest(start time.Time, requestMethod string, userAgents []string, ip net.Addr, fullMethod string, err error) {
if isHealthCheckRequest(requestMethod) {
return
}
sts := status.Convert(err)
auditEntry := logrus.Fields{
"user-agent": userAgents,
"peer": ip,
"took_ns": time.Since(start),
"status": sts.Code().String(),
"err": sts.Message(),
"err-details": sts.Details(),
}
log := logrus.WithFields(auditEntry)
switch sts.Code() {
case codes.OK:
log.Info("gRPC call succeeded")
// Caused by invalid client requests (http 4xx equiv.)
case codes.Canceled,
codes.InvalidArgument,
codes.NotFound,
codes.AlreadyExists,
codes.PermissionDenied,
codes.FailedPrecondition,
codes.Aborted,
codes.OutOfRange,
codes.Unimplemented, // usually caused by client requesting invalid operation (even though it matches 501)
codes.Unauthenticated:
log.Warn("gRPC call failed")
// Server errors (http 5xx equiv.):
// Unknown, DeadlineExceeded, ResourceExhausted, Internal, Unavailable, DataLoss
// (ResourceExhausted is somewhere in between, from user quota exhausted to OOM, rather have it on error for now)
default:
log.Error("gRPC call errored")
}
}
func isHealthCheckRequest(requestMethod string) bool {
if strings.Contains(requestMethod, "Health/Check") {
return true
}
return false
}