Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use find cache for glob expansion in grpc render #516

Merged
merged 1 commit into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 4 additions & 12 deletions carbonserver/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,21 +457,12 @@ func (listener *CarbonserverListener) Find(ctx context.Context, req *protov2.Glo
var finalRes *protov2.GlobResponse
var lookups uint32
if listener.findCacheEnabled {
key := query + "&" + format + "grpc"
key := query
size := uint64(100 * 1024 * 1024)
var result interface{}
result, fromCache, err = getWithCache(logger, listener.findCache, key, size, 300,
func() (interface{}, error) {
expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, t0, []string{query})
if err != nil {
return nil, err
}
finalRes = getProtoV2FindResponse(expandedGlobs[0], query)
lookups = expandedGlobs[0].Lookups
if len(finalRes.Matches) == 0 {
return nil, errorNotFound{}
}
return finalRes, nil
return listener.getExpandedGlobs(ctx, logger, t0, []string{query})
})
if err == nil {
listener.prometheus.cacheRequest("find", fromCache)
Expand All @@ -480,7 +471,8 @@ func (listener *CarbonserverListener) Find(ctx context.Context, req *protov2.Glo
} else {
atomic.AddUint64(&listener.metrics.FindCacheMiss, 1)
}
finalRes = result.(*protov2.GlobResponse)
expandedGlobs := result.([]globs)
finalRes = getProtoV2FindResponse(expandedGlobs[0], query)
if len(finalRes.Matches) == 0 {
err = errorNotFound{}
}
Expand Down
24 changes: 21 additions & 3 deletions carbonserver/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,14 +709,29 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
fetchSize := 0
// TODO: should chan buffer size be configurable?
responseChan := make(chan response, 1000)
var fromCache bool
var err error

fetchAndStreamMetricsFunc := func(getMetrics bool) ([]response, error) {
var err error
var findFromCache bool
metricNames := getUniqueMetricNames(targets)
// TODO: pipeline?
expansionT0 := time.Now()
expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
var expandedGlobs []globs
if listener.findCacheEnabled {
key := strings.Join(metricNames, "&")
size := uint64(100 * 1024 * 1024)
var result interface{}
result, findFromCache, err = getWithCache(logger, listener.findCache, key, size, 300,
func() (interface{}, error) {
return listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
})
if err == nil {
expandedGlobs = result.([]globs)
tle.FindFromCache = findFromCache
}
} else {
expandedGlobs, err = listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
}
tle.GlobExpansionDuration = float64(time.Since(expansionT0)) / float64(time.Second)
if expandedGlobs == nil {
if err != nil {
Expand All @@ -736,6 +751,8 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
return responses, err
}

var fromCache bool
var err error
if listener.streamingQueryCacheEnabled {
key, size := listener.getRenderCacheKeyAndSize(targets, format.String()+"grpc")
var res interface{}
Expand Down Expand Up @@ -816,6 +833,7 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str

type traceLogEntries struct {
FromCache bool `json:"from_cache"`
FindFromCache bool `json:"find_from_cache"`
FetchSize int `json:"fetch_size"`
MetricsFetched int `json:"metrics_fetched"`
ValuesFetched int `json:"values_fetched"`
Expand Down