Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 46 additions & 23 deletions be/src/exec/exchange/local_exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,30 @@ std::vector<Dependency*> LocalExchangeSinkLocalState::dependencies() const {
return deps;
}

Status LocalExchangeSinkOperatorX::init(RuntimeState* state, ExchangeType type,
const int num_buckets, const bool use_global_hash_shuffle,
Status LocalExchangeSinkOperatorX::init(RuntimeState* state, TLocalPartitionType::type type,
const int num_buckets,
const std::map<int, int>& shuffle_idx_to_instance_idx) {
DCHECK(!_planned_by_fe);
_name = "LOCAL_EXCHANGE_SINK_OPERATOR(" + get_exchange_type_name(type) + ")";
_type = type;
if (_type == ExchangeType::HASH_SHUFFLE) {
_shuffle_idx_to_instance_idx.clear();
_use_global_shuffle = use_global_hash_shuffle;
if (_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
// For shuffle join, if data distribution has been broken by previous operator, we
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned,
// we should use map shuffle idx to instance idx because all instances will be
// distributed to all BEs. Otherwise, we should use shuffle idx directly.
if (use_global_hash_shuffle) {
_shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx;
_shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx;
if (state->query_options().__isset.enable_new_shuffle_hash_method &&
state->query_options().enable_new_shuffle_hash_method) {
_partitioner = std::make_unique<Crc32CHashPartitioner>(_num_partitions);
} else {
for (int i = 0; i < _num_partitions; i++) {
_shuffle_idx_to_instance_idx[i] = i;
}
_partitioner =
std::make_unique<Crc32HashPartitioner<ShuffleChannelIds>>(_num_partitions);
}
RETURN_IF_ERROR(_partitioner->init(_texprs));
} else if (_type == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE) {
_shuffle_idx_to_instance_idx.clear();
for (int i = 0; i < _num_partitions; i++) {
_shuffle_idx_to_instance_idx[i] = i;
}
if (state->query_options().__isset.enable_new_shuffle_hash_method &&
state->query_options().enable_new_shuffle_hash_method) {
Expand All @@ -64,17 +70,41 @@ Status LocalExchangeSinkOperatorX::init(RuntimeState* state, ExchangeType type,
std::make_unique<Crc32HashPartitioner<ShuffleChannelIds>>(_num_partitions);
}
RETURN_IF_ERROR(_partitioner->init(_texprs));
} else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
} else if (_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
DCHECK_GT(num_buckets, 0);
_partitioner = std::make_unique<Crc32HashPartitioner<ShuffleChannelIds>>(num_buckets);
RETURN_IF_ERROR(_partitioner->init(_texprs));
}
return Status::OK();
}

Status LocalExchangeSinkOperatorX::init_partitioner(RuntimeState* state) {
DCHECK(_planned_by_fe);
// Set operator name to include exchange type (base class init(tnode) only sets generic name).
_name = "LOCAL_EXCHANGE_SINK_OPERATOR(" + get_exchange_type_name(_type) + ")";
if (_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE ||
_type == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE) {
if (state->query_options().__isset.enable_new_shuffle_hash_method &&
state->query_options().enable_new_shuffle_hash_method) {
_partitioner = std::make_unique<Crc32CHashPartitioner>(_num_partitions);
} else {
_partitioner =
std::make_unique<Crc32HashPartitioner<ShuffleChannelIds>>(_num_partitions);
}
RETURN_IF_ERROR(_partitioner->init(_texprs));
} else if (_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
DCHECK_GT(_num_partitions, 0);
_partitioner = std::make_unique<Crc32HashPartitioner<ShuffleChannelIds>>(_num_partitions);
RETURN_IF_ERROR(_partitioner->init(_texprs));
}
return Status::OK();
}

Status LocalExchangeSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<LocalExchangeSinkLocalState>::prepare(state));
if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) {
if (_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE ||
_type == TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE ||
_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc()));
RETURN_IF_ERROR(_partitioner->open(state));
}
Expand All @@ -88,11 +118,6 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
SCOPED_TIMER(_init_timer);
_compute_hash_value_timer = ADD_TIMER(custom_profile(), "ComputeHashValueTime");
_distribute_timer = ADD_TIMER(custom_profile(), "DistributeDataTime");
if (_parent->cast<LocalExchangeSinkOperatorX>()._type == ExchangeType::HASH_SHUFFLE) {
custom_profile()->add_info_string(
"UseGlobalShuffle",
std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle));
}
custom_profile()->add_info_string(
"PartitionExprsSize",
std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._partitioned_exprs_num));
Expand All @@ -108,8 +133,7 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* state) {
_exchanger = _shared_state->exchanger.get();
DCHECK(_exchanger != nullptr);

if (_exchanger->get_type() == ExchangeType::HASH_SHUFFLE ||
_exchanger->get_type() == ExchangeType::BUCKET_HASH_SHUFFLE) {
if (is_shuffled_exchange(_exchanger->get_type())) {
auto& p = _parent->cast<LocalExchangeSinkOperatorX>();
RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner));
}
Expand All @@ -132,12 +156,11 @@ Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_statu
std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}, _use_global_shuffle: {}, _channel_id: {}, _num_partitions: {}, "
"{}, _channel_id: {}, _num_partitions: {}, "
"_num_senders: {}, _num_sources: {}, "
"_running_sink_operators: {}, _running_source_operators: {}",
Base::debug_string(indentation_level),
_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle, _channel_id,
_exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources,
Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions,
_exchanger->_num_senders, _exchanger->_num_sources,
_exchanger->_running_sink_operators, _exchanger->_running_source_operators);
return fmt::to_string(debug_string_buffer);
}
Expand Down
28 changes: 20 additions & 8 deletions be/src/exec/exchange/local_exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeS
_texprs(texprs),
_partitioned_exprs_num(texprs.size()),
_shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {}

