Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add series aggregation DSL function `aggregate` #2294

Merged
Copy path View file
@@ -13,7 +13,7 @@ import (
"github.com/GaryBoone/GoStats/stats"
"github.com/MiniProfiler/go/miniprofiler"
"github.com/jinzhu/now"
)
)

func tagQuery(args []parse.Node) (parse.Tags, error) {
n := args[0].(*parse.StringNode)
@@ -384,6 +384,161 @@ var builtins = map[string]parse.Func{
F: V,
MapFunc: true,
},
"aggregate": {
Args: []models.FuncType{models.TypeSeriesSet, models.TypeString, models.TypeString},
Return: models.TypeSeriesSet,
Tags: tagFirst,

This comment has been minimized.

Copy link
@kylebrandt

kylebrandt Aug 17, 2018

Member

This should be a function that extracts what the resulting tag keys will be, so it will come from groups arg to Aggregate arg[1]. The reason is that if you look in expr/parse, the nodes have a Tag method which makes sure at parse time that the resulting tags of function make sense in any sort of union operations.

This comment has been minimized.

Copy link
@hermanschaaf

hermanschaaf Aug 17, 2018

Author Contributor

Ah, thanks, that makes sense now. I updated it, please have another look when you get a chance.

F: Aggregate,
},
}

// Aggregate combines multiple series matching the specified groups using an aggregator function. If group
// is empty, all given series are combined, regardless of existing groups.
// Available aggregator functions include: avg (average), p50 (median), min (minimum) and max (maximum).

This comment has been minimized.

Copy link
@kylebrandt

kylebrandt Aug 18, 2018

Member

update comment to reflect changes

This comment has been minimized.

Copy link
@kylebrandt

kylebrandt Aug 18, 2018

Member

would be good to have sum as well

This comment has been minimized.

Copy link
@hermanschaaf

hermanschaaf Aug 18, 2018

Author Contributor

Updated the comment and added sum. Also updated the documentation accordingly.

