Skip to content
Permalink
Browse files
[Enhancement] improve parquet reader via arrow's prefetch and multi t…
…hread (#9472)

* add ArrowReaderProperties to parquet::arrow::FileReader

* support perfecth batch
  • Loading branch information
Lchangliang committed May 19, 2022
1 parent 1355bc1 commit ef65f484df70aede49777157657354b80039b5a7
Showing 3 changed files with 118 additions and 46 deletions.
@@ -736,6 +736,10 @@ CONF_Validator(string_type_length_soft_limit_bytes,
// used for olap scanner to save memory, when the size of unused_object_pool
// is greater than object_pool_buffer_size, release the object in the unused_object_pool.
CONF_Int32(object_pool_buffer_size, "100");

// ParquetReaderWrap prefetch buffer size
CONF_Int32(parquet_reader_max_buffer_size, "50");

} // namespace config

} // namespace doris
@@ -18,9 +18,15 @@

#include <arrow/array.h>
#include <arrow/status.h>
#include <arrow/type_fwd.h>
#include <time.h>

#include <algorithm>
#include <mutex>
#include <thread>

#include "common/logging.h"
#include "common/status.h"
#include "exec/file_reader.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/TPaloBrokerService.h"
@@ -44,9 +50,6 @@ ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int32_t num_of_col
_current_line_of_group(0),
_current_line_of_batch(0) {
_parquet = std::shared_ptr<ParquetFile>(new ParquetFile(file_reader));
_properties = parquet::ReaderProperties();
_properties.enable_buffered_stream();
_properties.set_buffer_size(65535);
}

