Skip to content

Commit

Permalink
Merge #50721
Browse files Browse the repository at this point in the history
50721: colexec: add support for DISTINCT and FILTER hash aggregation r=yuzefovich a=yuzefovich

This commit adds the support of DISTINCT and FILTERing hash aggregation.
The approach is as follows:
- to handle FILTER we run a selection operator on the input state
- to handle DISTINCT we encode aggregation columns, one tuple at a time,
and update the selection vector to include tuples we haven't yet seen
- then we run the aggregation on the remaining selected tuples
- and then restore the state with the original length and selection
vector.

Such handling of FILTER clause sounds good to me, but the handling of
DISTINCT is somewhat unfortunate: we perform encoding one tuple at
a time.
Other approaches have been prototyped but showed worse performance:
- using the vectorized hash table - the benefit of such approach is that we
don't reduce ourselves to one tuple at a time (because we would be hashing
the full columns at once), but the big disadvantage is that the full tuples
are stored in the hash table (instead of an encoded representation)
- using a single global map for a particular aggregate function that is
shared among all aggregation groups - the benefit of such approach is that
we only have a handful of map, but it turned out that such global map grows
a lot bigger and has worse performance.

Addresses: #39241.
Addresses: #39242.

Release note (sql change): Vectorized execution engine now natively
supports DISTINCT and FILTERing hash aggregation.

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Aug 20, 2020
2 parents 545c4fe + 08b282a commit 6b147c7
Show file tree
Hide file tree
Showing 9 changed files with 936 additions and 190 deletions.
6 changes: 3 additions & 3 deletions pkg/sql/colexec/aggregate_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ func newAggregateFuncsAlloc(
}

// sizeOfAggregateFunc is the size of some aggregateFunc implementation.
// countAgg was chosen arbitrarily, but it's important that we use a pointer to
// the aggregate function struct.
// countHashAgg was chosen arbitrarily, but it's important that we use a
// pointer to the aggregate function struct.
const sizeOfAggregateFunc = int64(unsafe.Sizeof(&countHashAgg{}))

func (a *aggregateFuncsAlloc) makeAggregateFuncs() []aggregateFunc {
Expand All @@ -319,7 +319,7 @@ func (a *aggregateFuncsAlloc) makeAggregateFuncs() []aggregateFunc {
// of 'allocSize x number of funcs in schema' length. Every
// aggFuncAlloc will allocate allocSize of objects on the newAggFunc
// call below.
a.allocator.AdjustMemoryUsage(sizeOfAggregateFunc * a.allocSize)
a.allocator.AdjustMemoryUsage(sizeOfAggregateFunc * int64(len(a.aggFuncAllocs)) * a.allocSize)
a.returnFuncs = make([]aggregateFunc, len(a.aggFuncAllocs)*int(a.allocSize))
}
funcs := a.returnFuncs[:len(a.aggFuncAllocs)]
Expand Down
235 changes: 232 additions & 3 deletions pkg/sql/colexec/aggregators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -51,6 +52,8 @@ type aggregatorTestCase struct {
constArguments [][]execinfrapb.Expression
// spec will be populated during init().
spec *execinfrapb.AggregatorSpec
aggDistinct []bool
aggFilter []int
input tuples
unorderedInput bool
expected tuples
Expand Down Expand Up @@ -95,7 +98,7 @@ var aggTypes = []aggType{
outputTypes []*types.T,
_ bool,
) (colexecbase.Operator, error) {
return NewHashAggregator(allocator, input, inputTypes, spec, evalCtx, constructors, constArguments, outputTypes)
return NewHashAggregator(allocator, testMemAcc, input, inputTypes, spec, evalCtx, constructors, constArguments, outputTypes)
},
name: "hash",
},
Expand Down Expand Up @@ -154,6 +157,13 @@ func (tc *aggregatorTestCase) init() error {
if tc.constArguments != nil {
aggregations[i].Arguments = tc.constArguments[i]
}
if tc.aggDistinct != nil {
aggregations[i].Distinct = tc.aggDistinct[i]
}
if tc.aggFilter != nil && tc.aggFilter[i] != tree.NoColumnIdx {
filterColIdx := uint32(tc.aggFilter[i])
aggregations[i].FilterColIdx = &filterColIdx
}
}
tc.spec = &execinfrapb.AggregatorSpec{
GroupCols: tc.groupCols,
Expand Down Expand Up @@ -679,12 +689,133 @@ func TestAggregatorAllFunctions(t *testing.T) {
},
convToDecimal: true,
},

// Test DISTINCT aggregation.
{
aggFns: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_ANY_NOT_NULL,
execinfrapb.AggregatorSpec_COUNT,
execinfrapb.AggregatorSpec_COUNT,
execinfrapb.AggregatorSpec_SUM_INT,
execinfrapb.AggregatorSpec_SUM_INT,
},
aggCols: [][]uint32{{0}, {1}, {1}, {1}, {1}},
aggDistinct: []bool{false, false, true, false, true},
typs: []*types.T{types.Int, types.Int},
input: tuples{
{0, 1},
{0, 2},
{0, 2},
{0, nil},
{0, 1},
{0, nil},
{1, 1},
{1, 2},
{1, 2},
},
expected: tuples{
{0, 4, 2, 6, 3},
{1, 3, 2, 5, 3},
},
},

