Skip to content

Commit

Permalink
Merge pull request #483 from go-graphite/emadolsky/grpc-interceptors
Browse files Browse the repository at this point in the history
gRPC interceptors
  • Loading branch information
emadolsky committed Aug 11, 2022
2 parents 6dd9e73 + 28f0b52 commit 307d623
Show file tree
Hide file tree
Showing 5 changed files with 343 additions and 70 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,13 @@ stats-percentiles = [99, 98, 95, 75, 50]
# [[carbonserver.api-per-path-rate-limiters]]
# path = "/metrics/list_query/"
# max-inflight-requests = 3
#
# For gRPC rpcs, path should be full method name.
#
# [[carbonserver.api-per-path-rate-limiters]]
# path = "/carbonapi_v2_grpc.CarbonV2/Render"
# max-inflight-requests = 10
# request-timeout = "40s"

# carbonserver.grpc is the configuration for listening for grpc clients.
# Note: currently, only CarbonV2 Render rpc is implemented.
Expand Down
242 changes: 192 additions & 50 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ import (

prom "github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

"github.com/NYTimes/gziphandler"
"github.com/dgryski/go-expirecache"
"github.com/dgryski/go-trigram"
"github.com/dgryski/httputil"
"github.com/go-graphite/go-carbon/helper"
"github.com/go-graphite/go-carbon/helper/grpcutil"
"github.com/go-graphite/go-carbon/helper/stat"
"github.com/go-graphite/go-carbon/points"
grpcv2 "github.com/go-graphite/protocol/carbonapi_v2_grpc"
Expand Down Expand Up @@ -179,7 +183,7 @@ func (q *QueryItem) FetchOrLock() (interface{}, bool) {
return nil, false
}

select { //nolint:gosimple
select { //nolint:gosimple //skipcq: SCC-S1000
// TODO: Add timeout support
case <-q.QueryFinished:
break
Expand Down Expand Up @@ -225,6 +229,7 @@ type CarbonserverListener struct {
failOnMaxGlobs bool
percentiles []int
scanFrequency time.Duration
scanTicker *time.Ticker
forceScanChan chan struct{}
metricsAsCounters bool
tcpListener *net.TCPListener
Expand Down Expand Up @@ -443,7 +448,7 @@ type jsonMetricDetailsResponse struct {
}

type fileIndex struct {
typ int //nolint:unused,structcheck
typ int //nolint:unused,structcheck //skipcq: SCC-U1000

idx trigram.Index
files []string
Expand Down Expand Up @@ -1248,19 +1253,10 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str
}()

// why: no need to continue execution if the request is already timeout.
select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
listener.prometheus.timeoutRequest()
case context.Canceled:
listener.prometheus.cancelledRequest()
}

if listener.checkRequestCtx(ctx) != nil {
err := fmt.Errorf("time out due to heavy glob query rate limiter: %s", rl.pattern.String())
resultCh <- &ExpandedGlobResponse{query, nil, nil, nil, err}
return
default:
}

break
Expand All @@ -1269,7 +1265,7 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str
logger := TraceContextToZap(ctx, listener.logger)
matchedCount := 0
defer func(start time.Time) {
dur := time.Now().Sub(start) //nolint:gosimple
dur := time.Since(start)
if dur <= time.Second {
return
}
Expand Down Expand Up @@ -1566,6 +1562,9 @@ func (listener *CarbonserverListener) Stat(send helper.StatCallback) {

func (listener *CarbonserverListener) Stop() error {
close(listener.forceScanChan)
if listener.scanTicker != nil {
listener.scanTicker.Stop()
}
close(listener.exitChan)
if listener.db != nil {
listener.db.Close()
Expand Down Expand Up @@ -1635,16 +1634,46 @@ func (listener *CarbonserverListener) initStatsDB() error {
return nil
}

func (listener *CarbonserverListener) getPathRateLimiter(path string) *ApiPerPathRatelimiter {
rl, ok := listener.apiPerPathRatelimiter[path]
if ok {
return rl
}
return nil
}

func (listener *CarbonserverListener) checkRequestCtx(ctx context.Context) error {
select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
listener.prometheus.timeoutRequest()
case context.Canceled:
listener.prometheus.cancelledRequest()
}
return errors.New("context is done")
default:
}
return nil
}

func (listener *CarbonserverListener) shouldBlockForIndex() bool {
return listener.NoServiceWhenIndexIsNotReady && listener.CurrentFileIndex() == nil
}

func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http.HandlerFunc {
return func(wr http.ResponseWriter, req *http.Request) {
ratelimiter := listener.getPathRateLimiter(req.URL.Path)
// Can't use http.TimeoutHandler here due to supporting per-path timeout
if ratelimiter, ok := listener.apiPerPathRatelimiter[req.URL.Path]; listener.requestTimeout > 0 || (ok && ratelimiter.timeout > 0) {
timeout := listener.requestTimeout
if ok && ratelimiter.timeout > 0 {
timeout = ratelimiter.timeout
}

ctx, cancel := context.WithTimeout(req.Context(), timeout)
var newTimeout time.Duration
if listener.requestTimeout > 0 {
newTimeout = listener.requestTimeout
}
if ratelimiter != nil && ratelimiter.timeout > 0 {
newTimeout = ratelimiter.timeout
}
if newTimeout > 0 {
ctx, cancel := context.WithTimeout(req.Context(), newTimeout)
defer cancel()
req = req.WithContext(ctx)
}
Expand All @@ -1657,7 +1686,7 @@ func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http.
zap.String("peer", req.RemoteAddr),
))

if listener.NoServiceWhenIndexIsNotReady && listener.CurrentFileIndex() == nil {
if listener.shouldBlockForIndex() {
accessLogger.Error("request denied",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "index not ready"),
Expand All @@ -1667,39 +1696,23 @@ func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http.
return
}

if ratelimiter, ok := listener.apiPerPathRatelimiter[req.URL.Path]; ok {
if cap(ratelimiter.maxInflightRequests) == 0 {
if ratelimiter != nil {
if ratelimiter.Enter() != nil {
http.Error(wr, "Bad request (blocked by api per path rate limiter)", http.StatusBadRequest)
return
}

ratelimiter.maxInflightRequests <- struct{}{}
defer func() {
select {
case <-ratelimiter.maxInflightRequests:
default:
}
}()
defer ratelimiter.Exit()

// why: if the request is already timeout, there is no
// need to resume execution.
select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
listener.prometheus.timeoutRequest()
case context.Canceled:
listener.prometheus.cancelledRequest()
}

if listener.checkRequestCtx(ctx) != nil {
accessLogger.Error("request timeout due to per url rate limiting",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "timeout due to per url rate limiting"),
zap.Int("http_code", http.StatusRequestTimeout),
)
http.Error(wr, "Bad request (timeout due to maxInflightRequests)", http.StatusRequestTimeout)
return
default:
}
}

Expand Down Expand Up @@ -1737,7 +1750,8 @@ func (listener *CarbonserverListener) Listen(listen string) error {
listener.exitChan = make(chan struct{})
if (listener.trigramIndex || listener.trieIndex) && listener.scanFrequency != 0 {
listener.forceScanChan = make(chan struct{})
go listener.fileListUpdater(listener.whisperData, time.Tick(listener.scanFrequency), listener.forceScanChan, listener.exitChan) //nolint:staticcheck
listener.scanTicker = time.NewTicker(listener.scanFrequency)
go listener.fileListUpdater(listener.whisperData, listener.scanTicker.C, listener.forceScanChan, listener.exitChan) //nolint:staticcheck
listener.forceScanChan <- struct{}{}
}

Expand All @@ -1756,7 +1770,7 @@ func (listener *CarbonserverListener) Listen(listen string) error {
handlerStatusCodes,
listener.prometheus.request,
),
listener.bucketRequestTimes,
listener.bucketRequestTimesHTTP,
),
)
}
Expand Down Expand Up @@ -1889,7 +1903,27 @@ func (listener *CarbonserverListener) Listen(listen string) error {
return nil
}

func (listener *CarbonserverListener) bucketRequestTimes(req *http.Request, t time.Duration) {
func (listener *CarbonserverListener) bucketRequestTimesHTTP(req *http.Request, t time.Duration) {
bucket := listener.bucketRequestTimes(t)
if bucket >= listener.buckets {
listener.logger.Info("slow request",
zap.String("url", req.URL.RequestURI()),
zap.String("peer", req.RemoteAddr),
)
}
}

func (listener *CarbonserverListener) bucketRequestTimesGRPC(payload, peer string, t time.Duration) {
bucket := listener.bucketRequestTimes(t)
if bucket >= listener.buckets {
listener.logger.Info("slow request",
zap.String("payload", payload),
zap.String("peer", peer),
)
}
}

func (listener *CarbonserverListener) bucketRequestTimes(t time.Duration) int {
listener.prometheus.duration(t)

ms := t.Nanoseconds() / int64(time.Millisecond)
Expand All @@ -1909,13 +1943,10 @@ func (listener *CarbonserverListener) bucketRequestTimes(req *http.Request, t ti
if bucket < listener.buckets {
atomic.AddUint64(&listener.timeBuckets[bucket], 1)
} else {
// Too big? Increment overflow bucket and log
// Too big? Increment overflow bucket
atomic.AddUint64(&listener.timeBuckets[listener.buckets], 1)
listener.logger.Info("slow request",
zap.String("url", req.URL.RequestURI()),
zap.String("peer", req.RemoteAddr),
)
}
return bucket
}

func extractTrigrams(query string) []trigram.T {
Expand Down Expand Up @@ -1983,6 +2014,32 @@ func NewApiPerPathRatelimiter(maxInflightRequests uint, timeout time.Duration) *
}
}

type RatelimiterError string

func (re RatelimiterError) Error() string {
return string(re)
}

var (
BlockedRatelimitError = RatelimiterError("blocked by api per path rate limiter")
)

func (a *ApiPerPathRatelimiter) Enter() error {
if cap(a.maxInflightRequests) == 0 {
return BlockedRatelimitError
}

a.maxInflightRequests <- struct{}{}
return nil
}

func (a *ApiPerPathRatelimiter) Exit() {
select {
case <-a.maxInflightRequests:
default:
}
}

func (listener *CarbonserverListener) ListenGRPC(listen string) error {
var err error
var grpcAddr *net.TCPAddr
Expand All @@ -1997,9 +2054,94 @@ func (listener *CarbonserverListener) ListenGRPC(listen string) error {
}

var opts []grpc.ServerOption

opts = append(opts, grpc.ChainStreamInterceptor(
grpcutil.StreamServerTimeHandler(listener.bucketRequestTimesGRPC),
grpcutil.StreamServerStatusMetricHandler(statusCodes, listener.prometheus.request),
listener.StreamServerRatelimitHandler(),
))
grpcServer := grpc.NewServer(opts...) //skipcq: GO-S0902
grpcv2.RegisterCarbonV2Server(grpcServer, listener)
go grpcServer.Serve(listener.grpcListener)
return nil
}

func (listener *CarbonserverListener) StreamServerRatelimitHandler() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
t0 := time.Now()
fullMethodName := info.FullMethod
wss := grpcutil.GetWrappedStream(ss)
ratelimiter := listener.getPathRateLimiter(fullMethodName) // Can't use http.TimeoutHandler here due to supporting per-path timeout
var newTimeout time.Duration
if listener.requestTimeout > 0 {
newTimeout = listener.requestTimeout
}
if ratelimiter != nil && ratelimiter.timeout > 0 {
newTimeout = ratelimiter.timeout
}
if newTimeout > 0 {
ctx, cancel := context.WithTimeout(wss.Context(), newTimeout)
defer cancel()
wss.SetContext(ctx)
}

ctx := wss.Context()
var reqPeer string
if p, ok := peer.FromContext(ctx); ok {
reqPeer = p.Addr.String()
}
accessLogger := TraceContextToZap(ctx, listener.accessLogger.With(
zap.String("handler", "rate_limit"),
zap.String("payload", wss.Payload()),
zap.String("peer", reqPeer),
))

if listener.shouldBlockForIndex() {
accessLogger.Error("request denied",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "index not ready"),
zap.Int("grpc_code", int(codes.Unavailable)),
)
return status.Error(codes.Unavailable, "Service unavailable (index not ready)")
}

if ratelimiter != nil {
if ratelimiter.Enter() != nil {
accessLogger.Error("request blocked",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "blocked by api per path rate limiter"),
zap.Int("grpc_code", int(codes.InvalidArgument)),
)
return status.Error(codes.InvalidArgument, "blocked by api per path rate limiter")
}
defer ratelimiter.Exit()

// why: if the request is already timeout, there is no
// need to resume execution.
if listener.checkRequestCtx(ctx) != nil {
accessLogger.Error("request timeout due to per url rate limiting",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "timeout due to per url rate limiting"),
zap.Int("grpc_code", int(codes.ResourceExhausted)),
)
return status.Error(codes.ResourceExhausted, "timeout due to maxInflightRequests")
}
}

// TODO: to deprecate as it's replaced by per-path rate limiting?
//
// rate limit inflight requests
inflights := atomic.AddUint64(&listener.metrics.InflightRequests, 1)
defer atomic.AddUint64(&listener.metrics.InflightRequests, ^uint64(0))
if listener.MaxInflightRequests > 0 && inflights > listener.MaxInflightRequests {
atomic.AddUint64(&listener.metrics.RejectedTooManyRequests, 1)
accessLogger.Error("request denied",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "too many requests"),
zap.Int("grpc_code", http.StatusTooManyRequests),
)
return status.Error(codes.ResourceExhausted, "too many requests")
}

return handler(srv, ss)
}
}
Loading

0 comments on commit 307d623

Please sign in to comment.