From 0e30525c6d5fe1fbb9091a22809b6f617e67b351 Mon Sep 17 00:00:00 2001 From: Emad Mohamadi Date: Mon, 1 Aug 2022 16:52:56 +0200 Subject: [PATCH 1/5] Add time bucket grpc interceptor --- carbonserver/carbonserver.go | 36 ++++++++++++++++++++++++++++-------- helper/grpcutil/grpcutil.go | 31 +++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 8 deletions(-) create mode 100644 helper/grpcutil/grpcutil.go diff --git a/carbonserver/carbonserver.go b/carbonserver/carbonserver.go index 4b3c046c4..009bb00a8 100644 --- a/carbonserver/carbonserver.go +++ b/carbonserver/carbonserver.go @@ -22,6 +22,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/go-graphite/go-carbon/helper/grpcutil" "io" "math" "net" @@ -1755,7 +1756,7 @@ func (listener *CarbonserverListener) Listen(listen string) error { handlerStatusCodes, listener.prometheus.request, ), - listener.bucketRequestTimes, + listener.bucketRequestTimesHTTP, ), ) } @@ -1888,7 +1889,27 @@ func (listener *CarbonserverListener) Listen(listen string) error { return nil } -func (listener *CarbonserverListener) bucketRequestTimes(req *http.Request, t time.Duration) { +func (listener *CarbonserverListener) bucketRequestTimesHTTP(req *http.Request, t time.Duration) { + bucket := listener.bucketRequestTimes(t) + if bucket >= listener.buckets { + listener.logger.Info("slow request", + zap.String("url", req.URL.RequestURI()), + zap.String("peer", req.RemoteAddr), + ) + } +} + +func (listener *CarbonserverListener) bucketRequestTimesGRPC(payload, peer string, t time.Duration) { + bucket := listener.bucketRequestTimes(t) + if bucket >= listener.buckets { + listener.logger.Info("slow request", + zap.String("payload", payload), + zap.String("peer", peer), + ) + } +} + +func (listener *CarbonserverListener) bucketRequestTimes(t time.Duration) int { listener.prometheus.duration(t) ms := t.Nanoseconds() / int64(time.Millisecond) @@ -1908,13 +1929,10 @@ func (listener *CarbonserverListener) bucketRequestTimes(req *http.Request, t ti if bucket < listener.buckets { atomic.AddUint64(&listener.timeBuckets[bucket], 1) } else { - // Too big? Increment overflow bucket and log + // Too big? Increment overflow bucket atomic.AddUint64(&listener.timeBuckets[listener.buckets], 1) - listener.logger.Info("slow request", - zap.String("url", req.URL.RequestURI()), - zap.String("peer", req.RemoteAddr), - ) } + return bucket } func extractTrigrams(query string) []trigram.T { @@ -1996,7 +2014,9 @@ func (listener *CarbonserverListener) ListenGRPC(listen string) error { } var opts []grpc.ServerOption - + opts = append(opts, grpc.ChainUnaryInterceptor( + grpcutil.StreamServerTimeHandler(listener.bucketRequestTimesGRPC), + )) grpcServer := grpc.NewServer(opts...) //skipcq: GO-S0902 grpcv2.RegisterCarbonV2Server(grpcServer, listener) go grpcServer.Serve(listener.grpcListener) diff --git a/helper/grpcutil/grpcutil.go b/helper/grpcutil/grpcutil.go new file mode 100644 index 000000000..769bd74e4 --- /dev/null +++ b/helper/grpcutil/grpcutil.go @@ -0,0 +1,31 @@ +package grpcutil + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/peer" +) + +func StreamServerTimeHandler(cb func(payload, peer string, t time.Duration)) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + t0 := time.Now() + defer func() { + t := time.Since(t0) + var payload string + if reqStringer, ok := req.(fmt.Stringer); ok { + payload = reqStringer.String() + } + var reqPeer string + p, ok := peer.FromContext(ctx) + if ok { + reqPeer = p.Addr.String() + } + fmt.Println("INTERCEPTED!") + cb(payload, reqPeer, t) + }() + return handler(ctx, req) + } +} From e6379501d15e7d3452d0030578bd15e01cc5a041 Mon Sep 17 00:00:00 2001 From: Emad Mohamadi Date: Mon, 1 Aug 2022 17:13:39 +0200 Subject: [PATCH 2/5] Add grpc stream server interceptor --- carbonserver/carbonserver.go | 2 +- helper/grpcutil/grpcutil.go | 51 +++++++++++++++++++++++++++++++----- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/carbonserver/carbonserver.go b/carbonserver/carbonserver.go index 009bb00a8..d64a7c55a 100644 --- a/carbonserver/carbonserver.go +++ b/carbonserver/carbonserver.go @@ -2014,7 +2014,7 @@ func (listener *CarbonserverListener) ListenGRPC(listen string) error { } var opts []grpc.ServerOption - opts = append(opts, grpc.ChainUnaryInterceptor( + opts = append(opts, grpc.ChainStreamInterceptor( grpcutil.StreamServerTimeHandler(listener.bucketRequestTimesGRPC), )) grpcServer := grpc.NewServer(opts...) //skipcq: GO-S0902 diff --git a/helper/grpcutil/grpcutil.go b/helper/grpcutil/grpcutil.go index 769bd74e4..1273f8d1e 100644 --- a/helper/grpcutil/grpcutil.go +++ b/helper/grpcutil/grpcutil.go @@ -2,30 +2,67 @@ package grpcutil import ( "context" - "fmt" + "github.com/golang/protobuf/proto" "time" - + "google.golang.org/grpc" "google.golang.org/grpc/peer" ) -func StreamServerTimeHandler(cb func(payload, peer string, t time.Duration)) grpc.UnaryServerInterceptor { +func UnaryServerTimeHandler(cb func(payload, peer string, t time.Duration)) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { t0 := time.Now() defer func() { t := time.Since(t0) var payload string - if reqStringer, ok := req.(fmt.Stringer); ok { + if reqStringer, ok := req.(proto.Message); ok { payload = reqStringer.String() } var reqPeer string - p, ok := peer.FromContext(ctx) - if ok { + if p, ok := peer.FromContext(ctx); ok { reqPeer = p.Addr.String() } - fmt.Println("INTERCEPTED!") cb(payload, reqPeer, t) }() return handler(ctx, req) } } + +func StreamServerTimeHandler(cb func(payload, peer string, t time.Duration)) grpc.StreamServerInterceptor { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + t0 := time.Now() + wss := &wrappedStream{ + ServerStream: ss, + } + defer func() { + t := time.Since(t0) + var reqPeer string + if p, ok := peer.FromContext(wss.Context()); ok { + reqPeer = p.Addr.String() + } + cb(wss.payload, reqPeer, t) + }() + return handler(srv, wss) + } +} + +type wrappedStream struct { + grpc.ServerStream + payload string + gotFirst bool +} + +func (ws *wrappedStream) RecvMsg(m interface{}) error { + err := ws.ServerStream.RecvMsg(m) + if err != nil { + return err + } + if ws.gotFirst { + return nil + } + if p, ok := m.(proto.Message); ok { + ws.payload = p.String() + } + ws.gotFirst = true + return nil +} From b608c44178123a197418c99ccb873eb3c4ed938b Mon Sep 17 00:00:00 2001 From: Emad Mohamadi Date: Mon, 1 Aug 2022 19:00:48 +0200 Subject: [PATCH 3/5] Add status metric gRPC interceptor --- carbonserver/carbonserver.go | 1 + helper/grpcutil/grpcutil.go | 46 ++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/carbonserver/carbonserver.go b/carbonserver/carbonserver.go index d64a7c55a..d4fcd3a82 100644 --- a/carbonserver/carbonserver.go +++ b/carbonserver/carbonserver.go @@ -2016,6 +2016,7 @@ func (listener *CarbonserverListener) ListenGRPC(listen string) error { var opts []grpc.ServerOption opts = append(opts, grpc.ChainStreamInterceptor( grpcutil.StreamServerTimeHandler(listener.bucketRequestTimesGRPC), + grpcutil.StreamServerStatusMetricHandler(statusCodes, listener.prometheus.request), )) grpcServer := grpc.NewServer(opts...) //skipcq: GO-S0902 grpcv2.RegisterCarbonV2Server(grpcServer, listener) diff --git a/helper/grpcutil/grpcutil.go b/helper/grpcutil/grpcutil.go index 1273f8d1e..3546b9491 100644 --- a/helper/grpcutil/grpcutil.go +++ b/helper/grpcutil/grpcutil.go @@ -3,6 +3,11 @@ package grpcutil import ( "context" "github.com/golang/protobuf/proto" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "path" + "strings" + "sync/atomic" "time" "google.golang.org/grpc" @@ -66,3 +71,44 @@ func (ws *wrappedStream) RecvMsg(m interface{}) error { ws.gotFirst = true return nil } + +func StreamServerStatusMetricHandler(statusCodes map[string][]uint64, promRequest func(string, int)) grpc.StreamServerInterceptor { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + err := handler(srv, ss) + _, methodName := path.Split(info.FullMethod) + endpoint := strings.ToLower(methodName) + var major int + if err != nil { + s, _ := status.FromError(err) + major = getHTTPStatusCodeMajorFromGrpcStatusCode(s.Code()) + } else { + major = getHTTPStatusCodeMajorFromGrpcStatusCode(codes.OK) + } + if globalStatusCodes := statusCodes["combined"]; major < len(globalStatusCodes) { + atomic.AddUint64(&globalStatusCodes[major], 1) + if handlerStatusCodes, ok := statusCodes[endpoint]; ok { + atomic.AddUint64(&handlerStatusCodes[major], 1) + } + } + promRequest(endpoint, major) + return err + } +} + +func getHTTPStatusCodeMajorFromGrpcStatusCode(code codes.Code) int { + switch code { + case codes.OK: + return 1 + case codes.Canceled, + codes.InvalidArgument, + codes.NotFound, + codes.AlreadyExists, + codes.PermissionDenied, + codes.ResourceExhausted, + codes.FailedPrecondition, + codes.Unauthenticated: + return 3 + default: + return 4 + } +} From 5887d8c3bacbf83c1e66bedc601c4be1ab789531 Mon Sep 17 00:00:00 2001 From: Emad Mohamadi Date: Wed, 3 Aug 2022 14:08:06 +0200 Subject: [PATCH 4/5] Add ratelimit stream interceptor --- README.md | 7 ++ carbonserver/carbonserver.go | 194 ++++++++++++++++++++++++++++------- carbonserver/render.go | 22 +--- go-carbon.conf.example | 7 ++ helper/grpcutil/grpcutil.go | 39 +++++-- 5 files changed, 201 insertions(+), 68 deletions(-) diff --git a/README.md b/README.md index 6a8800d36..8687d7b37 100644 --- a/README.md +++ b/README.md @@ -527,6 +527,13 @@ stats-percentiles = [99, 98, 95, 75, 50] # [[carbonserver.api-per-path-rate-limiters]] # path = "/metrics/list_query/" # max-inflight-requests = 3 +# +# For gRPC rpcs, path should be full method name. +# +# [[carbonserver.api-per-path-rate-limiters]] +# path = "/carbonapi_v2_grpc.CarbonV2/Render" +# max-inflight-requests = 10 +# request-timeout = "40s" # carbonserver.grpc is the configuration for listening for grpc clients. # Note: currently, only CarbonV2 Render rpc is implemented. diff --git a/carbonserver/carbonserver.go b/carbonserver/carbonserver.go index d4fcd3a82..e508d7c9a 100644 --- a/carbonserver/carbonserver.go +++ b/carbonserver/carbonserver.go @@ -22,7 +22,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/go-graphite/go-carbon/helper/grpcutil" "io" "math" "net" @@ -43,12 +42,16 @@ import ( prom "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" "github.com/NYTimes/gziphandler" "github.com/dgryski/go-expirecache" "github.com/dgryski/go-trigram" "github.com/dgryski/httputil" "github.com/go-graphite/go-carbon/helper" + "github.com/go-graphite/go-carbon/helper/grpcutil" "github.com/go-graphite/go-carbon/helper/stat" "github.com/go-graphite/go-carbon/points" grpcv2 "github.com/go-graphite/protocol/carbonapi_v2_grpc" @@ -1248,19 +1251,10 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str }() // why: no need to continue execution if the request is already timeout. - select { - case <-ctx.Done(): - switch ctx.Err() { - case context.DeadlineExceeded: - listener.prometheus.timeoutRequest() - case context.Canceled: - listener.prometheus.cancelledRequest() - } - + if listener.checkRequestCtx(ctx) != nil { err := fmt.Errorf("time out due to heavy glob query rate limiter: %s", rl.pattern.String()) resultCh <- &ExpandedGlobResponse{query, nil, nil, err} return - default: } break @@ -1635,16 +1629,46 @@ func (listener *CarbonserverListener) initStatsDB() error { return nil } +func (listener *CarbonserverListener) getPathRateLimiter(path string) *ApiPerPathRatelimiter { + rl, ok := listener.apiPerPathRatelimiter[path] + if ok { + return rl + } + return nil +} + +func (listener *CarbonserverListener) checkRequestCtx(ctx context.Context) error { + select { + case <-ctx.Done(): + switch ctx.Err() { + case context.DeadlineExceeded: + listener.prometheus.timeoutRequest() + case context.Canceled: + listener.prometheus.cancelledRequest() + } + return errors.New("context is done") + default: + } + return nil +} + +func (listener *CarbonserverListener) shouldBlockForIndex() bool { + return listener.NoServiceWhenIndexIsNotReady && listener.CurrentFileIndex() == nil +} + func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http.HandlerFunc { return func(wr http.ResponseWriter, req *http.Request) { + ratelimiter := listener.getPathRateLimiter(req.URL.Path) // Can't use http.TimeoutHandler here due to supporting per-path timeout - if ratelimiter, ok := listener.apiPerPathRatelimiter[req.URL.Path]; listener.requestTimeout > 0 || (ok && ratelimiter.timeout > 0) { - timeout := listener.requestTimeout - if ok && ratelimiter.timeout > 0 { - timeout = ratelimiter.timeout - } - - ctx, cancel := context.WithTimeout(req.Context(), timeout) + var newTimeout time.Duration + if listener.requestTimeout > 0 { + newTimeout = listener.requestTimeout + } + if ratelimiter != nil && ratelimiter.timeout > 0 { + newTimeout = ratelimiter.timeout + } + if newTimeout > 0 { + ctx, cancel := context.WithTimeout(req.Context(), newTimeout) defer cancel() req = req.WithContext(ctx) } @@ -1657,7 +1681,7 @@ func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http. zap.String("peer", req.RemoteAddr), )) - if listener.NoServiceWhenIndexIsNotReady && listener.CurrentFileIndex() == nil { + if listener.shouldBlockForIndex() { accessLogger.Error("request denied", zap.Duration("runtime_seconds", time.Since(t0)), zap.String("reason", "index not ready"), @@ -1667,31 +1691,16 @@ func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http. return } - if ratelimiter, ok := listener.apiPerPathRatelimiter[req.URL.Path]; ok { - if cap(ratelimiter.maxInflightRequests) == 0 { + if ratelimiter != nil { + if ratelimiter.Enter() != nil { http.Error(wr, "Bad request (blocked by api per path rate limiter)", http.StatusBadRequest) return } - - ratelimiter.maxInflightRequests <- struct{}{} - defer func() { - select { - case <-ratelimiter.maxInflightRequests: - default: - } - }() + defer ratelimiter.Exit() // why: if the request is already timeout, there is no // need to resume execution. - select { - case <-ctx.Done(): - switch ctx.Err() { - case context.DeadlineExceeded: - listener.prometheus.timeoutRequest() - case context.Canceled: - listener.prometheus.cancelledRequest() - } - + if listener.checkRequestCtx(ctx) != nil { accessLogger.Error("request timeout due to per url rate limiting", zap.Duration("runtime_seconds", time.Since(t0)), zap.String("reason", "timeout due to per url rate limiting"), @@ -1699,7 +1708,6 @@ func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http. ) http.Error(wr, "Bad request (timeout due to maxInflightRequests)", http.StatusRequestTimeout) return - default: } } @@ -2000,6 +2008,32 @@ func NewApiPerPathRatelimiter(maxInflightRequests uint, timeout time.Duration) * } } +type RatelimiterError string + +func (re RatelimiterError) Error() string { + return string(re) +} + +var ( + BlockedRatelimitError = RatelimiterError("blocked by api per path rate limiter") +) + +func (a *ApiPerPathRatelimiter) Enter() error { + if cap(a.maxInflightRequests) == 0 { + return BlockedRatelimitError + } + + a.maxInflightRequests <- struct{}{} + return nil +} + +func (a *ApiPerPathRatelimiter) Exit() { + select { + case <-a.maxInflightRequests: + default: + } +} + func (listener *CarbonserverListener) ListenGRPC(listen string) error { var err error var grpcAddr *net.TCPAddr @@ -2017,9 +2051,91 @@ func (listener *CarbonserverListener) ListenGRPC(listen string) error { opts = append(opts, grpc.ChainStreamInterceptor( grpcutil.StreamServerTimeHandler(listener.bucketRequestTimesGRPC), grpcutil.StreamServerStatusMetricHandler(statusCodes, listener.prometheus.request), + listener.StreamServerRatelimitHandler(), )) grpcServer := grpc.NewServer(opts...) //skipcq: GO-S0902 grpcv2.RegisterCarbonV2Server(grpcServer, listener) go grpcServer.Serve(listener.grpcListener) return nil } + +func (listener *CarbonserverListener) StreamServerRatelimitHandler() grpc.StreamServerInterceptor { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + t0 := time.Now() + fullMethodName := info.FullMethod + wss := grpcutil.GetWrappedStream(ss) + ratelimiter := listener.getPathRateLimiter(fullMethodName) // Can't use http.TimeoutHandler here due to supporting per-path timeout + var newTimeout time.Duration + if listener.requestTimeout > 0 { + newTimeout = listener.requestTimeout + } + if ratelimiter != nil && ratelimiter.timeout > 0 { + newTimeout = ratelimiter.timeout + } + if newTimeout > 0 { + ctx, cancel := context.WithTimeout(wss.Context(), newTimeout) + defer cancel() + wss.SetContext(ctx) + } + + ctx := wss.Context() + var reqPeer string + if p, ok := peer.FromContext(ctx); ok { + reqPeer = p.Addr.String() + } + accessLogger := TraceContextToZap(ctx, listener.accessLogger.With( + zap.String("handler", "rate_limit"), + zap.String("payload", wss.Payload()), + zap.String("peer", reqPeer), + )) + + if listener.shouldBlockForIndex() { + accessLogger.Error("request denied", + zap.Duration("runtime_seconds", time.Since(t0)), + zap.String("reason", "index not ready"), + zap.Int("grpc_code", int(codes.Unavailable)), + ) + return status.Error(codes.Unavailable, "Service unavailable (index not ready)") + } + + if ratelimiter != nil { + if ratelimiter.Enter() != nil { + accessLogger.Error("request blocked", + zap.Duration("runtime_seconds", time.Since(t0)), + zap.String("reason", "blocked by api per path rate limiter"), + zap.Int("grpc_code", int(codes.InvalidArgument)), + ) + return status.Error(codes.InvalidArgument, "blocked by api per path rate limiter") + } + defer ratelimiter.Exit() + + // why: if the request is already timeout, there is no + // need to resume execution. + if listener.checkRequestCtx(ctx) != nil { + accessLogger.Error("request timeout due to per url rate limiting", + zap.Duration("runtime_seconds", time.Since(t0)), + zap.String("reason", "timeout due to per url rate limiting"), + zap.Int("grpc_code", int(codes.ResourceExhausted)), + ) + return status.Error(codes.ResourceExhausted, "timeout due to maxInflightRequests") + } + } + + // TODO: to deprecate as it's replaced by per-path rate limiting? + // + // rate limit inflight requests + inflights := atomic.AddUint64(&listener.metrics.InflightRequests, 1) + defer atomic.AddUint64(&listener.metrics.InflightRequests, ^uint64(0)) + if listener.MaxInflightRequests > 0 && inflights > listener.MaxInflightRequests { + atomic.AddUint64(&listener.metrics.RejectedTooManyRequests, 1) + accessLogger.Error("request denied", + zap.Duration("runtime_seconds", time.Since(t0)), + zap.String("reason", "too many requests"), + zap.Int("grpc_code", http.StatusTooManyRequests), + ) + return status.Error(codes.ResourceExhausted, "too many requests") + } + + return handler(srv, ss) + } +} diff --git a/carbonserver/render.go b/carbonserver/render.go index 5aeef61d2..88cab3e59 100644 --- a/carbonserver/render.go +++ b/carbonserver/render.go @@ -480,18 +480,9 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg } } - select { - case <-ctx.Done(): - switch ctx.Err() { - case context.DeadlineExceeded: - listener.prometheus.timeoutRequest() - case context.Canceled: - listener.prometheus.cancelledRequest() - } - + if listener.checkRequestCtx(ctx) != nil { err := fmt.Errorf("time out while preparing data proto") return fetchResponse{nil, contentType, 0, 0, 0, nil}, err - default: } if format == protoV2Format || format == jsonFormat { @@ -699,17 +690,8 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str } } - select { - case <-ctx.Done(): - switch ctx.Err() { - case context.DeadlineExceeded: - listener.prometheus.timeoutRequest() - case context.Canceled: - listener.prometheus.cancelledRequest() - } - + if listener.checkRequestCtx(ctx) != nil { return status.New(codes.DeadlineExceeded, "time out while preparing data proto").Err() - default: } if metricsFetched == 0 && !listener.emptyResultOk { diff --git a/go-carbon.conf.example b/go-carbon.conf.example index 76899421a..93c60a1b6 100644 --- a/go-carbon.conf.example +++ b/go-carbon.conf.example @@ -425,6 +425,13 @@ stats-percentiles = [99, 98, 95, 75, 50] # [[carbonserver.api-per-path-rate-limiters]] # path = "/metrics/list_query/" # max-inflight-requests = 3 +# +# For gRPC rpcs, path should be full method name. +# +# [[carbonserver.api-per-path-rate-limiters]] +# path = "/carbonapi_v2_grpc.CarbonV2/Render" +# max-inflight-requests = 10 +# request-timeout = "40s" # carbonserver.grpc is the configuration for listening for grpc clients. # Note: currently, only CarbonV2 Render rpc is implemented. diff --git a/helper/grpcutil/grpcutil.go b/helper/grpcutil/grpcutil.go index 3546b9491..54dccfe5f 100644 --- a/helper/grpcutil/grpcutil.go +++ b/helper/grpcutil/grpcutil.go @@ -2,16 +2,16 @@ package grpcutil import ( "context" - "github.com/golang/protobuf/proto" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + "fmt" "path" "strings" "sync/atomic" "time" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" ) func UnaryServerTimeHandler(cb func(payload, peer string, t time.Duration)) grpc.UnaryServerInterceptor { @@ -20,7 +20,7 @@ func UnaryServerTimeHandler(cb func(payload, peer string, t time.Duration)) grpc defer func() { t := time.Since(t0) var payload string - if reqStringer, ok := req.(proto.Message); ok { + if reqStringer, ok := req.(fmt.Stringer); ok { payload = reqStringer.String() } var reqPeer string @@ -36,23 +36,32 @@ func UnaryServerTimeHandler(cb func(payload, peer string, t time.Duration)) grpc func StreamServerTimeHandler(cb func(payload, peer string, t time.Duration)) grpc.StreamServerInterceptor { return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { t0 := time.Now() - wss := &wrappedStream{ - ServerStream: ss, - } + wss := GetWrappedStream(ss) defer func() { t := time.Since(t0) var reqPeer string if p, ok := peer.FromContext(wss.Context()); ok { reqPeer = p.Addr.String() } - cb(wss.payload, reqPeer, t) + cb(wss.Payload(), reqPeer, t) }() return handler(srv, wss) } } +func GetWrappedStream(ss grpc.ServerStream) *wrappedStream { + if wss, ok := ss.(*wrappedStream); ok { + return wss + } + return &wrappedStream{ + ServerStream: ss, + ctx: ss.Context(), + } +} + type wrappedStream struct { grpc.ServerStream + ctx context.Context payload string gotFirst bool } @@ -65,13 +74,25 @@ func (ws *wrappedStream) RecvMsg(m interface{}) error { if ws.gotFirst { return nil } - if p, ok := m.(proto.Message); ok { + if p, ok := m.(fmt.Stringer); ok { ws.payload = p.String() } ws.gotFirst = true return nil } +func (ws *wrappedStream) Context() context.Context { + return ws.ctx +} + +func (ws *wrappedStream) SetContext(ctx context.Context) { + ws.ctx = ctx +} + +func (ws *wrappedStream) Payload() string { + return ws.payload +} + func StreamServerStatusMetricHandler(statusCodes map[string][]uint64, promRequest func(string, int)) grpc.StreamServerInterceptor { return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { err := handler(srv, ss) From 28f0b5250d413c76bd726a8091981c80f7bf8d17 Mon Sep 17 00:00:00 2001 From: Emad Mohamadi Date: Mon, 8 Aug 2022 11:36:55 +0200 Subject: [PATCH 5/5] Refactor minor issues --- carbonserver/carbonserver.go | 13 +++++++++---- helper/grpcutil/grpcutil.go | 16 ++++++++-------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/carbonserver/carbonserver.go b/carbonserver/carbonserver.go index e508d7c9a..b7fa527ae 100644 --- a/carbonserver/carbonserver.go +++ b/carbonserver/carbonserver.go @@ -182,7 +182,7 @@ func (q *QueryItem) FetchOrLock() (interface{}, bool) { return nil, false } - select { //nolint:gosimple + select { //nolint:gosimple //skipcq: SCC-S1000 // TODO: Add timeout support case <-q.QueryFinished: break @@ -228,6 +228,7 @@ type CarbonserverListener struct { failOnMaxGlobs bool percentiles []int scanFrequency time.Duration + scanTicker *time.Ticker forceScanChan chan struct{} metricsAsCounters bool tcpListener *net.TCPListener @@ -446,7 +447,7 @@ type jsonMetricDetailsResponse struct { } type fileIndex struct { - typ int //nolint:unused,structcheck + typ int //nolint:unused,structcheck //skipcq: SCC-U1000 idx trigram.Index files []string @@ -1263,7 +1264,7 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str logger := TraceContextToZap(ctx, listener.logger) matchedCount := 0 defer func(start time.Time) { - dur := time.Now().Sub(start) //nolint:gosimple + dur := time.Since(start) if dur <= time.Second { return } @@ -1560,6 +1561,9 @@ func (listener *CarbonserverListener) Stat(send helper.StatCallback) { func (listener *CarbonserverListener) Stop() error { close(listener.forceScanChan) + if listener.scanTicker != nil { + listener.scanTicker.Stop() + } close(listener.exitChan) if listener.db != nil { listener.db.Close() @@ -1745,7 +1749,8 @@ func (listener *CarbonserverListener) Listen(listen string) error { listener.exitChan = make(chan struct{}) if (listener.trigramIndex || listener.trieIndex) && listener.scanFrequency != 0 { listener.forceScanChan = make(chan struct{}) - go listener.fileListUpdater(listener.whisperData, time.Tick(listener.scanFrequency), listener.forceScanChan, listener.exitChan) //nolint:staticcheck + listener.scanTicker = time.NewTicker(listener.scanFrequency) + go listener.fileListUpdater(listener.whisperData, listener.scanTicker.C, listener.forceScanChan, listener.exitChan) //nolint:staticcheck listener.forceScanChan <- struct{}{} } diff --git a/helper/grpcutil/grpcutil.go b/helper/grpcutil/grpcutil.go index 54dccfe5f..5c60fca7c 100644 --- a/helper/grpcutil/grpcutil.go +++ b/helper/grpcutil/grpcutil.go @@ -49,24 +49,24 @@ func StreamServerTimeHandler(cb func(payload, peer string, t time.Duration)) grp } } -func GetWrappedStream(ss grpc.ServerStream) *wrappedStream { - if wss, ok := ss.(*wrappedStream); ok { +func GetWrappedStream(ss grpc.ServerStream) *WrappedStream { + if wss, ok := ss.(*WrappedStream); ok { return wss } - return &wrappedStream{ + return &WrappedStream{ ServerStream: ss, ctx: ss.Context(), } } -type wrappedStream struct { +type WrappedStream struct { grpc.ServerStream ctx context.Context payload string gotFirst bool } -func (ws *wrappedStream) RecvMsg(m interface{}) error { +func (ws *WrappedStream) RecvMsg(m interface{}) error { err := ws.ServerStream.RecvMsg(m) if err != nil { return err @@ -81,15 +81,15 @@ func (ws *wrappedStream) RecvMsg(m interface{}) error { return nil } -func (ws *wrappedStream) Context() context.Context { +func (ws *WrappedStream) Context() context.Context { return ws.ctx } -func (ws *wrappedStream) SetContext(ctx context.Context) { +func (ws *WrappedStream) SetContext(ctx context.Context) { ws.ctx = ctx } -func (ws *wrappedStream) Payload() string { +func (ws *WrappedStream) Payload() string { return ws.payload }