Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/benchmark/benchmark_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
#include "benchmark_fastunion.hpp"
#include "benchmark_hll_merge.hpp"
#include "binary_cast_benchmark.hpp"
#include "core/block/block.h"
#include "vec/columns/column_string.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_string.h"

Expand Down
7 changes: 4 additions & 3 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1434,9 +1434,10 @@ DEFINE_mDouble(high_disk_avail_level_diff_usages, "0.15");
DEFINE_Int32(partition_disk_index_lru_size, "10000");
// limit the storage space that query spill files can use
DEFINE_String(spill_storage_root_path, "");
DEFINE_String(spill_storage_limit, "20%"); // 20%
DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
DEFINE_mInt32(spill_gc_work_time_ms, "2000"); // 2s
DEFINE_String(spill_storage_limit, "20%"); // 20%
DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
DEFINE_mInt32(spill_gc_work_time_ms, "2000"); // 2s
DEFINE_mInt64(spill_file_part_size_bytes, "1073741824"); // 1GB

// paused query in queue timeout(ms) will be resumed or canceled
DEFINE_Int64(spill_in_paused_queue_timeout_ms, "60000");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1538,6 +1538,8 @@ DECLARE_String(spill_storage_root_path);
DECLARE_String(spill_storage_limit);
DECLARE_mInt32(spill_gc_interval_ms);
DECLARE_mInt32(spill_gc_work_time_ms);
// Maximum size of each spill part file before rotation (bytes). Default 1GB.
DECLARE_mInt64(spill_file_part_size_bytes);
DECLARE_Int64(spill_in_paused_queue_timeout_ms);
DECLARE_Int64(wait_cancel_release_memory_ms);

Expand Down
15 changes: 10 additions & 5 deletions be/src/exec/operator/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ Status AggSinkLocalState::_merge_with_serialized_key(Block* block) {
}

