Skip to content

Commit

Permalink
Limit streaming channel size for gRPC render
Browse files Browse the repository at this point in the history
A number of files in the expanded globs is smaller than that
maximum, it makes sense to initiate chan with that size instead of
maximum.
  • Loading branch information
Emad Mohamadi committed Feb 10, 2023
1 parent e475b2c commit 925f190
Showing 1 changed file with 26 additions and 10 deletions.
36 changes: 26 additions & 10 deletions carbonserver/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,22 @@ func getMetricGlobMapFromExpandedGlobs(expandedGlobs []globs) map[string]globs {
return metricGlobMap
}

func getFoundGlobsFilesCount(metricGlobMap map[string]globs) int {
var c int
for _, eg := range metricGlobMap {
c += len(eg.Files)
}
return c
}

func getStreamingChannelSize(filesCount int) int {
channelSize := 100
if filesCount < channelSize {
channelSize = filesCount
}
return channelSize
}

func (listener *CarbonserverListener) prepareDataStream(ctx context.Context, format responseFormat, targets map[timeRange][]target, metricGlobMap map[string]globs, responseChan chan<- response) {
defer close(responseChan)
for tr, ts := range targets {
Expand Down Expand Up @@ -703,7 +719,7 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
valuesFetched := 0
fetchSize := 0

fetchMetricsFunc := func(responseChan chan<- response) error {
fetchMetricsFunc := func() (chan response, error) {
metricNames := getUniqueMetricNames(targets)
// TODO: pipeline?
expansionT0 := time.Now()
Expand All @@ -712,21 +728,22 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
tle.FindFromCache = isExpandCacheHit
if expandedGlobs == nil {
if err != nil {
return status.New(codes.InvalidArgument, err.Error()).Err()
return nil, status.New(codes.InvalidArgument, err.Error()).Err()
}
}
metricGlobMap := getMetricGlobMapFromExpandedGlobs(expandedGlobs)
tle.MetricGlobMapLength = len(metricGlobMap)
filesCount := getFoundGlobsFilesCount(metricGlobMap)
prepareChan := make(chan response, getStreamingChannelSize(filesCount))
go func() {
prepareT0 := time.Now()
listener.prepareDataStream(ctx, format, targets, metricGlobMap, responseChan)
listener.prepareDataStream(ctx, format, targets, metricGlobMap, prepareChan)
tle.PrepareDuration = float64(time.Since(prepareT0)) / float64(time.Second)
}()
return nil
return prepareChan, nil
}

// TODO: should chan buffer size be configurable?
responseChanToStream := make(chan response, 100)
var responseChanToStream chan response
var fromCache bool
var err error
if listener.streamingQueryCacheEnabled {
Expand All @@ -739,13 +756,12 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
switch {
case !found:
atomic.AddUint64(&listener.metrics.QueryCacheMiss, 1)
// TODO: should chan buffer size be configurable?
responseChan := make(chan response, 100)
err = fetchMetricsFunc(responseChan)
responseChan, err := fetchMetricsFunc()
if err != nil {
item.StoreAbort()
tle.CacheDuration = float64(time.Since(cacheT0)) / float64(time.Second)
} else {
responseChanToStream = make(chan response, cap(responseChan))
go func() {
var responses []response
for r := range responseChan {
Expand Down Expand Up @@ -774,7 +790,7 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
listener.prometheus.cacheRequest("query", fromCache)
tle.FromCache = fromCache
} else {
err = fetchMetricsFunc(responseChanToStream)
responseChanToStream, err = fetchMetricsFunc()
}

if err == nil {
Expand Down

0 comments on commit 925f190

Please sign in to comment.