func Aggregate(e *State, T miniprofiler.Timer, series *Results, groups string, aggregator string) (*Results, error) {
results := Results{}

grps := splitGroups(groups)
if len(grps) == 0 {
// no groups specified, so we merge all group values
res, err := aggregate(e, T, series, aggregator)
if err != nil {
return &results, err
}
res.Group = opentsdb.TagSet{}
results.Results = append(results.Results, res)
} else {

This comment has been minimized.

Copy link
@kylebrandt

kylebrandt Aug 17, 2018

Member

drop the else, return return &results, nil above in the if condition, and then the rest of the code does not need to be indented. Same idea under https://github.com/golang/go/wiki/CodeReviewComments#indent-error-flow in terms of keeping minimal indentation.

This comment has been minimized.

Copy link
@hermanschaaf

hermanschaaf Aug 17, 2018

Author Contributor

Done 👍

// at least one group specified, so we work out what
// the new group values will be
newGroups := map[string]*Results{}
for _, result := range series.Results {
vals := []string{}
for _, grp := range grps {
if val, ok := result.Group[grp]; ok {
vals = append(vals, val)

This comment has been minimized.

Copy link
@kylebrandt

kylebrandt Aug 17, 2018

Member

No need for else, can `continue in the if statement, and then have the error after the if statement. Same idea of minimal indentaiton

This comment has been minimized.

Copy link
@hermanschaaf

hermanschaaf Aug 17, 2018

Author Contributor

Done 👍

} else {
return nil, fmt.Errorf("unmatched group in at least one series: %v", grp)
}
}
groupName := strings.Join(vals, ",")
if _, ok := newGroups[groupName]; !ok {
newGroups[groupName] = &Results{}
}
newGroups[groupName].Results = append(newGroups[groupName].Results, result)
}

for groupName, series := range newGroups {
res, err := aggregate(e, T, series, aggregator)
if err != nil {
return &results, err
}
vs := strings.Split(groupName, ",")
res.Group = opentsdb.TagSet{}
for i := 0; i < len(grps); i++ {
res.Group.Merge(opentsdb.TagSet{grps[i]: vs[i]})
}
results.Results = append(results.Results, res)
}
}

return &results, nil
}

// Splits a string of groups by comma, but also trims any added whitespace
// and returns an empty slice if the string is empty.
func splitGroups(groups string) []string {
if len(groups) == 0 {
return []string{}
}
grps := strings.Split(groups, ",")
for i, grp := range grps {
grps[i] = strings.Trim(grp, " ")
}
return grps
}

func aggregate(e *State, T miniprofiler.Timer, series *Results, aggregator string) (*Result, error) {
res := Result{}
newSeries := make(Series)

switch aggregator {

This comment has been minimized.

Copy link
@kylebrandt

kylebrandt Aug 17, 2018

Member

Probably useful to mimic what we do in http://bosun.org/expressions#windowquery-string-duration-string-period-string-num-scalar-funcname-string-seriesset :

In addition to supporting Bosun’s reduction functions that take on argument, percentile operations may be be done by setting funcName to p followed by number that is between 0 and 1 (inclusively). For example, "p.25" will be the 25th percentile, "p.999" will be the 99.9th percentile. "p0" and "p1" are min and max respectively (However, in these cases it is recommended to use "min" and "max" for the sake of clarity.

This comment has been minimized.

Copy link
@hermanschaaf

hermanschaaf Aug 17, 2018

Author Contributor

Nice, I wasn't aware that it's done that way in the window query. Will make it so 👍

case "avg":
newSeries = aggregateAverage(series.Results)
case "p50":
newSeries = aggregateMedian(series.Results)
case "min":
newSeries = aggregateMin(series.Results)
case "max":
newSeries = aggregateMax(series.Results)
default:
return &res, fmt.Errorf("unknown aggregator: %v. Options are avg, p50, min, max", aggregator)
}

res.Value = newSeries
return &res, nil
}

func aggregateAverage(series ResultSlice) Series {
newSeries := make(Series)
counts := map[time.Time]int64{}
for _, result := range series {
for t, v := range result.Value.Value().(Series) {
newSeries[t] += v
counts[t] += 1

This comment has been minimized.

Copy link
@kylebrandt

kylebrandt Aug 18, 2018

Member

counts[t]++

This comment has been minimized.

Copy link
@hermanschaaf

hermanschaaf Aug 18, 2018

Author Contributor

That's it, no more Python for me 🐍

}
}
for t := range newSeries {
newSeries[t] /= float64(counts[t])
}
return newSeries
}

func aggregateMedian(series ResultSlice) Series {
newSeries := make(Series)
merged := map[time.Time][]float64{}
for _, result := range series {
for t, v := range result.Value.Value().(Series) {
merged[t] = append(merged[t], v)
}
}
for t := range merged {
sort.Float64s(merged[t])
l := len(merged[t])
if l % 2 == 1 {
newSeries[t] = merged[t][l / 2]
} else {
newSeries[t] = (merged[t][l / 2] + merged[t][l / 2 + 1]) / 2
}
}
return newSeries
}

func aggregateMin(series ResultSlice) Series {
newSeries := make(Series)
for _, result := range series {
for t, v := range result.Value.Value().(Series) {
if _, ok := newSeries[t]; !ok {
newSeries[t] = v
} else if v < newSeries[t] {
newSeries[t] = v
}
}
}
return newSeries
}

func aggregateMax(series ResultSlice) Series {
newSeries := make(Series)
for _, result := range series {
for t, v := range result.Value.Value().(Series) {
if _, ok := newSeries[t]; !ok {
newSeries[t] = v
} else if v > newSeries[t] {
newSeries[t] = v
}
}
}
return newSeries
}

func V(e *State, T miniprofiler.Timer) (*Results, error) {
Copy path View file
@@ -257,3 +257,153 @@ func TestTail(t *testing.T) {
}
}
}

func TestAggregate(t *testing.T) {
seriesA := `series("foo=bar", 0, 1)`
seriesB := `series("foo=baz", 0, 3)`
seriesC := `series("foo=bat", 0, 5)`

// test median aggregator
err := testExpression(exprInOut{
fmt.Sprintf("aggregate(merge(%v, %v, %v), \"\", \"p50\")", seriesA, seriesB, seriesC),
Results{
Results: ResultSlice{
&Result{
Value: Series{
time.Unix(0, 0): 3,
},
Group: opentsdb.TagSet{},
},
},
},
false,
})
if err != nil {
t.Error(err)
}

// test average aggregator
err = testExpression(exprInOut{
fmt.Sprintf("aggregate(merge(%v, %v, %v), \"\", \"avg\")", seriesA, seriesB, seriesC),
Results{
Results: ResultSlice{
&Result{
Value: Series{
time.Unix(0, 0): 3,
},
Group: opentsdb.TagSet{},
},
},
},
false,
})
if err != nil {
t.Error(err)
}

// test min aggregator
err = testExpression(exprInOut{
fmt.Sprintf("aggregate(merge(%v, %v, %v), \"\", \"min\")", seriesA, seriesB, seriesC),
Results{
Results: ResultSlice{
&Result{
Value: Series{
time.Unix(0, 0): 1,
},
Group: opentsdb.TagSet{},
},
},
},
false,
})
if err != nil {
t.Error(err)
}

// test max aggregator
err = testExpression(exprInOut{
fmt.Sprintf("aggregate(merge(%v, %v, %v), \"\", \"max\")", seriesA, seriesB, seriesC),
Results{
Results: ResultSlice{
&Result{
Value: Series{
time.Unix(0, 0): 5,
},
Group: opentsdb.TagSet{},
},
},
},
false,
})
if err != nil {
t.Error(err)
}

// check that unknown aggregator errors out
err = testExpression(exprInOut{
fmt.Sprintf("aggregate(merge(%v, %v, %v), \"\", \"unknown\")", seriesA, seriesB, seriesC),
Results{},
false,
})
if err == nil {
t.Errorf("expected unknown aggregator to return error")
}
}

func TestAggregateWithGroups(t *testing.T) {
seriesA := `series("color=blue,type=apple,name=bob", 0, 1)`
seriesB := `series("color=blue,type=apple", 1, 3)`
seriesC := `series("color=green,type=apple", 0, 5)`

// test aggregator with single group
err := testExpression(exprInOut{
fmt.Sprintf("aggregate(merge(%v, %v, %v), \"color\", \"p50\")", seriesA, seriesB, seriesC),
Results{
Results: ResultSlice{
&Result{
Value: Series{
time.Unix(0, 0): 1,
time.Unix(1, 0): 3,
},
Group: opentsdb.TagSet{"color": "blue"},
},
&Result{
Value: Series{
time.Unix(0, 0): 5,
},
Group: opentsdb.TagSet{"color": "green"},
},
},
},
false,
})
if err != nil {
t.Error(err)
}

// test aggregator with multiple groups
err = testExpression(exprInOut{
fmt.Sprintf("aggregate(merge(%v, %v, %v), \"color,type\", \"p50\")", seriesA, seriesB, seriesC),
Results{
Results: ResultSlice{
&Result{
Value: Series{
time.Unix(0, 0): 1,
time.Unix(1, 0): 3,
},
Group: opentsdb.TagSet{"color": "blue", "type": "apple"},
},
&Result{
Value: Series{
time.Unix(0, 0): 5,
},
Group: opentsdb.TagSet{"color": "green", "type": "apple"},
},
},
},
false,
})
if err != nil {
t.Error(err)
}
}
Copy path View file
@@ -593,6 +593,24 @@ Returns the length of the longest streak of values that evaluate to true (i.e. m

Sum.

# Aggregation Functions

Aggregation functions take a seriesSet, and return a new seriesSet.

## aggregate(series seriesSet, groups string, aggregator string) seriesSet
{: .exprFunc}

Takes a seriesSet and combines it into a new seriesSet with the groups specified, using an aggregator to merge any series that share the matching tag values. If groups is empty, all series are combined into a single series, regardless of existing tags. The available aggregator functions are: avg (average), p50 (median), min (minimum) and max (maximum).

This can be particularly useful for removing anomalies when comparing timeseries over periods using the over function.

Example:

```
$weeks = over("sum:1m-avg:os.cpu{region=*,color=*}", "24h", "1w", 3)
$agg = aggregate($weeks, "region,color", "median")
```

# Group Functions

Group functions modify the OpenTSDB groups.
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.