Skip to content

Commit

Permalink
Add ratelimit stream interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
emadolsky committed Aug 3, 2022
1 parent b608c44 commit 97b6a90
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 63 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,13 @@ stats-percentiles = [99, 98, 95, 75, 50]
# [[carbonserver.api-per-path-rate-limiters]]
# path = "/metrics/list_query/"
# max-inflight-requests = 3
#
# For gRPC rpcs, path should be full method name.
#
# [[carbonserver.api-per-path-rate-limiters]]
# path = "/carbonapi_v2_grpc.CarbonV2/Render"
# max-inflight-requests = 10
# request-timeout = "40s"

# carbonserver.grpc is the configuration for listening for grpc clients.
# Note: currently, only CarbonV2 Render rpc is implemented.
Expand Down
194 changes: 155 additions & 39 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/go-graphite/go-carbon/helper/grpcutil"
"io"
"math"
"net"
Expand All @@ -43,12 +42,16 @@ import (

prom "github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

"github.com/NYTimes/gziphandler"
"github.com/dgryski/go-expirecache"
"github.com/dgryski/go-trigram"
"github.com/dgryski/httputil"
"github.com/go-graphite/go-carbon/helper"
"github.com/go-graphite/go-carbon/helper/grpcutil"
"github.com/go-graphite/go-carbon/helper/stat"
"github.com/go-graphite/go-carbon/points"
grpcv2 "github.com/go-graphite/protocol/carbonapi_v2_grpc"
Expand Down Expand Up @@ -1248,19 +1251,10 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str
}()

// why: no need to continue execution if the request is already timeout.
select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
listener.prometheus.timeoutRequest()
case context.Canceled:
listener.prometheus.cancelledRequest()
}

if listener.checkRequestCtx(ctx) != nil {
err := fmt.Errorf("time out due to heavy glob query rate limiter: %s", rl.pattern.String())
resultCh <- &ExpandedGlobResponse{query, nil, nil, err}
return
default:
}

break
Expand Down Expand Up @@ -1635,16 +1629,46 @@ func (listener *CarbonserverListener) initStatsDB() error {
return nil
}

func (listener *CarbonserverListener) getPathRateLimiter(path string) *ApiPerPathRatelimiter {
rl, ok := listener.apiPerPathRatelimiter[path]
if ok {
return rl
}
return nil
}

func (listener *CarbonserverListener) checkRequestCtx(ctx context.Context) error {
select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
listener.prometheus.timeoutRequest()
case context.Canceled:
listener.prometheus.cancelledRequest()
}
return errors.New("context is done")
default:
}
return nil
}

func (listener *CarbonserverListener) shouldBlockForIndex() bool {
return listener.NoServiceWhenIndexIsNotReady && listener.CurrentFileIndex() == nil
}

func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http.HandlerFunc {
return func(wr http.ResponseWriter, req *http.Request) {
ratelimiter := listener.getPathRateLimiter(req.URL.Path)
// Can't use http.TimeoutHandler here due to supporting per-path timeout
if ratelimiter, ok := listener.apiPerPathRatelimiter[req.URL.Path]; listener.requestTimeout > 0 || (ok && ratelimiter.timeout > 0) {
timeout := listener.requestTimeout
if ok && ratelimiter.timeout > 0 {
timeout = ratelimiter.timeout
}

ctx, cancel := context.WithTimeout(req.Context(), timeout)
var newTimeout time.Duration
if listener.requestTimeout > 0 {
newTimeout = listener.requestTimeout
}
if ratelimiter != nil && ratelimiter.timeout > 0 {
newTimeout = ratelimiter.timeout
}
if newTimeout > 0 {
ctx, cancel := context.WithTimeout(req.Context(), newTimeout)
defer cancel()
req = req.WithContext(ctx)
}
Expand All @@ -1657,7 +1681,7 @@ func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http.
zap.String("peer", req.RemoteAddr),
))

if listener.NoServiceWhenIndexIsNotReady && listener.CurrentFileIndex() == nil {
if listener.shouldBlockForIndex() {
accessLogger.Error("request denied",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "index not ready"),
Expand All @@ -1667,39 +1691,23 @@ func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http.
return
}

if ratelimiter, ok := listener.apiPerPathRatelimiter[req.URL.Path]; ok {
if cap(ratelimiter.maxInflightRequests) == 0 {
if ratelimiter != nil {
if ratelimiter.Enter() != nil {
http.Error(wr, "Bad request (blocked by api per path rate limiter)", http.StatusBadRequest)
return
}

ratelimiter.maxInflightRequests <- struct{}{}
defer func() {
select {
case <-ratelimiter.maxInflightRequests:
default:
}
}()
defer ratelimiter.Exit()

// why: if the request is already timeout, there is no
// need to resume execution.
select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
listener.prometheus.timeoutRequest()
case context.Canceled:
listener.prometheus.cancelledRequest()
}

if listener.checkRequestCtx(ctx) != nil {
accessLogger.Error("request timeout due to per url rate limiting",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "timeout due to per url rate limiting"),
zap.Int("http_code", http.StatusRequestTimeout),
)
http.Error(wr, "Bad request (timeout due to maxInflightRequests)", http.StatusRequestTimeout)
return
default:
}
}

Expand Down Expand Up @@ -2000,6 +2008,32 @@ func NewApiPerPathRatelimiter(maxInflightRequests uint, timeout time.Duration) *
}
}

type RatelimiterError string

func (re RatelimiterError) Error() string {
return string(re)
}