// Test aggregation with FILTERs.
{
aggFns: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_ANY_NOT_NULL,
execinfrapb.AggregatorSpec_COUNT_ROWS,
execinfrapb.AggregatorSpec_SUM_INT,
},
aggCols: [][]uint32{{0}, {}, {1}},
aggFilter: []int{tree.NoColumnIdx, 2, 2},
typs: []*types.T{types.Int, types.Int, types.Bool},
input: tuples{
{0, 1, false},
{0, 2, true},
{0, 2, true},
{0, nil, nil},
{0, 1, nil},
{0, nil, true},
{1, 1, true},
{1, 2, nil},
{1, 2, true},
},
expected: tuples{
{0, 3, 4},
{1, 2, 3},
},
},

// Test aggregation with FILTERs when the whole groups are filtered out.
{
aggFns: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_ANY_NOT_NULL,
execinfrapb.AggregatorSpec_COUNT_ROWS,
execinfrapb.AggregatorSpec_SUM_INT,
},
aggCols: [][]uint32{{0}, {}, {1}},
aggFilter: []int{tree.NoColumnIdx, 2, 2},
typs: []*types.T{types.Int, types.Int, types.Bool},
input: tuples{
{0, 1, false},
{0, nil, nil},
{0, 2, false},
{1, 1, true},
{1, 2, nil},
{1, 2, true},
{2, 1, false},
{2, nil, nil},
{2, 2, nil},
},
expected: tuples{
{0, 0, nil},
{1, 2, 3},
{2, 0, nil},
},
},

// Test aggregation with FILTERs and DISTINCTs intertwined.
{
aggFns: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_ANY_NOT_NULL,
execinfrapb.AggregatorSpec_COUNT,
execinfrapb.AggregatorSpec_COUNT,
execinfrapb.AggregatorSpec_COUNT,
execinfrapb.AggregatorSpec_SUM_INT,
execinfrapb.AggregatorSpec_SUM_INT,
execinfrapb.AggregatorSpec_SUM_INT,
},
aggCols: [][]uint32{{0}, {1}, {1}, {1}, {1}, {1}, {1}},
aggDistinct: []bool{false, false, true, true, false, true, true},
aggFilter: []int{tree.NoColumnIdx, 2, tree.NoColumnIdx, 2, 2, tree.NoColumnIdx, 2},
typs: []*types.T{types.Int, types.Int, types.Bool},
input: tuples{
{0, 1, false},
{0, 2, true},
{0, 2, true},
{0, nil, nil},
{0, 1, nil},
{0, nil, true},
{1, 1, true},
{1, 2, nil},
{1, 2, true},
},
expected: tuples{
{0, 2, 2, 1, 4, 3, 2},
{1, 2, 2, 2, 3, 3, 3},
},
},
}

