Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exec: hash aggregation #34770

Merged
merged 2 commits into from Apr 11, 2019
Merged

exec: hash aggregation #34770

merged 2 commits into from Apr 11, 2019

Conversation

jordanlewis
Copy link
Member

This PR supersedes #32685, though the first commit is largely the same.

Add the hash aggregator. It currently reuses the hash join's hash table. The way it works is:

  1. build a hash table of its entire input, throwing away non-distinct inputs
  2. for every key, probe the hash table, populating a linked list array that keeps track of which indexes of the hash table are identical
  3. traverse the input keys again, determining a single unique key for each group of identical keys. for each unique key, traverse the linked list, marking the index of each identical key in the selection vector
  4. point the ordered aggregator at the selection vector, so it gets groups of rows with identical equality columns.

This works fine, but it's kind of slow. I think the reason it's slow is that it has to make a lot of passes over the input, and a lot of random access jumps through the large linked list array.

I'd like to get this committed for now, but it probably needs some work. An idea is to change it so the aggregation is performed on demand. This would require reworking the individual aggregator functions to be a bit more general, so they don't assume usage in the ordered aggregator paradigm.

Incidentally, this PR also fixes some bugs with the ordered aggregator benchmarks, revealing the true cost of decimals the way they're currently set up in the engine. This will need some work too. Possibly we can take a hint from @danhhz's idea in #34763 to denormalize the decimals into bytes, but I think it'll be expensive either way.

BenchmarkAggregator/ANY_NOT_NULL/hash/Int64/groupSize=1/numInputBatches=64-8                 500           2819938 ns/op         185.92 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Int64/groupSize=2/numInputBatches=64-8                 500           2809348 ns/op         186.62 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Int64/groupSize=512/numInputBatches=64-8               300           4777525 ns/op         109.74 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Int64/groupSize=1024/numInputBatches=64-8              200           6364571 ns/op          82.38 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Decimal/groupSize=1/numInputBatches=64-8               300           3984492 ns/op         131.58 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Decimal/groupSize=2/numInputBatches=64-8               300           3583325 ns/op         146.31 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Decimal/groupSize=512/numInputBatches=64-8             200           5713770 ns/op          91.76 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Decimal/groupSize=1024/numInputBatches=64-8            200           6946676 ns/op          75.47 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Int64/groupSize=1/numInputBatches=64-8             5000            247468 ns/op        2118.61 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Int64/groupSize=2/numInputBatches=64-8             5000            200777 ns/op        2611.28 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Int64/groupSize=512/numInputBatches=64-8          10000            159597 ns/op        3285.07 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Int64/groupSize=1024/numInputBatches=64-8         10000            157453 ns/op        3329.79 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Decimal/groupSize=1/numInputBatches=64-8           2000            594968 ns/op         881.20 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Decimal/groupSize=2/numInputBatches=64-8           3000            335296 ns/op        1563.66 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Decimal/groupSize=512/numInputBatches=64-8        10000            158746 ns/op        3302.67 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Decimal/groupSize=1024/numInputBatches=64-8       10000            157495 ns/op        3328.92 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Int64/groupSize=1/numInputBatches=64-8                   500           2688660 ns/op         195.00 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Int64/groupSize=2/numInputBatches=64-8                   500           2780675 ns/op         188.55 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Int64/groupSize=512/numInputBatches=64-8                 300           4715001 ns/op         111.20 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Int64/groupSize=1024/numInputBatches=64-8                200           6511204 ns/op          80.52 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Decimal/groupSize=1/numInputBatches=64-8                 500           2658724 ns/op         197.20 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Decimal/groupSize=2/numInputBatches=64-8                 500           2757335 ns/op         190.14 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Decimal/groupSize=512/numInputBatches=64-8               300           4861616 ns/op         107.84 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Decimal/groupSize=1024/numInputBatches=64-8              200           6405883 ns/op          81.84 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Int64/groupSize=1/numInputBatches=64-8               5000            300708 ns/op        1743.51 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Int64/groupSize=2/numInputBatches=64-8               5000            275161 ns/op        1905.38 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Int64/groupSize=512/numInputBatches=64-8             5000            260265 ns/op        2014.43 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Int64/groupSize=1024/numInputBatches=64-8            5000            264332 ns/op        1983.44 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Decimal/groupSize=1/numInputBatches=64-8             5000            298114 ns/op        1758.68 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Decimal/groupSize=2/numInputBatches=64-8             5000            270498 ns/op        1938.23 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Decimal/groupSize=512/numInputBatches=64-8           5000            268750 ns/op        1950.84 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Decimal/groupSize=1024/numInputBatches=64-8          5000            257533 ns/op        2035.80 MB/s
BenchmarkAggregator/SUM/hash/Int64/groupSize=1/numInputBatches=64-8                          500           2836210 ns/op         184.86 MB/s
BenchmarkAggregator/SUM/hash/Int64/groupSize=2/numInputBatches=64-8                          500           2932260 ns/op         178.80 MB/s
BenchmarkAggregator/SUM/hash/Int64/groupSize=512/numInputBatches=64-8                        300           5039520 ns/op         104.04 MB/s
BenchmarkAggregator/SUM/hash/Int64/groupSize=1024/numInputBatches=64-8                       200           6537244 ns/op          80.20 MB/s
BenchmarkAggregator/SUM/hash/Decimal/groupSize=1/numInputBatches=64-8                        100          14244380 ns/op          36.81 MB/s
BenchmarkAggregator/SUM/hash/Decimal/groupSize=2/numInputBatches=64-8                        100          13545369 ns/op          38.71 MB/s
BenchmarkAggregator/SUM/hash/Decimal/groupSize=512/numInputBatches=64-8                       20          52605020 ns/op           9.97 MB/s
BenchmarkAggregator/SUM/hash/Decimal/groupSize=1024/numInputBatches=64-8                      20          56025125 ns/op           9.36 MB/s
BenchmarkAggregator/SUM/ordered/Int64/groupSize=1/numInputBatches=64-8                      5000            297225 ns/op        1763.94 MB/s
BenchmarkAggregator/SUM/ordered/Int64/groupSize=2/numInputBatches=64-8                      5000            279924 ns/op        1872.96 MB/s
BenchmarkAggregator/SUM/ordered/Int64/groupSize=512/numInputBatches=64-8                    5000            261397 ns/op        2005.72 MB/s
BenchmarkAggregator/SUM/ordered/Int64/groupSize=1024/numInputBatches=64-8                   5000            268299 ns/op        1954.11 MB/s
BenchmarkAggregator/SUM/ordered/Decimal/groupSize=1/numInputBatches=64-8                     100          10972373 ns/op          47.78 MB/s
BenchmarkAggregator/SUM/ordered/Decimal/groupSize=2/numInputBatches=64-8                     100          10065695 ns/op          52.09 MB/s
BenchmarkAggregator/SUM/ordered/Decimal/groupSize=512/numInputBatches=64-8                    30          46431390 ns/op          11.29 MB/s
BenchmarkAggregator/SUM/ordered/Decimal/groupSize=1024/numInputBatches=64-8                   30          48812805 ns/op          10.74 MB/s

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@jordanlewis
Copy link
Member Author

