Skip to content

Commit

Permalink
optimisation(carbonserver): http - repurpose listener.findCache from …
Browse files Browse the repository at this point in the history
…storing find http responses to store expanded globs; and re-use this cache in render for expanding globs
  • Loading branch information
Anton Timofieiev committed Feb 4, 2023
1 parent 8192946 commit 441584e
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 56 deletions.
83 changes: 46 additions & 37 deletions carbonserver/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ func (listener *CarbonserverListener) findHandler(wr http.ResponseWriter, req *h
format := req.FormValue("format")
query := req.Form["query"]

var response *findResponse

logger := TraceContextToZap(ctx, listener.logger.With(
zap.String("handler", "find"),
zap.String("url", req.URL.RequestURI()),
Expand Down Expand Up @@ -125,30 +123,9 @@ func (listener *CarbonserverListener) findHandler(wr http.ResponseWriter, req *h
return
}

var err error
fromCache := false
if listener.findCacheEnabled {
key := strings.Join(query, ",") + "&" + format
size := uint64(100 * 1024 * 1024)
var result interface{}
result, fromCache, err = getWithCache(logger, listener.findCache, key, size, 300,
func() (interface{}, error) {
return listener.findMetrics(ctx, logger, t0, formatCode, query)
})
if err == nil {
listener.prometheus.cacheRequest("find", fromCache)
if fromCache {
atomic.AddUint64(&listener.metrics.FindCacheHit, 1)
} else {
atomic.AddUint64(&listener.metrics.FindCacheMiss, 1)
}
response = result.(*findResponse)
if response.files == 0 {
err = errorNotFound{}
}
}
} else {
response, err = listener.findMetrics(ctx, logger, t0, formatCode, query)
response, fromCache, err := listener.findMetrics(ctx, logger, t0, formatCode, query)
if response.files == 0 {
err = errorNotFound{}
}

if err != nil || response == nil {
Expand Down Expand Up @@ -224,13 +201,23 @@ func getProtoV2FindResponse(expandedGlob globs, query string) *protov2.GlobRespo
return res
}

func (listener *CarbonserverListener) findMetrics(ctx context.Context, logger *zap.Logger, t0 time.Time, format responseFormat, names []string) (*findResponse, error) {
func (listener *CarbonserverListener) findMetrics(ctx context.Context, logger *zap.Logger, t0 time.Time, format responseFormat, names []string) (*findResponse, bool, error) {
var result findResponse
metricsCount := uint64(0)
expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, t0, names)

isFromCache, expandedGlobs, err := listener.getExpandedGlobsWithCache(names, logger, ctx)
if expandedGlobs == nil {
return nil, err
return nil, false, err
}
if listener.findCacheEnabled && err == nil {
listener.prometheus.cacheRequest("find", isFromCache)
if isFromCache {
atomic.AddUint64(&listener.metrics.FindCacheHit, 1)
} else {
atomic.AddUint64(&listener.metrics.FindCacheMiss, 1)
}
}

atomic.AddUint64(&listener.metrics.MetricsFound, metricsCount)
switch format {
case protoV3Format, jsonFormat:
Expand Down Expand Up @@ -278,14 +265,14 @@ func (listener *CarbonserverListener) findMetrics(ctx context.Context, logger *z
zap.String("reason", "response encode failed"),
zap.Error(err),
)
return nil, err
return nil, false, err
}

if len(multiResponse.Metrics) == 0 {
return nil, errorNotFound{}
return nil, false, errorNotFound{}
}

return &result, err
return &result, isFromCache, err

case protoV2Format:
result.contentType = httpHeaders.ContentTypeProtobuf
Expand All @@ -309,14 +296,14 @@ func (listener *CarbonserverListener) findMetrics(ctx context.Context, logger *z
zap.String("reason", "response encode failed"),
zap.Error(err),
)
return nil, err
return nil, false, err
}

if len(response.Matches) == 0 {
return nil, errorNotFound{}
return nil, false, errorNotFound{}
}

return &result, err
return &result, isFromCache, err

case pickleFormat:
// [{'metric_path': 'metric', 'intervals': [(x,y)], 'isLeaf': True},]
Expand All @@ -341,9 +328,10 @@ func (listener *CarbonserverListener) findMetrics(ctx context.Context, logger *z
var buf bytes.Buffer
pEnc := pickle.NewEncoder(&buf)
pEnc.Encode(metrics)
return &findResponse{buf.Bytes(), httpHeaders.ContentTypePickle, files, lookups}, nil
return &findResponse{buf.Bytes(), httpHeaders.ContentTypePickle, files, lookups}, isFromCache, nil
}
return nil, nil

return nil, false, nil
}

func (listener *CarbonserverListener) getExpandedGlobs(ctx context.Context, logger *zap.Logger, t0 time.Time, names []string) ([]globs, error) {
Expand Down Expand Up @@ -412,6 +400,27 @@ GATHER:
return expandedGlobs, nil
}

func (listener *CarbonserverListener) getExpandedGlobsWithCache(globNames []string, logger *zap.Logger, ctx context.Context) (bool, []globs, error) {
if !listener.findCacheEnabled {
expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, time.Now(), globNames)
return false, expandedGlobs, err
}

key := strings.Join(globNames, "&")
size := uint64(100 * 1024 * 1024)
expandedGlobs, isFromCache, err := getWithCache(logger, listener.findCache, key, size, 300,
func() (interface{}, error) {
return listener.getExpandedGlobs(ctx, logger, time.Now(), globNames)
})

var expandedGlobsCasted []globs
if err == nil {
expandedGlobs = expandedGlobs.([]globs)
}

return isFromCache, expandedGlobsCasted, err
}

func (listener *CarbonserverListener) Find(ctx context.Context, req *protov2.GlobRequest) (*protov2.GlobResponse, error) {
t0 := time.Now()
span := trace.SpanFromContext(ctx)
Expand Down
23 changes: 4 additions & 19 deletions carbonserver/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,9 @@ func (listener *CarbonserverListener) prepareDataProto(ctx context.Context, logg
metricNames := getUniqueMetricNames(targets)
// TODO: pipeline?
expansionT0 := time.Now()
expandedGlobs, err := listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
isFromCache, expandedGlobs, err := listener.getExpandedGlobsWithCache(metricNames, logger, ctx)
tle.GlobExpansionDuration = float64(time.Since(expansionT0)) / float64(time.Second)
tle.FindFromCache = isFromCache
if expandedGlobs == nil {
return fetchResponse{nil, contentType, 0, 0, 0, nil}, err
}
Expand Down Expand Up @@ -711,28 +712,12 @@ func (listener *CarbonserverListener) Render(req *protov2.MultiFetchRequest, str
responseChan := make(chan response, 1000)

fetchAndStreamMetricsFunc := func(getMetrics bool) ([]response, error) {
var err error
var findFromCache bool
metricNames := getUniqueMetricNames(targets)
// TODO: pipeline?
expansionT0 := time.Now()
var expandedGlobs []globs
if listener.findCacheEnabled {
key := strings.Join(metricNames, "&")
size := uint64(100 * 1024 * 1024)
var result interface{}
result, findFromCache, err = getWithCache(logger, listener.findCache, key, size, 300,
func() (interface{}, error) {
return listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
})
if err == nil {
expandedGlobs = result.([]globs)
tle.FindFromCache = findFromCache
}
} else {
expandedGlobs, err = listener.getExpandedGlobs(ctx, logger, time.Now(), metricNames)
}
isFromCache, expandedGlobs, err := listener.getExpandedGlobsWithCache(metricNames, logger, ctx)
tle.GlobExpansionDuration = float64(time.Since(expansionT0)) / float64(time.Second)
tle.FindFromCache = isFromCache
if expandedGlobs == nil {
if err != nil {
return nil, status.New(codes.InvalidArgument, err.Error()).Err()
Expand Down

0 comments on commit 441584e

Please sign in to comment.