evalCtx := tree.MakeTestingEvalContext(cluster.MakeTestingClusterSettings())
defer evalCtx.Stop(context.Background())
for _, agg := range aggTypes {
for i, tc := range testCases {
if agg.name != "hash" && (tc.aggDistinct != nil || tc.aggFilter != nil) {
// Distinct or filtering aggregation is only supported with
// hash aggregator.
continue
}
log.Infof(context.Background(), "%s/%d", agg.name, i)
if err := tc.init(); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -993,6 +1124,104 @@ func BenchmarkAllOptimizedAggregateFunctions(b *testing.B) {
}
}

func BenchmarkDistinctAggregation(b *testing.B) {
rng, _ := randutil.NewPseudoRand()
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
flowCtx := &execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Settings: st,
},
}

typs := []*types.T{types.Int, types.Int}
aggFn := execinfrapb.AggregatorSpec_COUNT
aggSpec := &execinfrapb.AggregatorSpec{
Type: execinfrapb.AggregatorSpec_NON_SCALAR,
GroupCols: []uint32{0},
// TODO(yuzefovich): adjust the spec once we support distinct ordered
// aggregation.
Aggregations: []execinfrapb.AggregatorSpec_Aggregation{{
Func: aggFn,
Distinct: true,
ColIdx: []uint32{1},
}},
}
spec := &execinfrapb.ProcessorSpec{
Input: []execinfrapb.InputSyncSpec{{ColumnTypes: typs}},
Core: execinfrapb.ProcessorCoreUnion{
Aggregator: aggSpec,
},
}
args := &NewColOperatorArgs{
Spec: spec,
StreamingMemAccount: testMemAcc,
}
args.TestingKnobs.UseStreamingMemAccountForBuffering = true

for _, groupSize := range []int{1, 2, 32, 128, coldata.BatchSize() / 2, coldata.BatchSize()} {
for _, distinctProbability := range []float64{0.01, 0.1, 1.0} {
distinctModulo := int(1.0 / distinctProbability)
if (groupSize == 1 && distinctProbability != 1.0) || float64(groupSize)/float64(distinctModulo) < 0.1 {
// We have a such combination of groupSize and
// distinctProbability parameters that we will be very
// unlikely to satisfy them (for example, with groupSize=1
// and distinctProbability=0.01, every value will be
// distinct within the group), so we skip such
// configuration.
continue
}
for _, hasNulls := range []bool{false, true} {
for _, numInputBatches := range []int{64} {
// TODO(yuzefovich): refactor benchmarkAggregateFunction to
// be more configurable and reuse it here.
b.Run(fmt.Sprintf("%s/groupSize=%d/distinctProb=%.2f/nulls=%t",
aggFn, groupSize, distinctProbability, hasNulls),
func(b *testing.B) {
nTuples := numInputBatches * coldata.BatchSize()
cols := []coldata.Vec{
testAllocator.NewMemColumn(typs[0], nTuples),
testAllocator.NewMemColumn(typs[1], nTuples),
}
groups := cols[0].Int64()
vals := cols[1].Int64()
nGroups := nTuples / groupSize
for i := 0; i < nTuples; i++ {
groups[i] = int64(rng.Intn(nGroups))
vals[i] = int64(rng.Intn(distinctModulo))
if hasNulls && rng.Float64() < nullProbability {
cols[1].Nulls().SetNull(i)
}
}
source := newChunkingBatchSource(typs, cols, nTuples)
args.Inputs = []colexecbase.Operator{source}
r, err := TestNewColOperator(ctx, flowCtx, args)
if err != nil {
b.Fatal(err)
}

a := r.Op
a.Init()
b.ResetTimer()
// Only count the aggregation column.
b.SetBytes(int64(8 * nTuples))
for i := 0; i < b.N; i++ {
// Exhaust aggregator until all batches have been read.
for b := a.Next(ctx); b.Length() != 0; b = a.Next(ctx) {
}
a.(ResettableOperator).reset(ctx)
}
},
)
}
}
}
}
}