Another problem with the hash table implementation in the joiner and the aggregator is that it has a fixed number of buckets. This doesn't affect the correctness of the algorithm, since the hash table is a linked-list hash table, but it does make the algorithm slower (more hash collisions). We should extend the hash table to be resizable.

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this looks good to me (left only nit-comments), but, honestly, I still do not fully understand these algorithms. I want to do some step-by-step debugging and will do another pass afterwards.

Reviewed 8 of 11 files at r1, 10 of 10 files at r4.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @georgeutsin, and @jordanlewis)


pkg/sql/exec/aggregator.go, line 301 at r4 (raw file):

// indices and their corresponding column types.
func extractGroupTypes(groupCols []uint32, colTypes []types.T) []types.T {
	groupTyps := make([]types.T, len(groupCols))

[super nit]: I wonder if there is any logic behind when to choose Typs vs Types; personally, I'd use the latter everywhere.


pkg/sql/exec/aggregator_test.go, line 381 at r4 (raw file):

}

func TestAggregatorKitchenSink(t *testing.T) {

just curious: what's up with "KitchenSink"?


pkg/sql/exec/aggregator_test.go, line 520 at r4 (raw file):

			for _, agg := range aggTypes {
				for _, typ := range []types.T{types.Int64, types.Decimal} {
					if aggFn == distsqlpb.AggregatorSpec_AVG && typ == types.Decimal {

What is the reason for skipping this test configuration?


pkg/sql/exec/aggregator_test.go, line 569 at r4 (raw file):

									// Only count the int64 column.
									b.SetBytes(int64(8 * nTuples))

I don't see anywhere resetting and stopping of the timer, but I think I've seen it all other benchmarks, any reason for this? Is it because here the setup of the benchmark is very light?


pkg/sql/exec/hash_aggregator.go, line 31 at r4 (raw file):

	spec aggregatorSpec

	// isAggregating represents whether the hashAggregator is in the building or

nit: isAggregating has apparently been renamed (and above in the big comment).


pkg/sql/exec/hash_aggregator.go, line 43 at r4 (raw file):

}

// NewHashAggregator creates a hashed aggregator on the given grouping

nit: I don't think I've seen "hashed aggregator", only "hash aggregator", but English is my second language, so feel free to disregard this comment.


pkg/sql/exec/hash_aggregator.go, line 156 at r4 (raw file):

}

// Reset resets the hashAggregator for another run. Primarily used for

pkg/sql/exec/hash_aggregator.go, line 192 at r4 (raw file):

// hashAggregatorBatchOp is a helper operator used by the hashAggregator as
// input to its orderedAggregator. Once the building of the hashTable is
// completed, this batch operator exists to return the next batch of input to

The wording of this sentence is a little weird to me, maybe "this batch operator returns ...".


pkg/sql/exec/hash_aggregator.go, line 201 at r4 (raw file):

	// batched with the hashAggregatorBatchOp operator.
	sel []uint64
	// distinct represents whether each corresponding row is part of a new group.

nit: s/is part/is a part/ and in the comment below (feel free to disregard this comment as well).


pkg/sql/exec/hashjoiner.go, line 337 at r4 (raw file):

	visited []bool

	// head indicates whether each of the corresponding key is the head of the

pkg/sql/exec/hashjoiner.go, line 499 at r4 (raw file):

}

// rehash takes a element of a key (tuple representing a row of equality

nit: s/a/an/. Also, I was confused as if the method is missing from here. I'd probably move the description of rehash into a big comment above initHash and would add that rehash is generated.


pkg/sql/exec/hashjoiner.go, line 527 at r4 (raw file):

}

