Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions be/src/pipeline/common/agg_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,15 +275,17 @@ struct AggregateDataContainer {
using IteratorBase<ConstIterator, true>::IteratorBase;
};

ConstIterator begin() const { return ConstIterator(this, 0); }
ConstIterator begin() const { return {this, 0}; }

ConstIterator cbegin() const { return begin(); }

Iterator begin() { return Iterator(this, 0); }
Iterator begin() { return {this, 0}; }

ConstIterator end() const { return ConstIterator(this, _total_count); }
ConstIterator end() const { return {this, _total_count}; }
ConstIterator cend() const { return end(); }
Iterator end() { return Iterator(this, _total_count); }
Iterator end() { return {this, _total_count}; }

[[nodiscard]] uint32_t total_count() const { return _total_count; }

void init_once() {
if (_inited) {
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holde
namespace pipeline {

ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id,
int be_number, RuntimeState* state,
PlanNodeId node_id, int be_number, RuntimeState* state,
ExchangeSinkLocalState* parent)
: HasTaskExecutionCtx(state),
_queue_capacity(0),
_is_finishing(false),
_query_id(query_id),
_dest_node_id(dest_node_id),
_sender_id(send_id),
_node_id(node_id),
_be_number(be_number),
_state(state),
_context(state->get_query_ctx()),
Expand Down Expand Up @@ -408,6 +409,8 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) {
}

void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
LOG(INFO) << "send rpc failed, instance id: " << id << ", _dest_node_id: " << _dest_node_id
<< ",_sender_id: " << _sender_id << ", node id: " << _node_id << ", err: " << err;
_is_finishing = true;
_context->cancel(Status::Cancelled(err));
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ struct ExchangeRpcContext {
// Each ExchangeSinkOperator have one ExchangeSinkBuffer
class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
public:
ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, int be_number,
RuntimeState* state, ExchangeSinkLocalState* parent);
ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, PlanNodeId node_id,
int be_number, RuntimeState* state, ExchangeSinkLocalState* parent);
~ExchangeSinkBuffer() override = default;
void register_sink(TUniqueId);

Expand Down Expand Up @@ -235,6 +235,7 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
PlanNodeId _dest_node_id;
// Sender instance id, unique within a fragment. StreamSender save the variable
int _sender_id;
PlanNodeId _node_id;
int _be_number;
std::atomic<int64_t> _rpc_count = 0;
RuntimeState* _state = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
PUniqueId id;
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
_sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, p._dest_node_id, _sender_id,
_state->be_number(), state, this);
_sink_buffer = std::make_unique<ExchangeSinkBuffer>(
id, p._dest_node_id, _sender_id, _parent->node_id(), _state->be_number(), state, this);

register_channels(_sink_buffer.get());
_queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
Expand Down
13 changes: 8 additions & 5 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,6 @@ size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) {

if (!_build_side_mutable_block.empty()) {
size_to_reserve += _build_side_mutable_block.allocated_bytes();

// estimating for serialized key
for (auto id : _build_col_ids) {
size_to_reserve += _build_side_mutable_block.get_column_by_position(id)->byte_size();
}
}

const size_t rows = _build_side_mutable_block.rows() + state->batch_size();
Expand All @@ -143,6 +138,14 @@ size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) {
}
size_to_reserve += _evaluate_mem_usage;

if (size_to_reserve > 2L * 1024 * 1024 * 1024) [[unlikely]] {
LOG(INFO) << "**** too big reserve size: " << size_to_reserve << ", rows: " << rows
<< ", bucket_size: " << bucket_size
<< ", mutable block size: " << _build_side_mutable_block.allocated_bytes()
<< ", mutable block cols: " << _build_side_mutable_block.columns()
<< ", _build_col_ids.size: " << _build_col_ids.size();
}

return size_to_reserve;
}

