Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move precision alignment from per-target zipper to per-request FetchAndEvalExp #501

Merged
merged 5 commits into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/carbonapi/http/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (z mockCarbonZipper) TagValues(ctx context.Context, query string, limit int
return []string{}, nil
}

func (z mockCarbonZipper) ScaleToCommonStep() bool {
return true
}

func getGlobResponse() *pb.MultiGlobResponse {
globMtach := pb.GlobMatch{Path: "foo.bar", IsLeaf: true}
var matches []pb.GlobMatch
Expand Down
1 change: 1 addition & 0 deletions cmd/carbonapi/interfaces/zipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ type CarbonZipper interface {
Render(ctx context.Context, request pb.MultiFetchRequest) ([]*types.MetricData, *zipperTypes.Stats, merry.Error)
TagNames(ctx context.Context, query string, limit int64) ([]string, merry.Error)
TagValues(ctx context.Context, query string, limit int64) ([]string, merry.Error)
ScaleToCommonStep() bool
}
7 changes: 4 additions & 3 deletions cmd/carbonapi/zipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@ func (z zipper) Render(ctx context.Context, request pb.MultiFetchRequest) ([]*ty
Tags: tags,
})
}
if z.z.ScaleToCommonStep {
result = helper.ScaleToCommonStep(result)
}
}

