From 50ce4794755cd105ad287fe78e4f7414419ee97f Mon Sep 17 00:00:00 2001 From: Pxl Date: Fri, 22 May 2026 15:56:07 +0800 Subject: [PATCH] [Chore](be) Stop spill hash join repartition on cancel (#63456) Problem Summary: Partitioned hash join spill recovery could continue normal repartition progress after cancellation because some loops stopped on `state->is_cancelled()` but then fell through to completion handling. This could mark partially recovered or repartitioned spill data as complete. This PR returns the cancellation status before advancing partition state, clears recovered build data during close, and replaces a debug-only child EOS assertion with a runtime error. --- .../partitioned_hash_join_probe_operator.cpp | 19 +++++++-- ...titioned_hash_join_probe_operator_test.cpp | 39 +++++++++++++++++++ 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp index 1bbc1972becfb2..25161399c3a603 100644 --- a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp +++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp @@ -36,6 +36,7 @@ #include "exec/spill/spill_repartitioner.h" #include "runtime/fragment_mgr.h" #include "runtime/runtime_profile.h" +#include "runtime/runtime_state.h" namespace doris { @@ -217,6 +218,7 @@ Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) { } _current_probe_reader.reset(); } + _recovered_build_block.reset(); // Clean up any remaining spill partition queue entries for (auto& entry : _spill_partition_queue) { @@ -349,7 +351,7 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition( RETURN_IF_ERROR(_current_build_reader->open()); } bool eos = false; - while (!eos) { + while (!eos && !state->is_cancelled()) { Block block; RETURN_IF_ERROR(_current_build_reader->read(&block, &eos)); COUNTER_UPDATE(_recovery_build_rows, block.rows()); @@ -373,6 +375,7 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition( return Status::OK(); // yield — buffer full, more data may remain } } + RETURN_IF_CANCELLED(state); // Build file fully consumed. RETURN_IF_ERROR(_current_build_reader->close()); _current_build_reader.reset(); @@ -409,6 +412,7 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition( return Status::OK(); // yield — enough data read } } + RETURN_IF_CANCELLED(state); // Probe file fully consumed. RETURN_IF_ERROR(_current_probe_reader->close()); _current_probe_reader.reset(); @@ -416,6 +420,7 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition( return Status::OK(); } +// NOLINTNEXTLINE(readability-function-size,readability-function-cognitive-complexity): existing spill repartition state machine handles build/probe phases together. Status PartitionedHashJoinProbeLocalState::repartition_current_partition( RuntimeState* state, JoinSpillPartitionInfo& partition) { auto& p = _parent->cast(); @@ -474,6 +479,7 @@ Status PartitionedHashJoinProbeLocalState::repartition_current_partition( } } RETURN_IF_ERROR(_repartitioner.finalize()); + RETURN_IF_CANCELLED(state); _recovered_build_block.reset(); _current_build_reader.reset(); // clear any leftover reader state partition.build_file.reset(); @@ -497,9 +503,9 @@ Status PartitionedHashJoinProbeLocalState::repartition_current_partition( while (!done && !state->is_cancelled()) { RETURN_IF_ERROR(_repartitioner.repartition(state, partition.probe_file, &done)); } - partition.probe_file.reset(); - RETURN_IF_ERROR(_repartitioner.finalize()); + RETURN_IF_CANCELLED(state); + partition.probe_file.reset(); _current_probe_reader.reset(); } @@ -698,7 +704,11 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, Block // per-partition build and probe spill streams. After this point every partition // (including the original "level-0" ones) is accessed uniformly via the queue. if (!local_state._spill_queue_initialized) { - DCHECK(local_state._child_eos) << "pull() with is_spilled=true called before child EOS"; + if (UNLIKELY(!local_state._child_eos)) { + return Status::InternalError( + "query:{}, node:{}, pull() with is_spilled=true called before child EOS", + print_id(state->query_id()), node_id()); + } // There maybe some blocks still in partitioned block or probe blocks. Flush them to disk. RETURN_IF_ERROR(local_state.spill_probe_blocks(state, true)); // Close all probe writers so that SpillFile metadata (part_count, etc.) @@ -731,6 +741,7 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, Block return _pull_from_spill_queue(local_state, state, output_block, eos); } +// NOLINTNEXTLINE(readability-function-size,readability-function-cognitive-complexity): existing spill queue pull handles setup, recovery, and probing phases. Status PartitionedHashJoinProbeOperatorX::_pull_from_spill_queue( PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state, Block* output_block, bool* eos) const { diff --git a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp index 0bcb609438ea96..2ce5f1921b0768 100644 --- a/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp +++ b/be/test/exec/operator/partitioned_hash_join_probe_operator_test.cpp @@ -1139,6 +1139,45 @@ TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskEmpty) { ASSERT_TRUE(local_state->_recovered_build_block == nullptr); } +TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskCancelledBeforeEmptyEos) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + SpillFileSPtr spill_file; + auto relative_path = fmt::format( + "{}/hash_build-{}-{}", print_id(_helper.runtime_state->query_id()), + probe_operator->node_id(), ExecEnv::GetInstance()->spill_file_mgr()->next_id()); + ASSERT_TRUE(ExecEnv::GetInstance() + ->spill_file_mgr() + ->create_spill_file(relative_path, spill_file) + .ok()); + + { + SpillFileWriterSPtr writer; + ASSERT_TRUE(spill_file + ->create_writer(_helper.runtime_state.get(), + local_state->operator_profile(), writer) + .ok()); + ASSERT_TRUE(writer->close().ok()); + } + + _helper.runtime_state->cancel(Status::Cancelled("test cancel")); + + JoinSpillPartitionInfo partition_info(spill_file, nullptr, 0); + auto status = local_state->recover_build_blocks_from_partition(_helper.runtime_state.get(), + partition_info); + ASSERT_TRUE(status.is()) << status.to_string(); + ASSERT_NE(partition_info.build_file, nullptr); + ASSERT_TRUE(local_state->_recovered_build_block == nullptr); + + ASSERT_TRUE(local_state->close(_helper.runtime_state.get()).ok()); + ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(partition_info.build_file); + partition_info.build_file.reset(); +} + TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskLargeData) { // Similar setup as above... auto [probe_operator, sink_operator] = _helper.create_operators();