Skip to content

Commit

Permalink
Merge pull request #501 from Felixoid/align_precision
Browse files Browse the repository at this point in the history
Move precision alignment from per-target zipper to per-request FetchAndEvalExp
  • Loading branch information
Civil committed Jul 22, 2020
2 parents 88f6bc6 + 948b3bc commit b8affd5
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 28 deletions.
4 changes: 4 additions & 0 deletions cmd/carbonapi/http/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (z mockCarbonZipper) TagValues(ctx context.Context, query string, limit int
return []string{}, nil
}

func (z mockCarbonZipper) ScaleToCommonStep() bool {
return true
}

func getGlobResponse() *pb.MultiGlobResponse {
globMtach := pb.GlobMatch{Path: "foo.bar", IsLeaf: true}
var matches []pb.GlobMatch
Expand Down
1 change: 1 addition & 0 deletions cmd/carbonapi/interfaces/zipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ type CarbonZipper interface {
Render(ctx context.Context, request pb.MultiFetchRequest) ([]*types.MetricData, *zipperTypes.Stats, merry.Error)
TagNames(ctx context.Context, query string, limit int64) ([]string, merry.Error)
TagValues(ctx context.Context, query string, limit int64) ([]string, merry.Error)
ScaleToCommonStep() bool
}
7 changes: 4 additions & 3 deletions cmd/carbonapi/zipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@ func (z zipper) Render(ctx context.Context, request pb.MultiFetchRequest) ([]*ty
Tags: tags,
})
}
if z.z.ScaleToCommonStep {
result = helper.ScaleToCommonStep(result)
}
}

