Skip to content

Commit

Permalink
[Enhancement] Adjust downgrade strategy of chunks_partitioner (#8696)
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyehcf committed Jul 18, 2022
1 parent 15babd5 commit f355c23
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 54 deletions.
32 changes: 8 additions & 24 deletions be/src/exec/pipeline/sort/local_partition_topn_context.cpp
Expand Up @@ -8,8 +8,6 @@

namespace starrocks::pipeline {

const int32_t LocalPartitionTopnContext::MAX_PARTITION_NUM = 4096;

LocalPartitionTopnContext::LocalPartitionTopnContext(
const std::vector<TExpr>& t_partition_exprs, SortExecExprs& sort_exec_exprs, std::vector<bool> is_asc_order,
std::vector<bool> is_null_first, const std::string& sort_keys, int64_t offset, int64_t partition_limit,
Expand Down Expand Up @@ -53,18 +51,11 @@ Status LocalPartitionTopnContext::prepare(RuntimeState* state) {

Status LocalPartitionTopnContext::push_one_chunk_to_partitioner(RuntimeState* state,
const vectorized::ChunkPtr& chunk) {
const auto num_partitions = _chunks_partitioner->num_partitions();
if (!_is_downgrade && num_partitions < MAX_PARTITION_NUM) {
return _chunks_partitioner->offer(chunk);
} else {
auto st = _chunks_partitioner->offer(chunk);
if (_chunks_partitioner->is_downgrade()) {
transfer_all_chunks_from_partitioner_to_sorters(state);
{
std::lock_guard<std::mutex> l(_buffer_lock);
_downgrade_buffer.push(chunk);
}
_is_downgrade = true;
return Status::OK();
}
return st;
}

void LocalPartitionTopnContext::sink_complete() {
Expand All @@ -82,8 +73,8 @@ Status LocalPartitionTopnContext::transfer_all_chunks_from_partitioner_to_sorter
state, &_sort_exec_exprs.lhs_ordering_expr_ctxs(), &_is_asc_order, &_is_null_first, _sort_keys, _offset,
_partition_limit, _topn_type, vectorized::ChunksSorterTopn::tunning_buffered_chunks(_partition_limit));
}
RETURN_IF_ERROR(
_chunks_partitioner->accept([this, state](int32_t partition_idx, const vectorized::ChunkPtr& chunk) {
RETURN_IF_ERROR(_chunks_partitioner->consume_from_hash_map(
[this, state](int32_t partition_idx, const vectorized::ChunkPtr& chunk) {
_chunks_sorters[partition_idx]->update(state, chunk);
return true;
}));
Expand All @@ -97,8 +88,8 @@ Status LocalPartitionTopnContext::transfer_all_chunks_from_partitioner_to_sorter
}

bool LocalPartitionTopnContext::has_output() {
if (_is_downgrade) {
return _sorter_index < _chunks_sorters.size() || !_downgrade_buffer.empty();
if (_chunks_partitioner->is_downgrade() && _is_transfered) {
return _sorter_index < _chunks_sorters.size() || !_chunks_partitioner->is_downgrade_buffer_empty();
}
return _is_sink_complete && _sorter_index < _chunks_sorters.size();
}
Expand All @@ -118,14 +109,7 @@ StatusOr<vectorized::ChunkPtr> LocalPartitionTopnContext::pull_one_chunk() {
return chunk;
}
}
if (_downgrade_buffer.empty()) {
return nullptr;
}
{
std::lock_guard<std::mutex> l(_buffer_lock);
chunk = _downgrade_buffer.front();
_downgrade_buffer.pop();
}
chunk = _chunks_partitioner->consume_from_downgrade_buffer();
return chunk;
}

Expand Down
8 changes: 0 additions & 8 deletions be/src/exec/pipeline/sort/local_partition_topn_context.h
Expand Up @@ -59,14 +59,6 @@ class LocalPartitionTopnContext {
// The output chunk stream is unordered
StatusOr<vectorized::ChunkPtr> pull_one_chunk_from_sorters();

// TODO(hcf) set this value properly, maybe based on cache size
static const int32_t MAX_PARTITION_NUM;

bool _is_downgrade = false;
// We simply offer chunk to this buffer when number of partition reaches the threshould MAX_PARTITION_NUM
std::queue<vectorized::ChunkPtr> _downgrade_buffer;
std::mutex _buffer_lock;

const std::vector<TExpr>& _t_partition_exprs;
std::vector<ExprContext*> _partition_exprs;
std::vector<vectorized::PartitionColumnType> _partition_types;
Expand Down
27 changes: 21 additions & 6 deletions be/src/exec/vectorized/partition/chunks_partitioner.cpp
Expand Up @@ -28,23 +28,38 @@ Status ChunksPartitioner::prepare(RuntimeState* state) {
Status ChunksPartitioner::offer(const ChunkPtr& chunk) {
DCHECK(!_partition_it.has_value());

for (size_t i = 0; i < _partition_exprs.size(); i++) {
ASSIGN_OR_RETURN(_partition_columns[i], _partition_exprs[i]->evaluate(chunk.get()));
if (!_is_downgrade) {
for (size_t i = 0; i < _partition_exprs.size(); i++) {
ASSIGN_OR_RETURN(_partition_columns[i], _partition_exprs[i]->evaluate(chunk.get()));
}
}

if (false) {
}
#define HASH_MAP_METHOD(NAME) \
else if (_hash_map_variant.type == PartitionHashMapVariant::Type::NAME) { \
TRY_CATCH_BAD_ALLOC(split_chunk_by_partition<decltype(_hash_map_variant.NAME)::element_type>( \
*_hash_map_variant.NAME, chunk)); \
#define HASH_MAP_METHOD(NAME) \
else if (_hash_map_variant.type == PartitionHashMapVariant::Type::NAME) { \
TRY_CATCH_BAD_ALLOC(_split_chunk_by_partition<decltype(_hash_map_variant.NAME)::element_type>( \
*_hash_map_variant.NAME, chunk)); \
}
APPLY_FOR_PARTITION_VARIANT_ALL(HASH_MAP_METHOD)
#undef HASH_MAP_METHOD

return Status::OK();
}

ChunkPtr ChunksPartitioner::consume_from_downgrade_buffer() {
vectorized::ChunkPtr chunk = nullptr;
if (_downgrade_buffer.empty()) {
return chunk;
}
{
std::lock_guard<std::mutex> l(_buffer_lock);
chunk = _downgrade_buffer.front();
_downgrade_buffer.pop();
}
return chunk;
}

int32_t ChunksPartitioner::num_partitions() {
return _hash_map_variant.size();
}
Expand Down
33 changes: 25 additions & 8 deletions be/src/exec/vectorized/partition/chunks_partitioner.h
Expand Up @@ -3,6 +3,7 @@
#pragma once

#include <any>
#include <queue>

#include "column/chunk.h"
#include "column/vectorized_fwd.h"
Expand Down Expand Up @@ -36,18 +37,22 @@ class ChunksPartitioner {
// Number of partitions
int32_t num_partitions();

bool is_downgrade() const { return _is_downgrade; }

bool is_downgrade_buffer_empty() const { return _downgrade_buffer.empty(); }

// Consumers consume from the hash map
// method signature is: bool consumer(int32_t partition_idx, const ChunkPtr& chunk)
// The return value of the consumer denote whether to continue or not
template <typename Consumer>
Status accept(Consumer&& consumer) {
Status consume_from_hash_map(Consumer&& consumer) {
// First, fetch chunks from hash map
if (false) {
}
#define HASH_MAP_METHOD(NAME) \
else if (_hash_map_variant.type == PartitionHashMapVariant::Type::NAME) { \
TRY_CATCH_BAD_ALLOC(fetch_chunks_from_hash_map<typename decltype(_hash_map_variant.NAME)::element_type>( \
*_hash_map_variant.NAME, consumer)); \
#define HASH_MAP_METHOD(NAME) \
else if (_hash_map_variant.type == PartitionHashMapVariant::Type::NAME) { \
TRY_CATCH_BAD_ALLOC(_fetch_chunks_from_hash_map<typename decltype(_hash_map_variant.NAME)::element_type>( \
*_hash_map_variant.NAME, consumer)); \
}
APPLY_FOR_PARTITION_VARIANT_ALL(HASH_MAP_METHOD)
#undef HASH_MAP_METHOD
Expand All @@ -66,20 +71,27 @@ class ChunksPartitioner {
return Status::OK();
}

// Fetch one chunk from downgrade buffer if any
ChunkPtr consume_from_downgrade_buffer();

private:
bool _is_partition_columns_fixed_size(const std::vector<ExprContext*>& partition_expr_ctxs,
const std::vector<PartitionColumnType>& partition_types, size_t* max_size,
bool* has_null);
void _init_hash_map_variant();

template <typename HashMapWithKey>
void split_chunk_by_partition(HashMapWithKey& hash_map_with_key, const ChunkPtr& chunk) {
hash_map_with_key.append_chunk(chunk, _partition_columns, _mem_pool.get(), _obj_pool);
void _split_chunk_by_partition(HashMapWithKey& hash_map_with_key, const ChunkPtr& chunk) {
_is_downgrade = hash_map_with_key.append_chunk(chunk, _partition_columns, _mem_pool.get(), _obj_pool);
if (_is_downgrade) {
std::lock_guard<std::mutex> l(_buffer_lock);
_downgrade_buffer.push(chunk);
}
}

// Fetch chunks from hash map, return true if reaches eos
template <typename HashMapWithKey, typename Consumer>
bool fetch_chunks_from_hash_map(HashMapWithKey& hash_map_with_key, Consumer&& consumer) {
bool _fetch_chunks_from_hash_map(HashMapWithKey& hash_map_with_key, Consumer&& consumer) {
if (_hash_map_eos) {
return true;
}
Expand Down Expand Up @@ -177,6 +189,11 @@ class ChunksPartitioner {
// Hash map which holds chunks of different partitions
PartitionHashMapVariant _hash_map_variant;

bool _is_downgrade = false;
// We simply buffer chunks when partition cardinality is high
std::queue<ChunkPtr> _downgrade_buffer;
std::mutex _buffer_lock;

// Iterator of partitions
std::any _partition_it;
// Iterator of chunks of current partition
Expand Down

0 comments on commit f355c23

Please sign in to comment.