Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add series aggregation DSL function `aggregate` #2294

Merged
Copy path View file
@@ -290,7 +290,7 @@ func (a *Results) Equal(b *Results) (bool, error) {
if a.IgnoreOtherUnjoined != b.IgnoreOtherUnjoined {
return false, fmt.Errorf("ignoreUnjoined flag does not match a: %v, b: %v", a.IgnoreOtherUnjoined, b.IgnoreOtherUnjoined)
}
if a.NaNValue != a.NaNValue {
if a.NaNValue != b.NaNValue {

This comment has been minimized.

Copy link
@kylebrandt

kylebrandt Aug 18, 2018

Member

nice find!

return false, fmt.Errorf("NaNValue does not match a: %v, b: %v", a.NaNValue, b.NaNValue)
}
sortedA := ResultSliceByGroup(a.Results)
Copy path View file
@@ -1,9 +1,11 @@
package expr

import (
"errors"
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"

@@ -44,8 +46,23 @@ func tagRemove(args []parse.Node) (parse.Tags, error) {
}

func seriesFuncTags(args []parse.Node) (parse.Tags, error) {
s := args[0].(*parse.StringNode).Text
return tagsFromString(s)
}

func aggrFuncTags(args []parse.Node) (parse.Tags, error) {
if len(args) < 3 {
return nil, errors.New("aggr: expect 3 arguments")
}
if _, ok := args[1].(*parse.StringNode); !ok {
return nil, errors.New("aggr: expect group to be string")
}
s := args[1].(*parse.StringNode).Text
return tagsFromString(s)
}

func tagsFromString(text string) (parse.Tags, error) {
t := make(parse.Tags)
text := args[0].(*parse.StringNode).Text
if text == "" {
return t, nil
}
@@ -199,6 +216,15 @@ var builtins = map[string]parse.Func{
F: Streak,
},

// Aggregation functions
"aggr": {
Args: []models.FuncType{models.TypeSeriesSet, models.TypeString, models.TypeString},
Return: models.TypeSeriesSet,
Tags: aggrFuncTags,
F: Aggr,
Check: aggrCheck,
},

// Group functions
"addtags": {
Args: []models.FuncType{models.TypeVariantSet, models.TypeString},
@@ -385,6 +411,198 @@ var builtins = map[string]parse.Func{
},
}

// Aggr combines multiple series matching the specified groups using an aggregator function. If group
// is empty, all given series are combined, regardless of existing groups.
// Available aggregator functions include: avg (average), p50 (median), min (minimum) and max (maximum).

This comment has been minimized.

Copy link
@kylebrandt

kylebrandt Aug 18, 2018

Member

update comment to reflect changes

This comment has been minimized.

Copy link
@kylebrandt

kylebrandt Aug 18, 2018

Member

would be good to have sum as well

This comment has been minimized.

Copy link
@hermanschaaf

hermanschaaf Aug 18, 2018

Author Contributor

Updated the comment and added sum. Also updated the documentation accordingly.

func Aggr(e *State, series *Results, groups string, aggregator string) (*Results, error) {
results := Results{}

grps := splitGroups(groups)
if len(grps) == 0 {
// no groups specified, so we merge all group values
res, err := aggr(e, series, aggregator)
if err != nil {
return &results, err
}
res.Group = opentsdb.TagSet{}
results.Results = append(results.Results, res)
return &results, nil
}

// at least one group specified, so we work out what
// the new group values will be
newGroups := map[string]*Results{}
for _, result := range series.Results {
var vals []string
for _, grp := range grps {
if val, ok := result.Group[grp]; ok {
vals = append(vals, val)
continue
}
return nil, fmt.Errorf("unmatched group in at least one series: %v", grp)
}
groupName := strings.Join(vals, ",")
if _, ok := newGroups[groupName]; !ok {
newGroups[groupName] = &Results{}
}
newGroups[groupName].Results = append(newGroups[groupName].Results, result)
}

for groupName, series := range newGroups {
res, err := aggr(e, series, aggregator)
if err != nil {
return &results, err
}
vs := strings.Split(groupName, ",")
res.Group = opentsdb.TagSet{}
for i := 0; i < len(grps); i++ {
res.Group.Merge(opentsdb.TagSet{grps[i]: vs[i]})
}
results.Results = append(results.Results, res)
}

return &results, nil
}

// Splits a string of groups by comma, but also trims any added whitespace
// and returns an empty slice if the string is empty.
func splitGroups(groups string) []string {
if len(groups) == 0 {
return []string{}
}
grps := strings.Split(groups, ",")
for i, grp := range grps {
grps[i] = strings.Trim(grp, " ")
}
return grps
}

func aggr(e *State, series *Results, aggfunc string) (*Result, error) {
res := Result{}
newSeries := make(Series)
var isPerc bool
var percValue float64
if len(aggfunc) > 0 && aggfunc[0] == 'p' {
var err error
percValue, err = strconv.ParseFloat(aggfunc[1:], 10)
isPerc = err == nil
}
if isPerc {
if percValue < 0 || percValue > 1 {
return nil, fmt.Errorf("expr: aggr: percentile number must be greater than or equal to zero 0 and less than or equal 1")
}
aggfunc = "percentile"
}

switch aggfunc {
case "percentile":
newSeries = aggrPercentile(series.Results, percValue)
case "avg":
newSeries = aggrAverage(series.Results)
case "min":
newSeries = aggrMin(series.Results)
case "max":
newSeries = aggrMax(series.Results)
default:
return &res, fmt.Errorf("unknown aggfunc: %v. Options are avg, p50, min, max", aggfunc)
}

res.Value = newSeries
return &res, nil
}

func aggrAverage(series ResultSlice) Series {
newSeries := make(Series)
counts := map[time.Time]int64{}
for _, result := range series {
for t, v := range result.Value.Value().(Series) {
newSeries[t] += v
counts[t] += 1

This comment has been minimized.

Copy link
@kylebrandt

kylebrandt Aug 18, 2018

Member

counts[t]++

This comment has been minimized.

Copy link
@hermanschaaf

hermanschaaf Aug 18, 2018

Author Contributor

That's it, no more Python for me 🐍

}
}
for t := range newSeries {
newSeries[t] /= float64(counts[t])
}
return newSeries
}

func aggrPercentile(series ResultSlice, percValue float64) Series {
newSeries := make(Series)
merged := map[time.Time][]float64{}
for _, result := range series {
for t, v := range result.Value.Value().(Series) {
merged[t] = append(merged[t], v)
}
}
for t := range merged {
// transform points from merged series into an imaginary
// single timeseries, so that we can use the existing
// percentile reduction function here
dps := Series{}
for i := range merged[t] {
dps[time.Unix(int64(i), 0)] = merged[t][i]
}
newSeries[t] = percentile(dps, percValue)
}
return newSeries
}

func aggrMin(series ResultSlice) Series {
newSeries := make(Series)
for _, result := range series {
for t, v := range result.Value.Value().(Series) {
if _, ok := newSeries[t]; !ok {
newSeries[t] = v
} else if v < newSeries[t] {
newSeries[t] = v
}
}
}
return newSeries
}

func aggrMax(series ResultSlice) Series {
newSeries := make(Series)
for _, result := range series {
for t, v := range result.Value.Value().(Series) {
if _, ok := newSeries[t]; !ok {
newSeries[t] = v
} else if v > newSeries[t] {
newSeries[t] = v
}
}
}
return newSeries
}

func aggrCheck(t *parse.Tree, f *parse.FuncNode) error {
if len(f.Args) < 3 {
return errors.New("aggr: expect 3 arguments")
}
if _, ok := f.Args[2].(*parse.StringNode); !ok {
return errors.New("aggr: expect string as aggregator function name")
}
name := f.Args[2].(*parse.StringNode).Text
var isPerc bool
var percValue float64
if len(name) > 0 && name[0] == 'p' {
var err error
percValue, err = strconv.ParseFloat(name[1:], 10)
isPerc = err == nil
}
if isPerc {
if percValue < 0 || percValue > 1 {
return errors.New("expr: aggr: percentile number must be greater than or equal to zero 0 and less than or equal 1")
}
return nil
}
switch name {
case "avg", "min", "max":
return nil
}
return fmt.Errorf("expr: aggr: unrecognized aggregation function %s", name)
}

func V(e *State) (*Results, error) {
return fromScalar(e.vValue), nil
}
Oops, something went wrong.
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.