From b405a829ac63fd320813a452964b706b04d6a196 Mon Sep 17 00:00:00 2001 From: Hu Shenggang Date: Wed, 3 Jun 2026 18:34:22 +0800 Subject: [PATCH] [refactor](be) Remove redundant spill state ### What problem does this PR solve? Issue Number: None Related PR: None Problem Summary: Several spill helpers and profile counters no longer carry useful state after synchronous spill execution and current spill-file accounting. Remove the dead run_spill_task wrapper, unused writer-side cumulative counters, unused spill file current count and total size profile fields, and the stale SpillSortLocalState::_opened guard. Keep SpillFileReader::seek and SpillFile-owned byte accounting intact. Update test helpers to stop registering the removed current-count counter. ### Release note None ### Check List (For Author) - Test: Manual test - ./build-support/clang-format.sh - ./build-support/check-format.sh - git diff --check - DORIS_HOME=$PWD ninja -C be/ut_build_ASAN affected source/test object targets - Behavior changed: No - Does this need documentation: No --- be/src/exec/operator/multi_cast_data_streamer.cpp | 7 ++----- be/src/exec/operator/operator.h | 7 ------- .../partitioned_aggregation_sink_operator.cpp | 4 +--- .../spill_iceberg_table_sink_operator.cpp | 6 ++---- .../exec/operator/spill_sort_source_operator.cpp | 6 +----- be/src/exec/operator/spill_sort_source_operator.h | 4 +--- be/src/exec/operator/spill_utils.h | 15 --------------- be/src/exec/spill/spill_file_writer.cpp | 5 +---- be/src/exec/spill/spill_file_writer.h | 3 --- be/src/runtime/runtime_profile_counter_names.h | 1 - .../operator/spillable_operator_test_helper.cpp | 3 +-- .../pipeline/multi_cast_data_streamer_test.cpp | 4 ---- be/test/vec/spill/spill_file_test.cpp | 1 - be/test/vec/spill/spill_repartitioner_test.cpp | 1 - 14 files changed, 9 insertions(+), 58 deletions(-) diff --git a/be/src/exec/operator/multi_cast_data_streamer.cpp b/be/src/exec/operator/multi_cast_data_streamer.cpp index 403d81110180bc..84eaf058a348b7 100644 --- a/be/src/exec/operator/multi_cast_data_streamer.cpp +++ b/be/src/exec/operator/multi_cast_data_streamer.cpp @@ -30,7 +30,6 @@ #include "common/status.h" #include "core/block/block.h" #include "exec/operator/multi_cast_data_stream_source.h" -#include "exec/operator/spill_utils.h" #include "exec/pipeline/dependency.h" #include "exec/spill/spill_file_manager.h" #include "exec/spill/spill_file_reader.h" @@ -121,9 +120,7 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block* b }; l.unlock(); - // spill is synchronous; the profile passed to the runnable was only - // used for counters that are now tracked externally, so call helper - return run_spill_task(state, catch_exception_func); + return catch_exception_func(); } auto& pos_to_pull = _sender_pos_to_read[sender_idx]; @@ -279,7 +276,7 @@ Status MultiCastDataStreamer::_start_spill_task(RuntimeState* state, SpillFileSP return status; }; - return run_spill_task(state, exception_catch_func); + return exception_catch_func(); } Status MultiCastDataStreamer::push(RuntimeState* state, doris::Block* block, bool eos) { diff --git a/be/src/exec/operator/operator.h b/be/src/exec/operator/operator.h index 565a650a0daf74..ddd4939b31247a 100644 --- a/be/src/exec/operator/operator.h +++ b/be/src/exec/operator/operator.h @@ -417,8 +417,6 @@ class PipelineXSpillLocalState : public PipelineXLocalState { _spill_file_current_size = ADD_COUNTER_WITH_LEVEL( Base::custom_profile(), profile::SPILL_WRITE_FILE_CURRENT_BYTES, TUnit::BYTES, 1); - _spill_file_current_count = ADD_COUNTER_WITH_LEVEL( - Base::custom_profile(), profile::SPILL_WRITE_FILE_CURRENT_COUNT, TUnit::UNIT, 1); } // Total time of spill, including spill task scheduling time, @@ -441,9 +439,6 @@ class PipelineXSpillLocalState : public PipelineXLocalState { // Total bytes of spill data written to disk file(after serialized) RuntimeProfile::Counter* _spill_write_file_total_size = nullptr; RuntimeProfile::Counter* _spill_file_total_count = nullptr; - RuntimeProfile::Counter* _spill_file_current_count = nullptr; - // Spilled file total size - RuntimeProfile::Counter* _spill_file_total_size = nullptr; // Current spilled file size RuntimeProfile::Counter* _spill_file_current_size = nullptr; @@ -801,8 +796,6 @@ class PipelineXSpillSinkLocalState : public PipelineXSinkLocalStatedata_size(); } -Status SpillIcebergTableSinkLocalState::revoke_memory(RuntimeState* state) { +Status SpillIcebergTableSinkLocalState::revoke_memory(RuntimeState* /*state*/) { if (!_writer) { return Status::OK(); } @@ -95,7 +94,7 @@ Status SpillIcebergTableSinkLocalState::revoke_memory(RuntimeState* state) { return status; }; - return run_spill_task(state, exception_catch_func); + return exception_catch_func(); } SpillIcebergTableSinkOperatorX::SpillIcebergTableSinkOperatorX( @@ -169,7 +168,6 @@ void SpillIcebergTableSinkLocalState::_init_spill_counters() { ADD_COUNTER_WITH_LEVEL(profile, "SpillReadRows", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(profile, "SpillReadFileCount", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileCurrentBytes", TUnit::BYTES, 1); - ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileCurrentCount", TUnit::UNIT, 1); } } // namespace doris diff --git a/be/src/exec/operator/spill_sort_source_operator.cpp b/be/src/exec/operator/spill_sort_source_operator.cpp index a745bf2858d45b..b5d4ae3aec7a7b 100644 --- a/be/src/exec/operator/spill_sort_source_operator.cpp +++ b/be/src/exec/operator/spill_sort_source_operator.cpp @@ -50,10 +50,6 @@ Status SpillSortLocalState::init(RuntimeState* state, LocalStateInfo& info) { Status SpillSortLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); - if (_opened) { - return Status::OK(); - } - RETURN_IF_ERROR(setup_in_memory_sort_op(state)); return Base::open(state); } @@ -262,4 +258,4 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState* state, Block* block, bo return Status::OK(); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/exec/operator/spill_sort_source_operator.h b/be/src/exec/operator/spill_sort_source_operator.h index 196ab6474b2006..679d21a09bc039 100644 --- a/be/src/exec/operator/spill_sort_source_operator.h +++ b/be/src/exec/operator/spill_sort_source_operator.h @@ -57,8 +57,6 @@ class SpillSortLocalState final : public PipelineXSpillLocalState _runtime_state; - bool _opened = false; - std::vector _current_merging_files; /// Readers held alive during merge; one per SpillFile, reads parts sequentially. std::vector _current_merging_readers; @@ -90,4 +88,4 @@ class SpillSortSourceOperatorX : public OperatorX { std::unique_ptr _sort_source_operator; }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/exec/operator/spill_utils.h b/be/src/exec/operator/spill_utils.h index 131c1c9b8b78e2..dff39d5212feda 100644 --- a/be/src/exec/operator/spill_utils.h +++ b/be/src/exec/operator/spill_utils.h @@ -70,21 +70,6 @@ struct SpillContext { } }; -// helper to execute a spill function synchronously. The old code used -// SpillRunnable/SpillSinkRunnable/SpillRecoverRunnable wrappers to track -// counters and optionally notify a SpillContext. Since spill operations are -// now performed synchronously and external code already maintains any -// necessary counters, those wrappers are no longer necessary. We keep a -// small utility to run the provided callbacks and forward cancellation. -inline Status run_spill_task(RuntimeState* state, std::function exec_func, - std::function fin_cb = {}) { - RETURN_IF_ERROR(exec_func()); - if (fin_cb) { - RETURN_IF_ERROR(fin_cb()); - } - return Status::OK(); -} - template inline void update_profile_from_inner_profile(const std::string& name, RuntimeProfile* runtime_profile, diff --git a/be/src/exec/spill/spill_file_writer.cpp b/be/src/exec/spill/spill_file_writer.cpp index 60ddb68c26b6e0..d68d52c96401d6 100644 --- a/be/src/exec/spill/spill_file_writer.cpp +++ b/be/src/exec/spill/spill_file_writer.cpp @@ -97,7 +97,6 @@ Status SpillFileWriter::_close_current_part(const std::shared_ptr& sp int64_t meta_size = _part_meta.size(); _part_written_bytes += meta_size; - _total_written_bytes += meta_size; COUNTER_UPDATE(_write_file_total_size, meta_size); if (_resource_ctx) { _resource_ctx->io_context()->update_spill_write_bytes_to_local_storage(meta_size); @@ -118,7 +117,6 @@ Status SpillFileWriter::_close_current_part(const std::shared_ptr& sp // Advance to next part ++_current_part_index; - ++_total_parts; if (spill_file) { spill_file->increment_part_count(); } @@ -251,7 +249,6 @@ Status SpillFileWriter::_write_internal(const Block& block, } COUNTER_UPDATE(_write_block_counter, 1); _part_written_bytes += buff_size; - _total_written_bytes += buff_size; ++_part_written_blocks; // Incrementally update SpillFile so gc() can always // decrement the correct amount from _data_dir. @@ -269,4 +266,4 @@ Status SpillFileWriter::_write_internal(const Block& block, return status; } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/exec/spill/spill_file_writer.h b/be/src/exec/spill/spill_file_writer.h index 215685933f4b49..eb8c7a9d9eb0a8 100644 --- a/be/src/exec/spill/spill_file_writer.h +++ b/be/src/exec/spill/spill_file_writer.h @@ -89,9 +89,6 @@ class SpillFileWriter { size_t _part_max_sub_block_size = 0; std::string _part_meta; - // ── Cumulative state ── - int64_t _total_written_bytes = 0; - size_t _total_parts = 0; bool _closed = false; // ── Counters ── diff --git a/be/src/runtime/runtime_profile_counter_names.h b/be/src/runtime/runtime_profile_counter_names.h index bd4d2e878a86f7..5813d5e0ea95f9 100644 --- a/be/src/runtime/runtime_profile_counter_names.h +++ b/be/src/runtime/runtime_profile_counter_names.h @@ -104,7 +104,6 @@ inline constexpr char SPILL_WRITE_ROWS[] = "SpillWriteRows"; inline constexpr char SPILL_WRITE_FILE_BYTES[] = "SpillWriteFileBytes"; inline constexpr char SPILL_WRITE_FILE_TOTAL_COUNT[] = "SpillWriteFileTotalCount"; inline constexpr char SPILL_WRITE_FILE_CURRENT_BYTES[] = "SpillWriteFileCurrentBytes"; -inline constexpr char SPILL_WRITE_FILE_CURRENT_COUNT[] = "SpillWriteFileCurrentCount"; // ============================================================ // Spill read counters (Source-only) diff --git a/be/test/exec/operator/spillable_operator_test_helper.cpp b/be/test/exec/operator/spillable_operator_test_helper.cpp index fb87b156545db0..d51d6d17be16c5 100644 --- a/be/test/exec/operator/spillable_operator_test_helper.cpp +++ b/be/test/exec/operator/spillable_operator_test_helper.cpp @@ -60,7 +60,6 @@ void SpillableOperatorTestHelper::SetUp() { ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadRows", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileCount", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileTotalCount", TUnit::UNIT, 1); - ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentBytes", TUnit::UNIT, 1); operator_profile->add_child(custom_profile.get(), true); @@ -107,4 +106,4 @@ void SpillableOperatorTestHelper::TearDown() { SAFE_DELETE(ExecEnv::GetInstance()->_spill_file_mgr); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp b/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp index 61589327909d63..e9f1a58aeccf78 100644 --- a/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp +++ b/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp @@ -65,8 +65,6 @@ class MultiCastDataStreamerTest : public testing::Test { ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillReadFileCount", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileTotalCount", TUnit::UNIT, 1); - ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentCount", TUnit::UNIT, - 1); ADD_COUNTER_WITH_LEVEL(custom_profile.get(), "SpillWriteFileCurrentBytes", TUnit::UNIT, 1); } @@ -111,8 +109,6 @@ class MultiCastDataStreamerTest : public testing::Test { TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillWriteFileCurrentBytes", TUnit::BYTES, 1); - ADD_COUNTER_WITH_LEVEL(source_custom_profiles[i].get(), "SpillWriteFileCurrentCount", - TUnit::UNIT, 1); multi_cast_data_streamer->set_source_profile(i, source_profiles[i].get()); } diff --git a/be/test/vec/spill/spill_file_test.cpp b/be/test/vec/spill/spill_file_test.cpp index 67740acdcd2e97..09173a7a343c18 100644 --- a/be/test/vec/spill/spill_file_test.cpp +++ b/be/test/vec/spill/spill_file_test.cpp @@ -72,7 +72,6 @@ class SpillFileTest : public testing::Test { ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadRows", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileCount", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileTotalCount", TUnit::UNIT, 1); - ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileCurrentBytes", TUnit::UNIT, 1); _profile->add_child(_custom_profile.get(), true); diff --git a/be/test/vec/spill/spill_repartitioner_test.cpp b/be/test/vec/spill/spill_repartitioner_test.cpp index 53ffcf9f0ac00c..01da4719cad0d0 100644 --- a/be/test/vec/spill/spill_repartitioner_test.cpp +++ b/be/test/vec/spill/spill_repartitioner_test.cpp @@ -74,7 +74,6 @@ class SpillRepartitionerTest : public testing::Test { ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadRows", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileCount", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileTotalCount", TUnit::UNIT, 1); - ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1); ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileCurrentBytes", TUnit::UNIT, 1); _profile->add_child(_custom_profile.get(), true);