ParquetReaderWrap::~ParquetReaderWrap() {
@@ -55,10 +58,23 @@ ParquetReaderWrap::~ParquetReaderWrap() {
Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
const std::string& timezone) {
try {
// new file reader for parquet file
auto st = parquet::arrow::FileReader::Make(
arrow::default_memory_pool(),
parquet::ParquetFileReader::Open(_parquet, _properties), &_reader);
parquet::ArrowReaderProperties arrow_reader_properties =
parquet::default_arrow_reader_properties();
arrow_reader_properties.set_pre_buffer(true);
arrow_reader_properties.set_use_threads(true);
// Open Parquet file reader
auto reader_builder = parquet::arrow::FileReaderBuilder();
reader_builder.properties(arrow_reader_properties);

auto st = reader_builder.Open(_parquet);

if (!st.ok()) {
LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString();
return Status::InternalError("Failed to create file reader");
}

st = reader_builder.Build(&_reader);

if (!st.ok()) {
LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString();
return Status::InternalError("Failed to create file reader");
@@ -85,31 +101,23 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>

_timezone = timezone;

if (_current_line_of_group == 0) { // the first read
RETURN_IF_ERROR(column_indices(tuple_slot_descs));
// read batch
arrow::Status status = _reader->GetRecordBatchReader({_current_group},
_parquet_column_ids, &_rb_batch);
if (!status.ok()) {
LOG(WARNING) << "Get RecordBatch Failed. " << status.ToString();
return Status::InternalError(status.ToString());
}
status = _rb_batch->ReadNext(&_batch);
if (!status.ok()) {
LOG(WARNING) << "The first read record. " << status.ToString();
return Status::InternalError(status.ToString());
}
_current_line_of_batch = 0;
//save column type
std::shared_ptr<arrow::Schema> field_schema = _batch->schema();
for (int i = 0; i < _parquet_column_ids.size(); i++) {
std::shared_ptr<arrow::Field> field = field_schema->field(i);
if (!field) {
LOG(WARNING) << "Get field schema failed. Column order:" << i;
return Status::InternalError(status.ToString());
}
_parquet_column_type.emplace_back(field->type()->id());
RETURN_IF_ERROR(column_indices(tuple_slot_descs));

std::thread thread(&ParquetReaderWrap::prefetch_batch, this);
thread.detach();

// read batch
RETURN_IF_ERROR(read_next_batch());
_current_line_of_batch = 0;
//save column type
std::shared_ptr<arrow::Schema> field_schema = _batch->schema();
for (int i = 0; i < _parquet_column_ids.size(); i++) {
std::shared_ptr<arrow::Field> field = field_schema->field(i);
if (!field) {
LOG(WARNING) << "Get field schema failed. Column order:" << i;
return Status::InternalError(_status.ToString());
}
_parquet_column_type.emplace_back(field->type()->id());
}
return Status::OK();
} catch (parquet::ParquetException& e) {
@@ -121,6 +129,8 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
}

void ParquetReaderWrap::close() {
_closed = true;
_queue_writer_cond.notify_one();
arrow::Status st = _parquet->Close();
if (!st.ok()) {
LOG(WARNING) << "close parquet file error: " << st.ToString();
@@ -195,25 +205,15 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
_rows_of_group = _file_metadata->RowGroup(_current_group)
->num_rows(); //get rows of the current row group
// read batch
arrow::Status status =
_reader->GetRecordBatchReader({_current_group}, _parquet_column_ids, &_rb_batch);
if (!status.ok()) {
return Status::InternalError("Get RecordBatchReader Failed.");
}
status = _rb_batch->ReadNext(&_batch);
if (!status.ok()) {
return Status::InternalError("Read Batch Error With Libarrow.");
}
RETURN_IF_ERROR(read_next_batch());
_current_line_of_batch = 0;
} else if (_current_line_of_batch >= _batch->num_rows()) {
VLOG_DEBUG << "read_record_batch, current group id:" << _current_group
<< " current line of batch:" << _current_line_of_batch
<< " is larger than batch size:" << _batch->num_rows()
<< ". start to read next batch";
arrow::Status status = _rb_batch->ReadNext(&_batch);
if (!status.ok()) {
return Status::InternalError("Read Batch Error With Libarrow.");
}
// read batch
RETURN_IF_ERROR(read_next_batch());
_current_line_of_batch = 0;
}
return Status::OK();
@@ -553,6 +553,55 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
return read_record_batch(tuple_slot_descs, eof);
}

void ParquetReaderWrap::prefetch_batch() {
auto insert_batch = [this](const auto& batch) {
std::unique_lock<std::mutex> lock(_mtx);
while (!_closed && _queue.size() == _max_queue_size) {
_queue_writer_cond.wait_for(lock, std::chrono::seconds(1));
}
if (UNLIKELY(_closed)) {
return;
}
_queue.push_back(batch);
_queue_reader_cond.notify_one();
};
int current_group = 0;
while (true) {
if (_closed || current_group >= _total_groups) {
return;
}
_status = _reader->GetRecordBatchReader({current_group}, _parquet_column_ids, &_rb_batch);
if (!_status.ok()) {
_closed = true;
return;
}
arrow::RecordBatchVector batches;
_status = _rb_batch->ReadAll(&batches);
if (!_status.ok()) {
_closed = true;
return;
}
std::for_each(batches.begin(), batches.end(), insert_batch);
current_group++;
}
}

Status ParquetReaderWrap::read_next_batch() {
std::unique_lock<std::mutex> lock(_mtx);
while (!_closed && _queue.empty()) {
_queue_reader_cond.wait_for(lock, std::chrono::seconds(1));
}

if (UNLIKELY(_closed)) {
return Status::InternalError(_status.message());
}

_batch = _queue.front();
_queue.pop_front();
_queue_writer_cond.notify_one();
return Status::OK();
}

ParquetFile::ParquetFile(FileReader* file) : _file(file) {}

ParquetFile::~ParquetFile() {
@@ -22,17 +22,24 @@
#include <arrow/io/api.h>
#include <arrow/io/file.h>
#include <arrow/io/interfaces.h>
#include <arrow/status.h>
#include <parquet/api/reader.h>
#include <parquet/api/writer.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <parquet/exception.h>
#include <stdint.h>

#include <atomic>
#include <condition_variable>
#include <list>
#include <map>
#include <mutex>
#include <string>
#include <thread>

#include "common/status.h"
#include "common/config.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
@@ -51,7 +58,7 @@ class FileReader;
class ParquetFile : public arrow::io::RandomAccessFile {
public:
ParquetFile(FileReader* file);
virtual ~ParquetFile();
~ParquetFile() override;
arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override;
arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
arrow::Result<int64_t> GetSize() override;
@@ -92,9 +99,12 @@ class ParquetReaderWrap {
Status handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t* buf,
int32_t* wbtyes);

private:
void prefetch_batch();
Status read_next_batch();

private:
const int32_t _num_of_columns_from_file;
parquet::ReaderProperties _properties;
std::shared_ptr<ParquetFile> _parquet;

// parquet file reader object
@@ -113,6 +123,15 @@ class ParquetReaderWrap {
int _current_line_of_batch;

std::string _timezone;

private:
std::atomic<bool> _closed = false;
arrow::Status _status;
std::mutex _mtx;
std::condition_variable _queue_reader_cond;
std::condition_variable _queue_writer_cond;
std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
};

} // namespace doris

0 comments on commit ef65f48

Please sign in to comment.