From f355c2371141945db52ae0e2c8d51006e03afcd2 Mon Sep 17 00:00:00 2001 From: liuyehcf <1559500551@qq.com> Date: Mon, 18 Jul 2022 20:10:17 +0800 Subject: [PATCH] [Enhancement] Adjust downgrade strategy of chunks_partitioner (#8696) --- .../sort/local_partition_topn_context.cpp | 32 ++------ .../sort/local_partition_topn_context.h | 8 -- .../partition/chunks_partitioner.cpp | 27 +++++-- .../vectorized/partition/chunks_partitioner.h | 33 +++++++-- .../vectorized/partition/partition_hash_map.h | 73 +++++++++++++++++-- 5 files changed, 119 insertions(+), 54 deletions(-) diff --git a/be/src/exec/pipeline/sort/local_partition_topn_context.cpp b/be/src/exec/pipeline/sort/local_partition_topn_context.cpp index 956b9ea096dcb..adc6b5c098426 100644 --- a/be/src/exec/pipeline/sort/local_partition_topn_context.cpp +++ b/be/src/exec/pipeline/sort/local_partition_topn_context.cpp @@ -8,8 +8,6 @@ namespace starrocks::pipeline { -const int32_t LocalPartitionTopnContext::MAX_PARTITION_NUM = 4096; - LocalPartitionTopnContext::LocalPartitionTopnContext( const std::vector& t_partition_exprs, SortExecExprs& sort_exec_exprs, std::vector is_asc_order, std::vector is_null_first, const std::string& sort_keys, int64_t offset, int64_t partition_limit, @@ -53,18 +51,11 @@ Status LocalPartitionTopnContext::prepare(RuntimeState* state) { Status LocalPartitionTopnContext::push_one_chunk_to_partitioner(RuntimeState* state, const vectorized::ChunkPtr& chunk) { - const auto num_partitions = _chunks_partitioner->num_partitions(); - if (!_is_downgrade && num_partitions < MAX_PARTITION_NUM) { - return _chunks_partitioner->offer(chunk); - } else { + auto st = _chunks_partitioner->offer(chunk); + if (_chunks_partitioner->is_downgrade()) { transfer_all_chunks_from_partitioner_to_sorters(state); - { - std::lock_guard l(_buffer_lock); - _downgrade_buffer.push(chunk); - } - _is_downgrade = true; - return Status::OK(); } + return st; } void LocalPartitionTopnContext::sink_complete() { @@ -82,8 +73,8 @@ Status LocalPartitionTopnContext::transfer_all_chunks_from_partitioner_to_sorter state, &_sort_exec_exprs.lhs_ordering_expr_ctxs(), &_is_asc_order, &_is_null_first, _sort_keys, _offset, _partition_limit, _topn_type, vectorized::ChunksSorterTopn::tunning_buffered_chunks(_partition_limit)); } - RETURN_IF_ERROR( - _chunks_partitioner->accept([this, state](int32_t partition_idx, const vectorized::ChunkPtr& chunk) { + RETURN_IF_ERROR(_chunks_partitioner->consume_from_hash_map( + [this, state](int32_t partition_idx, const vectorized::ChunkPtr& chunk) { _chunks_sorters[partition_idx]->update(state, chunk); return true; })); @@ -97,8 +88,8 @@ Status LocalPartitionTopnContext::transfer_all_chunks_from_partitioner_to_sorter } bool LocalPartitionTopnContext::has_output() { - if (_is_downgrade) { - return _sorter_index < _chunks_sorters.size() || !_downgrade_buffer.empty(); + if (_chunks_partitioner->is_downgrade() && _is_transfered) { + return _sorter_index < _chunks_sorters.size() || !_chunks_partitioner->is_downgrade_buffer_empty(); } return _is_sink_complete && _sorter_index < _chunks_sorters.size(); } @@ -118,14 +109,7 @@ StatusOr LocalPartitionTopnContext::pull_one_chunk() { return chunk; } } - if (_downgrade_buffer.empty()) { - return nullptr; - } - { - std::lock_guard l(_buffer_lock); - chunk = _downgrade_buffer.front(); - _downgrade_buffer.pop(); - } + chunk = _chunks_partitioner->consume_from_downgrade_buffer(); return chunk; } diff --git a/be/src/exec/pipeline/sort/local_partition_topn_context.h b/be/src/exec/pipeline/sort/local_partition_topn_context.h index b42989da2f272..2f1313bdeb054 100644 --- a/be/src/exec/pipeline/sort/local_partition_topn_context.h +++ b/be/src/exec/pipeline/sort/local_partition_topn_context.h @@ -59,14 +59,6 @@ class LocalPartitionTopnContext { // The output chunk stream is unordered StatusOr pull_one_chunk_from_sorters(); - // TODO(hcf) set this value properly, maybe based on cache size - static const int32_t MAX_PARTITION_NUM; - - bool _is_downgrade = false; - // We simply offer chunk to this buffer when number of partition reaches the threshould MAX_PARTITION_NUM - std::queue _downgrade_buffer; - std::mutex _buffer_lock; - const std::vector& _t_partition_exprs; std::vector _partition_exprs; std::vector _partition_types; diff --git a/be/src/exec/vectorized/partition/chunks_partitioner.cpp b/be/src/exec/vectorized/partition/chunks_partitioner.cpp index d3efa4dc9f9cc..e3451798b8c03 100644 --- a/be/src/exec/vectorized/partition/chunks_partitioner.cpp +++ b/be/src/exec/vectorized/partition/chunks_partitioner.cpp @@ -28,16 +28,18 @@ Status ChunksPartitioner::prepare(RuntimeState* state) { Status ChunksPartitioner::offer(const ChunkPtr& chunk) { DCHECK(!_partition_it.has_value()); - for (size_t i = 0; i < _partition_exprs.size(); i++) { - ASSIGN_OR_RETURN(_partition_columns[i], _partition_exprs[i]->evaluate(chunk.get())); + if (!_is_downgrade) { + for (size_t i = 0; i < _partition_exprs.size(); i++) { + ASSIGN_OR_RETURN(_partition_columns[i], _partition_exprs[i]->evaluate(chunk.get())); + } } if (false) { } -#define HASH_MAP_METHOD(NAME) \ - else if (_hash_map_variant.type == PartitionHashMapVariant::Type::NAME) { \ - TRY_CATCH_BAD_ALLOC(split_chunk_by_partition( \ - *_hash_map_variant.NAME, chunk)); \ +#define HASH_MAP_METHOD(NAME) \ + else if (_hash_map_variant.type == PartitionHashMapVariant::Type::NAME) { \ + TRY_CATCH_BAD_ALLOC(_split_chunk_by_partition( \ + *_hash_map_variant.NAME, chunk)); \ } APPLY_FOR_PARTITION_VARIANT_ALL(HASH_MAP_METHOD) #undef HASH_MAP_METHOD @@ -45,6 +47,19 @@ Status ChunksPartitioner::offer(const ChunkPtr& chunk) { return Status::OK(); } +ChunkPtr ChunksPartitioner::consume_from_downgrade_buffer() { + vectorized::ChunkPtr chunk = nullptr; + if (_downgrade_buffer.empty()) { + return chunk; + } + { + std::lock_guard l(_buffer_lock); + chunk = _downgrade_buffer.front(); + _downgrade_buffer.pop(); + } + return chunk; +} + int32_t ChunksPartitioner::num_partitions() { return _hash_map_variant.size(); } diff --git a/be/src/exec/vectorized/partition/chunks_partitioner.h b/be/src/exec/vectorized/partition/chunks_partitioner.h index 6d710cb0ed5cf..56ce21510410e 100644 --- a/be/src/exec/vectorized/partition/chunks_partitioner.h +++ b/be/src/exec/vectorized/partition/chunks_partitioner.h @@ -3,6 +3,7 @@ #pragma once #include +#include #include "column/chunk.h" #include "column/vectorized_fwd.h" @@ -36,18 +37,22 @@ class ChunksPartitioner { // Number of partitions int32_t num_partitions(); + bool is_downgrade() const { return _is_downgrade; } + + bool is_downgrade_buffer_empty() const { return _downgrade_buffer.empty(); } + // Consumers consume from the hash map // method signature is: bool consumer(int32_t partition_idx, const ChunkPtr& chunk) // The return value of the consumer denote whether to continue or not template - Status accept(Consumer&& consumer) { + Status consume_from_hash_map(Consumer&& consumer) { // First, fetch chunks from hash map if (false) { } -#define HASH_MAP_METHOD(NAME) \ - else if (_hash_map_variant.type == PartitionHashMapVariant::Type::NAME) { \ - TRY_CATCH_BAD_ALLOC(fetch_chunks_from_hash_map( \ - *_hash_map_variant.NAME, consumer)); \ +#define HASH_MAP_METHOD(NAME) \ + else if (_hash_map_variant.type == PartitionHashMapVariant::Type::NAME) { \ + TRY_CATCH_BAD_ALLOC(_fetch_chunks_from_hash_map( \ + *_hash_map_variant.NAME, consumer)); \ } APPLY_FOR_PARTITION_VARIANT_ALL(HASH_MAP_METHOD) #undef HASH_MAP_METHOD @@ -66,6 +71,9 @@ class ChunksPartitioner { return Status::OK(); } + // Fetch one chunk from downgrade buffer if any + ChunkPtr consume_from_downgrade_buffer(); + private: bool _is_partition_columns_fixed_size(const std::vector& partition_expr_ctxs, const std::vector& partition_types, size_t* max_size, @@ -73,13 +81,17 @@ class ChunksPartitioner { void _init_hash_map_variant(); template - void split_chunk_by_partition(HashMapWithKey& hash_map_with_key, const ChunkPtr& chunk) { - hash_map_with_key.append_chunk(chunk, _partition_columns, _mem_pool.get(), _obj_pool); + void _split_chunk_by_partition(HashMapWithKey& hash_map_with_key, const ChunkPtr& chunk) { + _is_downgrade = hash_map_with_key.append_chunk(chunk, _partition_columns, _mem_pool.get(), _obj_pool); + if (_is_downgrade) { + std::lock_guard l(_buffer_lock); + _downgrade_buffer.push(chunk); + } } // Fetch chunks from hash map, return true if reaches eos template - bool fetch_chunks_from_hash_map(HashMapWithKey& hash_map_with_key, Consumer&& consumer) { + bool _fetch_chunks_from_hash_map(HashMapWithKey& hash_map_with_key, Consumer&& consumer) { if (_hash_map_eos) { return true; } @@ -177,6 +189,11 @@ class ChunksPartitioner { // Hash map which holds chunks of different partitions PartitionHashMapVariant _hash_map_variant; + bool _is_downgrade = false; + // We simply buffer chunks when partition cardinality is high + std::queue _downgrade_buffer; + std::mutex _buffer_lock; + // Iterator of partitions std::any _partition_it; // Iterator of chunks of current partition diff --git a/be/src/exec/vectorized/partition/partition_hash_map.h b/be/src/exec/vectorized/partition/partition_hash_map.h index 8952f07c405fb..3f25f08dd9716 100644 --- a/be/src/exec/vectorized/partition/partition_hash_map.h +++ b/be/src/exec/vectorized/partition/partition_hash_map.h @@ -60,6 +60,9 @@ using FixedSize16SlicePartitionHashMap = struct PartitionHashMapBase { const int32_t chunk_size; + bool is_downgrade = false; + + int64_t total_num_rows = 0; PartitionHashMapBase(int32_t chunk_size) : chunk_size(chunk_size) {} @@ -84,6 +87,17 @@ struct PartitionHashMapBase { } } + template + void check_downgrade(HashMap& hash_map) { + if (is_downgrade) { + return; + } + auto partition_num = hash_map.size(); + if (partition_num > 512 && total_num_rows < 10000 * partition_num) { + is_downgrade = true; + } + } + // Append chunk to the hash_map for one non-nullable key // Each key mapped to a PartitionChunks which holds an array of chunks // chunk will be divided into multiply parts by partition key, and each parts will be append to the @@ -92,16 +106,25 @@ struct PartitionHashMapBase { template void append_chunk_for_one_key(HashMap& hash_map, ChunkPtr chunk, KeyLoader&& key_loader, KeyAllocator&& key_allocator, ObjectPool* obj_pool) { + if (is_downgrade) { + return; + } phmap::flat_hash_set visited_keys(chunk->num_rows()); const auto size = chunk->num_rows(); - for (uint32_t i = 0; i < size; i++) { + uint32_t i = 0; + for (; !is_downgrade && i < size; i++) { const auto& key = key_loader(i); visited_keys.insert(key); + bool is_new_partition = false; auto iter = hash_map.lazy_emplace(key, [&](const auto& ctor) { + is_new_partition = true; return ctor(key_allocator(key), obj_pool->add(new PartitionChunks())); }); + if (is_new_partition) { + check_downgrade(hash_map); + } auto& value = *(iter->second); if (value.chunks.empty() || value.remain_size <= 0) { if (!value.chunks.empty()) { @@ -113,11 +136,19 @@ struct PartitionHashMapBase { } value.select_indexes.push_back(i); value.remain_size--; + total_num_rows++; } for (const auto& key : visited_keys) { flush(*(hash_map[key]), chunk); } + + // The first i rows has been pushed into hash_map + if (is_downgrade && i > 0) { + for (auto& column : chunk->columns()) { + column->remove_first_n_values(i); + } + } } // Append chunk to the hash_map for one nullable key @@ -130,6 +161,9 @@ struct PartitionHashMapBase { void append_chunk_for_one_nullable_key(HashMap& hash_map, PartitionChunks& null_key_value, ChunkPtr chunk, const NullableColumn* nullable_key_column, KeyLoader&& key_loader, KeyAllocator&& key_allocator, ObjectPool* obj_pool) { + if (is_downgrade) { + return; + } if (nullable_key_column->only_null()) { const auto size = chunk->num_rows(); if (null_key_value.chunks.empty() || null_key_value.remain_size <= 0) { @@ -151,6 +185,7 @@ struct PartitionHashMapBase { } null_key_value.chunks.back()->append(*chunk, offset, cur_remain_size); null_key_value.remain_size = chunk_size - null_key_value.chunks.back()->num_rows(); + total_num_rows += size; } else { phmap::flat_hash_set @@ -159,16 +194,22 @@ struct PartitionHashMapBase { const auto& null_flag_data = nullable_key_column->null_column()->get_data(); const auto size = chunk->num_rows(); - for (uint32_t i = 0; i < size; i++) { + uint32_t i = 0; + for (; !is_downgrade && i < size; i++) { PartitionChunks* value_ptr = nullptr; if (null_flag_data[i] == 1) { value_ptr = &null_key_value; } else { const auto& key = key_loader(i); visited_keys.insert(key); + bool is_new_partition = false; auto iter = hash_map.lazy_emplace(key, [&](const auto& ctor) { + is_new_partition = true; return ctor(key_allocator(key), obj_pool->add(new PartitionChunks())); }); + if (is_new_partition) { + check_downgrade(hash_map); + } value_ptr = iter->second; } @@ -183,12 +224,20 @@ struct PartitionHashMapBase { } value.select_indexes.push_back(i); value.remain_size--; + total_num_rows++; } for (const auto& key : visited_keys) { flush(*(hash_map[key]), chunk); } flush(null_key_value, chunk); + + // The first i rows has been pushed into hash_map + if (is_downgrade && i > 0) { + for (auto& column : chunk->columns()) { + column->remove_first_n_values(i); + } + } } } }; @@ -202,13 +251,14 @@ struct PartitionHashMapWithOneNumberKey : public PartitionHashMapBase { PartitionHashMapWithOneNumberKey(int32_t chunk_size) : PartitionHashMapBase(chunk_size) {} - void append_chunk(ChunkPtr chunk, const Columns& key_columns, MemPool* mem_pool, ObjectPool* obj_pool) { + bool append_chunk(ChunkPtr chunk, const Columns& key_columns, MemPool* mem_pool, ObjectPool* obj_pool) { DCHECK(!key_columns[0]->is_nullable()); const auto* key_column = down_cast(key_columns[0].get()); const auto& key_column_data = key_column->get_data(); append_chunk_for_one_key( hash_map, chunk, [&](uint32_t offset) { return key_column_data[offset]; }, [](const FieldType& key) { return key; }, obj_pool); + return is_downgrade; } }; @@ -222,7 +272,7 @@ struct PartitionHashMapWithOneNullableNumberKey : public PartitionHashMapBase { PartitionHashMapWithOneNullableNumberKey(int32_t chunk_size) : PartitionHashMapBase(chunk_size) {} - void append_chunk(ChunkPtr chunk, const Columns& key_columns, MemPool* mem_pool, ObjectPool* obj_pool) { + bool append_chunk(ChunkPtr chunk, const Columns& key_columns, MemPool* mem_pool, ObjectPool* obj_pool) { DCHECK(key_columns[0]->is_nullable()); const auto* nullable_key_column = ColumnHelper::as_raw_column(key_columns[0].get()); const auto& key_column_data = down_cast(nullable_key_column->data_column().get())->get_data(); @@ -230,6 +280,7 @@ struct PartitionHashMapWithOneNullableNumberKey : public PartitionHashMapBase { hash_map, null_key_value, chunk, nullable_key_column, [&](uint32_t offset) { return key_column_data[offset]; }, [](const FieldType& key) { return key; }, obj_pool); + return is_downgrade; } }; @@ -240,7 +291,7 @@ struct PartitionHashMapWithOneStringKey : public PartitionHashMapBase { PartitionHashMapWithOneStringKey(int32_t chunk_size) : PartitionHashMapBase(chunk_size) {} - void append_chunk(ChunkPtr chunk, const Columns& key_columns, MemPool* mem_pool, ObjectPool* obj_pool) { + bool append_chunk(ChunkPtr chunk, const Columns& key_columns, MemPool* mem_pool, ObjectPool* obj_pool) { DCHECK(!key_columns[0]->is_nullable()); const auto* key_column = down_cast(key_columns[0].get()); append_chunk_for_one_key( @@ -251,6 +302,7 @@ struct PartitionHashMapWithOneStringKey : public PartitionHashMapBase { return Slice{pos, key.size}; }, obj_pool); + return is_downgrade; } }; @@ -262,7 +314,7 @@ struct PartitionHashMapWithOneNullableStringKey : public PartitionHashMapBase { PartitionHashMapWithOneNullableStringKey(int32_t chunk_size) : PartitionHashMapBase(chunk_size) {} - void append_chunk(ChunkPtr chunk, const Columns& key_columns, MemPool* mem_pool, ObjectPool* obj_pool) { + bool append_chunk(ChunkPtr chunk, const Columns& key_columns, MemPool* mem_pool, ObjectPool* obj_pool) { DCHECK(key_columns[0]->is_nullable()); const auto* nullable_key_column = ColumnHelper::as_raw_column(key_columns[0].get()); const auto* key_column = down_cast(nullable_key_column->data_column().get()); @@ -275,6 +327,7 @@ struct PartitionHashMapWithOneNullableStringKey : public PartitionHashMapBase { return Slice{pos, key.size}; }, obj_pool); + return is_downgrade; } }; @@ -285,7 +338,9 @@ struct PartitionHashMapWithSerializedKey { HashMap hash_map; PartitionHashMapWithSerializedKey(int32_t chunk_size) {} - void append_chunk(ChunkPtr chunk, const Columns& key_columns, MemPool* mem_pool, ObjectPool* obj_pool) {} + bool append_chunk(ChunkPtr chunk, const Columns& key_columns, MemPool* mem_pool, ObjectPool* obj_pool) { + return false; + } }; // TODO(hcf) to be implemented @@ -297,7 +352,9 @@ struct PartitionHashMapWithSerializedKeyFixedSize { int fixed_byte_size = -1; // unset state PartitionHashMapWithSerializedKeyFixedSize(int32_t chunk_size) {} - void append_chunk(ChunkPtr chunk, const Columns& key_columns, MemPool* mem_pool, ObjectPool* obj_pool) {} + bool append_chunk(ChunkPtr chunk, const Columns& key_columns, MemPool* mem_pool, ObjectPool* obj_pool) { + return false; + } }; } // namespace starrocks::vectorized