Skip to content

Commit

Permalink
ARROW-91: Basic Parquet read support
Browse files Browse the repository at this point in the history
Depends on (mainly one line fixes):

- [x] apache/parquet-cpp#99
- [x] apache/parquet-cpp#98
- [x] apache/parquet-cpp#97

Author: Uwe L. Korn <uwelk@xhochy.com>
Author: Wes McKinney <wesm@apache.org>

Closes #73 from xhochy/arrow-91 and squashes the following commits:

7579fed [Uwe L. Korn] Mark single argument constructor as explicit
47441a1 [Uwe L. Korn] Assert that no exception was thrown
5fa1026 [Uwe L. Korn] Incorporate review comments
8d2db22 [Uwe L. Korn] ARROW-91: Basic Parquet read support
d9940d8 [Wes McKinney] Public API draft
  • Loading branch information
xhochy authored and wesm committed May 10, 2016
1 parent 1f04f7f commit 4bd13b8
Show file tree
Hide file tree
Showing 7 changed files with 488 additions and 8 deletions.
4 changes: 4 additions & 0 deletions cpp/src/arrow/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# arrow_parquet : Arrow <-> Parquet adapter

set(PARQUET_SRCS
reader.cc
schema.cc
)

Expand All @@ -36,6 +37,9 @@ SET_TARGET_PROPERTIES(arrow_parquet PROPERTIES LINKER_LANGUAGE CXX)
ADD_ARROW_TEST(parquet-schema-test)
ARROW_TEST_LINK_LIBRARIES(parquet-schema-test arrow_parquet)

ADD_ARROW_TEST(parquet-reader-test)
ARROW_TEST_LINK_LIBRARIES(parquet-reader-test arrow_parquet)

# Headers: top level
install(FILES
DESTINATION include/arrow/parquet)
116 changes: 116 additions & 0 deletions cpp/src/arrow/parquet/parquet-reader-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "gtest/gtest.h"

#include "arrow/test-util.h"
#include "arrow/parquet/reader.h"
#include "arrow/types/primitive.h"
#include "arrow/util/memory-pool.h"
#include "arrow/util/status.h"

#include "parquet/api/reader.h"
#include "parquet/api/writer.h"

using ParquetBuffer = parquet::Buffer;
using parquet::BufferReader;
using parquet::InMemoryOutputStream;
using parquet::Int64Writer;
using parquet::ParquetFileReader;
using parquet::ParquetFileWriter;
using parquet::RandomAccessSource;
using parquet::Repetition;
using parquet::SchemaDescriptor;
using ParquetType = parquet::Type;
using parquet::schema::GroupNode;
using parquet::schema::NodePtr;
using parquet::schema::PrimitiveNode;

namespace arrow {

namespace parquet {

class TestReadParquet : public ::testing::Test {
public:
virtual void SetUp() {}

std::shared_ptr<GroupNode> Int64Schema() {
auto pnode = PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64);
NodePtr node_ =
GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
return std::static_pointer_cast<GroupNode>(node_);
}

std::unique_ptr<ParquetFileReader> Int64File(
std::vector<int64_t>& values, int num_chunks) {
std::shared_ptr<GroupNode> schema = Int64Schema();
std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
auto file_writer = ParquetFileWriter::Open(sink, schema);
size_t chunk_size = values.size() / num_chunks;
for (int i = 0; i < num_chunks; i++) {
auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
auto column_writer = static_cast<Int64Writer*>(row_group_writer->NextColumn());
int64_t* data = values.data() + i * chunk_size;
column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
column_writer->Close();
row_group_writer->Close();
}
file_writer->Close();

std::shared_ptr<ParquetBuffer> buffer = sink->GetBuffer();
std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
return ParquetFileReader::Open(std::move(source));
}

private:
};

TEST_F(TestReadParquet, SingleColumnInt64) {
std::vector<int64_t> values(100, 128);
std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 1);
arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader;
ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader)));
ASSERT_NE(nullptr, column_reader.get());
std::shared_ptr<Array> out;
ASSERT_OK(column_reader->NextBatch(100, &out));
ASSERT_NE(nullptr, out.get());
Int64Array* out_array = static_cast<Int64Array*>(out.get());
for (size_t i = 0; i < values.size(); i++) {
EXPECT_EQ(values[i], out_array->raw_data()[i]);
}
}

TEST_F(TestReadParquet, SingleColumnInt64Chunked) {
std::vector<int64_t> values(100, 128);
std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 4);
arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader;
ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader)));
ASSERT_NE(nullptr, column_reader.get());
std::shared_ptr<Array> out;
ASSERT_OK(column_reader->NextBatch(100, &out));
ASSERT_NE(nullptr, out.get());
Int64Array* out_array = static_cast<Int64Array*>(out.get());
for (size_t i = 0; i < values.size(); i++) {
EXPECT_EQ(values[i], out_array->raw_data()[i]);
}
}

} // namespace parquet

} // namespace arrow
194 changes: 194 additions & 0 deletions cpp/src/arrow/parquet/reader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/parquet/reader.h"

#include <queue>

#include "arrow/parquet/schema.h"
#include "arrow/parquet/utils.h"
#include "arrow/schema.h"
#include "arrow/types/primitive.h"
#include "arrow/util/status.h"

using parquet::ColumnReader;
using parquet::TypedColumnReader;

namespace arrow {
namespace parquet {

class FileReader::Impl {
public:
Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader);
virtual ~Impl() {}

Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
Status ReadFlatColumn(int i, std::shared_ptr<Array>* out);

private:
MemoryPool* pool_;
std::unique_ptr<::parquet::ParquetFileReader> reader_;
};

class FlatColumnReader::Impl {
public:
Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr,
::parquet::ParquetFileReader* reader, int column_index);
virtual ~Impl() {}

Status NextBatch(int batch_size, std::shared_ptr<Array>* out);
template <typename ArrowType, typename ParquetType, typename CType>
Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);

private:
void NextRowGroup();

MemoryPool* pool_;
const ::parquet::ColumnDescriptor* descr_;
::parquet::ParquetFileReader* reader_;
int column_index_;
int next_row_group_;
std::shared_ptr<ColumnReader> column_reader_;
std::shared_ptr<Field> field_;

PoolBuffer values_buffer_;
PoolBuffer def_levels_buffer_;
PoolBuffer rep_levels_buffer_;
};

FileReader::Impl::Impl(
MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader)
: pool_(pool), reader_(std::move(reader)) {}

Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
std::unique_ptr<FlatColumnReader::Impl> impl(
new FlatColumnReader::Impl(pool_, reader_->descr()->Column(i), reader_.get(), i));
*out = std::unique_ptr<FlatColumnReader>(new FlatColumnReader(std::move(impl)));
return Status::OK();
}

Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
std::unique_ptr<FlatColumnReader> flat_column_reader;
RETURN_NOT_OK(GetFlatColumn(i, &flat_column_reader));
return flat_column_reader->NextBatch(reader_->num_rows(), out);
}

FileReader::FileReader(
MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader)
: impl_(new FileReader::Impl(pool, std::move(reader))) {}

FileReader::~FileReader() {}

Status FileReader::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
return impl_->GetFlatColumn(i, out);
}

Status FileReader::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
return impl_->ReadFlatColumn(i, out);
}

FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr,
::parquet::ParquetFileReader* reader, int column_index)
: pool_(pool),
descr_(descr),
reader_(reader),
column_index_(column_index),
next_row_group_(0),
values_buffer_(pool),
def_levels_buffer_(pool),
rep_levels_buffer_(pool) {
NodeToField(descr_->schema_node(), &field_);
NextRowGroup();
}

template <typename ArrowType, typename ParquetType, typename CType>
Status FlatColumnReader::Impl::TypedReadBatch(
int batch_size, std::shared_ptr<Array>* out) {
int values_to_read = batch_size;
NumericBuilder<ArrowType> builder(pool_, field_->type);
while ((values_to_read > 0) && column_reader_) {
values_buffer_.Resize(values_to_read * sizeof(CType));
if (descr_->max_definition_level() > 0) {
def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
}
if (descr_->max_repetition_level() > 0) {
rep_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
}
auto reader = dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get());
int64_t values_read;
CType* values = reinterpret_cast<CType*>(values_buffer_.mutable_data());
PARQUET_CATCH_NOT_OK(
values_to_read -= reader->ReadBatch(values_to_read,
reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()),
reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data()), values,
&values_read));
if (descr_->max_definition_level() == 0) {
RETURN_NOT_OK(builder.Append(values, values_read));
} else {
return Status::NotImplemented("no support for definition levels yet");
}
if (!column_reader_->HasNext()) { NextRowGroup(); }
}
*out = builder.Finish();
return Status::OK();
}

#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType, CType) \
case Type::ENUM: \
return TypedReadBatch<ArrowType, ParquetType, CType>(batch_size, out); \
break;

Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
if (!column_reader_) {
// Exhausted all row groups.
*out = nullptr;
return Status::OK();
}

if (descr_->max_repetition_level() > 0) {
return Status::NotImplemented("no support for repetition yet");
}

switch (field_->type->type) {
TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type, int32_t)
TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type, int64_t)
TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType, float)
TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType, double)
default:
return Status::NotImplemented(field_->type->ToString());
}
}

void FlatColumnReader::Impl::NextRowGroup() {
if (next_row_group_ < reader_->num_row_groups()) {
column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_);
next_row_group_++;
} else {
column_reader_ = nullptr;
}
}

FlatColumnReader::FlatColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}

FlatColumnReader::~FlatColumnReader() {}

Status FlatColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
return impl_->NextBatch(batch_size, out);
}

} // namespace parquet
} // namespace arrow

0 comments on commit 4bd13b8

Please sign in to comment.