size_t AggSinkLocalState::_memory_usage() const {
if (0 == _get_hash_table_size()) {
if (0 == get_hash_table_size()) {
return 0;
}
size_t usage = 0;
Expand Down Expand Up @@ -380,7 +380,7 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(Block* block) {
}

if (!limit && _should_limit_output) {
const size_t hash_table_size = _get_hash_table_size();
const size_t hash_table_size = get_hash_table_size();
_shared_state->reach_limit =
hash_table_size >= Base::_parent->template cast<AggSinkOperatorX>()._limit;
if (_shared_state->do_sort_limit && _shared_state->reach_limit) {
Expand Down Expand Up @@ -499,7 +499,7 @@ Status AggSinkLocalState::_execute_with_serialized_key_helper(Block* block) {
RETURN_IF_ERROR(do_aggregate_evaluators());

if (_should_limit_output && !Base::_shared_state->enable_spill) {
const size_t hash_table_size = _get_hash_table_size();
const size_t hash_table_size = get_hash_table_size();

_shared_state->reach_limit =
hash_table_size >=
Expand All @@ -516,7 +516,7 @@ Status AggSinkLocalState::_execute_with_serialized_key_helper(Block* block) {
return Status::OK();
}

size_t AggSinkLocalState::_get_hash_table_size() const {
size_t AggSinkLocalState::get_hash_table_size() const {
return std::visit(Overload {[&](std::monostate& arg) -> size_t { return 0; },
[&](auto& agg_method) { return agg_method.hash_table->size(); }},
_agg_data->method_variant);
Expand Down Expand Up @@ -872,7 +872,7 @@ Status AggSinkOperatorX::sink(doris::RuntimeState* state, Block* in_block, bool
RETURN_IF_ERROR(local_state._executor->execute(&local_state, in_block));
local_state._executor->update_memusage(&local_state);
COUNTER_SET(local_state._hash_table_size_counter,
(int64_t)local_state._get_hash_table_size());
(int64_t)local_state.get_hash_table_size());
}
if (eos) {
local_state._dependency->set_ready_to_read();
Expand All @@ -899,6 +899,11 @@ size_t AggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos) {
return local_state.get_reserve_mem_size(state, eos);
}

size_t AggSinkOperatorX::get_hash_table_size(RuntimeState* state) const {
auto& local_state = get_local_state(state);
return local_state.get_hash_table_size();
}

Status AggSinkLocalState::close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(Base::exec_time_counter());
SCOPED_TIMER(Base::_close_timer);
Expand Down
7 changes: 5 additions & 2 deletions be/src/exec/operator/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
bool is_blockable() const override;
size_t get_hash_table_size() const;

protected:
friend class AggSinkOperatorX;
Expand Down Expand Up @@ -81,6 +82,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {
Status _merge_with_serialized_key(Block* block);
void _update_memusage_with_serialized_key();
template <bool limit>

Status _execute_with_serialized_key_helper(Block* block);
void _find_in_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns,
uint32_t num_rows);
Expand All @@ -89,7 +91,6 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {
bool _emplace_into_hash_table_limit(AggregateDataPtr* places, Block* block,
const std::vector<int>& key_locs,
ColumnRawPtrs& key_columns, uint32_t num_rows);
size_t _get_hash_table_size() const;

template <bool limit, bool for_spill = false>
Status _merge_with_serialized_key_helper(Block* block);
Expand Down Expand Up @@ -119,7 +120,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {
PODArray<AggregateDataPtr> _places;
std::vector<char> _deserialize_buffer;

Block _preagg_block = Block();
Block _preagg_block;

AggregatedDataVariants* _agg_data = nullptr;

Expand Down Expand Up @@ -178,6 +179,8 @@ class AggSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX<AggSinkLoca

size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;

size_t get_hash_table_size(RuntimeState* state) const;

using DataSinkOperatorX<AggSinkLocalState>::node_id;
using DataSinkOperatorX<AggSinkLocalState>::operator_id;
using DataSinkOperatorX<AggSinkLocalState>::get_local_state;
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/operator/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,12 @@ Status AggSourceOperatorX::reset_hash_table(RuntimeState* state) {
return Status::OK();
}

Status AggSourceOperatorX::get_serialized_block(RuntimeState* state, Block* block, bool* eos) {
auto& local_state = get_local_state(state);
// Always use the serialized intermediate output path, regardless of _needs_finalize.
return local_state._get_results_with_serialized_key(state, block, eos);
}

void AggLocalState::_emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns,
uint32_t num_rows) {
std::visit(
Expand Down
7 changes: 7 additions & 0 deletions be/src/exec/operator/aggregation_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ class AggSourceOperatorX : public OperatorX<AggLocalState> {

Status reset_hash_table(RuntimeState* state);

/// Get a block of serialized intermediate aggregate states from the hash table.
/// Unlike get_block() which may finalize, this always outputs the serialized
/// intermediate format (key columns + serialized agg state columns), which is
/// the same format as the spill block. This is needed for repartitioning during
/// multi-level spill recovery: the data must be re-mergeable after repartitioning.
Status get_serialized_block(RuntimeState* state, Block* block, bool* eos);

private:
friend class AggLocalState;

Expand Down
1 change: 0 additions & 1 deletion be/src/exec/operator/multi_cast_data_stream_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ std::shared_ptr<BasicSharedState> MultiCastDataStreamSinkOperatorX::create_share
Status MultiCastDataStreamSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(Base::open(state));
_shared_state->multi_cast_data_streamer->set_sink_profile(operator_profile());
_shared_state->setup_shared_profile(custom_profile());
_shared_state->multi_cast_data_streamer->set_write_dependency(_dependency);
return Status::OK();
}
Expand Down
75 changes: 42 additions & 33 deletions be/src/exec/operator/multi_cast_data_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
#include "exec/operator/multi_cast_data_stream_source.h"
#include "exec/operator/spill_utils.h"
#include "exec/pipeline/dependency.h"
#include "exec/spill/spill_stream_manager.h"
#include "exec/spill/spill_file_manager.h"
#include "exec/spill/spill_file_reader.h"
#include "exec/spill/spill_file_writer.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "util/pretty_printer.h"
Expand Down Expand Up @@ -77,14 +79,14 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* b

if (!_spill_readers[sender_idx].empty()) {
auto reader_item = _spill_readers[sender_idx].front();
if (!reader_item->stream->ready_for_reading()) {
if (!reader_item->spill_file->ready_for_reading()) {
return Status::OK();
}

auto& reader = reader_item->reader;
RETURN_IF_ERROR(reader->open());
if (reader_item->block_offset != 0) {
reader->seek(reader_item->block_offset);
RETURN_IF_ERROR(reader->seek(reader_item->block_offset));
reader_item->block_offset = 0;
}

Expand Down Expand Up @@ -117,9 +119,9 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* b
};

l.unlock();
SpillRecoverRunnable spill_runnable(state, _source_operator_profiles[sender_idx],
catch_exception_func);
return spill_runnable.run();
// spill is synchronous; the profile passed to the runnable was only
// used for counters that are now tracked externally, so call helper
return run_spill_task(state, catch_exception_func);
}

auto& pos_to_pull = _sender_pos_to_read[sender_idx];
Expand Down Expand Up @@ -179,7 +181,7 @@ Status MultiCastDataStreamer::_trigger_spill_if_need(RuntimeState* state, bool*
return Status::OK();
}

SpillStreamSPtr spill_stream;
SpillFileSPtr spill_file;
*triggered = false;
if (_cumulative_mem_size.load() >= config::exchg_node_buffer_size_bytes &&
_multi_cast_blocks.size() >= 4) {
Expand All @@ -205,25 +207,32 @@ Status MultiCastDataStreamer::_trigger_spill_if_need(RuntimeState* state, bool*
}

if (has_reached_end) {
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
state, spill_stream, print_id(state->query_id()), "MultiCastSender", _node_id,
std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(),
_sink_operator_profile));
auto relative_path = fmt::format("{}/{}-{}-{}-{}", print_id(state->query_id()),
"MultiCastSender", _node_id, state->task_id(),
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
relative_path, spill_file));

// Block all senders while spilling.
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
_block_reading(i);
}

// Write blocks to disk. _start_spill_task updates spill_file->_part_count.
RETURN_IF_ERROR(_start_spill_task(state, spill_file));
DCHECK_EQ(_multi_cast_blocks.size(), 0);

// Create readers AFTER writing so that _part_count is valid.
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
if (distances[i] < total_count) {
auto reader = spill_stream->create_separate_reader();
reader->set_counters(_source_operator_profiles[i]);
auto reader = spill_file->create_reader(state, _source_operator_profiles[i]);
auto reader_item = std::make_shared<SpillingReader>(
std::move(reader), spill_stream, distances[i], false);
std::move(reader), spill_file, distances[i], false);
_spill_readers[i].emplace_back(std::move(reader_item));
}

_block_reading(i);
_set_ready_for_read(i);
}

RETURN_IF_ERROR(_start_spill_task(state, spill_stream));
DCHECK_EQ(_multi_cast_blocks.size(), 0);

for (auto& pos : _sender_pos_to_read) {
pos = _multi_cast_blocks.end();
}
Expand All @@ -235,7 +244,7 @@ Status MultiCastDataStreamer::_trigger_spill_if_need(RuntimeState* state, bool*
return Status::OK();
}

Status MultiCastDataStreamer::_start_spill_task(RuntimeState* state, SpillStreamSPtr spill_stream) {
Status MultiCastDataStreamer::_start_spill_task(RuntimeState* state, SpillFileSPtr spill_file) {
std::vector<Block> blocks;
for (auto& block : _multi_cast_blocks) {
DCHECK_GT(block._block->rows(), 0);
Expand All @@ -244,18 +253,20 @@ Status MultiCastDataStreamer::_start_spill_task(RuntimeState* state, SpillStream

_multi_cast_blocks.clear();

auto spill_func = [state, blocks = std::move(blocks),
spill_stream = std::move(spill_stream)]() mutable {
auto* sink_profile = _sink_operator_profile;
auto spill_func = [state, blocks = std::move(blocks), spill_file = std::move(spill_file),
sink_profile]() mutable {
const auto blocks_count = blocks.size();
while (!blocks.empty() && !state->is_cancelled()) {
auto block = std::move(blocks.front());
blocks.erase(blocks.begin());

RETURN_IF_ERROR(spill_stream->spill_block(state, block, false));
SpillFileWriterSPtr writer;
RETURN_IF_ERROR(spill_file->create_writer(state, sink_profile, writer));
for (auto& block : blocks) {
if (state->is_cancelled()) break;
RETURN_IF_ERROR(writer->write_block(state, block));
}
RETURN_IF_ERROR(writer->close());
VLOG_DEBUG << "Query: " << print_id(state->query_id()) << " multi cast write "
<< blocks_count << " blocks";
return spill_stream->spill_eof();
return Status::OK();
};

auto exception_catch_func = [spill_func = std::move(spill_func),
Expand All @@ -266,15 +277,13 @@ Status MultiCastDataStreamer::_start_spill_task(RuntimeState* state, SpillStream
if (!status.ok()) {
LOG(WARNING) << "Query: " << query_id
<< " multi cast write failed: " << status.to_string();
} else {
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
_set_ready_for_read(i);
}
}
// _set_ready_for_read is called by _trigger_spill_if_need after readers
// are created with the correct part_count.
return status;
};

return SpillSinkRunnable(state, nullptr, _sink_operator_profile, exception_catch_func).run();
return run_spill_task(state, exception_catch_func);
}

Status MultiCastDataStreamer::push(RuntimeState* state, doris::Block* block, bool eos) {
Expand Down
9 changes: 5 additions & 4 deletions be/src/exec/operator/multi_cast_data_streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "core/block/block.h"
#include "exec/exchange/vdata_stream_sender.h"
#include "exec/pipeline/dependency.h"
#include "exec/spill/spill_stream.h"
#include "exec/spill/spill_file.h"
#include "runtime/runtime_profile.h"

namespace doris {
Expand All @@ -45,8 +45,9 @@ struct MultiCastBlock {
};

struct SpillingReader {
SpillReaderUPtr reader;
SpillStreamSPtr stream;
SpillFileReaderSPtr reader;
SpillFileSPtr spill_file;

int64_t block_offset {0};
bool all_data_read {false};
};
Expand Down Expand Up @@ -99,7 +100,7 @@ class MultiCastDataStreamer {
Status _copy_block(RuntimeState* state, int32_t sender_idx, Block* block,
MultiCastBlock& multi_cast_block);

Status _start_spill_task(RuntimeState* state, SpillStreamSPtr spill_stream);
Status _start_spill_task(RuntimeState* state, SpillFileSPtr spill_file);

Status _trigger_spill_if_need(RuntimeState* state, bool* triggered);

Expand Down
Loading
Loading