func TestHashAggregator(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -1116,8 +1345,8 @@ func TestHashAggregator(t *testing.T) {
log.Infof(context.Background(), "numOfHashBuckets=%d", numOfHashBuckets)
runTests(t, []tuples{tc.input}, tc.expected, unorderedVerifier, func(sources []colexecbase.Operator) (colexecbase.Operator, error) {
a, err := NewHashAggregator(
testAllocator, sources[0], tc.typs, tc.spec, &evalCtx,
constructors, constArguments, outputTypes,
testAllocator, testMemAcc, sources[0], tc.typs, tc.spec,
&evalCtx, constructors, constArguments, outputTypes,
)
a.(*hashAggregator).testingKnobs.numOfHashBuckets = uint64(numOfHashBuckets)
return a, err
Expand Down
51 changes: 31 additions & 20 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,23 @@ func (r *opResult) resetToState(ctx context.Context, arg colexec.NewColOperatorR
*r.NewColOperatorResult = arg
}

func needHashAggregator(aggSpec *execinfrapb.AggregatorSpec) (bool, error) {
var groupCols, orderedCols util.FastIntSet
for _, col := range aggSpec.OrderedGroupCols {
orderedCols.Add(int(col))
}
for _, col := range aggSpec.GroupCols {
if !orderedCols.Contains(int(col)) {
return true, nil
}
groupCols.Add(int(col))
}
if !orderedCols.SubsetOf(groupCols) {
return false, errors.AssertionFailedf("ordered cols must be a subset of grouping cols")
}
return false, nil
}

// isSupported checks whether we have a columnar operator equivalent to a
// processor described by spec. Note that it doesn't perform any other checks
// (like validity of the number of inputs).
Expand All @@ -153,12 +170,16 @@ func isSupported(mode sessiondata.VectorizeExecMode, spec *execinfrapb.Processor

case core.Aggregator != nil:
aggSpec := core.Aggregator
needHash, err := needHashAggregator(aggSpec)
if err != nil {
return err
}
for _, agg := range aggSpec.Aggregations {
if agg.Distinct {
return errors.Newf("distinct aggregation not supported")
if agg.Distinct && !needHash {
return errors.Newf("distinct ordered aggregation not supported")
}
if agg.FilterColIdx != nil {
return errors.Newf("filtering aggregation not supported")
if agg.FilterColIdx != nil && !needHash {
return errors.Newf("filtering ordered aggregation not supported")
}
}
return nil
Expand Down Expand Up @@ -649,21 +670,11 @@ func NewColOperator(
break
}

var groupCols, orderedCols util.FastIntSet
for _, col := range aggSpec.OrderedGroupCols {
orderedCols.Add(int(col))
}
needHash := false
for _, col := range aggSpec.GroupCols {
if !orderedCols.Contains(int(col)) {
needHash = true
}
groupCols.Add(int(col))
}
if !orderedCols.SubsetOf(groupCols) {
return r, errors.AssertionFailedf("ordered cols must be a subset of grouping cols")
var needHash bool
needHash, err = needHashAggregator(aggSpec)
if err != nil {
return r, err
}

inputTypes := make([]*types.T, len(spec.Input[0].ColumnTypes))
copy(inputTypes, spec.Input[0].ColumnTypes)
evalCtx := flowCtx.NewEvalCtx()
Expand Down Expand Up @@ -691,8 +702,8 @@ func NewColOperator(
evalCtx.SingleDatumAggMemAccount = hashAggregatorMemAccount
result.Op, err = colexec.NewHashAggregator(
colmem.NewAllocator(ctx, hashAggregatorMemAccount, factory),
inputs[0], inputTypes, aggSpec, evalCtx, constructors,
constArguments, result.ColumnTypes,
hashAggregatorMemAccount, inputs[0], inputTypes, aggSpec,
evalCtx, constructors, constArguments, result.ColumnTypes,
)
} else {
evalCtx.SingleDatumAggMemAccount = streamingMemAccount
Expand Down

0 comments on commit 6b147c7

Please sign in to comment.