Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-8729: [C++][Dataset] Ensure non-empty batches when only virtual columns are projected #7534

Closed
wants to merge 9 commits into from
30 changes: 29 additions & 1 deletion cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Expand Up @@ -1858,7 +1858,7 @@ void MakeDoubleTable(int num_columns, int num_rows, int nchunks,
fields[i] = ::arrow::field(ss.str(), values->type());
}
auto schema = std::make_shared<::arrow::Schema>(fields);
*out = Table::Make(schema, columns);
*out = Table::Make(schema, columns, num_rows);
}

void MakeSimpleListArray(int num_rows, int max_value_length, const std::string& item_name,
Expand Down Expand Up @@ -2056,6 +2056,34 @@ TEST(TestArrowReadWrite, CoalescedReads) {
TestGetRecordBatchReader(arrow_properties);
}

TEST(TestArrowReadWrite, GetRecordBatchReaderNoColumns) {
ArrowReaderProperties properties = default_arrow_reader_properties();
const int num_rows = 10;
const int num_columns = 20;

std::shared_ptr<Table> table;
ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));

std::shared_ptr<Buffer> buffer;
ASSERT_NO_FATAL_FAILURE(
WriteTableToBuffer(table, num_rows, default_arrow_writer_properties(), &buffer));

std::unique_ptr<FileReader> reader;
FileReaderBuilder builder;
ASSERT_OK(builder.Open(std::make_shared<BufferReader>(buffer)));
ASSERT_OK(builder.properties(properties)->Build(&reader));

std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0}, {}, &rb_reader));

std::shared_ptr<::arrow::RecordBatch> actual_batch;
ASSERT_OK(rb_reader->ReadNext(&actual_batch));

ASSERT_NE(actual_batch, nullptr);
ASSERT_EQ(actual_batch->num_columns(), 0);
ASSERT_EQ(actual_batch->num_rows(), num_rows);
}