var (
BlockedRatelimitError = RatelimiterError("blocked by api per path rate limiter")
)

func (a *ApiPerPathRatelimiter) Enter() error {
if cap(a.maxInflightRequests) == 0 {
return BlockedRatelimitError
}

a.maxInflightRequests <- struct{}{}
return nil
}

func (a *ApiPerPathRatelimiter) Exit() {
select {
case <-a.maxInflightRequests:
default:
}
}

func (listener *CarbonserverListener) ListenGRPC(listen string) error {
var err error
var grpcAddr *net.TCPAddr
Expand All @@ -2017,9 +2051,91 @@ func (listener *CarbonserverListener) ListenGRPC(listen string) error {
opts = append(opts, grpc.ChainStreamInterceptor(
grpcutil.StreamServerTimeHandler(listener.bucketRequestTimesGRPC),
grpcutil.StreamServerStatusMetricHandler(statusCodes, listener.prometheus.request),
listener.StreamServerRatelimitHandler(),
))
grpcServer := grpc.NewServer(opts...) //skipcq: GO-S0902
grpcv2.RegisterCarbonV2Server(grpcServer, listener)
go grpcServer.Serve(listener.grpcListener)
return nil
}

func (listener *CarbonserverListener) StreamServerRatelimitHandler() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
t0 := time.Now()
fullMethodName := info.FullMethod
wss := grpcutil.GetWrappedStream(ss)
ratelimiter := listener.getPathRateLimiter(fullMethodName) // Can't use http.TimeoutHandler here due to supporting per-path timeout
var newTimeout time.Duration
if listener.requestTimeout > 0 {
newTimeout = listener.requestTimeout
}
if ratelimiter != nil && ratelimiter.timeout > 0 {
newTimeout = ratelimiter.timeout
}
if newTimeout > 0 {
ctx, cancel := context.WithTimeout(wss.Context(), newTimeout)
defer cancel()
wss.SetContext(ctx)
}

ctx := wss.Context()
var reqPeer string
if p, ok := peer.FromContext(ctx); ok {
reqPeer = p.Addr.String()
}
accessLogger := TraceContextToZap(ctx, listener.accessLogger.With(
zap.String("handler", "rate_limit"),
zap.String("payload", wss.Payload()),
zap.String("peer", reqPeer),
))

if listener.shouldBlockForIndex() {
accessLogger.Error("request denied",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "index not ready"),
zap.Int("grpc_code", int(codes.Unavailable)),
)
return status.Error(codes.Unavailable, "Service unavailable (index not ready)")
}

if ratelimiter != nil {
if ratelimiter.Enter() != nil {
accessLogger.Error("request blocked",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "blocked by api per path rate limiter"),
zap.Int("grpc_code", int(codes.InvalidArgument)),
)
return status.Error(codes.InvalidArgument, "blocked by api per path rate limiter")
}
defer ratelimiter.Exit()

// why: if the request is already timeout, there is no
// need to resume execution.
if listener.checkRequestCtx(ctx) != nil {
accessLogger.Error("request timeout due to per url rate limiting",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "timeout due to per url rate limiting"),
zap.Int("grpc_code", int(codes.ResourceExhausted)),
)
return status.Error(codes.ResourceExhausted, "timeout due to maxInflightRequests")
}
}

// TODO: to deprecate as it's replaced by per-path rate limiting?
//
// rate limit inflight requests
inflights := atomic.AddUint64(&listener.metrics.InflightRequests, 1)
defer atomic.AddUint64(&listener.metrics.InflightRequests, ^uint64(0))
if listener.MaxInflightRequests > 0 && inflights > listener.MaxInflightRequests {
atomic.AddUint64(&listener.metrics.RejectedTooManyRequests, 1)
accessLogger.Error("request denied",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "too many requests"),
zap.Int("grpc_code", http.StatusTooManyRequests),
)
return status.Error(codes.ResourceExhausted, "too many requests")
}

return handler(srv, ss)
}
}
22 changes: 2 additions & 20 deletions carbonserver/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,18 +480,9 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg
}
}

select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
listener.prometheus.timeoutRequest()
case context.Canceled:
listener.prometheus.cancelledRequest()
}

if listener.checkRequestCtx(ctx) != nil {
err := fmt.Errorf("time out while preparing data proto")
return fetchResponse{nil, contentType, 0, 0, 0, nil}, err
default:
}

if format == protoV2Format || format == jsonFormat {
Expand Down Expand Up @@ -699,17 +690,8 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
}
}

select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
listener.prometheus.timeoutRequest()
case context.Canceled:
listener.prometheus.cancelledRequest()
}

if listener.checkRequestCtx(ctx) != nil {
return status.New(codes.DeadlineExceeded, "time out while preparing data proto").Err()
default:
}

if metricsFetched == 0 && !listener.emptyResultOk {
Expand Down
7 changes: 7 additions & 0 deletions go-carbon.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,13 @@ stats-percentiles = [99, 98, 95, 75, 50]
# [[carbonserver.api-per-path-rate-limiters]]
# path = "/metrics/list_query/"
# max-inflight-requests = 3
#
# For gRPC rpcs, path should be full method name.
#
# [[carbonserver.api-per-path-rate-limiters]]
# path = "/carbonapi_v2_grpc.CarbonV2/Render"
# max-inflight-requests = 10
# request-timeout = "40s"

# carbonserver.grpc is the configuration for listening for grpc clients.
# Note: currently, only CarbonV2 Render rpc is implemented.
Expand Down
Loading

0 comments on commit 97b6a90

Please sign in to comment.