Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/paimon/core/io/key_value_in_memory_record_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "paimon/common/data/columnar/columnar_row_ref.h"
#include "paimon/common/types/row_kind.h"
#include "paimon/common/utils/arrow/arrow_utils.h"
#include "paimon/common/utils/arrow/mem_utils.h"
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/common/utils/fields_comparator.h"
#include "paimon/status.h"
Expand Down Expand Up @@ -62,6 +63,7 @@ KeyValueInMemoryRecordReader::KeyValueInMemoryRecordReader(
user_defined_sequence_fields_(user_defined_sequence_fields),
sequence_fields_ascending_(sequence_fields_ascending),
pool_(pool),
arrow_pool_(GetArrowPool(pool)),
value_struct_array_(struct_array),
row_kinds_(row_kinds),
key_comparator_(key_comparator) {
Expand Down Expand Up @@ -119,9 +121,10 @@ KeyValueInMemoryRecordReader::SortBatch() const {
}
auto sort_options =
arrow::compute::SortOptions(sort_keys, arrow::compute::NullPlacement::AtStart);
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
std::shared_ptr<arrow::Array> sorted_indices,
arrow::compute::SortIndices(arrow::Datum(value_struct_array_), sort_options));
arrow::compute::ExecContext exec_context(arrow_pool_.get());
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> sorted_indices,
arrow::compute::SortIndices(arrow::Datum(value_struct_array_),
sort_options, &exec_context));
auto typed_indices =
arrow::internal::checked_pointer_cast<arrow::NumericArray<arrow::UInt64Type>>(
sorted_indices);
Expand Down
1 change: 1 addition & 0 deletions src/paimon/core/io/key_value_in_memory_record_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class KeyValueInMemoryRecordReader : public KeyValueRecordReader {
std::vector<std::string> user_defined_sequence_fields_;
bool sequence_fields_ascending_ = true;
std::shared_ptr<MemoryPool> pool_;
std::unique_ptr<arrow::MemoryPool> arrow_pool_;
Comment thread
zjw1111 marked this conversation as resolved.
std::shared_ptr<arrow::StructArray> value_struct_array_;
std::vector<RecordBatch::RowKind> row_kinds_;
std::shared_ptr<FieldsComparator> key_comparator_;
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/mergetree/external_sort_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ Result<int64_t> ExternalSortBuffer::SpillToDisk(
std::unique_ptr<SpillWriter> spill_writer,
SpillWriter::Create(options_.GetFileSystem(), write_schema_, spill_channel_enumerator_,
spill_channel_manager_, spill_compress_options.compress,
spill_compress_options.zstd_level));
spill_compress_options.zstd_level, pool_));
auto cleanup_guard = ScopeGuard([&]() {
[[maybe_unused]] auto status =
spill_channel_manager_->DeleteChannel(spill_writer->GetChannelId());
Expand Down
5 changes: 4 additions & 1 deletion src/paimon/core/mergetree/spill_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@ Status SpillReader::Open(const FileIOChannel::ID& channel_id) {
uint64_t file_len = file_status->GetLen();
arrow_input_stream_adapter_ =
std::make_shared<ArrowInputStreamAdapter>(in_stream_, arrow_pool_, file_len);
auto ipc_read_options = arrow::ipc::IpcReadOptions::Defaults();
ipc_read_options.memory_pool = arrow_pool_.get();
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
arrow_reader_, arrow::ipc::RecordBatchFileReader::Open(arrow_input_stream_adapter_));
arrow_reader_,
arrow::ipc::RecordBatchFileReader::Open(arrow_input_stream_adapter_, ipc_read_options));
num_record_batches_ = arrow_reader_->num_record_batches();
current_batch_index_ = 0;
return Status::OK();
Expand Down
14 changes: 10 additions & 4 deletions src/paimon/core/mergetree/spill_reader_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ namespace paimon::test {
class SpillReaderWriterTest : public ::testing::TestWithParam<std::string> {
public:
void SetUp() override {
pool_ = GetDefaultPool();
read_pool_ = GetDefaultPool();
write_pool_ = GetDefaultPool();
test_dir_ = UniqueTestDirectory::Create();
file_system_ = test_dir_->GetFileSystem();

Expand Down Expand Up @@ -64,7 +65,8 @@ class SpillReaderWriterTest : public ::testing::TestWithParam<std::string> {

Result<std::unique_ptr<SpillWriter>> CreateSpillWriter() const {
return SpillWriter::Create(file_system_, write_schema_, channel_enumerator_,
spill_channel_manager_, GetParam(), /*compression_level=*/1);
spill_channel_manager_, GetParam(), /*compression_level=*/1,
write_pool_);
}

FileIOChannel::ID WriteSpillFile(
Expand All @@ -79,11 +81,13 @@ class SpillReaderWriterTest : public ::testing::TestWithParam<std::string> {

Result<std::unique_ptr<SpillReader>> CreateSpillReader(
const FileIOChannel::ID& channel_id) const {
return SpillReader::Create(file_system_, key_schema_, value_schema_, pool_, channel_id);
return SpillReader::Create(file_system_, key_schema_, value_schema_, read_pool_,
channel_id);
}

protected:
std::shared_ptr<MemoryPool> pool_;
std::shared_ptr<MemoryPool> read_pool_;
std::shared_ptr<MemoryPool> write_pool_;
std::shared_ptr<FileSystem> file_system_;
std::unique_ptr<UniqueTestDirectory> test_dir_;
std::unique_ptr<IOManager> io_manager_;
Expand Down Expand Up @@ -116,6 +120,7 @@ TEST_P(SpillReaderWriterTest, TestWriteBatch) {
ASSERT_GT(file_size, 0);
ASSERT_OK(writer->Close());
channel_id_1 = writer->GetChannelId();
ASSERT_GT(write_pool_->MaxMemoryUsage(), 0);
}
// Second writer
{
Expand Down Expand Up @@ -166,6 +171,7 @@ TEST_P(SpillReaderWriterTest, TestWriteBatch) {
ASSERT_EQ(batch_count, 1);
ASSERT_EQ(total_rows, 2);
reader->Close();
ASSERT_GT(read_pool_->MaxMemoryUsage(), 0);
}
// Read back second writer's data
{
Expand Down
16 changes: 11 additions & 5 deletions src/paimon/core/mergetree/spill_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "paimon/common/utils/arrow/arrow_output_stream_adapter.h"
#include "paimon/common/utils/arrow/arrow_utils.h"
#include "paimon/common/utils/arrow/mem_utils.h"
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/common/utils/scope_guard.h"
#include "paimon/core/mergetree/spill_channel_manager.h"
Expand All @@ -28,28 +29,33 @@ SpillWriter::SpillWriter(const std::shared_ptr<FileSystem>& fs,
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<FileIOChannel::Enumerator>& channel_enumerator,
const std::shared_ptr<SpillChannelManager>& spill_channel_manager,
const std::string& compression, int32_t compression_level)
const std::string& compression, int32_t compression_level,
const std::shared_ptr<MemoryPool>& pool)
: fs_(fs),
schema_(schema),
channel_enumerator_(channel_enumerator),
spill_channel_manager_(spill_channel_manager),
compression_(compression),
compression_level_(compression_level) {}
compression_level_(compression_level),
arrow_pool_(GetArrowPool(pool)) {}

Result<std::unique_ptr<SpillWriter>> SpillWriter::Create(
const std::shared_ptr<FileSystem>& fs, const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<FileIOChannel::Enumerator>& channel_enumerator,
const std::shared_ptr<SpillChannelManager>& spill_channel_manager,
const std::string& compression, int32_t compression_level) {
std::unique_ptr<SpillWriter> writer(new SpillWriter(
fs, schema, channel_enumerator, spill_channel_manager, compression, compression_level));
const std::string& compression, int32_t compression_level,
const std::shared_ptr<MemoryPool>& pool) {
std::unique_ptr<SpillWriter> writer(new SpillWriter(fs, schema, channel_enumerator,
Comment thread
zjw1111 marked this conversation as resolved.
spill_channel_manager, compression,
compression_level, pool));
PAIMON_RETURN_NOT_OK(writer->Open());
return writer;
}

Status SpillWriter::Open() {
channel_id_ = channel_enumerator_->Next();
auto ipc_write_options = arrow::ipc::IpcWriteOptions::Defaults();
ipc_write_options.memory_pool = arrow_pool_.get();
Comment thread
zjw1111 marked this conversation as resolved.
auto cleanup_guard = ScopeGuard([&]() {
arrow_writer_.reset();
arrow_output_stream_adapter_.reset();
Expand Down
8 changes: 6 additions & 2 deletions src/paimon/core/mergetree/spill_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Schema;
namespace paimon {

class ArrowOutputStreamAdapter;
class MemoryPool;
class SpillChannelManager;

class SpillWriter {
Expand All @@ -41,7 +42,8 @@ class SpillWriter {
const std::shared_ptr<FileSystem>& fs, const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<FileIOChannel::Enumerator>& channel_enumerator,
const std::shared_ptr<SpillChannelManager>& spill_channel_manager,
const std::string& compression, int32_t compression_level);
const std::string& compression, int32_t compression_level,
const std::shared_ptr<MemoryPool>& pool);

SpillWriter(const SpillWriter&) = delete;
SpillWriter& operator=(const SpillWriter&) = delete;
Expand All @@ -55,7 +57,8 @@ class SpillWriter {
SpillWriter(const std::shared_ptr<FileSystem>& fs, const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<FileIOChannel::Enumerator>& channel_enumerator,
const std::shared_ptr<SpillChannelManager>& spill_channel_manager,
const std::string& compression, int32_t compression_level);
const std::string& compression, int32_t compression_level,
const std::shared_ptr<MemoryPool>& pool);

Status Open();

Expand All @@ -67,6 +70,7 @@ class SpillWriter {
int32_t compression_level_;
std::shared_ptr<OutputStream> out_stream_;
std::shared_ptr<ArrowOutputStreamAdapter> arrow_output_stream_adapter_;
std::unique_ptr<arrow::MemoryPool> arrow_pool_;
Comment thread
zjw1111 marked this conversation as resolved.
std::shared_ptr<arrow::ipc::RecordBatchWriter> arrow_writer_;
FileIOChannel::ID channel_id_;
bool closed_ = false;
Expand Down
Loading