Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 55 additions & 57 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,57 @@ Status PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
return Status::OK();
}

Status PartitionedAggSinkLocalState::_execute_spill_process(RuntimeState* state,
size_t size_to_revoke) {
Status status;
auto& parent = Base::_parent->template cast<Parent>();
auto query_id = state->query_id();

DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
status = Status::InternalError("fault_inject partitioned_agg_sink revoke_memory canceled");
state->get_query_ctx()->cancel(status);
return status;
});

Defer defer {[&]() {
if (!status.ok() || state->is_cancelled()) {
if (!status.ok()) {
LOG(WARNING) << fmt::format(
"Query:{}, agg sink:{}, task:{}, revoke_memory error:{}",
print_id(query_id), Base::_parent->node_id(), state->task_id(), status);
}
_shared_state->close();
} else {
LOG(INFO) << fmt::format(
"Query:{}, agg sink:{}, task:{}, revoke_memory finish, eos:{}, revocable "
"memory:{}",
print_id(state->query_id()), _parent->node_id(), state->task_id(), _eos,
PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
}

if (_eos) {
Base::_dependency->set_ready_to_read();
}
state->get_query_ctx()->resource_ctx()->task_controller()->decrease_revoking_tasks_count();
}};

auto* runtime_state = _runtime_state.get();
auto* agg_data = parent._agg_sink_operator->get_agg_data(runtime_state);
status = std::visit(
vectorized::Overload {[&](std::monostate& arg) -> Status {
return Status::InternalError("Unit hash table");
},
[&](auto& agg_method) -> Status {
auto& hash_table = *agg_method.hash_table;
RETURN_IF_CATCH_EXCEPTION(return _spill_hash_table(
state, agg_method, hash_table, size_to_revoke, _eos));
}},
agg_data->method_variant);
RETURN_IF_ERROR(status);
status = parent._agg_sink_operator->reset_hash_table(runtime_state);
return status;
}

Status PartitionedAggSinkLocalState::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) {
const auto size_to_revoke = _parent->revocable_mem_size(state);
Expand All @@ -423,70 +474,17 @@ Status PartitionedAggSinkLocalState::revoke_memory(
update_profile<true>(sink_local_state->custom_profile());
}

auto& parent = Base::_parent->template cast<Parent>();
auto query_id = state->query_id();

DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_submit_func", {
return Status::Error<INTERNAL_ERROR>(
"fault_inject partitioned_agg_sink revoke_memory submit_func failed");
});

state->get_query_ctx()->resource_ctx()->task_controller()->increase_revoking_tasks_count();

SpillSinkRunnable spill_runnable(
state, spill_context, operator_profile(),
[this, &parent, state, query_id, size_to_revoke] {
Status status;
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
status = Status::InternalError(
"fault_inject partitioned_agg_sink "
"revoke_memory canceled");
state->get_query_ctx()->cancel(status);
return status;
});
Defer defer {[&]() {
if (!status.ok() || state->is_cancelled()) {
if (!status.ok()) {
LOG(WARNING) << fmt::format(
"Query:{}, agg sink:{}, task:{}, revoke_memory error:{}",
print_id(query_id), Base::_parent->node_id(), state->task_id(),
status);
}
_shared_state->close();
} else {
LOG(INFO) << fmt::format(
"Query:{}, agg sink:{}, task:{}, revoke_memory finish, eos:{}, "
"revocable memory:{}",
print_id(state->query_id()), _parent->node_id(), state->task_id(),
_eos,
PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
}

if (_eos) {
Base::_dependency->set_ready_to_read();
}
state->get_query_ctx()
->resource_ctx()
->task_controller()
->decrease_revoking_tasks_count();
}};
auto* runtime_state = _runtime_state.get();
auto* agg_data = parent._agg_sink_operator->get_agg_data(runtime_state);
status = std::visit(
vectorized::Overload {
[&](std::monostate& arg) -> Status {
return Status::InternalError("Unit hash table");
},
[&](auto& agg_method) -> Status {
auto& hash_table = *agg_method.hash_table;
RETURN_IF_CATCH_EXCEPTION(return _spill_hash_table(
state, agg_method, hash_table, size_to_revoke, _eos));
}},
agg_data->method_variant);
RETURN_IF_ERROR(status);
status = parent._agg_sink_operator->reset_hash_table(runtime_state);
return status;
});
SpillSinkRunnable spill_runnable(state, spill_context, operator_profile(),
[this, state, size_to_revoke] {
return _execute_spill_process(state, size_to_revoke);
});

return spill_runnable.run();
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class PartitionedAggSinkLocalState

Status revoke_memory(RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context);

Status _execute_spill_process(RuntimeState* state, size_t size_to_revoke);

Status setup_in_memory_agg_op(RuntimeState* state);

template <bool spilled>
Expand Down
Loading
Loading