TEST(TestArrowReadWrite, ScanContents) {
const int num_columns = 20;
const int num_rows = 1000;
Expand Down
257 changes: 137 additions & 120 deletions cpp/src/parquet/arrow/reader.cc
Expand Up @@ -104,8 +104,10 @@ std::shared_ptr<std::unordered_set<int>> VectorToSharedSet(
class FileReaderImpl : public FileReader {
public:
FileReaderImpl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader,
const ArrowReaderProperties& properties)
: pool_(pool), reader_(std::move(reader)), reader_properties_(properties) {}
ArrowReaderProperties properties)
: pool_(pool),
reader_(std::move(reader)),
reader_properties_(std::move(properties)) {}

Status Init() {
return SchemaManifest::Make(reader_->metadata()->schema(),
Expand Down Expand Up @@ -143,16 +145,15 @@ class FileReaderImpl : public FileReader {
return Status::OK();
}

int64_t GetTotalRecords(const std::vector<int>& row_groups, int column_chunk = 0) {
// Can throw exception
int64_t records = 0;
for (auto row_group : row_groups) {
records += reader_->metadata()
->RowGroup(row_group)
->ColumnChunk(column_chunk)
->num_values();
Status BoundsCheck(const std::vector<int>& row_groups,
const std::vector<int>& column_indices) {
for (int i : row_groups) {
return BoundsCheckRowGroup(i);
}
for (int i : column_indices) {
return BoundsCheckColumn(i);
}
return records;
return Status::OK();
}

std::shared_ptr<RowGroupReader> RowGroup(int row_group_index) override;
Expand All @@ -175,27 +176,35 @@ class FileReaderImpl : public FileReader {
return GetReader(manifest_.schema_fields[i], ctx, out);
}

Status GetColumn(int i, FileColumnIteratorFactory iterator_factory,
std::unique_ptr<ColumnReader>* out);

Status ReadSchemaField(int i,
const std::shared_ptr<std::unordered_set<int>>& included_leaves,
Status GetFieldReaders(const std::vector<int>& column_indices,
const std::vector<int>& row_groups,
std::shared_ptr<Field>* out_field,
std::shared_ptr<ChunkedArray>* out) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
std::unique_ptr<ColumnReaderImpl> reader;
RETURN_NOT_OK(GetFieldReader(i, included_leaves, row_groups, &reader));

*out_field = reader->field();
std::vector<std::shared_ptr<ColumnReaderImpl>>* out,
std::shared_ptr<::arrow::Schema>* out_schema) {
// We only need to read schema fields which have columns indicated
// in the indices vector
ARROW_ASSIGN_OR_RAISE(std::vector<int> field_indices,
manifest_.GetFieldIndices(column_indices));

auto included_leaves = VectorToSharedSet(column_indices);

out->resize(field_indices.size());
::arrow::FieldVector out_fields(field_indices.size());
for (size_t i = 0; i < out->size(); ++i) {
std::unique_ptr<ColumnReaderImpl> reader;
RETURN_NOT_OK(
GetFieldReader(field_indices[i], included_leaves, row_groups, &reader));

out_fields[i] = reader->field();
out->at(i) = std::move(reader);
}

// TODO(wesm): This calculation doesn't make much sense when we have repeated
// schema nodes
int64_t records_to_read = GetTotalRecords(row_groups, i);
return reader->NextBatch(records_to_read, out);
END_PARQUET_CATCH_EXCEPTIONS
*out_schema = ::arrow::schema(std::move(out_fields), manifest_.schema_metadata);
return Status::OK();
}

Status GetColumn(int i, FileColumnIteratorFactory iterator_factory,
std::unique_ptr<ColumnReader>* out);

Status GetColumn(int i, std::unique_ptr<ColumnReader>* out) override {
return GetColumn(i, AllRowGroupsFactory(), out);
}
Expand All @@ -205,35 +214,36 @@ class FileReaderImpl : public FileReader {
reader_->metadata()->key_value_metadata(), out);
}

Status ReadSchemaField(int i,
const std::shared_ptr<std::unordered_set<int>>& included_leaves,
const std::vector<int>& row_groups,
std::shared_ptr<ChunkedArray>* out) {
std::shared_ptr<Field> unused;
return ReadSchemaField(i, included_leaves, row_groups, &unused, out);
}
Status ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) override {
auto included_leaves = VectorToSharedSet(Iota(reader_->metadata()->num_columns()));
std::vector<int> row_groups = Iota(reader_->metadata()->num_row_groups());

Status ReadSchemaField(int i,
const std::shared_ptr<std::unordered_set<int>>& included_leaves,
std::shared_ptr<ChunkedArray>* out) {
return ReadSchemaField(i, included_leaves,
Iota(reader_->metadata()->num_row_groups()), out);
std::unique_ptr<ColumnReaderImpl> reader;
RETURN_NOT_OK(GetFieldReader(i, included_leaves, row_groups, &reader));

return ReadColumn(i, row_groups, reader.get(), out);
}

Status ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* out) override {
auto included_leaves = VectorToSharedSet(Iota(reader_->metadata()->num_columns()));
return ReadSchemaField(i, included_leaves,
Iota(reader_->metadata()->num_row_groups()), out);
Status ReadColumn(int i, const std::vector<int>& row_groups, ColumnReader* reader,
std::shared_ptr<ChunkedArray>* out) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
// TODO(wesm): This calculation doesn't make much sense when we have repeated
// schema nodes
int64_t records_to_read = 0;
for (auto row_group : row_groups) {
// Can throw exception
records_to_read +=
reader_->metadata()->RowGroup(row_group)->ColumnChunk(i)->num_values();
}
return reader->NextBatch(records_to_read, out);
END_PARQUET_CATCH_EXCEPTIONS
}

Status ReadColumn(int i, const std::vector<int>& row_groups,
std::shared_ptr<ChunkedArray>* out) {
std::unique_ptr<ColumnReader> flat_column_reader;
RETURN_NOT_OK(GetColumn(i, SomeRowGroupsFactory(row_groups), &flat_column_reader));
BEGIN_PARQUET_CATCH_EXCEPTIONS
int64_t records_to_read = GetTotalRecords(row_groups, i);
return flat_column_reader->NextBatch(records_to_read, out);
END_PARQUET_CATCH_EXCEPTIONS
return ReadColumn(i, row_groups, flat_column_reader.get(), out);
}

Status ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) override {
Expand Down Expand Up @@ -738,68 +748,73 @@ Status GetReader(const SchemaField& field, const std::shared_ptr<ReaderContext>&
END_PARQUET_CATCH_EXCEPTIONS
}

Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_group_indices,
Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
std::unique_ptr<RecordBatchReader>* out) {
// row group indices check
for (int row_group_index : row_group_indices) {
RETURN_NOT_OK(BoundsCheckRowGroup(row_group_index));
}
RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));

// column indices check
ARROW_ASSIGN_OR_RAISE(std::vector<int> field_indices,
manifest_.GetFieldIndices(column_indices));
if (reader_properties_.pre_buffer()) {
// PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
BEGIN_PARQUET_CATCH_EXCEPTIONS
reader_->PreBuffer(row_groups, column_indices, reader_properties_.async_context(),
reader_properties_.cache_options());
END_PARQUET_CATCH_EXCEPTIONS
}

std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
std::shared_ptr<::arrow::Schema> batch_schema;
RETURN_NOT_OK(GetSchema(&batch_schema));
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &batch_schema));

if (readers.empty()) {
// Just generate all batches right now; they're cheap since they have no columns.
int64_t batch_size = properties().batch_size();
auto max_sized_batch =
::arrow::RecordBatch::Make(batch_schema, batch_size, ::arrow::ArrayVector{});

::arrow::RecordBatchVector batches;

for (int row_group : row_groups) {
int64_t num_rows = parquet_reader()->metadata()->RowGroup(row_group)->num_rows();

// filter to only arrow::Fields which contain the selected physical columns
{
::arrow::FieldVector selected_fields;
batches.insert(batches.end(), num_rows / batch_size, max_sized_batch);

for (int field_idx : field_indices) {
selected_fields.push_back(batch_schema->field(field_idx));
if (int64_t trailing_rows = num_rows % batch_size) {
batches.push_back(max_sized_batch->Slice(0, trailing_rows));
}
}

batch_schema = ::arrow::schema(std::move(selected_fields));
}
*out = ::arrow::internal::make_unique<RowGroupRecordBatchReader>(
::arrow::MakeVectorIterator(std::move(batches)), std::move(batch_schema));

if (reader_properties_.pre_buffer()) {
// PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
BEGIN_PARQUET_CATCH_EXCEPTIONS
reader_->PreBuffer(row_group_indices, column_indices,
reader_properties_.async_context(),
reader_properties_.cache_options());
END_PARQUET_CATCH_EXCEPTIONS
return Status::OK();
}

using ::arrow::RecordBatchIterator;

// NB: This lambda will be invoked lazily whenever a new row group must be
// scanned, so it must capture `column_indices` by value (it will not
// otherwise outlive the scope of `GetRecordBatchReader()`). `this` is a non-owning
// pointer so we are relying on the parent FileReader outliving this RecordBatchReader.
auto row_group_index_to_batch_iterator =
[column_indices, this](const int* i) -> ::arrow::Result<RecordBatchIterator> {
std::shared_ptr<::arrow::Table> table;
RETURN_NOT_OK(RowGroup(*i)->ReadTable(column_indices, &table));

auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
table_reader->set_chunksize(properties().batch_size());

// NB: explicitly preserve table so that table_reader doesn't outlive it
return ::arrow::MakeFunctionIterator(
[table, table_reader] { return table_reader->Next(); });
};

::arrow::Iterator<RecordBatchIterator> row_group_batches =
::arrow::MakeMaybeMapIterator(
std::move(row_group_index_to_batch_iterator),
::arrow::MakeVectorPointingIterator(row_group_indices));
// NB: This lambda will be invoked outside the scope of this call to
// `GetRecordBatchReader()`, so it must capture `readers` and `batch_schema` by value.
// `this` is a non-owning pointer so we are relying on the parent FileReader outliving
// this RecordBatchReader.
::arrow::Iterator<RecordBatchIterator> batches = ::arrow::MakeFunctionIterator(
[readers, batch_schema, this]() -> ::arrow::Result<RecordBatchIterator> {
::arrow::ChunkedArrayVector columns(readers.size());
for (size_t i = 0; i < columns.size(); ++i) {
RETURN_NOT_OK(readers[i]->NextBatch(properties().batch_size(), &columns[i]));
if (columns[i] == nullptr || columns[i]->length() == 0) {
return ::arrow::IterationTraits<RecordBatchIterator>::End();
}
}

auto table = ::arrow::Table::Make(batch_schema, std::move(columns));
auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);

// NB: explicitly preserve table so that table_reader doesn't outlive it
return ::arrow::MakeFunctionIterator(
[table, table_reader] { return table_reader->Next(); });
});

*out = ::arrow::internal::make_unique<RowGroupRecordBatchReader>(
::arrow::MakeFlattenIterator(std::move(row_group_batches)),
std::move(batch_schema));
::arrow::MakeFlattenIterator(std::move(batches)), std::move(batch_schema));

return Status::OK();
}
Expand All @@ -819,55 +834,57 @@ Status FileReaderImpl::GetColumn(int i, FileColumnIteratorFactory iterator_facto
}

Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& indices,
const std::vector<int>& column_indices,
std::shared_ptr<Table>* out) {
BEGIN_PARQUET_CATCH_EXCEPTIONS

// We only need to read schema fields which have columns indicated
// in the indices vector
ARROW_ASSIGN_OR_RAISE(std::vector<int> field_indices,
manifest_.GetFieldIndices(indices));
RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));

// PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
if (reader_properties_.pre_buffer()) {
parquet_reader()->PreBuffer(row_groups, indices, reader_properties_.async_context(),
BEGIN_PARQUET_CATCH_EXCEPTIONS
parquet_reader()->PreBuffer(row_groups, column_indices,
reader_properties_.async_context(),
reader_properties_.cache_options());
END_PARQUET_CATCH_EXCEPTIONS
}

int num_fields = static_cast<int>(field_indices.size());
std::vector<std::shared_ptr<Field>> fields(num_fields);
std::vector<std::shared_ptr<ChunkedArray>> columns(num_fields);
std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
std::shared_ptr<::arrow::Schema> result_schema;
RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));

auto included_leaves = VectorToSharedSet(indices);
auto ReadColumnFunc = [&](int i) {
return ReadSchemaField(field_indices[i], included_leaves, row_groups, &fields[i],
&columns[i]);
::arrow::ChunkedArrayVector columns(readers.size());
auto ReadColumnFunc = [&](size_t i) {
return ReadColumn(static_cast<int>(i), row_groups, readers[i].get(), &columns[i]);
};

if (reader_properties_.use_threads()) {
std::vector<Future<Status>> futures(num_fields);
std::vector<Future<Status>> futures(readers.size());
auto pool = ::arrow::internal::GetCpuThreadPool();
for (int i = 0; i < num_fields; i++) {
for (size_t i = 0; i < readers.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(futures[i], pool->Submit(ReadColumnFunc, i));
}
Status final_status = Status::OK();

Status final_status;
for (auto& fut : futures) {
Status st = fut.status();
if (!st.ok()) {
final_status = std::move(st);
}
final_status &= fut.status();
}
RETURN_NOT_OK(final_status);
} else {
for (int i = 0; i < num_fields; i++) {
for (size_t i = 0; i < readers.size(); ++i) {
RETURN_NOT_OK(ReadColumnFunc(i));
}
}

auto result_schema = ::arrow::schema(fields, manifest_.schema_metadata);
*out = Table::Make(result_schema, columns);
int64_t num_rows = 0;
if (!columns.empty()) {
num_rows = columns[0]->length();
} else {
for (int i : row_groups) {
num_rows += parquet_reader()->metadata()->RowGroup(i)->num_rows();
}
}

*out = Table::Make(std::move(result_schema), std::move(columns), num_rows);
return (*out)->Validate();
END_PARQUET_CATCH_EXCEPTIONS
}

std::shared_ptr<RowGroupReader> FileReaderImpl::RowGroup(int row_group_index) {
Expand Down