Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

Commit

Permalink
Read multiple RowGroups at once into an Arrow table
Browse files Browse the repository at this point in the history
  • Loading branch information
xhochy committed Aug 22, 2018
1 parent 991e4a5 commit 94c7246
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 2 deletions.
63 changes: 63 additions & 0 deletions src/parquet/arrow/arrow-reader-writer-benchmark.cc
Expand Up @@ -195,6 +195,69 @@ BENCHMARK_TEMPLATE2(BM_ReadColumn, true, DoubleType);
BENCHMARK_TEMPLATE2(BM_ReadColumn, false, BooleanType);
BENCHMARK_TEMPLATE2(BM_ReadColumn, true, BooleanType);

static void BM_ReadIndividualRowGroups(::benchmark::State& state) {
std::vector<int64_t> values(BENCHMARK_SIZE, 128);
std::shared_ptr<::arrow::Table> table = TableFromVector<Int64Type>(values, true);
auto output = std::make_shared<InMemoryOutputStream>();
// This writes 10 RowGroups
EXIT_NOT_OK(
WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE / 10));
std::shared_ptr<Buffer> buffer = output->GetBuffer();

while (state.KeepRunning()) {
auto reader =
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
FileReader filereader(::arrow::default_memory_pool(), std::move(reader));

std::vector<std::shared_ptr<::arrow::Table>> tables;
for (int i = 0; i < filereader.num_row_groups(); i++) {
// Only read the even numbered RowGroups
if ((i % 2) == 0) {
std::shared_ptr<::arrow::Table> table;
EXIT_NOT_OK(filereader.RowGroup(i)->ReadTable(&table));
tables.push_back(table);
}
}

std::shared_ptr<::arrow::Table> final_table;
EXIT_NOT_OK(ConcatenateTables(tables, &final_table));
}
SetBytesProcessed<true, Int64Type>(state);
}

BENCHMARK(BM_ReadIndividualRowGroups);

static void BM_ReadMultipleRowGroups(::benchmark::State& state) {
std::vector<int64_t> values(BENCHMARK_SIZE, 128);
std::shared_ptr<::arrow::Table> table = TableFromVector<Int64Type>(values, true);
auto output = std::make_shared<InMemoryOutputStream>();
// This writes 10 RowGroups
EXIT_NOT_OK(
WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE / 10));
std::shared_ptr<Buffer> buffer = output->GetBuffer();

while (state.KeepRunning()) {
auto reader =
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
FileReader filereader(::arrow::default_memory_pool(), std::move(reader));

std::vector<std::shared_ptr<::arrow::Table>> tables;
std::vector<int> rgs;
for (int i = 0; i < filereader.num_row_groups(); i++) {
// Only read the even numbered RowGroups
if ((i % 2) == 0) {
rgs.push_back(i);
}
}

std::shared_ptr<::arrow::Table> table;
EXIT_NOT_OK(filereader.ReadRowGroups(rgs, &table));
}
SetBytesProcessed<true, Int64Type>(state);
}

BENCHMARK(BM_ReadMultipleRowGroups);

} // namespace benchmark

} // namespace parquet
9 changes: 8 additions & 1 deletion src/parquet/arrow/arrow-reader-writer-test.cc
Expand Up @@ -1614,14 +1614,21 @@ TEST(TestArrowReadWrite, ReadSingleRowGroup) {

ASSERT_EQ(2, reader->num_row_groups());

std::shared_ptr<Table> r1, r2;
std::shared_ptr<Table> r1, r2, r3, r4;
// Read everything
ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1));
ASSERT_OK_NO_THROW(reader->RowGroup(1)->ReadTable(&r2));
ASSERT_OK_NO_THROW(reader->ReadRowGroups({0, 1}, &r3));
ASSERT_OK_NO_THROW(reader->ReadRowGroups({1}, &r4));

std::shared_ptr<Table> concatenated;

ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated));
ASSERT_TRUE(table->Equals(*concatenated));