// insertKeys builds the hash map from the compute hash values.

pkg/sql/exec/hashjoiner.go, line 627 at r4 (raw file):

	}

	// buckets is used to store the computed hash value of each key.

pkg/sql/exec/hashjoiner.go, line 847 at r4 (raw file):

}

// check performs a equality check between the current key in the groupID bucket

pkg/sql/exec/hashjoiner.go, line 849 at r4 (raw file):

// check performs a equality check between the current key in the groupID bucket
// and the probe key at that index. If there is a match, the hashTable's same
// array is updated to lazily populate the a linked list of identical build

nit: s/the a/the/.


pkg/sql/exec/utils_test.go, line 378 at r4 (raw file):

func (c *chunkingBatchSource) Next() ColBatch {
	if c.curIdx >= c.len {
		c.batch.SetLength(0)

This if seems redundant to me. Maybe we should return after setting length to 0? Or this if block can be removed because we will make empty slices below and set the length to 0 anyway?

Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did another pass with a better understanding of what's going on, found several other nits (some not in your changes), but this :lgtm:

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @asubiotto, @georgeutsin, and @jordanlewis)


pkg/sql/distsqlrun/column_exec_setup.go, line 366 at r4 (raw file):

// planExpressionOperators plans a chain of operators to execute the provided
// expression. It returns the the tail of the chain, as well as the column index

nit: double the.


pkg/sql/exec/aggregator_test.go, line 35 at r4 (raw file):

type aggregatorTestCase struct {
	// groupCols, groupTypes, and aggCols will be set to their default values

nit: it should be groupCols, colTypes, aggFns, and aggCols.... Also, another super nit is to have these four variables in the same order in different places (in default var, here, as arguments to new within aggType struct).


pkg/sql/exec/hash_aggregator.go, line 24 at r4 (raw file):

// hashAggregator is an operator that performs an aggregation based on
// specified grouping columns. This operator performs a hash table build at the
// first call to `Next()` which consumes the entire build source. After the

nit: I think we're usually not using back ticks, and here I'd not use it since it's clear with the parenthesis that it's a function name/call.


pkg/sql/exec/hash_aggregator.go, line 156 at r4 (raw file):

Previously, yuzefovich wrote…

pkg/sql/exec/hashjoiner.go, line 337 at r4 (raw file):

Previously, yuzefovich wrote…

nit: probably keys.


pkg/sql/exec/hashjoiner.go, line 499 at r4 (raw file):

Previously, yuzefovich wrote…

nit: s/a/an/. Also, I was confused as if the method is missing from here. I'd probably move the description of rehash into a big comment above initHash and would add that rehash is generated.

Ok, just noticed the comment about rehash being generated, but still I'd probably move it into the big comment above.


pkg/sql/exec/hashjoiner.go, line 527 at r4 (raw file):

Previously, yuzefovich wrote…

nit: probably computed.


pkg/sql/exec/hashjoiner.go, line 528 at r4 (raw file):

// insertKeys builds the hash map from the compute hash values.
func (ht *hashTable) insertKeys(buckets []uint64) {

I wonder whether there is a reason to pass in buckets as an argument here: currently it is only called once with ht.next passed in, and logically I think buckets should always be ht.next. Also, probably the method could be renamed to something like buildNextChains.


pkg/sql/exec/hashjoiner.go, line 627 at r4 (raw file):

Previously, yuzefovich wrote…

nit: buckets have been renamed, so the comment should be adjusted (that was intended original for-some-reason-empty comment).


pkg/sql/exec/hashjoiner.go, line 793 at r4 (raw file):

				// removed from the toCheck array.
				for nToCheck > 0 {
					// Continue searching along the hash table next chains for the corresponding

nit: this comment is the exact copy of the comment one line above, probably only one is necessary.


pkg/sql/exec/hashjoiner.go, line 840 at r4 (raw file):

}

// checkCols performs a column by columns checkCol on the key columns.

nit: probably s/column by columns/column by column/.


pkg/sql/exec/hashjoiner.go, line 847 at r4 (raw file):

Previously, yuzefovich wrote…

nit: s/a equality/an equality/ (this was intended to be in the first for-some-reason-empty comment.

Copy link
Contributor

@georgeutsin georgeutsin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @asubiotto and @jordanlewis)


pkg/sql/exec/aggregator_test.go, line 457 at r4 (raw file):

						}

						typs := []types.T{types.Int64, types.Int64}

Would it make sense to bring the definition of typs up before cols and use typs[0], typs[1] instead to keep things consistent?

Copy link
Contributor

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I definitely think we'll have to explore reworking this to perform on-demand aggregations but let's get this in.

Reviewed 11 of 11 files at r1, 1 of 1 files at r2, 3 of 3 files at r3.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @jordanlewis)


pkg/sql/distsqlrun/column_exec_setup.go, line 275 at r1 (raw file):

			if len(core.JoinReader.LookupColumns) == 0 {
				jr, err = newIndexJoiner(
					flowCtx, spec.ProcessorID, core.JoinReader, input, post, nil /* output */)

This was an empty post because if there's any post process spec, that will be taken care of below. Am I missing something here? Might be better to not touch this in the aggregator PR and create a new PR for these changes if there is a problem.


pkg/sql/exec/aggregator.go, line 112 at r1 (raw file):

// NewOrderedAggregator creates an ordered aggregator on the given grouping
// columns.

I think it would be good to keep some sort of explanation of the inputs


pkg/sql/exec/aggregator.go, line 113 at r1 (raw file):

// NewOrderedAggregator creates an ordered aggregator on the given grouping
// columns.
// TODO(asubiotto): Take in distsqlrun.AggregatorSpec_Func. This is currently

Can remove this TODO if you feel like it.


pkg/sql/exec/coalescer_test.go, line 87 at r1 (raw file):

	for colIdx := 0; colIdx < nCols; colIdx++ {
		col := batch.ColVec(colIdx).Int64()
		for i := uint16(0); i < ColBatchSize; i++ {

nit: Why not start this off as an int64


pkg/sql/exec/hash_aggregator.go, line 45 at r1 (raw file):

// NewHashAggregator creates a hashed aggregator on the given grouping
// columns. The input specifications to this function are the same as that of
// the NewOrderedAggregator function.

I like the explanation in the PR description, it might be nice to put that here as well.


pkg/sql/exec/hash_aggregator.go, line 58 at r1 (raw file):

	// aggregation.
	nCols := uint32(len(colTypes))
	keepCol := make([]bool, nCols)

nit: can you use FastIntSet here?


pkg/sql/exec/sum_agg_tmpl.go, line 89 at r4 (raw file):

	copy(a.scratch.vec, zero_TYPEBatch)
	a.scratch.curIdx = -1
	a.done = false

🤦‍♂️

Copy link
Member Author

@jordanlewis jordanlewis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @asubiotto, @georgeutsin, @jordanlewis, and @yuzefovich)


pkg/sql/distsqlrun/column_exec_setup.go, line 275 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

This was an empty post because if there's any post process spec, that will be taken care of below. Am I missing something here? Might be better to not touch this in the aggregator PR and create a new PR for these changes if there is a problem.

Made a separate PR for this.


pkg/sql/distsqlrun/column_exec_setup.go, line 366 at r4 (raw file):

Previously, yuzefovich wrote…

[nit]: double the.

Done.


pkg/sql/exec/aggregator.go, line 112 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

I think it would be good to keep some sort of explanation of the inputs

Done - not sure why I deleted that whole thing.


pkg/sql/exec/aggregator.go, line 113 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Can remove this TODO if you feel like it.

Done.


pkg/sql/exec/aggregator.go, line 301 at r4 (raw file):

Previously, yuzefovich wrote…

[super nit]: I wonder if there is any logic behind when to choose Typs vs Types; personally, I'd use the latter everywhere.

Done.


pkg/sql/exec/aggregator_test.go, line 35 at r4 (raw file):

Previously, yuzefovich wrote…

[nit]: it should be groupCols, colTypes, aggFns, and aggCols.... Also, another super nit is to have these four variables in the same order in different places (in default var, here, as arguments to new within aggType struct).

Done.


pkg/sql/exec/aggregator_test.go, line 381 at r4 (raw file):

Previously, yuzefovich wrote…

just curious: what's up with "KitchenSink"?

"the kitchen sink" is an expression that means something like "a lot of random stuff". Changed this to be easier to understand.


pkg/sql/exec/aggregator_test.go, line 457 at r4 (raw file):

Previously, georgeutsin (George Utsin) wrote…

Would it make sense to bring the definition of typs up before cols and use typs[0], typs[1] instead to keep things consistent?

Done.


pkg/sql/exec/aggregator_test.go, line 520 at r4 (raw file):

Previously, yuzefovich wrote…

What is the reason for skipping this test configuration?

I'm not sure, good eye, I deleted it.


pkg/sql/exec/aggregator_test.go, line 569 at r4 (raw file):

Previously, yuzefovich wrote…

I don't see anywhere resetting and stopping of the timer, but I think I've seen it all other benchmarks, any reason for this? Is it because here the setup of the benchmark is very light?

It's not strictly necessary to do it, but it's a good habit - I added it here.


pkg/sql/exec/coalescer_test.go, line 87 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

nit: Why not start this off as an int64

Done.


pkg/sql/exec/hash_aggregator.go, line 45 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

I like the explanation in the PR description, it might be nice to put that here as well.

Done.


pkg/sql/exec/hash_aggregator.go, line 58 at r1 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

nit: can you use FastIntSet here?

Done.


pkg/sql/exec/hash_aggregator.go, line 24 at r4 (raw file):

Previously, yuzefovich wrote…

[nit]: I think we're usually not using back ticks, and here I'd not use it since it's clear with the parenthesis that it's a function name/call.

Done.


pkg/sql/exec/hash_aggregator.go, line 31 at r4 (raw file):

Previously, yuzefovich wrote…

[nit]: isAggregating has apparently been renamed (and above in the big comment).

Oops, thanks for catching! Done.


pkg/sql/exec/hash_aggregator.go, line 43 at r4 (raw file):

Previously, yuzefovich wrote…

[nit]: I don't think I've seen "hashed aggregator", only "hash aggregator", but English is my second language, so feel free to disregard this comment.

Done.


pkg/sql/exec/hash_aggregator.go, line 192 at r4 (raw file):

Previously, yuzefovich wrote…

The wording of this sentence is a little weird to me, maybe "this batch operator returns ...".

Done.


pkg/sql/exec/hash_aggregator.go, line 201 at r4 (raw file):

Previously, yuzefovich wrote…

[nit]: s/is part/is a part/ and in the comment below (feel free to disregard this comment as well).

Hm, I think is part is more natural here, at least to my ear, so I'll leave it.


pkg/sql/exec/hashjoiner.go, line 337 at r4 (raw file):

Previously, yuzefovich wrote…

[nit]: probably keys.

Done.


pkg/sql/exec/hashjoiner.go, line 499 at r4 (raw file):

Previously, yuzefovich wrote…

Ok, just noticed the comment about rehash being generated, but still I'd probably move it into the big comment above.

Done.


pkg/sql/exec/hashjoiner.go, line 527 at r4 (raw file):

Previously, yuzefovich wrote…

[nit]: probably computed.

Done.


pkg/sql/exec/hashjoiner.go, line 528 at r4 (raw file):

Previously, yuzefovich wrote…

I wonder whether there is a reason to pass in buckets as an argument here: currently it is only called once with ht.next passed in, and logically I think buckets should always be ht.next. Also, probably the method could be renamed to something like buildNextChains.

Done, good suggestions.


pkg/sql/exec/hashjoiner.go, line 627 at r4 (raw file):

Previously, yuzefovich wrote…

[nit]: buckets have been renamed, so the comment should be adjusted (that was intended original for-some-reason-empty comment).

Done.


pkg/sql/exec/hashjoiner.go, line 793 at r4 (raw file):

Previously, yuzefovich wrote…

[nit]: this comment is the exact copy of the comment one line above, probably only one is necessary.

🤔 done.


pkg/sql/exec/hashjoiner.go, line 840 at r4 (raw file):

Previously, yuzefovich wrote…

[nit]: probably s/column by columns/column by column/.

Done.


pkg/sql/exec/hashjoiner.go, line 847 at r4 (raw file):

Previously, yuzefovich wrote…

[nit]: s/a equality/an equality/ (this was intended to be in the first for-some-reason-empty comment.

Done.


pkg/sql/exec/hashjoiner.go, line 849 at r4 (raw file):

Previously, yuzefovich wrote…

[nit]: s/the a/the/.

Done.


pkg/sql/exec/sum_agg_tmpl.go, line 89 at r4 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

🤦‍♂️

:(


pkg/sql/exec/utils_test.go, line 378 at r4 (raw file):

Previously, yuzefovich wrote…

This if seems redundant to me. Maybe we should return after setting length to 0? Or this if block can be removed because we will make empty slices below and set the length to 0 anyway?

Done.

@jordanlewis jordanlewis force-pushed the hash-agg branch 4 times, most recently from b0a1809 to f1debbf Compare April 4, 2019 18:51
changangela and others added 2 commits April 11, 2019 17:47
hashAggregator is an operator that performs an aggregation based on specified
grouping columns. This operator performs a blocking hash table build at the
beginning and then the grouped buckets are then passed as input to an
orderedAggregator. The output row ordering of this aggregator is arbitrary.

Release note: None

fixup! exec: add the hashed aggregator
All aggregate tests test both hash and ordered aggregator now.

The benchmarks weren't working right - they vastly overestimated the
speed of the operator, unfortunately, because their reset methods didn't
work.

This commit splits the benchmark into one for ints and one for decimals,
to showcase how slow decimals are in comparison. It also fixes the reset
methods, and coalesces the hash and ordered aggregator benchmarks into
one.

```
BenchmarkAggregator/ANY_NOT_NULL/hash/Int64/groupSize=1/numInputBatches=64-8                 500           2819938 ns/op         185.92 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Int64/groupSize=2/numInputBatches=64-8                 500           2809348 ns/op         186.62 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Int64/groupSize=512/numInputBatches=64-8               300           4777525 ns/op         109.74 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Int64/groupSize=1024/numInputBatches=64-8              200           6364571 ns/op          82.38 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Decimal/groupSize=1/numInputBatches=64-8               300           3984492 ns/op         131.58 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Decimal/groupSize=2/numInputBatches=64-8               300           3583325 ns/op         146.31 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Decimal/groupSize=512/numInputBatches=64-8             200           5713770 ns/op          91.76 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Decimal/groupSize=1024/numInputBatches=64-8            200           6946676 ns/op          75.47 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Int64/groupSize=1/numInputBatches=64-8             5000            247468 ns/op        2118.61 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Int64/groupSize=2/numInputBatches=64-8             5000            200777 ns/op        2611.28 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Int64/groupSize=512/numInputBatches=64-8          10000            159597 ns/op        3285.07 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Int64/groupSize=1024/numInputBatches=64-8         10000            157453 ns/op        3329.79 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Decimal/groupSize=1/numInputBatches=64-8           2000            594968 ns/op         881.20 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Decimal/groupSize=2/numInputBatches=64-8           3000            335296 ns/op        1563.66 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Decimal/groupSize=512/numInputBatches=64-8        10000            158746 ns/op        3302.67 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Decimal/groupSize=1024/numInputBatches=64-8       10000            157495 ns/op        3328.92 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Int64/groupSize=1/numInputBatches=64-8                   500           2688660 ns/op         195.00 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Int64/groupSize=2/numInputBatches=64-8                   500           2780675 ns/op         188.55 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Int64/groupSize=512/numInputBatches=64-8                 300           4715001 ns/op         111.20 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Int64/groupSize=1024/numInputBatches=64-8                200           6511204 ns/op          80.52 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Decimal/groupSize=1/numInputBatches=64-8                 500           2658724 ns/op         197.20 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Decimal/groupSize=2/numInputBatches=64-8                 500           2757335 ns/op         190.14 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Decimal/groupSize=512/numInputBatches=64-8               300           4861616 ns/op         107.84 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Decimal/groupSize=1024/numInputBatches=64-8              200           6405883 ns/op          81.84 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Int64/groupSize=1/numInputBatches=64-8               5000            300708 ns/op        1743.51 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Int64/groupSize=2/numInputBatches=64-8               5000            275161 ns/op        1905.38 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Int64/groupSize=512/numInputBatches=64-8             5000            260265 ns/op        2014.43 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Int64/groupSize=1024/numInputBatches=64-8            5000            264332 ns/op        1983.44 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Decimal/groupSize=1/numInputBatches=64-8             5000            298114 ns/op        1758.68 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Decimal/groupSize=2/numInputBatches=64-8             5000            270498 ns/op        1938.23 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Decimal/groupSize=512/numInputBatches=64-8           5000            268750 ns/op        1950.84 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Decimal/groupSize=1024/numInputBatches=64-8          5000            257533 ns/op        2035.80 MB/s
BenchmarkAggregator/SUM/hash/Int64/groupSize=1/numInputBatches=64-8                          500           2836210 ns/op         184.86 MB/s
BenchmarkAggregator/SUM/hash/Int64/groupSize=2/numInputBatches=64-8                          500           2932260 ns/op         178.80 MB/s
BenchmarkAggregator/SUM/hash/Int64/groupSize=512/numInputBatches=64-8                        300           5039520 ns/op         104.04 MB/s
BenchmarkAggregator/SUM/hash/Int64/groupSize=1024/numInputBatches=64-8                       200           6537244 ns/op          80.20 MB/s
BenchmarkAggregator/SUM/hash/Decimal/groupSize=1/numInputBatches=64-8                        100          14244380 ns/op          36.81 MB/s
BenchmarkAggregator/SUM/hash/Decimal/groupSize=2/numInputBatches=64-8                        100          13545369 ns/op          38.71 MB/s
BenchmarkAggregator/SUM/hash/Decimal/groupSize=512/numInputBatches=64-8                       20          52605020 ns/op           9.97 MB/s
BenchmarkAggregator/SUM/hash/Decimal/groupSize=1024/numInputBatches=64-8                      20          56025125 ns/op           9.36 MB/s
BenchmarkAggregator/SUM/ordered/Int64/groupSize=1/numInputBatches=64-8                      5000            297225 ns/op        1763.94 MB/s
BenchmarkAggregator/SUM/ordered/Int64/groupSize=2/numInputBatches=64-8                      5000            279924 ns/op        1872.96 MB/s
BenchmarkAggregator/SUM/ordered/Int64/groupSize=512/numInputBatches=64-8                    5000            261397 ns/op        2005.72 MB/s
BenchmarkAggregator/SUM/ordered/Int64/groupSize=1024/numInputBatches=64-8                   5000            268299 ns/op        1954.11 MB/s
BenchmarkAggregator/SUM/ordered/Decimal/groupSize=1/numInputBatches=64-8                     100          10972373 ns/op          47.78 MB/s
BenchmarkAggregator/SUM/ordered/Decimal/groupSize=2/numInputBatches=64-8                     100          10065695 ns/op          52.09 MB/s
BenchmarkAggregator/SUM/ordered/Decimal/groupSize=512/numInputBatches=64-8                    30          46431390 ns/op          11.29 MB/s
BenchmarkAggregator/SUM/ordered/Decimal/groupSize=1024/numInputBatches=64-8                   30          48812805 ns/op          10.74 MB/s
```

Release note: None
@jordanlewis
Copy link
Member Author

bors r+

craig bot pushed a commit that referenced this pull request Apr 11, 2019
34770: exec: hash aggregation r=jordanlewis a=jordanlewis

This PR supersedes #32685, though the first commit is largely the same.

Add the hash aggregator. It currently reuses the hash join's hash table. The way it works is:

1. build a hash table of its entire input, throwing away non-distinct inputs
2. for every key, probe the hash table, populating a linked list array that keeps track of which indexes of the hash table are identical
3. traverse the input keys again, determining a single unique key for each group of identical keys. for each unique key, traverse the linked list, marking the index of each identical key in the selection vector
4. point the ordered aggregator at the selection vector, so it gets groups of rows with identical equality columns.

This works fine, but it's kind of slow. I think the reason it's slow is that it has to make a lot of passes over the input, and a lot of random access jumps through the large linked list array.

I'd like to get this committed for now, but it probably needs some work. An idea is to change it so the aggregation is performed on demand. This would require reworking the individual aggregator functions to be a bit more general, so they don't assume usage in the ordered aggregator paradigm.

Incidentally, this PR also fixes some bugs with the ordered aggregator benchmarks, revealing the true cost of decimals the way they're currently set up in the engine. This will need some work too. Possibly we can take a hint from @danhhz's idea in #34763 to denormalize the decimals into bytes, but I think it'll be expensive either way.

```
BenchmarkAggregator/ANY_NOT_NULL/hash/Int64/groupSize=1/numInputBatches=64-8                 500           2819938 ns/op         185.92 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Int64/groupSize=2/numInputBatches=64-8                 500           2809348 ns/op         186.62 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Int64/groupSize=512/numInputBatches=64-8               300           4777525 ns/op         109.74 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Int64/groupSize=1024/numInputBatches=64-8              200           6364571 ns/op          82.38 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Decimal/groupSize=1/numInputBatches=64-8               300           3984492 ns/op         131.58 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Decimal/groupSize=2/numInputBatches=64-8               300           3583325 ns/op         146.31 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Decimal/groupSize=512/numInputBatches=64-8             200           5713770 ns/op          91.76 MB/s
BenchmarkAggregator/ANY_NOT_NULL/hash/Decimal/groupSize=1024/numInputBatches=64-8            200           6946676 ns/op          75.47 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Int64/groupSize=1/numInputBatches=64-8             5000            247468 ns/op        2118.61 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Int64/groupSize=2/numInputBatches=64-8             5000            200777 ns/op        2611.28 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Int64/groupSize=512/numInputBatches=64-8          10000            159597 ns/op        3285.07 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Int64/groupSize=1024/numInputBatches=64-8         10000            157453 ns/op        3329.79 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Decimal/groupSize=1/numInputBatches=64-8           2000            594968 ns/op         881.20 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Decimal/groupSize=2/numInputBatches=64-8           3000            335296 ns/op        1563.66 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Decimal/groupSize=512/numInputBatches=64-8        10000            158746 ns/op        3302.67 MB/s
BenchmarkAggregator/ANY_NOT_NULL/ordered/Decimal/groupSize=1024/numInputBatches=64-8       10000            157495 ns/op        3328.92 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Int64/groupSize=1/numInputBatches=64-8                   500           2688660 ns/op         195.00 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Int64/groupSize=2/numInputBatches=64-8                   500           2780675 ns/op         188.55 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Int64/groupSize=512/numInputBatches=64-8                 300           4715001 ns/op         111.20 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Int64/groupSize=1024/numInputBatches=64-8                200           6511204 ns/op          80.52 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Decimal/groupSize=1/numInputBatches=64-8                 500           2658724 ns/op         197.20 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Decimal/groupSize=2/numInputBatches=64-8                 500           2757335 ns/op         190.14 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Decimal/groupSize=512/numInputBatches=64-8               300           4861616 ns/op         107.84 MB/s
BenchmarkAggregator/COUNT_ROWS/hash/Decimal/groupSize=1024/numInputBatches=64-8              200           6405883 ns/op          81.84 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Int64/groupSize=1/numInputBatches=64-8               5000            300708 ns/op        1743.51 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Int64/groupSize=2/numInputBatches=64-8               5000            275161 ns/op        1905.38 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Int64/groupSize=512/numInputBatches=64-8             5000            260265 ns/op        2014.43 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Int64/groupSize=1024/numInputBatches=64-8            5000            264332 ns/op        1983.44 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Decimal/groupSize=1/numInputBatches=64-8             5000            298114 ns/op        1758.68 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Decimal/groupSize=2/numInputBatches=64-8             5000            270498 ns/op        1938.23 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Decimal/groupSize=512/numInputBatches=64-8           5000            268750 ns/op        1950.84 MB/s
BenchmarkAggregator/COUNT_ROWS/ordered/Decimal/groupSize=1024/numInputBatches=64-8          5000            257533 ns/op        2035.80 MB/s
BenchmarkAggregator/SUM/hash/Int64/groupSize=1/numInputBatches=64-8                          500           2836210 ns/op         184.86 MB/s
BenchmarkAggregator/SUM/hash/Int64/groupSize=2/numInputBatches=64-8                          500           2932260 ns/op         178.80 MB/s
BenchmarkAggregator/SUM/hash/Int64/groupSize=512/numInputBatches=64-8                        300           5039520 ns/op         104.04 MB/s
BenchmarkAggregator/SUM/hash/Int64/groupSize=1024/numInputBatches=64-8                       200           6537244 ns/op          80.20 MB/s
BenchmarkAggregator/SUM/hash/Decimal/groupSize=1/numInputBatches=64-8                        100          14244380 ns/op          36.81 MB/s
BenchmarkAggregator/SUM/hash/Decimal/groupSize=2/numInputBatches=64-8                        100          13545369 ns/op          38.71 MB/s
BenchmarkAggregator/SUM/hash/Decimal/groupSize=512/numInputBatches=64-8                       20          52605020 ns/op           9.97 MB/s
BenchmarkAggregator/SUM/hash/Decimal/groupSize=1024/numInputBatches=64-8                      20          56025125 ns/op           9.36 MB/s
BenchmarkAggregator/SUM/ordered/Int64/groupSize=1/numInputBatches=64-8                      5000            297225 ns/op        1763.94 MB/s
BenchmarkAggregator/SUM/ordered/Int64/groupSize=2/numInputBatches=64-8                      5000            279924 ns/op        1872.96 MB/s
BenchmarkAggregator/SUM/ordered/Int64/groupSize=512/numInputBatches=64-8                    5000            261397 ns/op        2005.72 MB/s
BenchmarkAggregator/SUM/ordered/Int64/groupSize=1024/numInputBatches=64-8                   5000            268299 ns/op        1954.11 MB/s
BenchmarkAggregator/SUM/ordered/Decimal/groupSize=1/numInputBatches=64-8                     100          10972373 ns/op          47.78 MB/s
BenchmarkAggregator/SUM/ordered/Decimal/groupSize=2/numInputBatches=64-8                     100          10065695 ns/op          52.09 MB/s
BenchmarkAggregator/SUM/ordered/Decimal/groupSize=512/numInputBatches=64-8                    30          46431390 ns/op          11.29 MB/s
BenchmarkAggregator/SUM/ordered/Decimal/groupSize=1024/numInputBatches=64-8                   30          48812805 ns/op          10.74 MB/s
```

Co-authored-by: changangela <angelachang27@gmail.com>
Co-authored-by: Jordan Lewis <jordanthelewis@gmail.com>
@craig
Copy link
Contributor

craig bot commented Apr 11, 2019

Build succeeded

@craig craig bot merged commit d9a285f into cockroachdb:master Apr 11, 2019
@jordanlewis jordanlewis deleted the hash-agg branch April 20, 2019 14:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants