Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add carbonserver render tracing #509

Merged
merged 1 commit into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ func (app *App) Start(version string) (err error) {

carbonserver.SetMaxInflightRequests(conf.Carbonserver.MaxInflightRequests)
carbonserver.SetNoServiceWhenIndexIsNotReady(conf.Carbonserver.NoServiceWhenIndexIsNotReady)
carbonserver.SetRenderTraceLoggingEnabled(conf.Carbonserver.RenderTraceLoggingEnabled)

if conf.Carbonserver.RequestTimeout != nil {
carbonserver.SetRequestTimeout(conf.Carbonserver.RequestTimeout.Value())
Expand Down
2 changes: 2 additions & 0 deletions carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ type carbonserverConfig struct {
MaxInflightRequests uint `toml:"max-inflight-requests"`
RequestTimeout *Duration `toml:"request-timeout"`
} `toml:"api-per-path-rate-limiters"`

RenderTraceLoggingEnabled bool `toml:"render-trace-logging-enabled"`
}

type pprofConfig struct {
Expand Down
6 changes: 6 additions & 0 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ type CarbonserverListener struct {
NoServiceWhenIndexIsNotReady bool
apiPerPathRatelimiter map[string]*ApiPerPathRatelimiter
globQueryRateLimiters []*GlobQueryRateLimiter

renderTraceLoggingEnabled bool
}

type prometheus struct {
Expand Down Expand Up @@ -625,6 +627,10 @@ func (listener *CarbonserverListener) SetAPIPerPathRateLimiter(rls map[string]*A
listener.apiPerPathRatelimiter = rls
}

func (listener *CarbonserverListener) SetRenderTraceLoggingEnabled(enabled bool) {
listener.renderTraceLoggingEnabled = enabled
}

// skipcq: RVV-B0011
func (listener *CarbonserverListener) CurrentFileIndex() *fileIndex {
p := listener.fileIdx.Load()
Expand Down
62 changes: 55 additions & 7 deletions carbonserver/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ func (listener *CarbonserverListener) renderHandler(wr http.ResponseWriter, req
zap.String("format", format.String()),
))

tle := &traceLogEntries{}
if listener.renderTraceLoggingEnabled {
defer func() {
tle.TotalDuration = float64(time.Since(t0)) / float64(time.Second)
logger.Info("render trace", zap.Any("trace_log_entries", *tle))
}()
}
// Make sure we log which metric caused a panic()
defer func() {
if r := recover(); r != nil {
Expand All @@ -236,7 +243,7 @@ func (listener *CarbonserverListener) renderHandler(wr http.ResponseWriter, req
}
}()

response, fromCache, err := listener.fetchWithCache(ctx, logger, format, targets)
response, fromCache, err := listener.fetchWithCache(ctx, logger, format, targets, tle)

wr.Header().Set("Content-Type", response.contentType)
if err != nil {
Expand Down Expand Up @@ -325,7 +332,7 @@ func (listener *CarbonserverListener) getRenderCacheKeyAndSize(targets map[timeR
return key, size
}

func (listener *CarbonserverListener) fetchWithCache(ctx context.Context, logger *zap.Logger, format responseFormat, targets map[timeRange][]target) (fetchResponse, bool, error) {
func (listener *CarbonserverListener) fetchWithCache(ctx context.Context, logger *zap.Logger, format responseFormat, targets map[timeRange][]target, tle *traceLogEntries) (fetchResponse, bool, error) {
logger = logger.With(
zap.String("function", "fetchWithCache"),
)
Expand All @@ -336,10 +343,12 @@ func (listener *CarbonserverListener) fetchWithCache(ctx context.Context, logger
if listener.queryCacheEnabled {
key, size := listener.getRenderCacheKeyAndSize(targets, format.String())
var res interface{}
cacheT0 := time.Now()
res, fromCache, err = getWithCache(logger, listener.queryCache, key, size, 60,
func() (interface{}, error) {
return listener.prepareDataProto(ctx, logger, format, targets)
return listener.prepareDataProto(ctx, logger, format, targets, tle)
})
tle.CacheDuration = float64(time.Since(cacheT0)) / float64(time.Second)
if err == nil {
response = res.(fetchResponse)
listener.prometheus.cacheRequest("query", fromCache)
Expand All @@ -348,9 +357,10 @@ func (listener *CarbonserverListener) fetchWithCache(ctx context.Context, logger
} else {
atomic.AddUint64(&listener.metrics.QueryCacheMiss, 1)
}
tle.FromCache = fromCache
}
} else {
response, err = listener.prepareDataProto(ctx, logger, format, targets)
response, err = listener.prepareDataProto(ctx, logger, format, targets, tle)
}
return response, fromCache, err
}
Expand Down Expand Up @@ -450,7 +460,7 @@ func (listener *CarbonserverListener) prepareDataStream(ctx context.Context, for
}
}

func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logger *zap.Logger, format responseFormat, targets map[timeRange][]target) (fetchResponse, error) {
func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logger *zap.Logger, format responseFormat, targets map[timeRange][]target, tle *traceLogEntries) (fetchResponse, error) {
contentType := "application/text"
var b []byte
var metricsFetched int
Expand All @@ -464,12 +474,18 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg
responseChan := make(chan response, 1000)
metricNames := getUniqueMetricNames(targets)
// TODO: pipeline?
expansionT0 := time.Now()
expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
tle.GlobExpansionDuration = float64(time.Since(expansionT0)) / float64(time.Second)
if expandedGlobs == nil {
return fetchResponse{nil, contentType, 0, 0, 0, nil}, err
}
metricGlobMap := getMetricGlobMapFromExpandedGlobs(expandedGlobs)
go listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan)
go func() {
prepareT0 := time.Now()
listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan)
tle.PrepareDuration = float64(time.Since(prepareT0)) / float64(time.Second)
}()

var metrics []string
for renderResponse := range responseChan {
Expand Down Expand Up @@ -511,6 +527,7 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg
}
}

marshalT0 := time.Now()
switch format {
// We still keep old json format, because it's painful to deal with math.NaN that can occur in new format.
case jsonFormat:
Expand Down Expand Up @@ -562,6 +579,7 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg
default:
err = fmt.Errorf("unknown format: %v", format)
}
tle.MarshalDuration = float64(time.Since(marshalT0)) / float64(time.Second)

if err != nil {
return fetchResponse{nil, contentType, 0, 0, 0, nil}, err
Expand Down Expand Up @@ -658,6 +676,13 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
zap.String("peer", reqPeer),
))

tle := &traceLogEntries{}
if listener.renderTraceLoggingEnabled {
defer func() {
tle.TotalDuration = float64(time.Since(t0)) / float64(time.Second)
logger.Info("render trace", zap.Any("trace_log_entries", *tle))
}()
}
// Make sure we log which metric caused a panic()
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -687,26 +712,36 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
fetchAndStreamMetricsFunc := func(getMetrics bool) ([]response, error) {
metricNames := getUniqueMetricNames(targets)
// TODO: pipeline?
expansionT0 := time.Now()
expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
tle.GlobExpansionDuration = float64(time.Since(expansionT0)) / float64(time.Second)
if expandedGlobs == nil {
if err != nil {
return nil, status.New(codes.InvalidArgument, err.Error()).Err()
}
}
metricGlobMap := getMetricGlobMapFromExpandedGlobs(expandedGlobs)
go listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan)
go func() {
prepareT0 := time.Now()
listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan)
tle.PrepareDuration = float64(time.Since(prepareT0)) / float64(time.Second)
}()
var responses []response
streamT0 := time.Now()
responses, metricsFetched, valuesFetched, fetchSize, err = listener.streamMetrics(stream, responseChan, getMetrics)
tle.StreamDuration = float64(time.Since(streamT0)) / float64(time.Second)
return responses, err
}

if listener.streamingQueryCacheEnabled {
key, size := listener.getRenderCacheKeyAndSize(targets, format.String()+"grpc")
var res interface{}
cacheT0 := time.Now()
res, fromCache, err = getWithCache(logger, listener.queryCache, key, size, 60,
func() (interface{}, error) {
return fetchAndStreamMetricsFunc(true)
})
tle.CacheDuration = float64(time.Since(cacheT0)) / float64(time.Second)
if err == nil {
listener.prometheus.cacheRequest("query", fromCache)
if fromCache {
Expand All @@ -718,10 +753,13 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
}
close(responseChan)
}()
streamT0 := time.Now()
_, metricsFetched, valuesFetched, fetchSize, err = listener.streamMetrics(stream, responseChan, false)
tle.StreamDuration = float64(time.Since(streamT0)) / float64(time.Second)
} else {
atomic.AddUint64(&listener.metrics.QueryCacheMiss, 1)
}
tle.FromCache = fromCache
}
} else {
_, err = fetchAndStreamMetricsFunc(false)
Expand Down Expand Up @@ -769,3 +807,13 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str

return nil
}

type traceLogEntries struct {
FromCache bool `json:"from_cache"`
GlobExpansionDuration float64 `json:"glob_expansion_duration"`
CacheDuration float64 `json:"cache_duration"`
PrepareDuration float64 `json:"prepare_duration"`
StreamDuration float64 `json:"stream_duration"`
MarshalDuration float64 `json:"marshal_duration"`
TotalDuration float64 `json:"total_duration"`
}