sort.Sort(helper.ByNameNatural(result))
Expand Down Expand Up @@ -128,3 +125,7 @@ func (z zipper) TagNames(ctx context.Context, query string, limit int64) ([]stri
func (z zipper) TagValues(ctx context.Context, query string, limit int64) ([]string, merry.Error) {
return z.z.TagValues(ctx, query, limit)
}

func (z zipper) ScaleToCommonStep() bool {
return z.z.ScaleToCommonStep
}
14 changes: 13 additions & 1 deletion expr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func (eval evaluator) FetchAndEvalExp(ctx context.Context, exp parser.Expr, from
multiFetchRequest := pb.MultiFetchRequest{}
metricRequestCache := make(map[string]parser.MetricRequest)
maxDataPoints := utilctx.GetMaxDatapoints(ctx)
// values related to this particular `target=`
targetValues := make(map[parser.MetricRequest][]*types.MetricData)

for _, m := range exp.Metrics() {
fetchRequest := pb.FetchRequest{
Expand All @@ -49,10 +51,12 @@ func (eval evaluator) FetchAndEvalExp(ctx context.Context, exp parser.Expr, from

// avoid multiple requests in a http request, E.g render?target=a.b&target=a.b
if _, ok := values[metricRequest]; ok {
targetValues[metricRequest] = nil
continue
}

metricRequestCache[m.Metric] = metricRequest
targetValues[metricRequest] = nil
multiFetchRequest.Metrics = append(multiFetchRequest.Metrics, fetchRequest)
}

Expand All @@ -76,7 +80,15 @@ func (eval evaluator) FetchAndEvalExp(ctx context.Context, exp parser.Expr, from
}
}

return eval.Eval(ctx, exp, from, until, values)
for m := range targetValues {
targetValues[m] = values[m]
}

if config.Config.ZipperInstance.ScaleToCommonStep() {
targetValues = helper.ScaleValuesToCommonStep(targetValues)
}

return eval.Eval(ctx, exp, from, until, targetValues)
}

// Eval evalualtes expressions
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/seriesList/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (f *seriesList) Do(ctx context.Context, e parser.Expr, from, until int64, v
case "diffSeriesLists":
compute = func(l, r float64) float64 { return l - r }
case "powSeriesLists":
compute = func(l, r float64) float64 { return math.Pow(l, r) }
compute = math.Pow
}

if useConstant {
Expand Down
2 changes: 1 addition & 1 deletion expr/functions/weightedAverage/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func New(configFile string) []interfaces.FunctionMetadata {

// weightedAverage(seriesListAvg, seriesListWeight, *nodes)
func (f *weightedAverage) Do(ctx context.Context, e parser.Expr, from, until int64, values map[parser.MetricRequest][]*types.MetricData) ([]*types.MetricData, error) {
aggKeyPairs := make(map[string]map[string]*types.MetricData, 0)
aggKeyPairs := make(map[string]map[string]*types.MetricData)
var productList []*types.MetricData

avgs, err := helper.GetSeriesArg(e.Args()[0], from, until, values)
Expand Down
66 changes: 56 additions & 10 deletions expr/helper/align.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/go-graphite/carbonapi/expr/types"
"github.com/go-graphite/carbonapi/pkg/parser"
)

// GCD returns greatest common divisor calculated via Euclidean algorithm
Expand Down Expand Up @@ -33,25 +34,49 @@ func LCM(args ...int64) int64 {
return lcm
}

// GetCommonStep returns LCM(steps) for slice of metrics.
// minStart and maxStop will be set to closest lower or equal multiple of LCM(steps).
func GetCommonStep(args []*types.MetricData) int64 {
steps := make([]int64, 0, len(args))
// GetCommonStep returns LCM(steps), changed (bool) for slice of metrics.
// If all metrics have the same step, changed == false.
func GetCommonStep(args []*types.MetricData) (commonStep int64, changed bool) {
steps := make([]int64, 0, 1)
stepsIndex := make(map[int64]struct{})
for _, arg := range args {
steps = append(steps, arg.StepTime)
if _, ok := stepsIndex[arg.StepTime]; !ok {
stepsIndex[arg.StepTime] = struct{}{}
steps = append(steps, arg.StepTime)
}
}
if len(steps) == 1 {
return steps[0], false
}
commonStep := LCM(steps...)
return commonStep
commonStep = LCM(steps...)
return commonStep, true
}

// ScaleToCommonStep returns the metrics, aligned LCM of all metrics steps.
// If commonStep == 0, then it will be calculated automatically
// It respects xFilesFactor and fills gaps in the begin and end with NaNs if needed.
func ScaleToCommonStep(args []*types.MetricData) []*types.MetricData {
commonStep := GetCommonStep(args)
for _, arg := range args {
func ScaleToCommonStep(args []*types.MetricData, commonStep int64) []*types.MetricData {
if commonStep < 0 {
// This doesn't make sence
return args
}

// If it's invoked with commonStep other than 0, changes are applied by default
changed := true
if commonStep == 0 {
commonStep, changed = GetCommonStep(args)
}

if !changed {
return args
}

for a, arg := range args {
if arg.StepTime == commonStep {
continue
}
arg = arg.Copy(true)
args[a] = arg
stepFactor := commonStep / arg.StepTime
newStart := arg.StartTime - (arg.StartTime % commonStep)
if (arg.StartTime % commonStep) != 0 {
Expand Down Expand Up @@ -98,6 +123,27 @@ func aggregateBatch(vals []float64, arg *types.MetricData) float64 {
return arg.GetAggregateFunction()(vals)
}

// ScaleValuesToCommonStep returns map[parser.MetricRequest][]*types.MetricData. If any element of []*types.MetricData is changed, it doesn't change original
// metric, but creates the new one to avoid cache spoiling.
func ScaleValuesToCommonStep(values map[parser.MetricRequest][]*types.MetricData) map[parser.MetricRequest][]*types.MetricData {
// Calculate global commonStep
var args []*types.MetricData
for _, metrics := range values {
args = append(args, metrics...)
}

commonStep, changed := GetCommonStep(args)
if !changed {
return values
}

for m, metrics := range values {
values[m] = ScaleToCommonStep(metrics, commonStep)
}

return values
}

// GetBuckets returns amount buckets for timeSeries (defined with startTime, stopTime and step (bucket) size.
func GetBuckets(start, stop, bucketSize int64) int64 {
return int64(math.Ceil(float64(stop-start) / float64(bucketSize)))
Expand Down
36 changes: 31 additions & 5 deletions expr/helper/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func TestGetCommonStep(t *testing.T) {
tests := []struct {
metrics []*types.MetricData
commonStep int64
changed bool
}{
// Different steps and start/stop time
{
Expand All @@ -128,6 +129,7 @@ func TestGetCommonStep(t *testing.T) {
types.MakeMetricData("metric2", make([]float64, 25), 3, 6), // 6..81
},
30,
true,
},
// Same set of points
{
Expand All @@ -137,6 +139,7 @@ func TestGetCommonStep(t *testing.T) {
types.MakeMetricData("metric3", make([]float64, 15), 5, 5), // 5..80
},
5,
false,
},
// Same step, different lengths
{
Expand All @@ -146,24 +149,29 @@ func TestGetCommonStep(t *testing.T) {
types.MakeMetricData("metric3", make([]float64, 4), 5, 35), // 35..55
},
5,
false,
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("Set %v", i), func(t *testing.T) {
com := GetCommonStep(tt.metrics)
com, changed := GetCommonStep(tt.metrics)
if com != tt.commonStep {
t.Errorf("Result of GetCommonStep: %v; expected is %v", com, tt.commonStep)
}
if changed != tt.changed {
t.Errorf("GetCommonStep changed: %v; expected is %v", changed, tt.changed)
}
})
}
}

func TestScaleToCommonStep(t *testing.T) {
NaN := math.NaN()
tests := []struct {
name string
metrics []*types.MetricData
expected []*types.MetricData
name string
metrics []*types.MetricData
commonStep int64
expected []*types.MetricData
}{
{
"Normal metrics",
Expand All @@ -172,6 +180,7 @@ func TestScaleToCommonStep(t *testing.T) {
types.MakeMetricData("metric2", []float64{1, 2, 3, 4, 5}, 2, 4), // 4..14
types.MakeMetricData("metric3", []float64{1, 2, 3, 4, 5, 6}, 3, 3), // 3..21
},
0,
[]*types.MetricData{
types.MakeMetricData("metric1", []float64{2, 10, 17}, 6, 0), // 0..18
types.MakeMetricData("metric2", []float64{1, 3, 5}, 6, 0), // 0..18
Expand All @@ -186,13 +195,30 @@ func TestScaleToCommonStep(t *testing.T) {
types.MakeMetricData("metric3", []float64{1, 2, 3, 4, 5, 6}, 3, 3), // 3..21
types.MakeMetricData("metric6", []float64{1, 2, 3, 4, 5}, 6, 0), // 0..30
},
0,
[]*types.MetricData{
types.MakeMetricData("metric1", []float64{NaN, 72}, 6, 0), // 0..12
types.MakeMetricData("metric2", []float64{NaN, 2, NaN}, 6, 0), // 0..18
types.MakeMetricData("metric3", []float64{NaN, 3, 5, NaN}, 6, 0), // 0..24
types.MakeMetricData("metric6", []float64{1, 2, 3, 4, 5}, 6, 0), // 0..30, unchanged
},
},
{
"Custom common step",
[]*types.MetricData{
types.MakeMetricData("metric1", []float64{NaN, 3, 5, 7, 9, 11, 13, 15, 17}, 1, 3), // 3..12
types.MakeMetricData("metric2", []float64{1, 2, 3, 4, 5}, 2, 4), // 4..14
types.MakeMetricData("metric3", []float64{1, 2, 3, 4, 5, 6}, 3, 3), // 3..21
types.MakeMetricData("metric6", []float64{1, 2, 3, 4, 5}, 6, 0), // 0..30
},
12,
[]*types.MetricData{
types.MakeMetricData("metric1", []float64{10}, 12, 0), // 0..12
types.MakeMetricData("metric2", []float64{2.5, 5}, 12, 0), // 0..18
types.MakeMetricData("metric3", []float64{2, 5}, 12, 0), // 0..24
types.MakeMetricData("metric6", []float64{1.5, 3.5, 5}, 12, 0), // 0..30, unchanged
},
},
}
custom := tests[1].metrics
custom[0].ConsolidationFunc = "sum"
Expand All @@ -203,7 +229,7 @@ func TestScaleToCommonStep(t *testing.T) {
custom[2].XFilesFactor = 0.51
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := ScaleToCommonStep(tt.metrics)
result := ScaleToCommonStep(tt.metrics, tt.commonStep)
if len(result) != len(tt.expected) {
t.Errorf("Result has different length %v than expected %v", len(result), len(tt.expected))
}
Expand Down
53 changes: 46 additions & 7 deletions expr/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,44 @@ func (r *MetricData) AggregateValues() {
r.aggregatedValues = aggV
}

// Copy returns the copy of r. If includeValues set to true, it copies values as well.
func (r *MetricData) Copy(includeValues bool) *MetricData {
var values, aggregatedValues []float64
values = make([]float64, 0)
aggregatedValues = nil

if includeValues {
values = make([]float64, len(r.Values))
copy(values, r.Values)
if r.aggregatedValues != nil {
aggregatedValues = make([]float64, len(r.aggregatedValues))
copy(aggregatedValues, r.aggregatedValues)
}
}

return &MetricData{
FetchResponse: pb.FetchResponse{
Name: r.Name,
PathExpression: r.PathExpression,
ConsolidationFunc: r.ConsolidationFunc,
StartTime: r.StartTime,
StopTime: r.StopTime,
StepTime: r.StepTime,
XFilesFactor: r.XFilesFactor,
HighPrecisionTimestamps: r.HighPrecisionTimestamps,
Values: values,
AppliedFunctions: r.AppliedFunctions,
RequestStartTime: r.RequestStartTime,
RequestStopTime: r.RequestStopTime,
},
GraphOptions: r.GraphOptions,
ValuesPerPoint: r.ValuesPerPoint,
aggregatedValues: aggregatedValues,
Tags: r.Tags,
AggregateFunction: r.AggregateFunction,
}
}

// MakeMetricData creates new metrics data with given metric timeseries
func MakeMetricData(name string, values []float64, step, start int64) *MetricData {
return makeMetricDataWithTags(name, values, step, start, tags.ExtractTags(name))
Expand All @@ -354,13 +392,14 @@ func MakeMetricData(name string, values []float64, step, start int64) *MetricDat
func makeMetricDataWithTags(name string, values []float64, step, start int64, tags map[string]string) *MetricData {
stop := start + int64(len(values))*step

return &MetricData{FetchResponse: pb.FetchResponse{
Name: name,
Values: values,
StartTime: start,
StepTime: step,
StopTime: stop,
},
return &MetricData{
FetchResponse: pb.FetchResponse{
Name: name,
Values: values,
StartTime: start,
StepTime: step,
StopTime: stop,
},
Tags: tags,
}
}

0 comments on commit b8affd5

Please sign in to comment.