Expand Down
25 changes: 13 additions & 12 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state) {
}

Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
const auto size_to_revoke = _parent->revocable_mem_size(state);
VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node "
<< Base::_parent->node_id()
<< " revoke_memory, size: " << _parent->revocable_mem_size(state)
Expand Down Expand Up @@ -282,7 +283,7 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
state->get_query_ctx()->increase_revoking_tasks_count();
auto spill_runnable = std::make_shared<SpillRunnable>(
state, _shared_state->shared_from_this(),
[this, &parent, state, query_id, submit_timer] {
[this, &parent, state, query_id, size_to_revoke, submit_timer] {
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
auto st = Status::InternalError(
"fault_inject partitioned_agg_sink "
Expand Down Expand Up @@ -316,17 +317,17 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
}};
auto* runtime_state = _runtime_state.get();
auto* agg_data = parent._agg_sink_operator->get_agg_data(runtime_state);
Base::_shared_state->sink_status =
std::visit(vectorized::Overload {
[&](std::monostate& arg) -> Status {
return Status::InternalError("Unit hash table");
},
[&](auto& agg_method) -> Status {
auto& hash_table = *agg_method.hash_table;
RETURN_IF_CATCH_EXCEPTION(return _spill_hash_table(
state, agg_method, hash_table, _eos));
}},
agg_data->method_variant);
Base::_shared_state->sink_status = std::visit(
vectorized::Overload {
[&](std::monostate& arg) -> Status {
return Status::InternalError("Unit hash table");
},
[&](auto& agg_method) -> Status {
auto& hash_table = *agg_method.hash_table;
RETURN_IF_CATCH_EXCEPTION(return _spill_hash_table(
state, agg_method, hash_table, size_to_revoke, _eos));
}},
agg_data->method_variant);
RETURN_IF_ERROR(Base::_shared_state->sink_status);
Base::_shared_state->sink_status =
parent._agg_sink_operator->reset_hash_table(runtime_state);
Expand Down
21 changes: 18 additions & 3 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "pipeline/exec/operator.h"
#include "vec/exprs/vectorized_agg_fn.h"
#include "vec/exprs/vexpr.h"
#include "vec/spill/spill_stream.h"
#include "vec/spill/spill_stream_manager.h"

namespace doris::pipeline {
Expand Down Expand Up @@ -57,7 +58,7 @@ class PartitionedAggSinkLocalState
};
template <typename HashTableCtxType, typename HashTableType>
Status _spill_hash_table(RuntimeState* state, HashTableCtxType& context,
HashTableType& hash_table, bool eos) {
HashTableType& hash_table, const size_t size_to_revoke, bool eos) {
Status status;
Defer defer {[&]() {
if (!status.ok()) {
Expand All @@ -69,8 +70,22 @@ class PartitionedAggSinkLocalState

Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();

static int spill_batch_rows = 4096;
int row_count = 0;
const auto total_rows =
Base::_shared_state->in_mem_shared_state->aggregate_data_container->total_count();

const size_t size_to_revoke_ = std::max<size_t>(size_to_revoke, 1);

// `spill_batch_rows` will be between 4k and 1M
// and each block to spill will not be larger than 32MB(`MAX_SPILL_WRITE_BATCH_MEM`)
const auto spill_batch_rows = std::min<size_t>(
1024 * 1024,
std::max<size_t>(4096, vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM *
total_rows / size_to_revoke_));

VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << _parent->node_id()
<< ", spill_batch_rows: " << spill_batch_rows << ", total rows: " << total_rows
<< ", size_to_revoke: " << size_to_revoke;
size_t row_count = 0;

std::vector<TmpSpillInfo<typename HashTableType::key_type>> spill_infos(
Base::_shared_state->partition_count);
Expand Down
16 changes: 14 additions & 2 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,8 @@ Status PipelineTask::execute(bool* eos) {
_root->reset_reserve_mem_size(_state);
DCHECK_EQ(_root->get_reserve_mem_size(_state), 0);

if (reserve_size > 0) {
auto workload_group = _state->get_query_ctx()->workload_group();
if (workload_group && reserve_size > 0) {
auto st = thread_context()->try_reserve_memory(reserve_size);
if (!st.ok()) {
VLOG_DEBUG << "query: " << print_id(query_id)
Expand All @@ -384,7 +385,18 @@ Status PipelineTask::execute(bool* eos) {
<< ", sink name: " << _sink->get_name()
<< ", node id: " << _sink->node_id() << " failed: " << st.to_string()
<< ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str();
{
bool is_high_wartermark = false;
bool is_low_wartermark = false;
workload_group->check_mem_used(&is_low_wartermark, &is_high_wartermark);
if (is_low_wartermark || is_high_wartermark) {
/// The larger reserved memory size is likely due to a larger available revocable size.
/// If the available memory for revoking is large enough, here trigger revoking proactively.
if (_sink->revocable_mem_size(_state) > 512L * 1024 * 1024) {
LOG(INFO) << "query: " << print_id(query_id)
<< " has big memory to revoke.";
RETURN_IF_ERROR(_sink->revoke_memory(_state));
}

_memory_sufficient_dependency->block();
_state->get_query_ctx()->get_pipe_exec_scheduler()->add_paused_task(this);
continue;
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,12 @@ Status Channel<Parent>::open(RuntimeState* state) {
_brpc_request->set_sender_id(_parent->sender_id());
_brpc_request->set_be_number(_be_number);

_brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000;
const auto& query_options = state->query_options();
if (query_options.__isset.query_timeout) {
_brpc_timeout_ms = query_options.query_timeout * 1000;
} else {
_brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000;
}

_serializer.set_is_local(_is_local);

Expand Down