LocalExchangeSinkOperatorX(int operator_id, int dest_id, const TPlanNode& tnode,
int num_partitions,
const std::map<int, int>& shuffle_id_to_instance_idx)
: Base(operator_id, tnode, dest_id),
_type(tnode.local_exchange_node.partition_type),
_num_partitions(num_partitions),
_texprs(tnode.local_exchange_node.distribute_expr_lists),
_partitioned_exprs_num(tnode.local_exchange_node.distribute_expr_lists.size()),
_shuffle_idx_to_instance_idx(shuffle_id_to_instance_idx),
_planned_by_fe(true) {}
#ifdef BE_TEST
LocalExchangeSinkOperatorX(const std::vector<TExpr>& texprs,
const std::map<int, int>& bucket_seq_to_instance_idx)
Expand All @@ -89,18 +100,19 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeS
_shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {}
#endif

Status init(const TPlanNode& tnode, RuntimeState* state) override {
return Status::InternalError("{} should not init with TPlanNode", Base::_name);
}

Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode", Base::_name);
}

Status init(RuntimeState* state, ExchangeType type, const int num_buckets,
const bool use_global_hash_shuffle,
Status init(RuntimeState* state, TLocalPartitionType::type type, const int num_buckets,
const std::map<int, int>& shuffle_idx_to_instance_idx) override;

// Initialize partitioner for FE-planned local exchange nodes. The FE-planned constructor
// already sets _type, _num_partitions, _texprs, and _shuffle_idx_to_instance_idx from the
// TPlanNode, but does not create the partitioner. This method creates the partitioner so
// that prepare() can call _partitioner->prepare() without null dereference.
Status init_partitioner(RuntimeState* state);

Status prepare(RuntimeState* state) override;

Status sink(RuntimeState* state, Block* in_block, bool eos) override;
Expand All @@ -115,13 +127,13 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeS
private:
friend class LocalExchangeSinkLocalState;
friend class ShuffleExchanger;
ExchangeType _type;
TLocalPartitionType::type _type;
const int _num_partitions;
const std::vector<TExpr>& _texprs;
const size_t _partitioned_exprs_num;
std::unique_ptr<PartitionerBase> _partitioner;
std::map<int, int> _shuffle_idx_to_instance_idx;
bool _use_global_shuffle = false;
const bool _planned_by_fe = false;
};

} // namespace doris
5 changes: 2 additions & 3 deletions be/src/exec/exchange/local_exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo&
DCHECK(_exchanger != nullptr);
_get_block_failed_counter =
ADD_COUNTER_WITH_LEVEL(custom_profile(), "GetBlockFailedTime", TUnit::UNIT, 1);
if (_exchanger->get_type() == ExchangeType::HASH_SHUFFLE ||
_exchanger->get_type() == ExchangeType::BUCKET_HASH_SHUFFLE) {
if (is_shuffled_exchange(_exchanger->get_type())) {
_copy_data_timer = ADD_TIMER(custom_profile(), "CopyDataTime");
}

Expand Down Expand Up @@ -60,7 +59,7 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* state) {
}

