QUICKSTEP-70-71 Improve aggregation performance #179
Conversation
7285f90
to
fe2ec54
Compare
Hi @jianqiao, A quick question on Feature 1 using a vector-based aggregation: for a group-by w/ a known bounded range (i.e., the min and max value), do we always choose this approach over the hash-based, or depending on the range size (i.e., if the range is too wide, we may fall back to the hash-based)? Thanks! |
b6a2059
to
0dce4d2
Compare
It is depending on the range size. Currently there is a gflag for setting the range upbound at |
0dce4d2
to
c70485b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only concern regarding this PR is the way to deal with partitions. I guess we may merge PartitionedHashTablePool
and HashTablePool
so that the later is the special case of the former w/ a single partition.
@@ -61,7 +61,7 @@ class HashTableStateUpserterFast { | |||
* table. The corresponding state (for the same key) in the destination | |||
* hash table will be upserted. | |||
**/ | |||
HashTableStateUpserterFast(const HandleT &handle, | |||
HashTableStateUpserter(const HandleT &handle, | |||
const std::uint8_t *source_state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Align with the line above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
estimated_num_groups, | ||
&max_num_groups); | ||
|
||
if (can_use_collision_free_aggregation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor, we actually don't need this extra bool variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
#endif | ||
|
||
// Supports only single group-by key. | ||
if (aggregate->grouping_expressions().size() != 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For multiple small group-by keys, I think we could create a multi-dimension array for the same goal as the single key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we will have a follow-up PR dealing with multiple group-by keys to improve TPC-H Q01.
break; | ||
} | ||
default: | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the reason of supporting only such types about the overflow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can support more types later. For any type/any number of group-by keys, if we can define a one-to-one mapping function that maps the keys to range-bounded integers, then this aggregation is applicable.
} | ||
|
||
// TODO(jianqiao): Support AggregationID::AVG. | ||
switch (agg_func->getAggregate().getAggregationID()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor using QUICKSTEP_EQUALS_ANY_CONSTANT
defined in utility/EqualsAnyConstant.hpp
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
|
||
void *aligned_memory_start = this->blob_->getMemoryMutable(); | ||
std::size_t available_memory = num_storage_slots * kSlotSizeBytes; | ||
if (align(alignof(Header), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor using CHECK
.
Similarly below.
"StorageBlob used to hold resizable " | ||
"SeparateChainingHashTable is too small to meet alignment " | ||
"requirements of SeparateChainingHashTable::Header."); | ||
} else if (aligned_memory_start != this->blob_->getMemoryMutable()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor using LOG_IF
.
// Separate chaining ensures that any resized hash table with more buckets | ||
// than the original table will be able to hold more entries than the | ||
// original. | ||
DEBUG_ASSERT(retry_num == 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use DCHECK_EQ
instead.
variable_storage_required; | ||
const std::size_t resized_storage_slots = | ||
this->storage_manager_->SlotsNeededForBytes(resized_memory_required); | ||
if (resized_storage_slots == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor using CHECK
.
return true; | ||
} else { | ||
return false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor, but we could flip the condition:
if (*entry_num >= header_->buckets_allocated.load(std::memory_order_relaxed)) {
return false;
}
const char *bucket =
static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
*key = key_manager_.getKeyComponentTyped(bucket, 0);
*value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
++(*entry_num);
return true;
Similarly below.
c70485b
to
0dce4d2
Compare
For the question about
|
Please resync with the master branch, and I will merge it. Thanks. |
0dce4d2
to
8dbac18
Compare
Just rebased on master. |
There is a |
8dbac18
to
2d89e4f
Compare
…egation for range-bounded single integer group-by key. - Supports copy elision for aggregation.
Updated, and tested locally. |
Very impressive algorithmic change @jianqiao !! |
This PR implements two features that improve aggregation performance:
CollisionFreeVectorTable
to support specialized high-performance aggregation.For feature 1, when the group-by attribute is a range-bounded single attribute of
INT
orLONG
type. We can use a vector of typestd::vector<std::atomic<StateT>>
to store the aggregation states, whereStateT
is the aggregation state type (currently restricted toLONG
andDOUBLE
). Then during aggregation, for each tuple, we locate the aggregation state with the group-by key's value as index to the state vector, and concurrently update the state with C++'s atomic primitives.For feature 2, note that the current implementation of aggregation always creates a
ColumnVectorsValueAccessor
to store the results of ALL the input expressions. However, we can avoid the creation of a column vector (thus avoiding copying values into the column vector) if the aggregation is on a simple attribute, e.g.SUM(x)
. Thus by PR, when performing aggregation we prepare two inputValueAccessor
s: one BASE accessor that is created directly from the input relation's storage block, and one DERIVED accessor that is the temporary resultColumnVectorsValueAccessor
. Each aggregation argument may be from the base accessor (meaning that it is a simple attribute) or from the derived accessor (meaning that it is a non-trivial expression that gets evaluated). The two accessors are then properly handled in aggregation handles and aggregation hash tables.Main changes:
expressions/aggregation
: Updated the aggregation handles to support copy elision. Also did some cleanups.relational_operators
: AddedInitializeAggregationOperator
to support parallel initialization of the aggregation state (justmemset
the memory to 0) -- because it takes a relatively long time to do the initialization with single thread if the aggregation hash table is large.storage
: AddedCollisionFreeVectorTable
. RenamedFastHashTable
toPackedPayloadHashTable
, made it support copy elision, and cleaned up the class to remove unused methods. RefactoredAggregationOperationState
to support copy elision and support the new aggregation. Moved aggregation code out ofStorageBlock
.This PR significantly improves some TPC-H queries' performance. For example, it improves TPC-H Q18 from ~27.5s to ~3.5s, with scale factor 100 on a cloudlab machine.
Below shows the TPC-H performance (scale factor 100 on a cloudlab machine) with recently committed optimizations up to this point: