Skip to content

Commit

Permalink
don't hold an entire row group in memory
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Jul 13, 2020
1 parent d199fc5 commit 75e3b0a
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 125 deletions.
245 changes: 135 additions & 110 deletions cpp/src/parquet/arrow/reader.cc
Expand Up @@ -24,6 +24,7 @@
#include <vector>

#include "arrow/array.h"
#include "arrow/array/util.h"
#include "arrow/buffer.h"
#include "arrow/io/memory.h"
#include "arrow/record_batch.h"
Expand Down Expand Up @@ -104,8 +105,10 @@ std::shared_ptr<std::unordered_set<int>> VectorToSharedSet(
class FileReaderImpl : public FileReader {
public:
FileReaderImpl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader,
const ArrowReaderProperties& properties)
: pool_(pool), reader_(std::move(reader)), reader_properties_(properties) {}
ArrowReaderProperties properties)
: pool_(pool),
reader_(std::move(reader)),
reader_properties_(std::move(properties)) {}

Status Init() {
return SchemaManifest::Make(reader_->metadata()->schema(),
Expand Down Expand Up @@ -143,16 +146,15 @@ class FileReaderImpl : public FileReader {
return Status::OK();
}

int64_t GetTotalRecords(const std::vector<int>& row_groups, int column_chunk = 0) {
// Can throw exception
int64_t records = 0;
for (auto row_group : row_groups) {
records += reader_->metadata()
->RowGroup(row_group)
->ColumnChunk(column_chunk)
->num_values();
Status BoundsCheck(const std::vector<int>& row_groups,
const std::vector<int>& column_indices) {
for (int i : row_groups) {
return BoundsCheckRowGroup(i);
}
for (int i : column_indices) {
return BoundsCheckColumn(i);
}
return records;
return Status::OK();
}

std::shared_ptr<RowGroupReader> RowGroup(int row_group_index) override;
Expand All @@ -175,27 +177,35 @@ class FileReaderImpl : public FileReader {
return GetReader(manifest_.schema_fields[i], ctx, out);
}

Status GetColumn(int i, FileColumnIteratorFactory iterator_factory,
std::unique_ptr<ColumnReader>* out);

Status ReadSchemaField(int i,
const std::shared_ptr<std::unordered_set<int>>& included_leaves,
Status GetFieldReaders(const std::vector<int>& column_indices,
const std::vector<int>& row_groups,
std::shared_ptr<Field>* out_field,
std::shared_ptr<ChunkedArray>* out) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
std::unique_ptr<ColumnReaderImpl> reader;
RETURN_NOT_OK(GetFieldReader(i, included_leaves, row_groups, &reader));

*out_field = reader->field();
std::vector<std::shared_ptr<ColumnReaderImpl>>* out,
std::shared_ptr<::arrow::Schema>* out_schema) {
// We only need to read schema fields which have columns indicated
// in the indices vector
ARROW_ASSIGN_OR_RAISE(std::vector<int> field_indices,
manifest_.GetFieldIndices(column_indices));

auto included_leaves = VectorToSharedSet(column_indices);

out->resize(field_indices.size());
::arrow::FieldVector out_fields(field_indices.size());
for (size_t i = 0; i < out->size(); ++i) {
std::unique_ptr<ColumnReaderImpl> reader;
RETURN_NOT_OK(
GetFieldReader(field_indices[i], included_leaves, row_groups, &reader));

out_fields[i] = reader->field();
out->at(i) = std::move(reader);
}

// TODO(wesm): This calculation doesn't make much sense when we have repeated
// schema nodes
int64_t records_to_read = GetTotalRecords(row_groups, i);
return reader->NextBatch(records_to_read, out);
END_PARQUET_CATCH_EXCEPTIONS
*out_schema = ::arrow::schema(std::move(out_fields), manifest_.schema_metadata);
return Status::OK();
}

Status GetColumn(int i, FileColumnIteratorFactory iterator_factory,
std::unique_ptr<ColumnReader>* out);

Status GetColumn(int i, std::unique_ptr<ColumnReader>* out) override {
return GetColumn(i, AllRowGroupsFactory(), out);
}
Expand All @@ -205,35 +215,36 @@ class FileReaderImpl : public FileReader {
reader_->metadata()->key_value_metadata(), out);
}

Status ReadSchemaField(int i,
const std::shared_ptr<std::unordered_set<int>>& included_leaves,
const std::vector<int>& row_groups,
std::shared_ptr<ChunkedArray>* out) {
std::shared_ptr<Field> unused;
return ReadSchemaField(i, included_leaves, row_groups, &unused, out);
}
Status ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) override {
auto included_leaves = VectorToSharedSet(Iota(reader_->metadata()->num_columns()));
std::vector<int> row_groups = Iota(reader_->metadata()->num_row_groups());

Status ReadSchemaField(int i,
const std::shared_ptr<std::unordered_set<int>>& included_leaves,
std::shared_ptr<ChunkedArray>* out) {
return ReadSchemaField(i, included_leaves,
Iota(reader_->metadata()->num_row_groups()), out);
std::unique_ptr<ColumnReaderImpl> reader;
RETURN_NOT_OK(GetFieldReader(i, included_leaves, row_groups, &reader));

return ReadColumn(i, row_groups, reader.get(), out);
}

Status ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) override {
auto included_leaves = VectorToSharedSet(Iota(reader_->metadata()->num_columns()));
return ReadSchemaField(i, included_leaves,
Iota(reader_->metadata()->num_row_groups()), out);
Status ReadColumn(int i, const std::vector<int>& row_groups, ColumnReader* reader,
std::shared_ptr<ChunkedArray>* out) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
// TODO(wesm): This calculation doesn't make much sense when we have repeated
// schema nodes
int64_t records_to_read = 0;
for (auto row_group : row_groups) {
// Can throw exception
records_to_read +=
reader_->metadata()->RowGroup(row_group)->ColumnChunk(i)->num_values();
}
return reader->NextBatch(records_to_read, out);
END_PARQUET_CATCH_EXCEPTIONS
}

Status ReadColumn(int i, const std::vector<int>& row_groups,
std::shared_ptr<ChunkedArray>* out) {
std::unique_ptr<ColumnReader> flat_column_reader;
RETURN_NOT_OK(GetColumn(i, SomeRowGroupsFactory(row_groups), &flat_column_reader));
BEGIN_PARQUET_CATCH_EXCEPTIONS
int64_t records_to_read = GetTotalRecords(row_groups, i);
return flat_column_reader->NextBatch(records_to_read, out);
END_PARQUET_CATCH_EXCEPTIONS
return ReadColumn(i, row_groups, flat_column_reader.get(), out);
}

Status ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) override {
Expand Down Expand Up @@ -738,39 +749,44 @@ Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>&
END_PARQUET_CATCH_EXCEPTIONS
}

Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_group_indices,
Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
std::unique_ptr<RecordBatchReader>* out) {
// row group indices check
for (int row_group_index : row_group_indices) {
RETURN_NOT_OK(BoundsCheckRowGroup(row_group_index));
}
RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));

// column indices check
ARROW_ASSIGN_OR_RAISE(std::vector<int> field_indices,
manifest_.GetFieldIndices(column_indices));
if (reader_properties_.pre_buffer()) {
// PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
BEGIN_PARQUET_CATCH_EXCEPTIONS
reader_->PreBuffer(row_groups, column_indices, reader_properties_.async_context(),
reader_properties_.cache_options());
END_PARQUET_CATCH_EXCEPTIONS
}

std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
std::shared_ptr<::arrow::Schema> batch_schema;
RETURN_NOT_OK(GetSchema(&batch_schema));
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &batch_schema));

// filter to only arrow::Fields which contain the selected physical columns
{
::arrow::FieldVector selected_fields;
if (readers.empty()) {
int64_t batch_size = properties().batch_size();
auto max_sized_batch =
::arrow::RecordBatch::Make(batch_schema, batch_size, ::arrow::ArrayVector{});

for (int field_idx : field_indices) {
selected_fields.push_back(batch_schema->field(field_idx));
}
::arrow::RecordBatchVector batches;

batch_schema = ::arrow::schema(std::move(selected_fields));
}
for (int row_group : row_groups) {
// create a single RecordBatch with no columns covering a whole row group
int64_t num_rows = parquet_reader()->metadata()->RowGroup(row_group)->num_rows();

if (reader_properties_.pre_buffer()) {
// PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
BEGIN_PARQUET_CATCH_EXCEPTIONS
reader_->PreBuffer(row_group_indices, column_indices,
reader_properties_.async_context(),
reader_properties_.cache_options());
END_PARQUET_CATCH_EXCEPTIONS
batches.insert(batches.end(), num_rows / batch_size, max_sized_batch);

if (int64_t trailing_rows = num_rows % batch_size) {
batches.push_back(max_sized_batch->Slice(0, trailing_rows));
}
}

*out = ::arrow::internal::make_unique<RowGroupRecordBatchReader>(
::arrow::MakeVectorIterator(std::move(batches)), std::move(batch_schema));
return Status::OK();
}

using ::arrow::RecordBatchIterator;
Expand All @@ -779,27 +795,41 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_group_in
// scanned, so it must capture `column_indices` by value (it will not
// otherwise outlive the scope of `GetRecordBatchReader()`). `this` is a non-owning
// pointer so we are relying on the parent FileReader outliving this RecordBatchReader.
auto row_group_index_to_batch_iterator =
[column_indices, this](const int* i) -> ::arrow::Result<RecordBatchIterator> {
std::shared_ptr<::arrow::Table> table;
RETURN_NOT_OK(RowGroup(*i)->ReadTable(column_indices, &table));

auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
table_reader->set_chunksize(properties().batch_size());

// NB: explicitly preserve table so that table_reader doesn't outlive it
return ::arrow::MakeFunctionIterator(
[table, table_reader] { return table_reader->Next(); });
};

::arrow::Iterator<RecordBatchIterator> row_group_batches =
::arrow::MakeMaybeMapIterator(
std::move(row_group_index_to_batch_iterator),
::arrow::MakeVectorPointingIterator(row_group_indices));
::arrow::Iterator<RecordBatchIterator> batches = ::arrow::MakeFunctionIterator(
[readers, batch_schema, this]() -> ::arrow::Result<RecordBatchIterator> {
// Get the next chunks for each column
// (chunks[i] contains the chunks for column i)
std::vector<::arrow::ArrayVector> chunks(readers.size());
for (size_t i = 0; i < readers.size(); ++i) {
std::shared_ptr<ChunkedArray> chunk;
RETURN_NOT_OK(readers[i]->NextBatch(properties().batch_size(), &chunk));
if (chunk == nullptr || chunk->length() == 0) {
return ::arrow::IterationTraits<RecordBatchIterator>::End();
}
chunks[i] = chunk->chunks();
}

// Transpose chunks to record batches
chunks = ::arrow::internal::RechunkArraysConsistently(chunks);

::arrow::RecordBatchVector batches(chunks[0].size());

for (size_t batch_index = 0; batch_index < batches.size(); ++batch_index) {
::arrow::ArrayVector columns(chunks.size());
for (size_t i = 0; i < columns.size(); ++i) {
columns[i] = std::move(chunks[i][batch_index]);
}

int64_t num_rows = columns[0]->length();
batches[batch_index] =
::arrow::RecordBatch::Make(batch_schema, num_rows, std::move(columns));
}

return ::arrow::MakeVectorIterator(std::move(batches));
});

*out = ::arrow::internal::make_unique<RowGroupRecordBatchReader>(
::arrow::MakeFlattenIterator(std::move(row_group_batches)),
std::move(batch_schema));
::arrow::MakeFlattenIterator(std::move(batches)), std::move(batch_schema));

return Status::OK();
}
Expand All @@ -819,34 +849,32 @@ Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_facto
}

Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& indices,
const std::vector<int>& column_indices,
std::shared_ptr<Table>* out) {
BEGIN_PARQUET_CATCH_EXCEPTIONS

// We only need to read schema fields which have columns indicated
// in the indices vector
ARROW_ASSIGN_OR_RAISE(std::vector<int> field_indices,
manifest_.GetFieldIndices(indices));
RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));

// PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
if (reader_properties_.pre_buffer()) {
parquet_reader()->PreBuffer(row_groups, indices, reader_properties_.async_context(),
BEGIN_PARQUET_CATCH_EXCEPTIONS
parquet_reader()->PreBuffer(row_groups, column_indices,
reader_properties_.async_context(),
reader_properties_.cache_options());
END_PARQUET_CATCH_EXCEPTIONS
}

::arrow::FieldVector fields(field_indices.size());
::arrow::ChunkedArrayVector columns(fields.size());
std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
std::shared_ptr<::arrow::Schema> result_schema;
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));

auto included_leaves = VectorToSharedSet(indices);
::arrow::ChunkedArrayVector columns(readers.size());
auto ReadColumnFunc = [&](size_t i) {
return ReadSchemaField(field_indices[i], included_leaves, row_groups, &fields[i],
&columns[i]);
return ReadColumn(i, row_groups, readers[i].get(), &columns[i]);
};

