-
Notifications
You must be signed in to change notification settings - Fork 67
/
groupby.go
73 lines (67 loc) · 2.03 KB
/
groupby.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package kernel
import (
"errors"
"fmt"
"github.com/brimdata/zed/compiler/ast/dag"
"github.com/brimdata/zed/order"
"github.com/brimdata/zed/pkg/field"
"github.com/brimdata/zed/runtime/expr"
"github.com/brimdata/zed/runtime/op/groupby"
"github.com/brimdata/zed/zbuf"
)
func (b *Builder) compileGroupBy(parent zbuf.Puller, summarize *dag.Summarize) (*groupby.Op, error) {
keys, err := b.compileAssignments(summarize.Keys)
if err != nil {
return nil, err
}
names, reducers, err := b.compileAggAssignments(summarize.Aggs)
if err != nil {
return nil, err
}
dir := order.Direction(summarize.InputSortDir)
return groupby.New(b.octx, parent, keys, names, reducers, summarize.Limit, dir, summarize.PartialsIn, summarize.PartialsOut)
}
func (b *Builder) compileAggAssignments(assignments []dag.Assignment) (field.List, []*expr.Aggregator, error) {
names := make(field.List, 0, len(assignments))
aggs := make([]*expr.Aggregator, 0, len(assignments))
for _, assignment := range assignments {
name, agg, err := b.compileAggAssignment(assignment)
if err != nil {
return nil, nil, err
}
aggs = append(aggs, agg)
names = append(names, name)
}
return names, aggs, nil
}
func (b *Builder) compileAggAssignment(assignment dag.Assignment) (field.Path, *expr.Aggregator, error) {
aggAST, ok := assignment.RHS.(*dag.Agg)
if !ok {
return nil, nil, errors.New("aggregator is not an aggregation expression")
}
this, ok := assignment.LHS.(*dag.This)
if !ok {
return nil, nil, fmt.Errorf("internal error: aggregator assignment LHS is not a static path: %#v", assignment.LHS)
}
m, err := b.compileAgg(aggAST)
return this.Path, m, err
}
func (b *Builder) compileAgg(agg *dag.Agg) (*expr.Aggregator, error) {
name := agg.Name
var err error
var arg expr.Evaluator
if agg.Expr != nil {
arg, err = b.compileExpr(agg.Expr)
if err != nil {
return nil, err
}
}
var where expr.Evaluator
if agg.Where != nil {
where, err = b.compileExpr(agg.Where)
if err != nil {
return nil, err
}
}
return expr.NewAggregator(name, arg, where)
}