Skip to content

Commit

Permalink
carbonserver: introducing request-timeout, heavy-glob-query-rate-limi…
Browse files Browse the repository at this point in the history
…ters and api-per-path-rate-limiters for read traffic regulation

Three new types of read path/traffic control configs are introduced in this commit:

* request-timeout: it is designed to control how long each api call in
  carbonserver can run. The existing timeouts like read, write and idle are for
  http.Server, but this new timeout is for each API call in carbonserver.

* heavy-glob-query-rate-limiters are relatively narrow controls against queries
  that might causes high cpu and memory consumption due to matching over too
  many metrics or nodes at the same time, queries
  like: "*.*.*.*.*.*.*.*.*keyword*". For these types of queries, trigram might
  be able to handle it better, but for trie and filesystem glob, it's might be too
  expensive when the index tree is large.

* api-per-path-rate-limiters are used for strict api call rate limiting. All
  registered API paths (see carbonserver.Listen for a full list) can be
  controlled separately with it.
  • Loading branch information
bom-d-van committed Jun 2, 2022
1 parent 8c1ea97 commit 9a56947
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 8 deletions.
60 changes: 60 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ metrics-as-counters = false
# Read and Write timeouts for HTTP server
read-timeout = "60s"
write-timeout = "60s"
# Request timeout for each API call
request-timeout = "60s"
# Enable /render cache, it will cache the result for 1 minute
query-cache-enabled = true
# 0 for unlimited
Expand Down Expand Up @@ -449,6 +451,64 @@ internal-stats-dir = ""
# To disable this feature, leave the list blank
stats-percentiles = [99, 98, 95, 75, 50]

# heavy-glob-query-rate-limiters is a narrow control against queries that might
# causes high cpu and memory consumption due to matching over too many metrics
# or nodes at the same time, queries like: "*.*.*.*.*.*.*.*.*keyword*". For
# these types of queries, trigram might be able to handle it better, but for
# trie and filesystem glob, it's too expensive.
#
# pattern is a Go regular expression: https://pkg.go.dev/regexp/syntax.
#
# When max-inflight-requests is set to 0, it means instant rejection.
# When max-inflight-requests is set as a positive integer and when there are too
# many concurrent requests, it would block/delay the request until the previous ones
# are completed.
#
# The configs are order sensitive and are applied top down. The current
# implementation is in an O(n) so it's advised not to apply too many rules here
# as they are applied on all the queries.
#
# [[carbonserver.heavy-glob-query-rate-limiters]]
# pattern = "^(\*\.){5,}"
# max-inflight-requests = 1
#
# [[carbonserver.heavy-glob-query-rate-limiters]]
# pattern = "^sys(\*\.){7,}"
# max-inflight-requests = 0

# api-per-path-rate-limiters are used for strict api call rate limiting. All
# registered API paths (see carbonserver.Listen for a full list) can be
# controlled separately here:
#
# "/_internal/capabilities/"
# "/metrics/find/"
# "/metrics/list/"
# "/metrics/list_query/"
# "/metrics/details/"
# "/render/"
# "/info/"
# "/forcescan"
# "/admin/quota"
# "/admin/info"
# "/robots.txt"
# ...
#
# When max-inflight-requests is set to 0, it means instant rejection.
# When max-inflight-requests is set as a positive integer and when there are too
# many concurrent requests, it would block/delay the request until the previous
# ones are completed.
#
# request-timeout would override the global request-timeout.
#
# [[carbonserver.api-per-path-rate-limiters]]
# path = "/metrics/list/"
# max-inflight-requests = 1
# request-timeout = 600s
#
# [[carbonserver.api-per-path-rate-limiters]]
# path = "/metrics/list_query/"
# max-inflight-requests = 3

[dump]
# Enable dump/restore function on USR2 signal
enabled = false
Expand Down
26 changes: 26 additions & 0 deletions carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"runtime"
"strings"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand Down Expand Up @@ -459,6 +460,23 @@ func (app *App) Start(version string) (err error) {
}
}

