Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC interceptors #483

Merged
merged 5 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,13 @@ stats-percentiles = [99, 98, 95, 75, 50]
# [[carbonserver.api-per-path-rate-limiters]]
# path = "/metrics/list_query/"
# max-inflight-requests = 3
#
# For gRPC rpcs, path should be full method name.
#
# [[carbonserver.api-per-path-rate-limiters]]
# path = "/carbonapi_v2_grpc.CarbonV2/Render"
# max-inflight-requests = 10
# request-timeout = "40s"

# carbonserver.grpc is the configuration for listening for grpc clients.
# Note: currently, only CarbonV2 Render rpc is implemented.
Expand Down
242 changes: 192 additions & 50 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ import (

prom "github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

"github.com/NYTimes/gziphandler"
"github.com/dgryski/go-expirecache"
"github.com/dgryski/go-trigram"
"github.com/dgryski/httputil"
"github.com/go-graphite/go-carbon/helper"
"github.com/go-graphite/go-carbon/helper/grpcutil"
"github.com/go-graphite/go-carbon/helper/stat"
"github.com/go-graphite/go-carbon/points"
grpcv2 "github.com/go-graphite/protocol/carbonapi_v2_grpc"
Expand Down Expand Up @@ -178,7 +182,7 @@ func (q *QueryItem) FetchOrLock() (interface{}, bool) {
return nil, false
}

select { //nolint:gosimple
select { //nolint:gosimple //skipcq: SCC-S1000
// TODO: Add timeout support
case <-q.QueryFinished:
break
Expand Down Expand Up @@ -224,6 +228,7 @@ type CarbonserverListener struct {
failOnMaxGlobs bool
percentiles []int
scanFrequency time.Duration
scanTicker *time.Ticker
forceScanChan chan struct{}
metricsAsCounters bool
tcpListener *net.TCPListener
Expand Down Expand Up @@ -442,7 +447,7 @@ type jsonMetricDetailsResponse struct {
}

type fileIndex struct {
typ int //nolint:unused,structcheck
typ int //nolint:unused,structcheck //skipcq: SCC-U1000

idx trigram.Index
files []string
Expand Down Expand Up @@ -1247,19 +1252,10 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str
}()

// why: no need to continue execution if the request is already timeout.
select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
listener.prometheus.timeoutRequest()
case context.Canceled:
listener.prometheus.cancelledRequest()
}

if listener.checkRequestCtx(ctx) != nil {
err := fmt.Errorf("time out due to heavy glob query rate limiter: %s", rl.pattern.String())
resultCh <- &ExpandedGlobResponse{query, nil, nil, err}
return
default:
}

break
Expand All @@ -1268,7 +1264,7 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str
logger := TraceContextToZap(ctx, listener.logger)
matchedCount := 0
defer func(start time.Time) {
dur := time.Now().Sub(start) //nolint:gosimple
dur := time.Since(start)
if dur <= time.Second {
return
}
Expand Down Expand Up @@ -1565,6 +1561,9 @@ func (listener *CarbonserverListener) Stat(send helper.StatCallback) {

func (listener *CarbonserverListener) Stop() error {
close(listener.forceScanChan)
if listener.scanTicker != nil {
listener.scanTicker.Stop()
}
close(listener.exitChan)
if listener.db != nil {
listener.db.Close()
Expand Down Expand Up @@ -1634,16 +1633,46 @@ func (listener *CarbonserverListener) initStatsDB() error {
return nil
}

func (listener *CarbonserverListener) getPathRateLimiter(path string) *ApiPerPathRatelimiter {
rl, ok := listener.apiPerPathRatelimiter[path]
if ok {
return rl
}
return nil
}

func (listener *CarbonserverListener) checkRequestCtx(ctx context.Context) error {
select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
listener.prometheus.timeoutRequest()
case context.Canceled:
listener.prometheus.cancelledRequest()
}
return errors.New("context is done")
default:
}
return nil
}

func (listener *CarbonserverListener) shouldBlockForIndex() bool {
return listener.NoServiceWhenIndexIsNotReady && listener.CurrentFileIndex() == nil
}

func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http.HandlerFunc {
return func(wr http.ResponseWriter, req *http.Request) {
ratelimiter := listener.getPathRateLimiter(req.URL.Path)
// Can't use http.TimeoutHandler here due to supporting per-path timeout
if ratelimiter, ok := listener.apiPerPathRatelimiter[req.URL.Path]; listener.requestTimeout > 0 || (ok && ratelimiter.timeout > 0) {
timeout := listener.requestTimeout
if ok && ratelimiter.timeout > 0 {
timeout = ratelimiter.timeout
}

ctx, cancel := context.WithTimeout(req.Context(), timeout)
var newTimeout time.Duration
if listener.requestTimeout > 0 {
newTimeout = listener.requestTimeout
}
if ratelimiter != nil && ratelimiter.timeout > 0 {
newTimeout = ratelimiter.timeout
}
if newTimeout > 0 {
ctx, cancel := context.WithTimeout(req.Context(), newTimeout)
defer cancel()
req = req.WithContext(ctx)
}
Expand All @@ -1656,7 +1685,7 @@ func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http.
zap.String("peer", req.RemoteAddr),
))

if listener.NoServiceWhenIndexIsNotReady && listener.CurrentFileIndex() == nil {
if listener.shouldBlockForIndex() {
accessLogger.Error("request denied",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "index not ready"),
Expand All @@ -1666,39 +1695,23 @@ func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http.
return
}

if ratelimiter, ok := listener.apiPerPathRatelimiter[req.URL.Path]; ok {
if cap(ratelimiter.maxInflightRequests) == 0 {
if ratelimiter != nil {
if ratelimiter.Enter() != nil {
http.Error(wr, "Bad request (blocked by api per path rate limiter)", http.StatusBadRequest)
return
}

ratelimiter.maxInflightRequests <- struct{}{}
defer func() {
select {
case <-ratelimiter.maxInflightRequests:
default:
}
}()
defer ratelimiter.Exit()

// why: if the request is already timeout, there is no
// need to resume execution.
select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
listener.prometheus.timeoutRequest()
case context.Canceled:
listener.prometheus.cancelledRequest()
}

if listener.checkRequestCtx(ctx) != nil {
accessLogger.Error("request timeout due to per url rate limiting",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "timeout due to per url rate limiting"),
zap.Int("http_code", http.StatusRequestTimeout),
)
http.Error(wr, "Bad request (timeout due to maxInflightRequests)", http.StatusRequestTimeout)
return
default:
}
}

Expand Down Expand Up @@ -1736,7 +1749,8 @@ func (listener *CarbonserverListener) Listen(listen string) error {
listener.exitChan = make(chan struct{})
if (listener.trigramIndex || listener.trieIndex) && listener.scanFrequency != 0 {
listener.forceScanChan = make(chan struct{})
go listener.fileListUpdater(listener.whisperData, time.Tick(listener.scanFrequency), listener.forceScanChan, listener.exitChan) //nolint:staticcheck
listener.scanTicker = time.NewTicker(listener.scanFrequency)
go listener.fileListUpdater(listener.whisperData, listener.scanTicker.C, listener.forceScanChan, listener.exitChan) //nolint:staticcheck
listener.forceScanChan <- struct{}{}
}

Expand All @@ -1755,7 +1769,7 @@ func (listener *CarbonserverListener) Listen(listen string) error {
handlerStatusCodes,
listener.prometheus.request,
),
listener.bucketRequestTimes,
listener.bucketRequestTimesHTTP,
),
)
}
Expand Down Expand Up @@ -1888,7 +1902,27 @@ func (listener *CarbonserverListener) Listen(listen string) error {
return nil
}

func (listener *CarbonserverListener) bucketRequestTimes(req *http.Request, t time.Duration) {
func (listener *CarbonserverListener) bucketRequestTimesHTTP(req *http.Request, t time.Duration) {
bucket := listener.bucketRequestTimes(t)
if bucket >= listener.buckets {
listener.logger.Info("slow request",
zap.String("url", req.URL.RequestURI()),

Check failure

Code scanning / CodeQL

Log entries created from user input

This log write receives unsanitized user input from [here](1).
zap.String("peer", req.RemoteAddr),
)
}
}

func (listener *CarbonserverListener) bucketRequestTimesGRPC(payload, peer string, t time.Duration) {
bucket := listener.bucketRequestTimes(t)
if bucket >= listener.buckets {
listener.logger.Info("slow request",
zap.String("payload", payload),
zap.String("peer", peer),
)
}
}

func (listener *CarbonserverListener) bucketRequestTimes(t time.Duration) int {
listener.prometheus.duration(t)

ms := t.Nanoseconds() / int64(time.Millisecond)
Expand All @@ -1908,13 +1942,10 @@ func (listener *CarbonserverListener) bucketRequestTimes(req *http.Request, t ti
if bucket < listener.buckets {
atomic.AddUint64(&listener.timeBuckets[bucket], 1)
} else {
// Too big? Increment overflow bucket and log
// Too big? Increment overflow bucket
atomic.AddUint64(&listener.timeBuckets[listener.buckets], 1)
listener.logger.Info("slow request",
zap.String("url", req.URL.RequestURI()),
zap.String("peer", req.RemoteAddr),
)
}
return bucket
}

func extractTrigrams(query string) []trigram.T {
Expand Down Expand Up @@ -1982,6 +2013,32 @@ func NewApiPerPathRatelimiter(maxInflightRequests uint, timeout time.Duration) *
}
}

type RatelimiterError string

func (re RatelimiterError) Error() string {
return string(re)
}

var (
BlockedRatelimitError = RatelimiterError("blocked by api per path rate limiter")
)

func (a *ApiPerPathRatelimiter) Enter() error {
if cap(a.maxInflightRequests) == 0 {
return BlockedRatelimitError
}

a.maxInflightRequests <- struct{}{}
return nil
}

func (a *ApiPerPathRatelimiter) Exit() {
select {
case <-a.maxInflightRequests:
default:
}
}

func (listener *CarbonserverListener) ListenGRPC(listen string) error {
var err error
var grpcAddr *net.TCPAddr
Expand All @@ -1996,9 +2053,94 @@ func (listener *CarbonserverListener) ListenGRPC(listen string) error {
}

var opts []grpc.ServerOption

opts = append(opts, grpc.ChainStreamInterceptor(
grpcutil.StreamServerTimeHandler(listener.bucketRequestTimesGRPC),
grpcutil.StreamServerStatusMetricHandler(statusCodes, listener.prometheus.request),
listener.StreamServerRatelimitHandler(),
))
grpcServer := grpc.NewServer(opts...) //skipcq: GO-S0902
grpcv2.RegisterCarbonV2Server(grpcServer, listener)
go grpcServer.Serve(listener.grpcListener)
return nil
}

func (listener *CarbonserverListener) StreamServerRatelimitHandler() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
t0 := time.Now()
fullMethodName := info.FullMethod
wss := grpcutil.GetWrappedStream(ss)
ratelimiter := listener.getPathRateLimiter(fullMethodName) // Can't use http.TimeoutHandler here due to supporting per-path timeout
var newTimeout time.Duration
if listener.requestTimeout > 0 {
newTimeout = listener.requestTimeout
}
if ratelimiter != nil && ratelimiter.timeout > 0 {
newTimeout = ratelimiter.timeout
}
if newTimeout > 0 {
ctx, cancel := context.WithTimeout(wss.Context(), newTimeout)
defer cancel()
wss.SetContext(ctx)
}

ctx := wss.Context()
var reqPeer string
if p, ok := peer.FromContext(ctx); ok {
reqPeer = p.Addr.String()
}
accessLogger := TraceContextToZap(ctx, listener.accessLogger.With(
zap.String("handler", "rate_limit"),
zap.String("payload", wss.Payload()),
zap.String("peer", reqPeer),
))

if listener.shouldBlockForIndex() {
accessLogger.Error("request denied",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "index not ready"),
zap.Int("grpc_code", int(codes.Unavailable)),
)
return status.Error(codes.Unavailable, "Service unavailable (index not ready)")
}

if ratelimiter != nil {
if ratelimiter.Enter() != nil {
accessLogger.Error("request blocked",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "blocked by api per path rate limiter"),
zap.Int("grpc_code", int(codes.InvalidArgument)),
)
return status.Error(codes.InvalidArgument, "blocked by api per path rate limiter")
}
defer ratelimiter.Exit()

// why: if the request is already timeout, there is no
// need to resume execution.
if listener.checkRequestCtx(ctx) != nil {
accessLogger.Error("request timeout due to per url rate limiting",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "timeout due to per url rate limiting"),
zap.Int("grpc_code", int(codes.ResourceExhausted)),
)
return status.Error(codes.ResourceExhausted, "timeout due to maxInflightRequests")
}
}

// TODO: to deprecate as it's replaced by per-path rate limiting?
//
// rate limit inflight requests
inflights := atomic.AddUint64(&listener.metrics.InflightRequests, 1)
defer atomic.AddUint64(&listener.metrics.InflightRequests, ^uint64(0))
if listener.MaxInflightRequests > 0 && inflights > listener.MaxInflightRequests {
atomic.AddUint64(&listener.metrics.RejectedTooManyRequests, 1)
accessLogger.Error("request denied",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "too many requests"),
zap.Int("grpc_code", http.StatusTooManyRequests),
)
return status.Error(codes.ResourceExhausted, "too many requests")
}

return handler(srv, ss)
}
}
Loading