Skip to content

Commit

Permalink
Merge pull request #509 from go-graphite/emadolsky/add-carbonserver-r…
Browse files Browse the repository at this point in the history
…ender-time-trace

Add carbonserver render tracing
  • Loading branch information
emadolsky committed Dec 14, 2022
2 parents b720895 + 4a96559 commit c980e81
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 7 deletions.
1 change: 1 addition & 0 deletions carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ func (app *App) Start(version string) (err error) {

carbonserver.SetMaxInflightRequests(conf.Carbonserver.MaxInflightRequests)
carbonserver.SetNoServiceWhenIndexIsNotReady(conf.Carbonserver.NoServiceWhenIndexIsNotReady)
carbonserver.SetRenderTraceLoggingEnabled(conf.Carbonserver.RenderTraceLoggingEnabled)

if conf.Carbonserver.RequestTimeout != nil {
carbonserver.SetRequestTimeout(conf.Carbonserver.RequestTimeout.Value())
Expand Down
2 changes: 2 additions & 0 deletions carbon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ type carbonserverConfig struct {
MaxInflightRequests uint `toml:"max-inflight-requests"`
RequestTimeout *Duration `toml:"request-timeout"`
} `toml:"api-per-path-rate-limiters"`

RenderTraceLoggingEnabled bool `toml:"render-trace-logging-enabled"`
}

type pprofConfig struct {
Expand Down
6 changes: 6 additions & 0 deletions carbonserver/carbonserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ type CarbonserverListener struct {
NoServiceWhenIndexIsNotReady bool
apiPerPathRatelimiter map[string]*ApiPerPathRatelimiter
globQueryRateLimiters []*GlobQueryRateLimiter

renderTraceLoggingEnabled bool
}

type prometheus struct {
Expand Down Expand Up @@ -625,6 +627,10 @@ func (listener *CarbonserverListener) SetAPIPerPathRateLimiter(rls map[string]*A
listener.apiPerPathRatelimiter = rls
}

func (listener *CarbonserverListener) SetRenderTraceLoggingEnabled(enabled bool) {
listener.renderTraceLoggingEnabled = enabled
}

// skipcq: RVV-B0011
func (listener *CarbonserverListener) CurrentFileIndex() *fileIndex {
p := listener.fileIdx.Load()
Expand Down
62 changes: 55 additions & 7 deletions carbonserver/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ func (listener *CarbonserverListener) renderHandler(wr http.ResponseWriter, req
zap.String("format", format.String()),
))

tle := &traceLogEntries{}
if listener.renderTraceLoggingEnabled {
defer func() {
tle.TotalDuration = float64(time.Since(t0)) / float64(time.Second)
logger.Info("render trace", zap.Any("trace_log_entries", *tle))
}()
}
// Make sure we log which metric caused a panic()
defer func() {
if r := recover(); r != nil {
Expand All @@ -236,7 +243,7 @@ func (listener *CarbonserverListener) renderHandler(wr http.ResponseWriter, req
}
}()

response, fromCache, err := listener.fetchWithCache(ctx, logger, format, targets)
response, fromCache, err := listener.fetchWithCache(ctx, logger, format, targets, tle)

wr.Header().Set("Content-Type", response.contentType)
if err != nil {
Expand Down Expand Up @@ -325,7 +332,7 @@ func (listener *CarbonserverListener) getRenderCacheKeyAndSize(targets map[timeR
return key, size
}

func (listener *CarbonserverListener) fetchWithCache(ctx context.Context, logger *zap.Logger, format responseFormat, targets map[timeRange][]target) (fetchResponse, bool, error) {
func (listener *CarbonserverListener) fetchWithCache(ctx context.Context, logger *zap.Logger, format responseFormat, targets map[timeRange][]target, tle *traceLogEntries) (fetchResponse, bool, error) {
logger = logger.With(
zap.String("function", "fetchWithCache"),
)
Expand All @@ -336,10 +343,12 @@ func (listener *CarbonserverListener) fetchWithCache(ctx context.Context, logger
if listener.queryCacheEnabled {
key, size := listener.getRenderCacheKeyAndSize(targets, format.String())
var res interface{}
cacheT0 := time.Now()
res, fromCache, err = getWithCache(logger, listener.queryCache, key, size, 60,
func() (interface{}, error) {
return listener.prepareDataProto(ctx, logger, format, targets)
return listener.prepareDataProto(ctx, logger, format, targets, tle)
})
tle.CacheDuration = float64(time.Since(cacheT0)) / float64(time.Second)
if err == nil {
response = res.(fetchResponse)
listener.prometheus.cacheRequest("query", fromCache)
Expand All @@ -348,9 +357,10 @@ func (listener *CarbonserverListener) fetchWithCache(ctx context.Context, logger
} else {
atomic.AddUint64(&listener.metrics.QueryCacheMiss, 1)
}
tle.FromCache = fromCache
}
} else {
response, err = listener.prepareDataProto(ctx, logger, format, targets)
response, err = listener.prepareDataProto(ctx, logger, format, targets, tle)
}
return response, fromCache, err
}
Expand Down Expand Up @@ -450,7 +460,7 @@ func (listener *CarbonserverListener) prepareDataStream(ctx context.Context, for
}
}

func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logger *zap.Logger, format responseFormat, targets map[timeRange][]target) (fetchResponse, error) {
func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logger *zap.Logger, format responseFormat, targets map[timeRange][]target, tle *traceLogEntries) (fetchResponse, error) {
contentType := "application/text"
var b []byte
var metricsFetched int
Expand All @@ -464,12 +474,18 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg
responseChan := make(chan response, 1000)
metricNames := getUniqueMetricNames(targets)
// TODO: pipeline?
expansionT0 := time.Now()
expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
tle.GlobExpansionDuration = float64(time.Since(expansionT0)) / float64(time.Second)
if expandedGlobs == nil {
return fetchResponse{nil, contentType, 0, 0, 0, nil}, err
}
metricGlobMap := getMetricGlobMapFromExpandedGlobs(expandedGlobs)
go listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan)
go func() {
prepareT0 := time.Now()
listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan)
tle.PrepareDuration = float64(time.Since(prepareT0)) / float64(time.Second)
}()

var metrics []string
for renderResponse := range responseChan {
Expand Down Expand Up @@ -511,6 +527,7 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg
}
}

marshalT0 := time.Now()
switch format {
// We still keep old json format, because it's painful to deal with math.NaN that can occur in new format.
case jsonFormat:
Expand Down Expand Up @@ -562,6 +579,7 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg
default:
err = fmt.Errorf("unknown format: %v", format)
}
tle.MarshalDuration = float64(time.Since(marshalT0)) / float64(time.Second)

if err != nil {
return fetchResponse{nil, contentType, 0, 0, 0, nil}, err
Expand Down Expand Up @@ -658,6 +676,13 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
zap.String("peer", reqPeer),
))

tle := &traceLogEntries{}
if listener.renderTraceLoggingEnabled {
defer func() {
tle.TotalDuration = float64(time.Since(t0)) / float64(time.Second)
logger.Info("render trace", zap.Any("trace_log_entries", *tle))
}()
}
// Make sure we log which metric caused a panic()
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -687,26 +712,36 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
fetchAndStreamMetricsFunc := func(getMetrics bool) ([]response, error) {
metricNames := getUniqueMetricNames(targets)
// TODO: pipeline?
expansionT0 := time.Now()
expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
tle.GlobExpansionDuration = float64(time.Since(expansionT0)) / float64(time.Second)
if expandedGlobs == nil {
if err != nil {
return nil, status.New(codes.InvalidArgument, err.Error()).Err()
}
}
metricGlobMap := getMetricGlobMapFromExpandedGlobs(expandedGlobs)
go listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan)
go func() {
prepareT0 := time.Now()
listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan)
tle.PrepareDuration = float64(time.Since(prepareT0)) / float64(time.Second)
}()
var responses []response
streamT0 := time.Now()
responses, metricsFetched, valuesFetched, fetchSize, err = listener.streamMetrics(stream, responseChan, getMetrics)
tle.StreamDuration = float64(time.Since(streamT0)) / float64(time.Second)
return responses, err
}

if listener.streamingQueryCacheEnabled {
key, size := listener.getRenderCacheKeyAndSize(targets, format.String()+"grpc")
var res interface{}
cacheT0 := time.Now()
res, fromCache, err = getWithCache(logger, listener.queryCache, key, size, 60,
func() (interface{}, error) {
return fetchAndStreamMetricsFunc(true)
})
tle.CacheDuration = float64(time.Since(cacheT0)) / float64(time.Second)
if err == nil {
listener.prometheus.cacheRequest("query", fromCache)
if fromCache {
Expand All @@ -718,10 +753,13 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
}
close(responseChan)
}()
streamT0 := time.Now()
_, metricsFetched, valuesFetched, fetchSize, err = listener.streamMetrics(stream, responseChan, false)
tle.StreamDuration = float64(time.Since(streamT0)) / float64(time.Second)
} else {
atomic.AddUint64(&listener.metrics.QueryCacheMiss, 1)
}
tle.FromCache = fromCache
}
} else {
_, err = fetchAndStreamMetricsFunc(false)
Expand Down Expand Up @@ -769,3 +807,13 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str

return nil
}

type traceLogEntries struct {
FromCache bool `json:"from_cache"`
GlobExpansionDuration float64 `json:"glob_expansion_duration"`
CacheDuration float64 `json:"cache_duration"`
PrepareDuration float64 `json:"prepare_duration"`
StreamDuration float64 `json:"stream_duration"`
MarshalDuration float64 `json:"marshal_duration"`
TotalDuration float64 `json:"total_duration"`
}

0 comments on commit c980e81

Please sign in to comment.