-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
partitioner.go
83 lines (75 loc) · 2.98 KB
/
partitioner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package colexecwindow
import (
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecbase"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
)
// NewWindowSortingPartitioner creates a new colexecop.Operator that orders
// input first based on the partitionIdxs columns and second on ordCols (i.e. it
// handles both PARTITION BY and ORDER BY clauses of a window function) and puts
// true in partitionColIdx'th column (which is appended if needed) for every
// tuple that is the first within its partition.
func NewWindowSortingPartitioner(
allocator *colmem.Allocator,
input colexecop.Operator,
inputTyps []*types.T,
partitionIdxs []uint32,
ordCols []execinfrapb.Ordering_Column,
partitionColIdx int,
createDiskBackedSorter func(input colexecop.Operator, inputTypes []*types.T, orderingCols []execinfrapb.Ordering_Column) colexecop.Operator,
) colexecop.Operator {
partitionAndOrderingCols := make([]execinfrapb.Ordering_Column, len(partitionIdxs)+len(ordCols))
for i, idx := range partitionIdxs {
partitionAndOrderingCols[i] = execinfrapb.Ordering_Column{ColIdx: idx}
}
copy(partitionAndOrderingCols[len(partitionIdxs):], ordCols)
input = createDiskBackedSorter(input, inputTyps, partitionAndOrderingCols)
var distinctCol []bool
input, distinctCol = colexecbase.OrderedDistinctColsToOperators(input, partitionIdxs, inputTyps, false /* nullsAreDistinct */)
input = colexecutils.NewVectorTypeEnforcer(allocator, input, types.Bool, partitionColIdx)
return &windowSortingPartitioner{
OneInputHelper: colexecop.MakeOneInputHelper(input),
allocator: allocator,
distinctCol: distinctCol,
partitionColIdx: partitionColIdx,
}
}
type windowSortingPartitioner struct {
colexecop.OneInputHelper
allocator *colmem.Allocator
// distinctCol is the output column of the chain of ordered distinct
// operators in which true will indicate that a new partition begins with the
// corresponding tuple.
distinctCol []bool
partitionColIdx int
}
func (p *windowSortingPartitioner) Next() coldata.Batch {
b := p.Input.Next()
if b.Length() == 0 {
return coldata.ZeroBatch
}
partitionVec := b.ColVec(p.partitionColIdx)
partitionCol := partitionVec.Bool()
sel := b.Selection()
if sel != nil {
for i := 0; i < b.Length(); i++ {
partitionCol[sel[i]] = p.distinctCol[sel[i]]
}
} else {
copy(partitionCol, p.distinctCol[:b.Length()])
}
return b
}