Skip to content

Commit

Permalink
Merge pull request #508 from go-graphite/emadolsky/add-fetch-size-grp…
Browse files Browse the repository at this point in the history
…c-render

Calculate and add fetch size in gRPC render
  • Loading branch information
emadolsky committed Dec 12, 2022
2 parents 6e5f360 + 4bd34b4 commit b720895
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions carbonserver/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,10 +598,12 @@ func (listener *CarbonserverListener) fetchData(metric, pathExpression string, f
return multi, nil
}

func (listener *CarbonserverListener) streamMetrics(stream grpcv2.CarbonV2_RenderServer, responseChan chan response, storeAndGetMetrics bool) (responses []response, metricsFetched, valuesFetched int, err error) {
func (listener *CarbonserverListener) streamMetrics(stream grpcv2.CarbonV2_RenderServer, responseChan chan response, storeAndGetMetrics bool) (responses []response, metricsFetched, valuesFetched, fetchSize int, err error) {
var metricAccessBatch []string
for renderResponse := range responseChan {
err = stream.Send(renderResponse.proto2())
protoRes := renderResponse.proto2()
fetchSize += protoRes.SizeVT()
err = stream.Send(protoRes)
if err != nil {
return
}
Expand Down Expand Up @@ -676,6 +678,7 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str

metricsFetched := 0
valuesFetched := 0
fetchSize := 0
// TODO: should chan buffer size be configurable?
responseChan := make(chan response, 1000)
var fromCache bool
Expand All @@ -693,7 +696,7 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
metricGlobMap := getMetricGlobMapFromExpandedGlobs(expandedGlobs)
go listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan)
var responses []response
responses, metricsFetched, valuesFetched, err = listener.streamMetrics(stream, responseChan, getMetrics)
responses, metricsFetched, valuesFetched, fetchSize, err = listener.streamMetrics(stream, responseChan, getMetrics)
return responses, err
}

Expand All @@ -715,7 +718,7 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
}
close(responseChan)
}()
_, metricsFetched, valuesFetched, err = listener.streamMetrics(stream, responseChan, false)
_, metricsFetched, valuesFetched, fetchSize, err = listener.streamMetrics(stream, responseChan, false)
} else {
atomic.AddUint64(&listener.metrics.QueryCacheMiss, 1)
}
Expand Down Expand Up @@ -747,6 +750,7 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
return status.New(codes.NotFound, "no metrics found").Err()
}

atomic.AddUint64(&listener.metrics.FetchSize, uint64(fetchSize))
logger.Info("fetch served",
zap.Duration("runtime_seconds", time.Since(t0)),
zap.Bool("query_cache_enabled", listener.streamingQueryCacheEnabled),
Expand Down

0 comments on commit b720895

Please sign in to comment.