sort.Sort(helper.ByNameNatural(result))
Expand Down Expand Up @@ -128,3 +125,7 @@ func (z zipper) TagNames(ctx context.Context, query string, limit int64) ([]stri
func (z zipper) TagValues(ctx context.Context, query string, limit int64) ([]string, merry.Error) {
return z.z.TagValues(ctx, query, limit)
}

func (z zipper) ScaleToCommonStep() bool {
return z.z.ScaleToCommonStep
}
14 changes: 13 additions & 1 deletion expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func (eval evaluator) FetchAndEvalExp(ctx context.Context, exp parser.Expr, from
multiFetchRequest := pb.MultiFetchRequest{}
metricRequestCache := make(map[string]parser.MetricRequest)
maxDataPoints := utilctx.GetMaxDatapoints(ctx)
// values related to this particular `target=`
targetValues := make(map[parser.MetricRequest][]*types.MetricData)

for _, m := range exp.Metrics() {
fetchRequest := pb.FetchRequest{
Expand All @@ -49,10 +51,12 @@ func (eval evaluator) FetchAndEvalExp(ctx context.Context, exp parser.Expr, from

// avoid multiple requests in a http request, E.g render?target=a.b&target=a.b
if _, ok := values[metricRequest]; ok {
targetValues[metricRequest] = nil
continue
}

metricRequestCache[m.Metric] = metricRequest
targetValues[metricRequest] = nil
multiFetchRequest.Metrics = append(multiFetchRequest.Metrics, fetchRequest)
}

Expand All @@ -76,7 +80,15 @@ func (eval evaluator) FetchAndEvalExp(ctx context.Context, exp parser.Expr, from
}
}

return eval.Eval(ctx, exp, from, until, values)
for m := range targetValues {
targetValues[m] = values[m]
}

if config.Config.ZipperInstance.ScaleToCommonStep() {
targetValues = helper.ScaleValuesToCommonStep(targetValues)
}

return eval.Eval(ctx, exp, from, until, targetValues)
}

// Eval evalualtes expressions
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/seriesList/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (f *seriesList) Do(ctx context.Context, e parser.Expr, from, until int64, v
case "diffSeriesLists":
compute = func(l, r float64) float64 { return l - r }
case "powSeriesLists":
compute = func(l, r float64) float64 { return math.Pow(l, r) }
compute = math.Pow
}

if useConstant {
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/weightedAverage/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func New(configFile string) []interfaces.FunctionMetadata {

// weightedAverage(seriesListAvg, seriesListWeight, *nodes)
func (f *weightedAverage) Do(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
aggKeyPairs := make(map[string]map[string]*types.MetricData, 0)
aggKeyPairs := make(map[string]map[string]*types.MetricData)
var productList []*types.MetricData

avgs, err := helper.GetSeriesArg(e.Args()[0], from, until, values)
Expand Down
66 changes: 56 additions & 10 deletions expr/helper/align.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
)

// GCD returns greatest common divisor calculated via Euclidean algorithm
Expand Down Expand Up @@ -33,25 +34,49 @@ func LCM(args ...int64) int64 {
return lcm
}

// GetCommonStep returns LCM(steps) for slice of metrics.
// minStart and maxStop will be set to closest lower or equal multiple of LCM(steps).
func GetCommonStep(args []*types.MetricData) int64 {
steps := make([]int64, 0, len(args))
// GetCommonStep returns LCM(steps), changed (bool) for slice of metrics.
// If all metrics have the same step, changed == false.
func GetCommonStep(args []*types.MetricData) (commonStep int64, changed bool) {
steps := make([]int64, 0, 1)
stepsIndex := make(map[int64]struct{})
for _, arg := range args {
steps = append(steps, arg.StepTime)
if _, ok := stepsIndex[arg.StepTime]; !ok {
stepsIndex[arg.StepTime] = struct{}{}
steps = append(steps, arg.StepTime)
}
}
if len(steps) == 1 {
return steps[0], false
}
commonStep := LCM(steps...)
return commonStep
commonStep = LCM(steps...)
return commonStep, true
}

// ScaleToCommonStep returns the metrics, aligned LCM of all metrics steps.
// If commonStep == 0, then it will be calculated automatically
// It respects xFilesFactor and fills gaps in the begin and end with NaNs if needed.
func ScaleToCommonStep(args []*types.MetricData) []*types.MetricData {
commonStep := GetCommonStep(args)
for _, arg := range args {
func ScaleToCommonStep(args []*types.MetricData, commonStep int64) []*types.MetricData {
if commonStep < 0 {
// This doesn't make sence
return args
}

// If it's invoked with commonStep other than 0, changes are applied by default
changed := true
if commonStep == 0 {
commonStep, changed = GetCommonStep(args)
}

if !changed {
return args
}

for a, arg := range args {
if arg.StepTime == commonStep {
continue
}
arg = arg.Copy(true)
args[a] = arg
stepFactor := commonStep / arg.StepTime
newStart := arg.StartTime - (arg.StartTime % commonStep)
if (arg.StartTime % commonStep) != 0 {
Expand Down Expand Up @@ -98,6 +123,27 @@ func aggregateBatch(vals []float64, arg *types.MetricData) float64 {
return arg.GetAggregateFunction()(vals)
}

// ScaleValuesToCommonStep returns map[parser.MetricRequest][]*types.MetricData. If any element of []*types.MetricData is changed, it doesn't change original
// metric, but creates the new one to avoid cache spoiling.
func ScaleValuesToCommonStep(values map[parser.MetricRequest][]*types.MetricData) map[parser.MetricRequest][]*types.MetricData {
// Calculate global commonStep
var args []*types.MetricData
for _, metrics := range values {
args = append(args, metrics...)
}

commonStep, changed := GetCommonStep(args)
if !changed {
return values
}

for m, metrics := range values {
values[m] = ScaleToCommonStep(metrics, commonStep)
}

return values
}

// GetBuckets returns amount buckets for timeSeries (defined with startTime, stopTime and step (bucket) size.
func GetBuckets(start, stop, bucketSize int64) int64 {
return int64(math.Ceil(float64(stop-start) / float64(bucketSize)))
Expand Down
36 changes: 31 additions & 5 deletions expr/helper/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func TestGetCommonStep(t *testing.T) {
tests := []struct {
metrics []*types.MetricData
commonStep int64
changed bool
}{
// Different steps and start/stop time
{
Expand All @@ -128,6 +129,7 @@ func TestGetCommonStep(t *testing.T) {
types.MakeMetricData("metric2", make([]float64, 25), 3, 6), // 6..81
},
30,
true,
},
// Same set of points
{
Expand All @@ -137,6 +139,7 @@ func TestGetCommonStep(t *testing.T) {
types.MakeMetricData("metric3", make([]float64, 15), 5, 5), // 5..80
},
5,
false,
},
// Same step, different lengths
{
Expand All @@ -146,24 +149,29 @@ func TestGetCommonStep(t *testing.T) {
types.MakeMetricData("metric3", make([]float64, 4), 5, 35), // 35..55
},
5,
false,
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("Set %v", i), func(t *testing.T) {
com := GetCommonStep(tt.metrics)
com, changed := GetCommonStep(tt.metrics)
if com != tt.commonStep {
t.Errorf("Result of GetCommonStep: %v; expected is %v", com, tt.commonStep)
}
if changed != tt.changed {
t.Errorf("GetCommonStep changed: %v; expected is %v", changed, tt.changed)
}
})
}
}

func TestScaleToCommonStep(t *testing.T) {
NaN := math.NaN()
tests := []struct {
name string
metrics []*types.MetricData
expected []*types.MetricData
name string
metrics []*types.MetricData
commonStep int64
expected []*types.MetricData
}{
{
"Normal metrics",
Expand All @@ -172,6 +180,7 @@ func TestScaleToCommonStep(t *testing.T) {
types.MakeMetricData("metric2", []float64{1, 2, 3, 4, 5}, 2, 4), // 4..14
types.MakeMetricData("metric3", []float64{1, 2, 3, 4, 5, 6}, 3, 3), // 3..21
},
0,
[]*types.MetricData{
types.MakeMetricData("metric1", []float64{2, 10, 17}, 6, 0), // 0..18
types.MakeMetricData("metric2", []float64{1, 3, 5}, 6, 0), // 0..18
Expand All @@ -186,13 +195,30 @@ func TestScaleToCommonStep(t *testing.T) {
types.MakeMetricData("metric3", []float64{1, 2, 3, 4, 5, 6}, 3, 3), // 3..21
types.MakeMetricData("metric6", []float64{1, 2, 3, 4, 5}, 6, 0), // 0..30
},
0,
[]*types.MetricData{
types.MakeMetricData("metric1", []float64{NaN, 72}, 6, 0), // 0..12
types.MakeMetricData("metric2", []float64{NaN, 2, NaN}, 6, 0), // 0..18
types.MakeMetricData("metric3", []float64{NaN, 3, 5, NaN}, 6, 0), // 0..24
types.MakeMetricData("metric6", []float64{1, 2, 3, 4, 5}, 6, 0), // 0..30, unchanged
},
},
{
"Custom common step",
[]*types.MetricData{
types.MakeMetricData("metric1", []float64{NaN, 3, 5, 7, 9, 11, 13, 15, 17}, 1, 3), // 3..12
types.MakeMetricData("metric2", []float64{1, 2, 3, 4, 5}, 2, 4), // 4..14
types.MakeMetricData("metric3", []float64{1, 2, 3, 4, 5, 6}, 3, 3), // 3..21
types.MakeMetricData("metric6", []float64{1, 2, 3, 4, 5}, 6, 0), // 0..30
},
12,
[]*types.MetricData{
types.MakeMetricData("metric1", []float64{10}, 12, 0), // 0..12
types.MakeMetricData("metric2", []float64{2.5, 5}, 12, 0), // 0..18
types.MakeMetricData("metric3", []float64{2, 5}, 12, 0), // 0..24
types.MakeMetricData("metric6", []float64{1.5, 3.5, 5}, 12, 0), // 0..30, unchanged
},
},
}
custom := tests[1].metrics
custom[0].ConsolidationFunc = "sum"
Expand All @@ -203,7 +229,7 @@ func TestScaleToCommonStep(t *testing.T) {
custom[2].XFilesFactor = 0.51
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := ScaleToCommonStep(tt.metrics)
result := ScaleToCommonStep(tt.metrics, tt.commonStep)
if len(result) != len(tt.expected) {
t.Errorf("Result has different length %v than expected %v", len(result), len(tt.expected))
}
Expand Down
53 changes: 46 additions & 7 deletions expr/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,44 @@ func (r *MetricData) AggregateValues() {
r.aggregatedValues = aggV
}

// Copy returns the copy of r. If includeValues set to true, it copies values as well.
func (r *MetricData) Copy(includeValues bool) *MetricData {
var values, aggregatedValues []float64
values = make([]float64, 0)
aggregatedValues = nil

if includeValues {
values = make([]float64, len(r.Values))
copy(values, r.Values)
if r.aggregatedValues != nil {
aggregatedValues = make([]float64, len(r.aggregatedValues))
copy(aggregatedValues, r.aggregatedValues)
}
}

return &MetricData{
FetchResponse: pb.FetchResponse{
Name: r.Name,
PathExpression: r.PathExpression,
ConsolidationFunc: r.ConsolidationFunc,
StartTime: r.StartTime,
StopTime: r.StopTime,
StepTime: r.StepTime,
XFilesFactor: r.XFilesFactor,
HighPrecisionTimestamps: r.HighPrecisionTimestamps,
Values: values,
AppliedFunctions: r.AppliedFunctions,
RequestStartTime: r.RequestStartTime,
RequestStopTime: r.RequestStopTime,
},
GraphOptions: r.GraphOptions,
ValuesPerPoint: r.ValuesPerPoint,
aggregatedValues: aggregatedValues,
Tags: r.Tags,
AggregateFunction: r.AggregateFunction,
}
}

// MakeMetricData creates new metrics data with given metric timeseries
func MakeMetricData(name string, values []float64, step, start int64) *MetricData {
return makeMetricDataWithTags(name, values, step, start, tags.ExtractTags(name))
Expand All @@ -354,13 +392,14 @@ func MakeMetricData(name string, values []float64, step, start int64) *MetricDat
func makeMetricDataWithTags(name string, values []float64, step, start int64, tags map[string]string) *MetricData {
stop := start + int64(len(values))*step

return &MetricData{FetchResponse: pb.FetchResponse{
Name: name,
Values: values,
StartTime: start,
StepTime: step,
StopTime: stop,
},
return &MetricData{
FetchResponse: pb.FetchResponse{
Name: name,
Values: values,
StartTime: start,
StepTime: step,
StopTime: stop,
},
Tags: tags,
}
}