apiPerPathRateLimiters := map[string]*carbonserver.ApiPerPathRatelimiter{}
for _, rl := range conf.Carbonserver.APIPerPathRateLimiters {
var timeout time.Duration
if rl.RequestTimeout != nil {
timeout = rl.RequestTimeout.Value()
}
apiPerPathRateLimiters[rl.Path] = carbonserver.NewApiPerPathRatelimiter(rl.MaxInflightRequests, timeout)
}
var globQueryRateLimiters []*carbonserver.GlobQueryRateLimiter
for _, rl := range conf.Carbonserver.HeavyGlobQueryRateLimiters {
gqrl, err := carbonserver.NewGlobQueryRateLimiter(rl.Pattern, rl.MaxInflightRequests)
if err != nil {
return fmt.Errorf("failed to init Carbonserver.HeavyGlobQueryRateLimiters %s: %s", rl.Pattern, err)
}
globQueryRateLimiters = append(globQueryRateLimiters, gqrl)
}

// TODO: refactor: do not use var name the same as pkg name
carbonserver := carbonserver.NewCarbonserverListener(core.Get)
carbonserver.SetWhisperData(conf.Whisper.DataDir)
Expand All @@ -477,6 +495,7 @@ func (app *App) Start(version string) (err error) {
carbonserver.SetReadTimeout(conf.Carbonserver.ReadTimeout.Value())
carbonserver.SetIdleTimeout(conf.Carbonserver.IdleTimeout.Value())
carbonserver.SetWriteTimeout(conf.Carbonserver.WriteTimeout.Value())
carbonserver.SetRequestTimeout(conf.Carbonserver.RequestTimeout.Value())
carbonserver.SetQueryCacheEnabled(conf.Carbonserver.QueryCacheEnabled)
carbonserver.SetFindCacheEnabled(conf.Carbonserver.FindCacheEnabled)
carbonserver.SetQueryCacheSizeMB(conf.Carbonserver.QueryCacheSizeMB)
Expand All @@ -491,6 +510,13 @@ func (app *App) Start(version string) (err error) {
carbonserver.SetMaxInflightRequests(conf.Carbonserver.MaxInflightRequests)
carbonserver.SetNoServiceWhenIndexIsNotReady(conf.Carbonserver.NoServiceWhenIndexIsNotReady)

if len(apiPerPathRateLimiters) > 0 {
carbonserver.SetAPIPerPathRateLimiter(apiPerPathRateLimiters)
}
if len(globQueryRateLimiters) > 0 {
carbonserver.SetHeavyGlobQueryRateLimiters(globQueryRateLimiters)
}

if app.Config.Whisper.Quotas != nil {
if !conf.Carbonserver.ConcurrentIndex || conf.Carbonserver.RealtimeIndex <= 0 {
return errors.New("concurrent-index and realtime-index needs to be enabled for quota control.")
Expand Down
21 changes: 19 additions & 2 deletions carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type carbonserverConfig struct {
ReadTimeout *Duration `toml:"read-timeout"`
IdleTimeout *Duration `toml:"idle-timeout"`
WriteTimeout *Duration `toml:"write-timeout"`
RequestTimeout *Duration `toml:"request-timeout"`
ScanFrequency *Duration `toml:"scan-frequency"`
QueryCacheEnabled bool `toml:"query-cache-enabled"`
QueryCacheSizeMB int `toml:"query-cache-size-mb"`
Expand All @@ -132,8 +133,24 @@ type carbonserverConfig struct {

QuotaUsageReportFrequency *Duration `toml:"quota-usage-report-frequency"`

MaxInflightRequests uint64 `toml:"max-inflight-requests"`
NoServiceWhenIndexIsNotReady bool `toml:"no-service-when-index-is-not-ready"`
NoServiceWhenIndexIsNotReady bool `toml:"no-service-when-index-is-not-ready"`

// TODO: depcreate, replaced by APIPerPathRateLimiters
MaxInflightRequests uint64 `toml:"max-inflight-requests"`

// Only applied on filtering phase where it might potentially causes
// high cpu and memory consumption due to matching over the whole index
// or file tree.
HeavyGlobQueryRateLimiters []struct {
Pattern string `toml:"pattern"`
MaxInflightRequests uint `toml:"max-inflight-requests"`
} `toml:"heavy-glob-query-rate-limiters"`

APIPerPathRateLimiters []struct {
Path string `toml:"path"`
MaxInflightRequests uint `toml:"max-inflight-requests"`
RequestTimeout *Duration `toml:"request-timeout"`
} `toml:"api-per-path-rate-limiters"`
}

type pprofConfig struct {
Expand Down
128 changes: 123 additions & 5 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
_ "net/http/pprof"
"os"
"path/filepath"
"regexp"
"runtime"
"runtime/debug"
"sort"
Expand Down Expand Up @@ -213,6 +214,7 @@ type CarbonserverListener struct {
readTimeout time.Duration
idleTimeout time.Duration
writeTimeout time.Duration
requestTimeout time.Duration
whisperData string
buckets int
maxGlobs int
Expand Down Expand Up @@ -268,8 +270,11 @@ type CarbonserverListener struct {

interfalInfoCallbacks map[string]func() map[string]interface{}

MaxInflightRequests uint64
// resource control
MaxInflightRequests uint64 // TODO: to deprecate
NoServiceWhenIndexIsNotReady bool
apiPerPathRatelimiter map[string]*ApiPerPathRatelimiter
globQueryRateLimiters []*GlobQueryRateLimiter
}

type prometheus struct {
Expand Down Expand Up @@ -468,7 +473,8 @@ func NewCarbonserverListener(cacheGetFunc func(key string) []points.Point) *Carb
returnedMetric: func() {},
returnedPoint: func(int) {},
},
quotaAndUsageMetrics: make(chan []points.Points, 1),
quotaAndUsageMetrics: make(chan []points.Points, 1),
apiPerPathRatelimiter: map[string]*ApiPerPathRatelimiter{},
}
}

Expand Down Expand Up @@ -511,6 +517,9 @@ func (listener *CarbonserverListener) SetIdleTimeout(idleTimeout time.Duration)
func (listener *CarbonserverListener) SetWriteTimeout(writeTimeout time.Duration) {
listener.writeTimeout = writeTimeout
}
func (listener *CarbonserverListener) SetRequestTimeout(requestTimeout time.Duration) {
listener.requestTimeout = requestTimeout
}
func (listener *CarbonserverListener) SetCompressed(compressed bool) {
listener.compressed = compressed
}
Expand Down Expand Up @@ -583,6 +592,12 @@ func (listener *CarbonserverListener) SetMaxInflightRequests(max uint64) {
func (listener *CarbonserverListener) SetNoServiceWhenIndexIsNotReady(no bool) {
listener.NoServiceWhenIndexIsNotReady = no
}
func (listener *CarbonserverListener) SetHeavyGlobQueryRateLimiters(rls []*GlobQueryRateLimiter) {
listener.globQueryRateLimiters = rls
}
func (listener *CarbonserverListener) SetAPIPerPathRateLimiter(rls map[string]*ApiPerPathRatelimiter) {
listener.apiPerPathRatelimiter = rls
}

// skipcq: RVV-B0011
func (listener *CarbonserverListener) CurrentFileIndex() *fileIndex {
Expand Down Expand Up @@ -1170,6 +1185,36 @@ func (listener *CarbonserverListener) expandGlobs(ctx context.Context, query str
}
}()

// Rate limit heavy globbing queries like: *.*.*.*keyword*.
//
// Why: it's expensive to scan the whole index while looking for
// keywords, especially for trie and file system glob.
for _, rl := range listener.globQueryRateLimiters {
if !rl.pattern.MatchString(query) {
continue
}
if cap(rl.maxInflightRequests) == 0 {
err := fmt.Errorf("rejected by query rate limiter: %s", rl.pattern.String())
resultCh <- &ExpandedGlobResponse{query, nil, nil, err}
return
}

rl.maxInflightRequests <- struct{}{}
defer func() {
<-rl.maxInflightRequests
}()

select {
case <-ctx.Done():
err := fmt.Errorf("time out due to query rate limiter: %s", rl.pattern.String())
resultCh <- &ExpandedGlobResponse{query, nil, nil, err}
return
default:
}

break
}

logger := TraceContextToZap(ctx, listener.logger)
matchedCount := 0
defer func(start time.Time) {
Expand Down Expand Up @@ -1540,6 +1585,18 @@ func (listener *CarbonserverListener) initStatsDB() error {

func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http.HandlerFunc {
return func(wr http.ResponseWriter, req *http.Request) {
// Can't use http.TimeoutHandler here due to supporting per-path timeout
if rl, ok := listener.apiPerPathRatelimiter[req.URL.Path]; listener.requestTimeout > 0 || (ok && rl.timeout > 0) {
timeout := listener.requestTimeout
if rl.timeout > 0 {
timeout = rl.timeout
}

ctx, cancel := context.WithTimeout(req.Context(), timeout)
defer cancel()
req = req.WithContext(ctx)
}

t0 := time.Now()
ctx := req.Context()
accessLogger := TraceContextToZap(ctx, listener.accessLogger.With(
Expand All @@ -1558,10 +1615,45 @@ func (listener *CarbonserverListener) rateLimitRequest(h http.HandlerFunc) http.
return
}

inflights := atomic.AddUint64(&listener.metrics.InflightRequests, 1)
defer atomic.AddUint64(&listener.metrics.InflightRequests, ^uint64(0))
if ratelimiter, ok := listener.apiPerPathRatelimiter[req.URL.Path]; ok {
if cap(ratelimiter.maxInflightRequests) == 0 {
http.Error(wr, "Bad request (blocked by api per path rate limiter)", http.StatusBadRequest)
return
}

ratelimiter.maxInflightRequests <- struct{}{}
defer func() {
select {
case <-ratelimiter.maxInflightRequests:
default:
}
}()

select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
listener.prometheus.timeoutRequest()
case context.Canceled:
listener.prometheus.cancelledRequest()
}

accessLogger.Error("request timeout due to per url rate limiting",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.String("reason", "timeout due to per url rate limiting"),
zap.Int("http_code", http.StatusRequestTimeout),
)
http.Error(wr, "Bad request (timeout due to maxInflightRequests)", http.StatusRequestTimeout)
return
default:
}
}

// TODO: to deprecate as it's replaced by per-path rate limiting
//
// rate limit inflight requests
inflights := atomic.AddUint64(&listener.metrics.InflightRequests, 1)
defer atomic.AddUint64(&listener.metrics.InflightRequests, ^uint64(0))
if listener.MaxInflightRequests > 0 && inflights > listener.MaxInflightRequests {
atomic.AddUint64(&listener.metrics.RejectedTooManyRequests, 1)

Expand Down Expand Up @@ -1601,7 +1693,6 @@ func (listener *CarbonserverListener) Listen(listen string) error {
listener.timeBuckets = make([]uint64, listener.buckets+1)

carbonserverMux := http.NewServeMux()

wrapHandler := func(h http.HandlerFunc, handlerStatusCodes []uint64) http.HandlerFunc {
return httputil.TrackConnections(
httputil.TimeHandler(
Expand All @@ -1615,6 +1706,7 @@ func (listener *CarbonserverListener) Listen(listen string) error {
),
)
}

carbonserverMux.HandleFunc("/_internal/capabilities/", wrapHandler(listener.capabilityHandler, statusCodes["capabilities"]))
carbonserverMux.HandleFunc("/metrics/find/", wrapHandler(listener.findHandler, statusCodes["find"]))
carbonserverMux.HandleFunc("/metrics/list/", wrapHandler(listener.listHandler, statusCodes["list"]))
Expand Down Expand Up @@ -1879,3 +1971,29 @@ func (listener *CarbonserverListener) RegisterInternalInfoHandler(name string, f
}
listener.interfalInfoCallbacks[name] = f
}

type GlobQueryRateLimiter struct {
pattern *regexp.Regexp
maxInflightRequests chan struct{}
}

func NewGlobQueryRateLimiter(pattern string, max uint) (*GlobQueryRateLimiter, error) {
exp, err := regexp.Compile(pattern)
if err != nil {
return nil, err
}

return &GlobQueryRateLimiter{pattern: exp, maxInflightRequests: make(chan struct{}, max)}, nil
}

type ApiPerPathRatelimiter struct {
maxInflightRequests chan struct{}
timeout time.Duration
}

func NewApiPerPathRatelimiter(maxInflightRequests uint, timeout time.Duration) *ApiPerPathRatelimiter {
return &ApiPerPathRatelimiter{
maxInflightRequests: make(chan struct{}, maxInflightRequests),
timeout: timeout,
}
}
2 changes: 1 addition & 1 deletion carbonserver/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ GATHER:
zap.String("reason", "can't expand globs"),
zap.Errors("errors", errors),
)
return nil, fmt.Errorf("find failed, can't expand globs")
return nil, fmt.Errorf("find failed, can't expand globs: %v", errors)
} else {
logger.Warn("find partly failed",
zap.Duration("runtime_seconds", time.Since(t0)),
Expand Down

0 comments on commit 9a56947

Please sign in to comment.