diff --git a/README.md b/README.md index 6a8800d36..8687d7b37 100644 --- a/README.md +++ b/README.md @@ -527,6 +527,13 @@ stats-percentiles = [99, 98, 95, 75, 50] # [[carbonserver.api-per-path-rate-limiters]] # path = "/metrics/list_query/" # max-inflight-requests = 3 +# +# For gRPC rpcs, path should be full method name. +# +# [[carbonserver.api-per-path-rate-limiters]] +# path = "/carbonapi_v2_grpc.CarbonV2/Render" +# max-inflight-requests = 10 +# request-timeout = "40s" # carbonserver.grpc is the configuration for listening for grpc clients. # Note: currently, only CarbonV2 Render rpc is implemented. diff --git a/carbonserver/carbonserver.go b/carbonserver/carbonserver.go index 24450882c..a52e18a1a 100644 --- a/carbonserver/carbonserver.go +++ b/carbonserver/carbonserver.go @@ -42,12 +42,16 @@ import ( prom "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" "github.com/NYTimes/gziphandler" "github.com/dgryski/go-expirecache" "github.com/dgryski/go-trigram" "github.com/dgryski/httputil" "github.com/go-graphite/go-carbon/helper" + "github.com/go-graphite/go-carbon/helper/grpcutil" "github.com/go-graphite/go-carbon/helper/stat" "github.com/go-graphite/go-carbon/points" grpcv2 "github.com/go-graphite/protocol/carbonapi_v2_grpc" @@ -179,7 +183,7 @@ func (q *QueryItem) FetchOrLock() (interface{}, bool) { return nil, false } - select { //nolint:gosimple + select { //nolint:gosimple //skipcq: SCC-S1000 // TODO: Add timeout support case <-q.QueryFinished: break @@ -225,6 +229,7 @@ type CarbonserverListener struct { failOnMaxGlobs bool percentiles []int scanFrequency time.Duration + scanTicker *time.Ticker forceScanChan chan struct{} metricsAsCounters bool tcpListener *net.TCPListener @@ -443,7 +448,7 @@ type jsonMetricDetailsResponse struct { } type fileIndex struct { - typ int //nolint:unused,structcheck + typ int //nolint:unused,structcheck //skipcq: SCC-U1000 idx trigram.Index files []string @@ -1248,19 +1253,10 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str }() // why: no need to continue execution if the request is already timeout. - select { - case <-ctx.Done(): - switch ctx.Err() { - case context.DeadlineExceeded: - listener.prometheus.timeoutRequest() - case context.Canceled: - listener.prometheus.cancelledRequest() - } - + if listener.checkRequestCtx(ctx) != nil { err := fmt.Errorf("time out due to heavy glob query rate limiter: %s", rl.pattern.String()) resultCh <- &ExpandedGlobResponse{query, nil, nil, nil, err} return - default: } break @@ -1269,7 +1265,7 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str logger := TraceContextToZap(ctx, listener.logger) matchedCount := 0 defer func(start time.Time) { - dur := time.Now().Sub(start) //nolint:gosimple + dur := time.Since(start) if dur <= time.Second { return } @@ -1566,6 +1562,9 @@ func (listener *CarbonserverListener) Stat(send helper.StatCallback) { func (listener *CarbonserverListener) Stop() error { close(listener.forceScanChan) + if listener.scanTicker != nil { + listener.scanTicker.Stop() + } close(listener.exitChan) if listener.db != nil { listener.db.Close() @@ -1635,16 +1634,46 @@ func (listener *CarbonserverListener) initStatsDB() error { return nil } +func (listener *CarbonserverListener) getPathRateLimiter(path string) *ApiPerPathRatelimiter { + rl, ok := listener.apiPerPathRatelimiter[path] + if ok { + return rl + } + return nil +} + +func (listener *CarbonserverListener) checkRequestCtx(ctx context.Context) error { + select { + case <-ctx.Done(): + switch ctx.Err() { + case context.DeadlineExceeded: + listener.prometheus.timeoutRequest() + case context.Canceled: + listener.prometheus.cancelledRequest() + } + return errors.New("context is done") + default: + } + return nil +} + +func (listener *CarbonserverListener) shouldBlockForIndex() bool { + return listener.NoServiceWhenIndexIsNotReady && listener.CurrentFileIndex() == nil +} + func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http.HandlerFunc { return func(wr http.ResponseWriter, req *http.Request) { + ratelimiter := listener.getPathRateLimiter(req.URL.Path) // Can't use http.TimeoutHandler here due to supporting per-path timeout - if ratelimiter, ok := listener.apiPerPathRatelimiter[req.URL.Path]; listener.requestTimeout > 0 || (ok && ratelimiter.timeout > 0) { - timeout := listener.requestTimeout - if ok && ratelimiter.timeout > 0 { - timeout = ratelimiter.timeout - } - - ctx, cancel := context.WithTimeout(req.Context(), timeout) + var newTimeout time.Duration + if listener.requestTimeout > 0 { + newTimeout = listener.requestTimeout + } + if ratelimiter != nil && ratelimiter.timeout > 0 { + newTimeout = ratelimiter.timeout + } + if newTimeout > 0 { + ctx, cancel := context.WithTimeout(req.Context(), newTimeout) defer cancel() req = req.WithContext(ctx) } @@ -1657,7 +1686,7 @@ func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http. zap.String("peer", req.RemoteAddr), )) - if listener.NoServiceWhenIndexIsNotReady && listener.CurrentFileIndex() == nil { + if listener.shouldBlockForIndex() { accessLogger.Error("request denied", zap.Duration("runtime_seconds", time.Since(t0)), zap.String("reason", "index not ready"), @@ -1667,31 +1696,16 @@ func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http. return } - if ratelimiter, ok := listener.apiPerPathRatelimiter[req.URL.Path]; ok { - if cap(ratelimiter.maxInflightRequests) == 0 { + if ratelimiter != nil { + if ratelimiter.Enter() != nil { http.Error(wr, "Bad request (blocked by api per path rate limiter)", http.StatusBadRequest) return } - - ratelimiter.maxInflightRequests <- struct{}{} - defer func() { - select { - case <-ratelimiter.maxInflightRequests: - default: - } - }() + defer ratelimiter.Exit() // why: if the request is already timeout, there is no // need to resume execution. - select { - case <-ctx.Done(): - switch ctx.Err() { - case context.DeadlineExceeded: - listener.prometheus.timeoutRequest() - case context.Canceled: - listener.prometheus.cancelledRequest() - } - + if listener.checkRequestCtx(ctx) != nil { accessLogger.Error("request timeout due to per url rate limiting", zap.Duration("runtime_seconds", time.Since(t0)), zap.String("reason", "timeout due to per url rate limiting"), @@ -1699,7 +1713,6 @@ func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http. ) http.Error(wr, "Bad request (timeout due to maxInflightRequests)", http.StatusRequestTimeout) return - default: } } @@ -1737,7 +1750,8 @@ func (listener *CarbonserverListener) Listen(listen string) error { listener.exitChan = make(chan struct{}) if (listener.trigramIndex || listener.trieIndex) && listener.scanFrequency != 0 { listener.forceScanChan = make(chan struct{}) - go listener.fileListUpdater(listener.whisperData, time.Tick(listener.scanFrequency), listener.forceScanChan, listener.exitChan) //nolint:staticcheck + listener.scanTicker = time.NewTicker(listener.scanFrequency) + go listener.fileListUpdater(listener.whisperData, listener.scanTicker.C, listener.forceScanChan, listener.exitChan) //nolint:staticcheck listener.forceScanChan <- struct{}{} } @@ -1756,7 +1770,7 @@ func (listener *CarbonserverListener) Listen(listen string) error { handlerStatusCodes, listener.prometheus.request, ), - listener.bucketRequestTimes, + listener.bucketRequestTimesHTTP, ), ) } @@ -1889,7 +1903,27 @@ func (listener *CarbonserverListener) Listen(listen string) error { return nil } -func (listener *CarbonserverListener) bucketRequestTimes(req *http.Request, t time.Duration) { +func (listener *CarbonserverListener) bucketRequestTimesHTTP(req *http.Request, t time.Duration) { + bucket := listener.bucketRequestTimes(t) + if bucket >= listener.buckets { + listener.logger.Info("slow request", + zap.String("url", req.URL.RequestURI()), + zap.String("peer", req.RemoteAddr), + ) + } +} + +func (listener *CarbonserverListener) bucketRequestTimesGRPC(payload, peer string, t time.Duration) { + bucket := listener.bucketRequestTimes(t) + if bucket >= listener.buckets { + listener.logger.Info("slow request", + zap.String("payload", payload), + zap.String("peer", peer), + ) + } +} + +func (listener *CarbonserverListener) bucketRequestTimes(t time.Duration) int { listener.prometheus.duration(t) ms := t.Nanoseconds() / int64(time.Millisecond) @@ -1909,13 +1943,10 @@ func (listener *CarbonserverListener) bucketRequestTimes(req *http.Request, t ti if bucket < listener.buckets { atomic.AddUint64(&listener.timeBuckets[bucket], 1) } else { - // Too big? Increment overflow bucket and log + // Too big? Increment overflow bucket atomic.AddUint64(&listener.timeBuckets[listener.buckets], 1) - listener.logger.Info("slow request", - zap.String("url", req.URL.RequestURI()), - zap.String("peer", req.RemoteAddr), - ) } + return bucket } func extractTrigrams(query string) []trigram.T { @@ -1983,6 +2014,32 @@ func NewApiPerPathRatelimiter(maxInflightRequests uint, timeout time.Duration) * } } +type RatelimiterError string + +func (re RatelimiterError) Error() string { + return string(re) +} + +var ( + BlockedRatelimitError = RatelimiterError("blocked by api per path rate limiter") +) + +func (a *ApiPerPathRatelimiter) Enter() error { + if cap(a.maxInflightRequests) == 0 { + return BlockedRatelimitError + } + + a.maxInflightRequests <- struct{}{} + return nil +} + +func (a *ApiPerPathRatelimiter) Exit() { + select { + case <-a.maxInflightRequests: + default: + } +} + func (listener *CarbonserverListener) ListenGRPC(listen string) error { var err error var grpcAddr *net.TCPAddr @@ -1997,9 +2054,94 @@ func (listener *CarbonserverListener) ListenGRPC(listen string) error { } var opts []grpc.ServerOption - + opts = append(opts, grpc.ChainStreamInterceptor( + grpcutil.StreamServerTimeHandler(listener.bucketRequestTimesGRPC), + grpcutil.StreamServerStatusMetricHandler(statusCodes, listener.prometheus.request), + listener.StreamServerRatelimitHandler(), + )) grpcServer := grpc.NewServer(opts...) //skipcq: GO-S0902 grpcv2.RegisterCarbonV2Server(grpcServer, listener) go grpcServer.Serve(listener.grpcListener) return nil } + +func (listener *CarbonserverListener) StreamServerRatelimitHandler() grpc.StreamServerInterceptor { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + t0 := time.Now() + fullMethodName := info.FullMethod + wss := grpcutil.GetWrappedStream(ss) + ratelimiter := listener.getPathRateLimiter(fullMethodName) // Can't use http.TimeoutHandler here due to supporting per-path timeout + var newTimeout time.Duration + if listener.requestTimeout > 0 { + newTimeout = listener.requestTimeout + } + if ratelimiter != nil && ratelimiter.timeout > 0 { + newTimeout = ratelimiter.timeout + } + if newTimeout > 0 { + ctx, cancel := context.WithTimeout(wss.Context(), newTimeout) + defer cancel() + wss.SetContext(ctx) + } + + ctx := wss.Context() + var reqPeer string + if p, ok := peer.FromContext(ctx); ok { + reqPeer = p.Addr.String() + } + accessLogger := TraceContextToZap(ctx, listener.accessLogger.With( + zap.String("handler", "rate_limit"), + zap.String("payload", wss.Payload()), + zap.String("peer", reqPeer), + )) + + if listener.shouldBlockForIndex() { + accessLogger.Error("request denied", + zap.Duration("runtime_seconds", time.Since(t0)), + zap.String("reason", "index not ready"), + zap.Int("grpc_code", int(codes.Unavailable)), + ) + return status.Error(codes.Unavailable, "Service unavailable (index not ready)") + } + + if ratelimiter != nil { + if ratelimiter.Enter() != nil { + accessLogger.Error("request blocked", + zap.Duration("runtime_seconds", time.Since(t0)), + zap.String("reason", "blocked by api per path rate limiter"), + zap.Int("grpc_code", int(codes.InvalidArgument)), + ) + return status.Error(codes.InvalidArgument, "blocked by api per path rate limiter") + } + defer ratelimiter.Exit() + + // why: if the request is already timeout, there is no + // need to resume execution. + if listener.checkRequestCtx(ctx) != nil { + accessLogger.Error("request timeout due to per url rate limiting", + zap.Duration("runtime_seconds", time.Since(t0)), + zap.String("reason", "timeout due to per url rate limiting"), + zap.Int("grpc_code", int(codes.ResourceExhausted)), + ) + return status.Error(codes.ResourceExhausted, "timeout due to maxInflightRequests") + } + } + + // TODO: to deprecate as it's replaced by per-path rate limiting? + // + // rate limit inflight requests + inflights := atomic.AddUint64(&listener.metrics.InflightRequests, 1) + defer atomic.AddUint64(&listener.metrics.InflightRequests, ^uint64(0)) + if listener.MaxInflightRequests > 0 && inflights > listener.MaxInflightRequests { + atomic.AddUint64(&listener.metrics.RejectedTooManyRequests, 1) + accessLogger.Error("request denied", + zap.Duration("runtime_seconds", time.Since(t0)), + zap.String("reason", "too many requests"), + zap.Int("grpc_code", http.StatusTooManyRequests), + ) + return status.Error(codes.ResourceExhausted, "too many requests") + } + + return handler(srv, ss) + } +} diff --git a/carbonserver/render.go b/carbonserver/render.go index 8470051cf..40925cd5f 100644 --- a/carbonserver/render.go +++ b/carbonserver/render.go @@ -482,18 +482,9 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg } } - select { - case <-ctx.Done(): - switch ctx.Err() { - case context.DeadlineExceeded: - listener.prometheus.timeoutRequest() - case context.Canceled: - listener.prometheus.cancelledRequest() - } - + if listener.checkRequestCtx(ctx) != nil { err := fmt.Errorf("time out while preparing data proto") return fetchResponse{nil, contentType, 0, 0, 0, nil}, err - default: } if format == protoV2Format || format == jsonFormat { @@ -705,17 +696,8 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str } } - select { - case <-ctx.Done(): - switch ctx.Err() { - case context.DeadlineExceeded: - listener.prometheus.timeoutRequest() - case context.Canceled: - listener.prometheus.cancelledRequest() - } - + if listener.checkRequestCtx(ctx) != nil { return status.New(codes.DeadlineExceeded, "time out while preparing data proto").Err() - default: } if metricsFetched == 0 && !listener.emptyResultOk { diff --git a/go-carbon.conf.example b/go-carbon.conf.example index 76899421a..93c60a1b6 100644 --- a/go-carbon.conf.example +++ b/go-carbon.conf.example @@ -425,6 +425,13 @@ stats-percentiles = [99, 98, 95, 75, 50] # [[carbonserver.api-per-path-rate-limiters]] # path = "/metrics/list_query/" # max-inflight-requests = 3 +# +# For gRPC rpcs, path should be full method name. +# +# [[carbonserver.api-per-path-rate-limiters]] +# path = "/carbonapi_v2_grpc.CarbonV2/Render" +# max-inflight-requests = 10 +# request-timeout = "40s" # carbonserver.grpc is the configuration for listening for grpc clients. # Note: currently, only CarbonV2 Render rpc is implemented. diff --git a/helper/grpcutil/grpcutil.go b/helper/grpcutil/grpcutil.go new file mode 100644 index 000000000..5c60fca7c --- /dev/null +++ b/helper/grpcutil/grpcutil.go @@ -0,0 +1,135 @@ +package grpcutil + +import ( + "context" + "fmt" + "path" + "strings" + "sync/atomic" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" +) + +func UnaryServerTimeHandler(cb func(payload, peer string, t time.Duration)) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + t0 := time.Now() + defer func() { + t := time.Since(t0) + var payload string + if reqStringer, ok := req.(fmt.Stringer); ok { + payload = reqStringer.String() + } + var reqPeer string + if p, ok := peer.FromContext(ctx); ok { + reqPeer = p.Addr.String() + } + cb(payload, reqPeer, t) + }() + return handler(ctx, req) + } +} + +func StreamServerTimeHandler(cb func(payload, peer string, t time.Duration)) grpc.StreamServerInterceptor { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + t0 := time.Now() + wss := GetWrappedStream(ss) + defer func() { + t := time.Since(t0) + var reqPeer string + if p, ok := peer.FromContext(wss.Context()); ok { + reqPeer = p.Addr.String() + } + cb(wss.Payload(), reqPeer, t) + }() + return handler(srv, wss) + } +} + +func GetWrappedStream(ss grpc.ServerStream) *WrappedStream { + if wss, ok := ss.(*WrappedStream); ok { + return wss + } + return &WrappedStream{ + ServerStream: ss, + ctx: ss.Context(), + } +} + +type WrappedStream struct { + grpc.ServerStream + ctx context.Context + payload string + gotFirst bool +} + +func (ws *WrappedStream) RecvMsg(m interface{}) error { + err := ws.ServerStream.RecvMsg(m) + if err != nil { + return err + } + if ws.gotFirst { + return nil + } + if p, ok := m.(fmt.Stringer); ok { + ws.payload = p.String() + } + ws.gotFirst = true + return nil +} + +func (ws *WrappedStream) Context() context.Context { + return ws.ctx +} + +func (ws *WrappedStream) SetContext(ctx context.Context) { + ws.ctx = ctx +} + +func (ws *WrappedStream) Payload() string { + return ws.payload +} + +func StreamServerStatusMetricHandler(statusCodes map[string][]uint64, promRequest func(string, int)) grpc.StreamServerInterceptor { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + err := handler(srv, ss) + _, methodName := path.Split(info.FullMethod) + endpoint := strings.ToLower(methodName) + var major int + if err != nil { + s, _ := status.FromError(err) + major = getHTTPStatusCodeMajorFromGrpcStatusCode(s.Code()) + } else { + major = getHTTPStatusCodeMajorFromGrpcStatusCode(codes.OK) + } + if globalStatusCodes := statusCodes["combined"]; major < len(globalStatusCodes) { + atomic.AddUint64(&globalStatusCodes[major], 1) + if handlerStatusCodes, ok := statusCodes[endpoint]; ok { + atomic.AddUint64(&handlerStatusCodes[major], 1) + } + } + promRequest(endpoint, major) + return err + } +} + +func getHTTPStatusCodeMajorFromGrpcStatusCode(code codes.Code) int { + switch code { + case codes.OK: + return 1 + case codes.Canceled, + codes.InvalidArgument, + codes.NotFound, + codes.AlreadyExists, + codes.PermissionDenied, + codes.ResourceExhausted, + codes.FailedPrecondition, + codes.Unauthenticated: + return 3 + default: + return 4 + } +}