-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
colexec: implement aggregate window functions in the vectorized engine #68081
Conversation
I've split this PR into separate commits for ease of reviewing. When it comes time to merge, I'll merge the last five commits into one. |
1a6f8b3
to
5b44950
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great stuff! Thanks for breaking it down - it was very easy to review! I only have some nits and a question about closers.
Reviewed 10 of 10 files at r1, 28 of 28 files at r2, 38 of 38 files at r3, 19 of 19 files at r4, 5 of 6 files at r5, 6 of 6 files at r7, 5 of 5 files at r8.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball)
pkg/sql/colexec/colbuilder/execplan.go, line 1284 at r4 (raw file):
// we check if the returned operator is a Closer. if c, ok := result.Root.(colexecop.Closer); ok { result.ToClose = append(result.ToClose, c)
I just realized that we're probably forgetting to add buffered window operators to ToClose
, no? At least I don't immediately see where this is done. Usually we check that all expected closers are actually closed as part of the unit test.
pkg/sql/colexec/colexecagg/aggregate_funcs.go, line 160 at r3 (raw file):
func (h *unorderedAggregateFuncBase) CurrentOutputIndex() int { colexecerror.InternalError(errors.AssertionFailedf("CurrentOutputIndex called with hash aggregation"))
nit: s/hash aggregation/unordered aggregation/
.
pkg/sql/colexec/colexecwindow/buffered_window.go, line 304 at r1 (raw file):
// The spilling queue sets 'maxSetLength' to the length of the batch for // bytes-like types, so we have to reset it so that `Set` can be used. switch typeconv.TypeFamilyToCanonicalTypeFamily(b.outputColTyp.Family()) {
nit: we could store the canonical type family for the output vector. We also have CanonicalTypeFamily
method on coldata.Vec
.
pkg/sql/colexec/colexecwindow/window_aggregator.go, line 39 at r7 (raw file):
) colexecop.Operator { // Because the buffer is used multiple times per-row, it is important to // prevent it from spilling to disk of possible. For this reason, we give the
nit: s/of possible/if possible/
.
pkg/sql/colexec/colexecwindow/window_functions_test.go, line 1299 at r8 (raw file):
} for aggFnIdx := 0; aggFnIdx < len(execinfrapb.AggregatorSpec_Func_name); aggFnIdx++ {
nit: I think we need to +1 here because of this
cockroach/pkg/sql/distsql/columnar_operators_test.go
Lines 123 to 124 in b32885b
// We need +1 because an entry for index=6 was omitted by mistake. | |
numSupportedAggFns := len(execinfrapb.AggregatorSpec_Func_name) + 1 |
pkg/sql/colexec/execgen/cmd/execgen/agg_gen_util.go, line 45 at r3 (raw file):
) } if genWindowVariant {
super nit: to me it seems that a slightly cleaner approach would be to define aggKinds
local variable over which the loop above iterates and append to aggKinds
the windowAggKind
if getWindowVariant
is true
.
pkg/sql/distsql/columnar_operators_test.go, line 1083 at r8 (raw file):
const manyRowsProbability = 0.05 const fewRows = 10 var manyRows = 2 * coldata.BatchSize()
nit: maybe randomize this a bit by adding +rng.Intn(coldata.BatchSize())
.
pkg/sql/distsql/columnar_operators_test.go, line 1139 at r8 (raw file):
if orderNonPartitionCols { // The outputs of this window function is not deterministic if there
nit: s/outputs/output/
.
5b44950
to
c2fd97c
Compare
Due to a limitation of the flat buffer implementation, `Set` cannot be called on a `Bytes` columns at an index less than `maxSetLength`. In such a case, `Set` will panic, rather than invalidating data beyond the `Set` index. This behavior is intentional, to avoid the subtle bugs that could result from misusing a version of `Set` that invalidated following data, and the performance hit that would result from exactly mimicking slice indexing behavior. However, in some cases we really don't care about the data stored beyond the `Set` index. For example, `SpillingQueue` copies input batches into its own batches, which sets `maxSetLength` to the length of the batch for `Bytes` columns. This will lead to panics if `Set` is called at any index less than the length of the batch, which is problematic if a batch needs to be partially written to, then stored in the queue, and then finished. This patch adds a method `Truncate` to `Bytes` columns that sets `maxSetLength` to the desired value, invalidating any data beyond the new length. This allows for `Set` to be used even in cases where `maxSetLength` is changed. Release note: None
This commit modifies the `Compute` function of the aggregate function interface to take a `startIdx` argument, which indicates the row starting from which the aggregation should be computed. This will make it easier for window functions to reuse the existing aggregate function logic. Release note: None
This commit adds variants of the aggregate functions supported by the vectorized engine that can be executed in a window context. Release note: None
This commit pulls common fields that are used to construct window function operators into a struct that is passed as an argument. This simplifies the logic in `NewColOperator` that constructs window functions. Release note: None
This patch allows aggregate functions to be executed in a window context - specifically, the aggregate functions that already have an optimized implementation in the vectorized engine. This is accomplished by reusing the existing aggregate function templates to generate `Window` variants that can be used by `windowAggregator` operators to aggregate over window frames. The window frames are calculated by `windowFramer` operators that return a set of intervals `[startIdx, endIdx)` over which the aggregate function should be evaluated for each row. Note that this implementation uses the potentially quadratic approach (depending on window frame) of aggregating over the entire window frame for every row. For this reason, this commit may constitute a regression in performance for some queries. A future PR will implement the sliding-window optimization, which should make the vectorized implementation strictly better than the row-engine implementation. Release note (sql change): common aggregate functions can now be executed in the vectorized engine. This allows for better memory accounting and faster execution in some cases.
c2fd97c
to
7bfe479
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @yuzefovich)
pkg/sql/colexec/colbuilder/execplan.go, line 1284 at r4 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
I just realized that we're probably forgetting to add buffered window operators to
ToClose
, no? At least I don't immediately see where this is done. Usually we check that all expected closers are actually closed as part of the unit test.
Huh, that's a good catch. I'll just move that block down so that we always execute it.
pkg/sql/colexec/colexecagg/aggregate_funcs.go, line 160 at r3 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit:
s/hash aggregation/unordered aggregation/
.
Done.
pkg/sql/colexec/colexecwindow/buffered_window.go, line 304 at r1 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: we could store the canonical type family for the output vector. We also have
CanonicalTypeFamily
method oncoldata.Vec
.
Done.
pkg/sql/colexec/colexecwindow/window_aggregator.go, line 39 at r7 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit:
s/of possible/if possible/
.
Done.
pkg/sql/colexec/colexecwindow/window_functions_test.go, line 1299 at r8 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: I think we need to +1 here because of this
.cockroach/pkg/sql/distsql/columnar_operators_test.go
Lines 123 to 124 in b32885b
// We need +1 because an entry for index=6 was omitted by mistake. numSupportedAggFns := len(execinfrapb.AggregatorSpec_Func_name) + 1
Done.
pkg/sql/colexec/execgen/cmd/execgen/agg_gen_util.go, line 45 at r3 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
super nit: to me it seems that a slightly cleaner approach would be to define
aggKinds
local variable over which the loop above iterates and append toaggKinds
thewindowAggKind
ifgetWindowVariant
istrue
.
Done.
pkg/sql/distsql/columnar_operators_test.go, line 1083 at r8 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: maybe randomize this a bit by adding
+rng.Intn(coldata.BatchSize())
.
Done.
pkg/sql/distsql/columnar_operators_test.go, line 1139 at r8 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit:
s/outputs/output/
.
Done.
7bfe479
to
9a9f1a2
Compare
This commit adds the window aggregate functions that are supported by the vectorized engine to the window function tests and benchmark. Release note: None
9a9f1a2
to
62af7c5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 10 of 28 files at r11, 38 of 38 files at r12, 16 of 19 files at r13, 6 of 7 files at r14, 5 of 5 files at r15.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @DrewKimball)
TFTR! |
Bazel failure looks unrelated to this PR, so I'm borsing now. |
bors r+ |
Build succeeded: |
coldata: add Truncate method to Bytes
Due to a limitation of the flat buffer implementation,
Set
cannotbe called on a
Bytes
columns at an index less thanmaxSetLength
.In such a case,
Set
will panic, rather than invalidating data beyondthe
Set
index. This behavior is intentional, to avoid the subtle bugsthat could result from misusing a version of
Set
that invalidatedfollowing data, and the performance hit that would result from exactly
mimicking slice indexing behavior.
However, in some cases we really don't care about the data stored beyond
the
Set
index. For example,SpillingQueue
copies input batches intoits own batches, which sets
maxSetLength
to the length of the batch forBytes
columns. This will lead to panics ifSet
is called at any indexless than the length of the batch, which is problematic if a batch needs
to be partially written to, then stored in the queue, and then finished.
This patch adds a method
Truncate
toBytes
columns that setsmaxSetLength
to the desired value, invalidating any data beyond the new length. This
allows for
Set
to be used even in cases wheremaxSetLength
is changed.Release note: None
colexecagg: add window variants of optimized aggregate functions
This commit adds variants of the aggregate functions supported by the
vectorized engine that can be executed in a window context.
Release note: None
colexecagg: change aggregate function interface
This commit modifies the
Compute
function of the aggregate functioninterface to take a
startIdx
argument, which indicates the row startingfrom which the aggregation should be computed. This will make it easier
for window functions to reuse the existing aggregate function logic.
Release note: None
colexec: pull window function constructor fields into struct
This commit pulls common fields that are used to construct window
function operators into a struct that is passed as an argument.
This simplifies the logic in
NewColOperator
that constructswindow functions.
Release note: None
colexec: implement window aggregate functions
This patch allows aggregate functions to be executed in a window
context - specifically, the aggregate functions that already have an
optimized implementation in the vectorized engine. This is accomplished
by reusing the existing aggregate function templates to generate
Window
variants that can be used by
windowAggregator
operators to aggregateover window frames. The window frames are calculated by
windowFramer
operators that return a set of intervals
[startIdx, endIdx)
over whichthe aggregate function should be evaluated for each row.
Note that this implementation uses the potentially quadratic approach
(depending on window frame) of aggregating over the entire window frame
for every row. For this reason, this commit may constitute a regression in
performance for some queries. A future PR will implement the sliding-window
optimization, which should make the vectorized implementation strictly better
than the row-engine implementation.
Release note (sql change): common aggregate functions can now be executed
in the vectorized engine. This allows for better memory accounting and
faster execution in some cases.
colexecwindow: add tests for window aggregate functions
This commit adds the window aggregate functions that are supported
by the vectorized engine to the window function tests and benchmark.
Release note: None
Fixes #37036