ASSERT_TRUE(table->Equals(*r3));
ASSERT_TRUE(r2->Equals(*r4));
ASSERT_OK(ConcatenateTables({r1, r4}, &concatenated));
ASSERT_TRUE(table->Equals(*concatenated));
}

Expand Down
50 changes: 49 additions & 1 deletion src/parquet/arrow/reader.cc
Expand Up @@ -228,11 +228,15 @@ class FileReader::Impl {
Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
Status GetSchema(const std::vector<int>& indices,
std::shared_ptr<::arrow::Schema>* out);
Status ReadRowGroup(int row_group_index, std::shared_ptr<Table>* table);
Status ReadRowGroup(int row_group_index, const std::vector<int>& indices,
std::shared_ptr<::arrow::Table>* out);
Status ReadTable(const std::vector<int>& indices, std::shared_ptr<Table>* table);
Status ReadTable(std::shared_ptr<Table>* table);
Status ReadRowGroup(int i, std::shared_ptr<Table>* table);
Status ReadRowGroups(const std::vector<int>& row_groups, std::shared_ptr<Table>* table);
Status ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& indices,
std::shared_ptr<::arrow::Table>* out);

bool CheckForFlatColumn(const ColumnDescriptor* descr);
bool CheckForFlatListColumn(const ColumnDescriptor* descr);
Expand Down Expand Up @@ -562,6 +566,31 @@ Status FileReader::Impl::ReadTable(std::shared_ptr<Table>* table) {
return ReadTable(indices, table);
}

Status FileReader::Impl::ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& indices,
std::shared_ptr<Table>* table) {
// TODO(PARQUET-1393): Modify the record readers to already read this into a single,
// continuous array.
std::vector<std::shared_ptr<Table>> tables;

for (size_t i = 0; i < row_groups.size(); ++i) {
std::shared_ptr<Table> rg_table;
RETURN_NOT_OK(ReadRowGroup(row_groups[i], indices, &rg_table));
tables.push_back(rg_table);
}
return ConcatenateTables(tables, table);
}

Status FileReader::Impl::ReadRowGroups(const std::vector<int>& row_groups,
std::shared_ptr<Table>* table) {
std::vector<int> indices(reader_->metadata()->num_columns());

for (size_t i = 0; i < indices.size(); ++i) {
indices[i] = static_cast<int>(i);
}
return ReadRowGroups(row_groups, indices, table);
}

Status FileReader::Impl::ReadRowGroup(int i, std::shared_ptr<Table>* table) {
std::vector<int> indices(reader_->metadata()->num_columns());

Expand Down Expand Up @@ -683,6 +712,25 @@ Status FileReader::ReadRowGroup(int i, const std::vector<int>& indices,
}
}

Status FileReader::ReadRowGroups(const std::vector<int>& row_groups,
std::shared_ptr<Table>* out) {
try {
return impl_->ReadRowGroups(row_groups, out);
} catch (const ::parquet::ParquetException& e) {
return ::arrow::Status::IOError(e.what());
}
}

Status FileReader::ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& indices,
std::shared_ptr<Table>* out) {
try {
return impl_->ReadRowGroups(row_groups, indices, out);
} catch (const ::parquet::ParquetException& e) {
return ::arrow::Status::IOError(e.what());
}
}

std::shared_ptr<RowGroupReader> FileReader::RowGroup(int row_group_index) {
return std::shared_ptr<RowGroupReader>(
new RowGroupReader(impl_.get(), row_group_index));
Expand Down
7 changes: 7 additions & 0 deletions src/parquet/arrow/reader.h
Expand Up @@ -182,6 +182,13 @@ class PARQUET_EXPORT FileReader {

::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out);

::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
const std::vector<int>& column_indices,
std::shared_ptr<::arrow::Table>* out);

::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
std::shared_ptr<::arrow::Table>* out);

/// \brief Scan file contents with one thread, return number of rows
::arrow::Status ScanContents(std::vector<int> columns, const int32_t column_batch_size,
int64_t* num_rows);
Expand Down

0 comments on commit 94c7246

Please sign in to comment.