diff --git a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp index 8d2a2041ea9922..67040e2762dbc1 100644 --- a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp +++ b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp @@ -53,43 +53,42 @@ bool SpillIcebergTableSinkLocalState::is_blockable() const { } size_t SpillIcebergTableSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool eos) { - if (!_writer || !_writer->current_writer()) { + if (!_writer) { return 0; } - - auto* sort_writer = dynamic_cast(_writer->current_writer().get()); - if (!sort_writer || !sort_writer->sorter()) { + auto current_writer = _writer->current_writer(); + auto* sort_writer = dynamic_cast(current_writer.get()); + if (!sort_writer) { return 0; } - return sort_writer->sorter()->get_reserve_mem_size(state, eos); + return sort_writer->get_reserve_mem_size(state, eos); } size_t SpillIcebergTableSinkLocalState::get_revocable_mem_size(RuntimeState* state) const { - if (!_writer || !_writer->current_writer()) { + if (!_writer) { return 0; } - - auto* sort_writer = dynamic_cast(_writer->current_writer().get()); - if (!sort_writer || !sort_writer->sorter()) { + auto current_writer = _writer->current_writer(); + auto* sort_writer = dynamic_cast(current_writer.get()); + if (!sort_writer) { return 0; } - return sort_writer->sorter()->data_size(); + return sort_writer->data_size(); } Status SpillIcebergTableSinkLocalState::revoke_memory(RuntimeState* state) { - if (!_writer || !_writer->current_writer()) { + if (!_writer) { return Status::OK(); } - - auto* sort_writer = dynamic_cast(_writer->current_writer().get()); - - if (!sort_writer || !sort_writer->sorter()) { + auto current_writer = _writer->current_writer(); + auto* sort_writer = dynamic_cast(current_writer.get()); + if (!sort_writer) { return Status::OK(); } - auto exception_catch_func = [sort_writer]() { + auto exception_catch_func = [current_writer, sort_writer]() { auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return sort_writer->trigger_spill(); }); }(); @@ -173,4 +172,4 @@ void SpillIcebergTableSinkLocalState::_init_spill_counters() { ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileCurrentCount", TUnit::UNIT, 1); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp index 3d36e1f10eb888..3827cb6d92591e 100644 --- a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp +++ b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp @@ -55,6 +55,8 @@ Status VIcebergSortWriter::open(RuntimeState* state, RuntimeProfile* profile, } Status VIcebergSortWriter::write(Block& block) { + std::lock_guard lock(_sorter_mutex); + // Append incoming block data to the sorter's internal buffer RETURN_IF_ERROR(_sorter->append_block(&block)); _update_spill_block_batch_row_count(block); @@ -72,7 +74,34 @@ Status VIcebergSortWriter::write(Block& block) { return Status::OK(); } +size_t VIcebergSortWriter::data_size() const { + std::lock_guard lock(_sorter_mutex); + return _sorter == nullptr ? 0 : _sorter->data_size(); +} + +size_t VIcebergSortWriter::get_reserve_mem_size(RuntimeState* state, bool eos) const { + std::lock_guard lock(_sorter_mutex); + return _sorter == nullptr ? 0 : _sorter->get_reserve_mem_size(state, eos); +} + +Status VIcebergSortWriter::trigger_spill() { + std::lock_guard lock(_sorter_mutex); + if (_closed || _sorter == nullptr) { + return Status::OK(); + } + return _do_spill(); +} + Status VIcebergSortWriter::close(const Status& status) { + std::lock_guard lock(_sorter_mutex); + if (_closed) { + return Status::OK(); + } + Defer mark_closed {[&]() { _closed = true; }}; + return _close_locked(status); +} + +Status VIcebergSortWriter::_close_locked(const Status& status) { // Track the actual internal status of operations performed during close. // This is important because if intermediate operations (like do_sort()) fail, // we need to propagate the actual error status to the underlying partition writer's diff --git a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h index 45858f473c9041..e1e512f0a0cf79 100644 --- a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h +++ b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h @@ -18,7 +18,7 @@ #pragma once #include -#include +#include #include #include @@ -101,12 +101,12 @@ class VIcebergSortWriter : public IPartitionWriterBase { inline size_t written_len() const override { return _iceberg_partition_writer->written_len(); } - // Returns a raw pointer to the FullSorter, used by SpillIcebergTableSinkLocalState - // to query memory usage (data_size, get_reserve_mem_size) - auto sorter() const { return _sorter.get(); } + size_t data_size() const; + + size_t get_reserve_mem_size(RuntimeState* state, bool eos) const; // Called by the memory management system to trigger spilling data to disk - Status trigger_spill() { return _do_spill(); } + Status trigger_spill(); private: // Calculate average row size from the first non-empty block to determine @@ -129,6 +129,8 @@ class VIcebergSortWriter : public IPartitionWriterBase { // Explicitly calls do_sort() before prepare_for_read() to guarantee sorted output. Status _do_spill(); + Status _close_locked(const Status& status); + // Merge all spilled streams and output final sorted data to Parquet/ORC files. // Handles file splitting when output exceeds target file size. Status _combine_files_output(); @@ -168,6 +170,17 @@ class VIcebergSortWriter : public IPartitionWriterBase { std::unique_ptr _sorter; std::unique_ptr _merger; + // Serialize all accesses to _sorter because async writes and revoke spills run on + // different thread pools but touch the same FullSorter instance. + mutable std::mutex _sorter_mutex; + + // Set to true once close() has finished tearing down the sorter / underlying writer. + // Late-arriving revoke spills (which run on a different thread than the async writer) + // must become no-ops after close, otherwise they would write to a fresh spill stream + // whose data never gets merged out (close has already produced the final output and + // cleaned up spill files). + bool _closed = false; + // Queue of spill files waiting to be merged (FIFO order) std::deque _sorted_spill_files; // Files currently being consumed by the merger @@ -185,4 +198,4 @@ class VIcebergSortWriter : public IPartitionWriterBase { RuntimeProfile::Counter* _do_spill_count_counter = nullptr; }; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp index 4e979bbb04202b..eb2040bb83c1c0 100644 --- a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp +++ b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.cpp @@ -283,7 +283,7 @@ Status VIcebergTableWriter::_write_prepared_block(Block& output_block) { SCOPED_RAW_TIMER(&_partition_writers_write_ns); output_block.erase(_non_write_columns_indices); RETURN_IF_ERROR(writer->write(output_block)); - _current_writer = writer; + _current_writer.store(writer); return Status::OK(); } @@ -326,7 +326,7 @@ Status VIcebergTableWriter::_write_prepared_block(Block& output_block) { SCOPED_RAW_TIMER(&_partition_writers_write_ns); output_block.erase(_non_write_columns_indices); RETURN_IF_ERROR(writer->write(output_block)); - _current_writer = writer; + _current_writer.store(writer); return Status::OK(); } @@ -429,7 +429,7 @@ Status VIcebergTableWriter::_write_prepared_block(Block& output_block) { Block filtered_block; RETURN_IF_ERROR(_filter_block(output_block, &it->second, &filtered_block)); RETURN_IF_ERROR(it->first->write(filtered_block)); - _current_writer = it->first; + _current_writer.store(it->first); } return Status::OK(); } diff --git a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h index f94ce4feb6bb52..cc7cec1fdad880 100644 --- a/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h +++ b/be/src/exec/sink/writer/iceberg/viceberg_table_writer.h @@ -19,6 +19,7 @@ #include +#include "common/atomic_shared_ptr.h" #include "core/block/block.h" #include "core/column/column.h" #include "exec/sink/writer/async_result_writer.h" @@ -66,12 +67,17 @@ class VIcebergTableWriter final : public AsyncResultWriter { // Getter for the current partition writer. // Used by SpillIcebergTableSinkLocalState to access the current writer for // memory management operations (get_reserve_mem_size, revocable_mem_size, etc.). - const std::shared_ptr& current_writer() const { return _current_writer; } + // Returns a snapshot by value: the async writer thread updates _current_writer + // concurrently with the spill/revoke path, so callers must hold their own copy + // while operating on it instead of dereferencing the underlying member directly. + std::shared_ptr current_writer() const { return _current_writer.load(); } private: // The currently active partition writer (may be VIcebergPartitionWriter or VIcebergSortWriter). // Updated during write() to track which writer received the most recent data. - std::shared_ptr _current_writer; + // Wrapped in atomic_shared_ptr because revoke_memory / get_revocable_mem_size run on + // a different thread than the async writer that assigns to it. + doris::atomic_shared_ptr _current_writer; class IcebergPartitionColumn { public: IcebergPartitionColumn(const iceberg::PartitionField& field,