std::vector<Dependency*> LocalExchangeSourceLocalState::dependencies() const {
if ((_exchanger->get_type() == ExchangeType::PASS_TO_ONE) && _channel_id != 0) {
if ((_exchanger->get_type() == TLocalPartitionType::PASS_TO_ONE) && _channel_id != 0) {
// If this is a PASS_TO_ONE exchange and is not the first task, source operators always
// return empty result so no dependencies here.
return {};
Expand Down
38 changes: 33 additions & 5 deletions be/src/exec/exchange/local_exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,47 @@ class LocalExchangeSourceOperatorX final : public OperatorX<LocalExchangeSourceL
public:
using Base = OperatorX<LocalExchangeSourceLocalState>;
LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, id, id) {}
LocalExchangeSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs)
: Base(pool, tnode, operator_id, descs),
_exchange_type(tnode.local_exchange_node.partition_type),
_planned_by_fe(true) {}
#ifdef BE_TEST
LocalExchangeSourceOperatorX() = default;
#endif
Status init(ExchangeType type) override {
Status init(TLocalPartitionType::type type) override {
DCHECK(!_planned_by_fe);
_op_name = "LOCAL_EXCHANGE_OPERATOR(" + get_exchange_type_name(type) + ")";
_exchange_type = type;
return Status::OK();
}
Status prepare(RuntimeState* state) override { return Status::OK(); }
Status prepare(RuntimeState* state) override {
if (_planned_by_fe) {
RETURN_IF_ERROR(Base::prepare(state));
// Base::prepare() resets _op_name from tnode node_type; restore the type-qualified name.
_op_name = "LOCAL_EXCHANGE_OPERATOR(" + get_exchange_type_name(_exchange_type) + ")";
return Status::OK();
}
return Status::OK();
}
const RowDescriptor& intermediate_row_desc() const override {
if (_planned_by_fe) {
return Base::intermediate_row_desc();
}
return _child->intermediate_row_desc();
}
RowDescriptor& row_descriptor() override { return _child->row_descriptor(); }
const RowDescriptor& row_desc() const override { return _child->row_desc(); }
RowDescriptor& row_descriptor() override {
if (_planned_by_fe) {
return Base::row_descriptor();
}
return _child->row_descriptor();
}
const RowDescriptor& row_desc() const override {
if (_planned_by_fe) {
return Base::row_desc();
}
return _child->row_desc();
}

Status get_block(RuntimeState* state, Block* block, bool* eos) override;

Expand All @@ -85,7 +112,8 @@ class LocalExchangeSourceOperatorX final : public OperatorX<LocalExchangeSourceL
private:
friend class LocalExchangeSourceLocalState;

ExchangeType _exchange_type;
TLocalPartitionType::type _exchange_type;
const bool _planned_by_fe = false;
};

} // namespace doris
Loading
Loading