if (reader_properties_.use_threads()) {
std::vector<Future<Status>> futures(fields.size());
std::vector<Future<Status>> futures(readers.size());
auto pool = ::arrow::internal::GetCpuThreadPool();
for (size_t i = 0; i < fields.size(); ++i) {
for (size_t i = 0; i < readers.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(futures[i], pool->Submit(ReadColumnFunc, i));
}

Expand All @@ -856,13 +884,11 @@ Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
}
RETURN_NOT_OK(final_status);
} else {
for (size_t i = 0; i < fields.size(); ++i) {
for (size_t i = 0; i < readers.size(); ++i) {
RETURN_NOT_OK(ReadColumnFunc(i));
}
}

auto result_schema = ::arrow::schema(std::move(fields), manifest_.schema_metadata);

int64_t num_rows = 0;
if (!columns.empty()) {
num_rows = columns[0]->length();
Expand All @@ -874,7 +900,6 @@ Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,

*out = Table::Make(std::move(result_schema), std::move(columns), num_rows);
return (*out)->Validate();
END_PARQUET_CATCH_EXCEPTIONS
}

std::shared_ptr<RowGroupReader> FileReaderImpl::RowGroup(int row_group_index) {
Expand Down
15 changes: 3 additions & 12 deletions cpp/src/parquet/arrow/reader.h
Expand Up @@ -152,12 +152,8 @@ class PARQUET_EXPORT FileReader {

/// \brief Return a RecordBatchReader of row groups selected from row_group_indices.
///
/// Note that the ordering in row_group_indices matters. Each row group will
/// held in memory until each slice has been yielded as a RecordBatch, at
/// which point the next row group will be loaded. FileReaders must outlive
/// their RecordBatchReaders. NOTE: in the future we would like to avoid
/// materializing the entire row group in memory before yielding chunks of it
/// in this interface, thus reducing memory use.
/// Note that the ordering in row_group_indices matters. FileReaders must outlive
/// their RecordBatchReaders.
///
/// \returns error Status if row_group_indices contains an invalid index
virtual ::arrow::Status GetRecordBatchReader(
Expand All @@ -171,12 +167,7 @@ class PARQUET_EXPORT FileReader {
/// row_group_indices, whose columns are selected by column_indices.
///
/// Note that the ordering in row_group_indices and column_indices
/// matter. Each row group will held in memory until each slice has been
/// yielded as a RecordBatch, at which point the next row group will be
/// loaded. FileReaders must outlive their RecordBatchReaders. NOTE: in the
/// future we would like to avoid materializing the entire row group in
/// memory before yielding chunks of it in this interface, thus reducing
/// memory use.
/// matter. FileReaders must outlive their RecordBatchReaders.
///
/// \returns error Status if either row_group_indices or column_indices
/// contains an invalid index
Expand Down

0 comments on commit 75e3b0a

Please sign in to comment.